How do I convert csv file to rdd

ScalaApache Spark

Scala Problem Overview


I'm new to spark. I want to perform some operations on particular data in a CSV record.

I'm trying to read a CSV file and convert it to RDD. My further operations are based on the heading provided in CSV file.

(From comments) This is my code so far:

final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { 
    @Override public Iterable<String> call(String s) { 
    return Arrays.asList(EOL.split(s)); 
    } 
});
final String heading=lines.first().toString();

I can get the header values like this. I want to map this to each record in CSV file.

final String[] header=heading.split(" "); 

I can get the header values like this. I want to map this to each record in CSV file.

In java I’m using CSVReader record.getColumnValue(Column header) to get the particular value. I need to do something similar to that here.

Scala Solutions


Solution 1 - Scala

A simplistic approach would be to have a way to preserve the header.

Let's say you have a file.csv like:

user, topic, hits
om,  scala, 120
daniel, spark, 80
3754978, spark, 1

We can define a header class that uses a parsed version of the first row:

class SimpleCSVHeader(header:Array[String]) extends Serializable {
  val index = header.zipWithIndex.toMap
  def apply(array:Array[String], key:String):String = array(index(key))
}

That we can use that header to address the data further down the road:

val csv = sc.textFile("file.csv")  // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...

Note that the header is not much more than a simple map of a mnemonic to the array index. Pretty much all this could be done on the ordinal place of the element in the array, like user = row(0)

PS: Welcome to Scala :-)

Solution 2 - Scala

You can use the spark-csv library: https://github.com/databricks/spark-csv

This is directly from the documentation:

import org.apache.spark.sql.SQLContext

SQLContext sqlContext = new SQLContext(sc);

HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "cars.csv");

DataFrame df = sqlContext.load("com.databricks.spark.csv", options);

Solution 3 - Scala

Firstly I must say that it's much much simpler if you put your headers in separate files - this is the convention in big data.

Anyway Daniel's answer is pretty good, but it has an inefficiency and a bug, so I'm going to post my own. The inefficiency is that you don't need to check every record to see if it's the header, you just need to check the first record for each partition. The bug is that by using .split(",") you could get an exception thrown or get the wrong column when entries are the empty string and occur at the start or end of the record - to correct that you need to use .split(",", -1). So here is the full code:

val header =
  scala.io.Source.fromInputStream(
    hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration)
    .open(new hadoop.fs.Path(path)))
  .getLines.head

val columnIndex = header.split(",").indexOf(columnName)
     
sc.textFile(path).mapPartitions(iterator => {
  val head = iterator.next()
  if (head == header) iterator else Iterator(head) ++ iterator
})
.map(_.split(",", -1)(columnIndex))

Final points, consider Parquet if you want to only fish out certain columns. Or at least consider implementing a lazily evaluated split function if you have wide rows.

Solution 4 - Scala

We can use the new DataFrameRDD for reading and writing the CSV data. There are few advantages of DataFrameRDD over NormalRDD:

  1. DataFrameRDD are bit more faster than NormalRDD since we determine the schema and which helps to optimize a lot on runtime and provide us with significant performance gain.
  2. Even if the column shifts in CSV it will automatically take the correct column as we are not hard coding the column number which was present in reading the data as textFile and then splitting it and then using the number of column to get the data.
  3. In few lines of code you can read the CSV file directly.

You will be required to have this library: Add it in build.sbt

libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"

Spark Scala code for it:

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val csvInPath = "/path/to/csv/abc.csv"
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
//format is for specifying the type of file you are reading
//header = true indicates that the first line is header in it

To convert to normal RDD by taking some of the columns from it and

val rddData = df.map(x=>Row(x.getAs("colA")))
//Do other RDD operation on it

Saving the RDD to CSV format:

val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")

Since the header is set to true we will be getting the header name in all the output files.

Solution 5 - Scala

I'd recommend reading the header directly from the driver, not through Spark. Two reasons for this: 1) It's a single line. There's no advantage to a distributed approach. 2) We need this line in the driver, not the worker nodes.

It goes something like this:

// Ridiculous amount of code to read one line.
val uri = new java.net.URI(filename)
val conf = sc.hadoopConfiguration
val fs = hadoop.fs.FileSystem.get(uri, conf)
val path = new hadoop.fs.Path(filename)
val stream = fs.open(path)
val source = scala.io.Source.fromInputStream(stream)
val header = source.getLines.head

Now when you make the RDD you can discard the header.

val csvRDD = sc.textFile(filename).filter(_ != header)

