How do I convert csv file to rdd
ScalaApache SparkScala Problem Overview
I'm new to spark. I want to perform some operations on particular data in a CSV record.
I'm trying to read a CSV file and convert it to RDD. My further operations are based on the heading provided in CSV file.
(From comments) This is my code so far:
final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String s) {
return Arrays.asList(EOL.split(s));
}
});
final String heading=lines.first().toString();
I can get the header values like this. I want to map this to each record in CSV file.
final String[] header=heading.split(" ");
I can get the header values like this. I want to map this to each record in CSV file.
In java I’m using CSVReader record.getColumnValue(Column header)
to get the particular value. I need to do something similar to that here.
Scala Solutions
Solution 1 - Scala
A simplistic approach would be to have a way to preserve the header.
Let's say you have a file.csv like:
user, topic, hits
om, scala, 120
daniel, spark, 80
3754978, spark, 1
We can define a header class that uses a parsed version of the first row:
class SimpleCSVHeader(header:Array[String]) extends Serializable {
val index = header.zipWithIndex.toMap
def apply(array:Array[String], key:String):String = array(index(key))
}
That we can use that header to address the data further down the road:
val csv = sc.textFile("file.csv") // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...
Note that the header
is not much more than a simple map of a mnemonic to the array index. Pretty much all this could be done on the ordinal place of the element in the array, like user = row(0)
PS: Welcome to Scala :-)
Solution 2 - Scala
You can use the spark-csv library: https://github.com/databricks/spark-csv
This is directly from the documentation:
import org.apache.spark.sql.SQLContext
SQLContext sqlContext = new SQLContext(sc);
HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "cars.csv");
DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
Solution 3 - Scala
Firstly I must say that it's much much simpler if you put your headers in separate files - this is the convention in big data.
Anyway Daniel's answer is pretty good, but it has an inefficiency and a bug, so I'm going to post my own. The inefficiency is that you don't need to check every record to see if it's the header, you just need to check the first record for each partition. The bug is that by using .split(",")
you could get an exception thrown or get the wrong column when entries are the empty string and occur at the start or end of the record - to correct that you need to use .split(",", -1)
. So here is the full code:
val header =
scala.io.Source.fromInputStream(
hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration)
.open(new hadoop.fs.Path(path)))
.getLines.head
val columnIndex = header.split(",").indexOf(columnName)
sc.textFile(path).mapPartitions(iterator => {
val head = iterator.next()
if (head == header) iterator else Iterator(head) ++ iterator
})
.map(_.split(",", -1)(columnIndex))
Final points, consider Parquet if you want to only fish out certain columns. Or at least consider implementing a lazily evaluated split function if you have wide rows.
Solution 4 - Scala
We can use the new DataFrameRDD for reading and writing the CSV data. There are few advantages of DataFrameRDD over NormalRDD:
- DataFrameRDD are bit more faster than NormalRDD since we determine the schema and which helps to optimize a lot on runtime and provide us with significant performance gain.
- Even if the column shifts in CSV it will automatically take the correct column as we are not hard coding the column number which was present in reading the data as textFile and then splitting it and then using the number of column to get the data.
- In few lines of code you can read the CSV file directly.
You will be required to have this library: Add it in build.sbt
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"
Spark Scala code for it:
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val csvInPath = "/path/to/csv/abc.csv"
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
//format is for specifying the type of file you are reading
//header = true indicates that the first line is header in it
To convert to normal RDD by taking some of the columns from it and
val rddData = df.map(x=>Row(x.getAs("colA")))
//Do other RDD operation on it
Saving the RDD to CSV format:
val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")
Since the header is set to true we will be getting the header name in all the output files.
Solution 5 - Scala
I'd recommend reading the header directly from the driver, not through Spark. Two reasons for this: 1) It's a single line. There's no advantage to a distributed approach. 2) We need this line in the driver, not the worker nodes.
It goes something like this:
// Ridiculous amount of code to read one line.
val uri = new java.net.URI(filename)
val conf = sc.hadoopConfiguration
val fs = hadoop.fs.FileSystem.get(uri, conf)
val path = new hadoop.fs.Path(filename)
val stream = fs.open(path)
val source = scala.io.Source.fromInputStream(stream)
val header = source.getLines.head
Now when you make the RDD you can discard the header.
val csvRDD = sc.textFile(filename).filter(_ != header)
Then we can make an RDD from one column, for example:
val idx = header.split(",").indexOf(columnName)
val columnRDD = csvRDD.map(_.split(",")(idx))
Solution 6 - Scala
Here is another example using Spark/Scala to convert a CSV to RDD. For a more detailed description see this post.
def main(args: Array[String]): Unit = {
val csv = sc.textFile("/path/to/your/file.csv")
// split / clean data
val headerAndRows = csv.map(line => line.split(",").map(_.trim))
// get header
val header = headerAndRows.first
// filter out header (eh. just check if the first val matches the first header name)
val data = headerAndRows.filter(_(0) != header(0))
// splits to map (header/value pairs)
val maps = data.map(splits => header.zip(splits).toMap)
// filter out the user "me"
val result = maps.filter(map => map("user") != "me")
// print result
result.foreach(println)
}
Solution 7 - Scala
Another alternative is to use the mapPartitionsWithIndex
method as you'll get the partition index number and a list of all lines within that partition. Partition 0 and line 0 will be be the header
val rows = sc.textFile(path)
.mapPartitionsWithIndex({ (index: Int, rows: Iterator[String]) =>
val results = new ArrayBuffer[(String, Int)]
var first = true
while (rows.hasNext) {
// check for first line
if (index == 0 && first) {
first = false
rows.next // skip the first row
} else {
results += rows.next
}
}
results.toIterator
}, true)
rows.flatMap { row => row.split(",") }
Solution 8 - Scala
How about this?
val Delimeter = ","
val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter))
Solution 9 - Scala
For spark scala I typically use when I can't use the spark csv packages...
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv")
val header = rawdata.first()
val tbldata = rawdata.filter(_(0) != header(0))
Solution 10 - Scala
I would suggest you to try
https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
new Function<String, Person>() {
public Person call(String line) throws Exception {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
}
});
You have to have a class in this example person with the spec of your file header and associate your data to the schema and apply criteria like in mysql.. to get desired result
Solution 11 - Scala
I think you can try to load that csv into a RDD and then create a dataframe from that RDD, here is the document of creating dataframe from rdd:http://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds
Solution 12 - Scala
As of Spark 2.0, CSV can be read directly into a DataFrame
.
If the data file does not have a header row, then it would be:
val df = spark.read.csv("file://path/to/data.csv")
That will load the data, but give each column generic names like _c0
, _c1
, etc.
If there are headers then adding .option("header", "true")
will use the first row to define the columns in the DataFrame
:
val df = spark.read
.option("header", "true")
.csv("file://path/to/data.csv")
For a concrete example, let's say you have a file with the contents:
user,topic,hits
om,scala,120
daniel,spark,80
3754978,spark,1
Then the following will get the total hits grouped by topic:
import org.apache.spark.sql.functions._
import spark.implicits._
val rawData = spark.read
.option("header", "true")
.csv("file://path/to/data.csv")
// specifies the query, but does not execute it
val grouped = rawData.groupBy($"topic").agg(sum($"hits))
// runs the query, pulling the data to the master node
// can fail if the amount of data is too much to fit
// into the master node's memory!
val collected = grouped.collect
// runs the query, writing the result back out
// in this case, changing format to Parquet since that can
// be nicer to work with in Spark
grouped.write.parquet("hdfs://some/output/directory/")
// runs the query, writing the result back out
// in this case, in CSV format with a header and
// coalesced to a single file. This is easier for human
// consumption but usually much slower.
grouped.coalesce(1)
.write
.option("header", "true")
.csv("hdfs://some/output/directory/")