How do I skip a header from CSV files in Spark?

ScalaCsvApache Spark

Scala Problem Overview


Suppose I give three files paths to a Spark context to read and each file has a schema in the first row. How can we skip schema lines from headers?

val rdd=sc.textFile("file1,file2,file3")

Now, how can we skip header lines from this rdd?

Scala Solutions


Solution 1 - Scala

data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header)   #filter out header

Solution 2 - Scala

If there were just one header line in the first record, then the most efficient way to filter it out would be:

rdd.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}

This doesn't help if of course there are many files with many header lines inside. You can union three RDDs you make this way, indeed.

You could also just write a filter that matches only a line that could be a header. This is quite simple, but less efficient.

Python equivalent:

from itertools import islice

rdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
)

Solution 3 - Scala

In Spark 2.0 a CSV reader is build into Spark, so you can easily load a CSV file as follows:

spark.read.option("header","true").csv("filePath")

Solution 4 - Scala

From Spark 2.0 onwards what you can do is use SparkSession to get this done as a one liner:

val spark = SparkSession.builder.config(conf).getOrCreate()

and then as @SandeepPurohit said:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)

I hope it solved your question !

P.S: SparkSession is the new entry point introduced in Spark 2.0 and can be found under spark_sql package

Solution 5 - Scala

Working in 2018 (Spark 2.3)

Python

df = spark.read
    .option("header", "true")
    .format("csv")
    .schema(myManualSchema)
    .load("mycsv.csv")

Scala

val myDf = spark.read
  .option("header", "true")
  .format("csv")
  .schema(myManualSchema)
  .load("mycsv.csv")

PD1: myManualSchema is a predefined schema written by me, you could skip that part of code

UPDATE 2021 The same code works for Spark 3.x

df = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .format("csv")
    .csv("mycsv.csv")

Solution 6 - Scala

You could load each file separately, filter them with file.zipWithIndex().filter(_._2 > 0) and then union all the file RDDs.

If the number of files is too large, the union could throw a StackOverflowExeption.

Solution 7 - Scala

In PySpark you can use a dataframe and set header as True:

df = spark.read.csv(dataPath, header=True)

Solution 8 - Scala

Use the filter() method in PySpark by filtering out the first column name to remove the header:

# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)

# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)

# Check your result
for i in filterDD.take(5) : print (i)

Solution 9 - Scala

Alternatively, you can use the spark-csv package (or in Spark 2.0 this is more or less available natively as CSV). Note that this expects the header on each file (as you desire):

schema = StructType([
        StructField('lat',DoubleType(),True),
        StructField('lng',DoubleType(),True)])

df = sqlContext.read.format('com.databricks.spark.csv'). \
     options(header='true',
             delimiter="\t",
             treatEmptyValuesAsNulls=True,
             mode="DROPMALFORMED").load(input_file,schema=schema)

Solution 10 - Scala

It's an option that you pass to the read() command:

context = new org.apache.spark.sql.SQLContext(sc)

var data = context.read.option("header","true").csv("<path>")

Solution 11 - Scala

//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{
    case (fileName, stream)=>
        val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
        (fileName, header)
}.collect().toMap

val fileNameHeaderBr = sc.broadcast(fileNameHeader)

// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter =>
    if(iter.hasNext){
        val firstLine = iter.next()
        println(s"Comparing with firstLine $firstLine")
        if(firstLine == fileNameHeaderBr.value.head._2)
            new WrappedIterator(null, iter)
        else
            new WrappedIterator(firstLine, iter)
    }
    else {
        iter
    }
).collect().foreach(println)

class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
    var isFirstIteration = true
    override def hasNext: Boolean = {
        if (isFirstIteration && firstLine != null){
            true
        }
        else{
            iter.hasNext
        }
    }

    override def next(): String = {
        if (isFirstIteration){
            println(s"For the first time $firstLine")
            isFirstIteration = false
            if (firstLine != null){
                firstLine
            }
            else{
                println(s"Every time $firstLine")
                iter.next()
            }
        }
        else {
          iter.next()
        }
    }
}

Solution 12 - Scala

For python developers. I have tested with spark2.0. Let's say you want to remove first 14 rows.

sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

withColumn is df function. So below will not work in RDD style as used above.

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionHafiz MujadidView Question on Stackoverflow
Solution 1 - ScalaJimmyView Answer on Stackoverflow
Solution 2 - ScalaSean OwenView Answer on Stackoverflow
Solution 3 - ScalaSandeep PurohitView Answer on Stackoverflow
Solution 4 - ScalaShivanshView Answer on Stackoverflow
Solution 5 - ScalaAntonio CachuanView Answer on Stackoverflow
Solution 6 - ScalapzecevicView Answer on Stackoverflow
Solution 7 - ScalahayjView Answer on Stackoverflow
Solution 8 - Scalakumara81205View Answer on Stackoverflow
Solution 9 - ScalaAdrian BridgettView Answer on Stackoverflow
Solution 10 - ScalaSahan JayasumanaView Answer on Stackoverflow
Solution 11 - ScalaRockSolidView Answer on Stackoverflow
Solution 12 - ScalakartikView Answer on Stackoverflow