Then we can make an RDD from one column, for example:

val idx = header.split(",").indexOf(columnName)
val columnRDD = csvRDD.map(_.split(",")(idx))

Solution 6 - Scala

Here is another example using Spark/Scala to convert a CSV to RDD. For a more detailed description see this post.

def main(args: Array[String]): Unit = {
  val csv = sc.textFile("/path/to/your/file.csv")

  // split / clean data
  val headerAndRows = csv.map(line => line.split(",").map(_.trim))
  // get header
  val header = headerAndRows.first
  // filter out header (eh. just check if the first val matches the first header name)
  val data = headerAndRows.filter(_(0) != header(0))
  // splits to map (header/value pairs)
  val maps = data.map(splits => header.zip(splits).toMap)
  // filter out the user "me"
  val result = maps.filter(map => map("user") != "me")
  // print result
  result.foreach(println)
}

Solution 7 - Scala

Another alternative is to use the mapPartitionsWithIndex method as you'll get the partition index number and a list of all lines within that partition. Partition 0 and line 0 will be be the header

val rows = sc.textFile(path)
  .mapPartitionsWithIndex({ (index: Int, rows: Iterator[String]) => 
    val results = new ArrayBuffer[(String, Int)]
    
    var first = true
    while (rows.hasNext) {
      // check for first line
      if (index == 0 && first) {
        first = false
        rows.next // skip the first row
      } else {
        results += rows.next
      }
    }
    
    results.toIterator
}, true)

rows.flatMap { row => row.split(",") }

Solution 8 - Scala

How about this?

val Delimeter = ","
val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter))

Solution 9 - Scala

For spark scala I typically use when I can't use the spark csv packages...

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv")
val header = rawdata.first()
val tbldata = rawdata.filter(_(0) != header(0))

Solution 10 - Scala

I would suggest you to try

https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds

JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  new Function<String, Person>() {
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");

      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));

      return person;
    }
  });

You have to have a class in this example person with the spec of your file header and associate your data to the schema and apply criteria like in mysql.. to get desired result

Solution 11 - Scala

I think you can try to load that csv into a RDD and then create a dataframe from that RDD, here is the document of creating dataframe from rdd:http://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds

Solution 12 - Scala

As of Spark 2.0, CSV can be read directly into a DataFrame.

If the data file does not have a header row, then it would be:

val df = spark.read.csv("file://path/to/data.csv")

That will load the data, but give each column generic names like _c0, _c1, etc.

If there are headers then adding .option("header", "true") will use the first row to define the columns in the DataFrame:

val df = spark.read
  .option("header", "true")
  .csv("file://path/to/data.csv")

For a concrete example, let's say you have a file with the contents:

user,topic,hits
om,scala,120
daniel,spark,80
3754978,spark,1

Then the following will get the total hits grouped by topic:

import org.apache.spark.sql.functions._
import spark.implicits._

val rawData = spark.read
  .option("header", "true")
  .csv("file://path/to/data.csv")

// specifies the query, but does not execute it
val grouped = rawData.groupBy($"topic").agg(sum($"hits))

// runs the query, pulling the data to the master node
// can fail if the amount of data is too much to fit 
// into the master node's memory!
val collected = grouped.collect

// runs the query, writing the result back out
// in this case, changing format to Parquet since that can
//   be nicer to work with in Spark
grouped.write.parquet("hdfs://some/output/directory/")

// runs the query, writing the result back out
// in this case, in CSV format with a header and 
// coalesced to a single file.  This is easier for human 
// consumption but usually much slower.
grouped.coalesce(1)
  .write
  .option("header", "true")
  .csv("hdfs://some/output/directory/")

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionRamyaView Question on Stackoverflow
Solution 1 - ScalamaasgView Answer on Stackoverflow
Solution 2 - ScalaSamanView Answer on Stackoverflow
Solution 3 - ScalasamthebestView Answer on Stackoverflow
Solution 4 - ScalaAjay GuptaView Answer on Stackoverflow
Solution 5 - ScalaDaniel DarabosView Answer on Stackoverflow
Solution 6 - ScalacmdView Answer on Stackoverflow
Solution 7 - ScalamightymephistoView Answer on Stackoverflow
Solution 8 - Scalaom-nom-nomView Answer on Stackoverflow
Solution 9 - ScalaBruce NelsonView Answer on Stackoverflow
Solution 10 - ScalamithraView Answer on Stackoverflow
Solution 11 - Scalataotao.liView Answer on Stackoverflow
Solution 12 - Scalahayden.sikhView Answer on Stackoverflow