Provide schema while reading csv file as a dataframe

ScalaApache SparkDataframeApache Spark-SqlSpark Csv

Scala Problem Overview


I am trying to read a csv file into a dataframe. I know what the schema of my dataframe should be since I know my csv file. Also I am using spark csv package to read the file. I trying to specify the schema like below.

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

But when I check the schema of the data frame I created, it seems to have taken its own schema. Am I doing anything wrong ? how to make spark to pick up the schema I mentioned ?

> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)

Scala Solutions


Solution 1 - Scala

Try the below code, you need not specify the schema. When you give inferSchema as true it should take it from your csv file.

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

If you want to manually specify the schema, you can do it as below:

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("project", StringType, true),
  StructField("article", StringType, true),
  StructField("requests", IntegerType, true),
  StructField("bytes_served", DoubleType, true))
)

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .schema(customSchema)
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

  

Solution 2 - Scala

For those interested in doing this in Python here is a working version.

customSchema = StructType([
    StructField("IDGC", StringType(), True),        
    StructField("SEARCHNAME", StringType(), True),
    StructField("PRICE", DoubleType(), True)
])
productDF = spark.read.load('/home/ForTesting/testProduct.csv', format="csv", header="true", sep='|', schema=customSchema)

testProduct.csv
ID|SEARCHNAME|PRICE
6607|EFKTON75LIN|890.88
6612|EFKTON100HEN|55.66

Hope this helps.

Solution 3 - Scala

I'm using the solution provided by Arunakiran Nulu in my analysis (see the code). Despite it is able to assign the correct types to the columns, all the values returned are null. Previously, I've tried to the option .option("inferSchema", "true") and it returns the correct values in the dataframe (although different type).

val customSchema = StructType(Array(
    StructField("numicu", StringType, true),
    StructField("fecha_solicitud", TimestampType, true),
    StructField("codtecnica", StringType, true),
    StructField("tecnica", StringType, true),
    StructField("finexploracion", TimestampType, true),
    StructField("ultimavalidacioninforme", TimestampType, true),
    StructField("validador", StringType, true)))

val df_explo = spark.read
        .format("csv")
        .option("header", "true")
        .option("delimiter", "\t")
        .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") 
        .schema(customSchema)
        .load(filename)

Result

root


|-- numicu: string (nullable = true)
 |-- fecha_solicitud: timestamp (nullable = true)
 |-- codtecnica: string (nullable = true)
 |-- tecnica: string (nullable = true)
 |-- finexploracion: timestamp (nullable = true)
 |-- ultimavalidacioninforme: timestamp (nullable = true)
 |-- validador: string (nullable = true)

and the table is:

|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador|
+------+---------------+----------+-------+--------------+-----------------------+---------+
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|

Solution 4 - Scala

Thanks to the answer by @Nulu, it works for pyspark with minimal tweaking

from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType

customSchema = StructType(Array(
    StructField("project", StringType, true),
    StructField("article", StringType, true),
    StructField("requests", IntegerType, true),
    StructField("bytes_served", DoubleType, true)))

pagecount = sc.read.format("com.databricks.spark.csv")
         .option("delimiter"," ")
         .option("quote","")
         .option("header", "false")
         .schema(customSchema)
         .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

Solution 5 - Scala

The previous solutions have used the custom StructType.

With spark-sql 2.4.5 (scala version 2.12.10) it is now possible to specify the schema as a string using the schema function

import org.apache.spark.sql.SparkSession;

val sparkSession = SparkSession.builder()
            .appName("sample-app")
            .master("local[2]")
            .getOrCreate();

val pageCount = sparkSession.read
  .format("csv")
  .option("delimiter","|")
  .option("quote","")
  .schema("project string ,article string ,requests integer ,bytes_served long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

Solution 6 - Scala

> schema definition as simple string

Just in case if some one is interested in schema definition as simple string with date and time stamp

> data file creation from Terminal or shell

echo " 
2019-07-02 22:11:11.000999, 01/01/2019, Suresh, abc  
2019-01-02 22:11:11.000001, 01/01/2020, Aadi, xyz 
" > data.csv

> Defining the schema as String

    user_schema = 'timesta TIMESTAMP,date DATE,first_name STRING , last_name STRING'

> reading the data

    df = spark.read.csv(path='data.csv', schema = user_schema, sep=',', dateFormat='MM/dd/yyyy',timestampFormat='yyyy-MM-dd HH:mm:ss.SSSSSS')

    df.show(10, False)
    
    +-----------------------+----------+----------+---------+
    |timesta                |date      |first_name|last_name|
    +-----------------------+----------+----------+---------+
    |2019-07-02 22:11:11.999|2019-01-01| Suresh   | abc     |
    |2019-01-02 22:11:11.001|2020-01-01| Aadi     | xyz     |
    +-----------------------+----------+----------+---------+

> Please note defining the schema explicitly instead of letting spark infer the schema also improves the spark read performance.

Solution 7 - Scala

Here's how you can work with a custom schema, a complete demo:

$> shell code,

echo "
Slingo, iOS 
Slingo, Android
" > game.csv

Scala code:

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("game_id", StringType, true),
  StructField("os_id", StringType, true)
))

val csv_df = spark.read.format("csv").schema(customSchema).load("game.csv")
csv_df.show 

csv_df.orderBy(asc("game_id"), desc("os_id")).show
csv_df.createOrReplaceTempView("game_view")
val sort_df = sql("select * from game_view order by game_id, os_id desc")
sort_df.show 

Solution 8 - Scala

if your spark version is 3.0.1, you can use following Scala scripts:

val df = spark.read.format("csv").option("delimiter",",").option("header",true).load("file:///LOCAL_CSV_FILE_PATH")

but in this way, all datatypes will be set as String.

Solution 9 - Scala

// import Library
import java.io.StringReader ;

import au.com.bytecode.opencsv.CSVReader

//filename

var train_csv = "/Path/train.csv";

//read as text file

val train_rdd = sc.textFile(train_csv)   

//use string reader to convert in proper format

var full_train_data  = train_rdd.map{line =>  var csvReader = new CSVReader(new StringReader(line)) ; csvReader.readNext();  }   

//declares  types

type s = String

// declare case class for schema

case class trainSchema (Loan_ID :s ,Gender :s, Married :s, Dependents :s,Education :s,Self_Employed :s,ApplicantIncome :s,CoapplicantIncome :s,
	LoanAmount :s,Loan_Amount_Term :s, Credit_History :s, Property_Area :s,Loan_Status :s)

//create DF RDD with custom schema 

var full_train_data_with_schema = full_train_data.mapPartitionsWithIndex{(idx,itr)=> if (idx==0) itr.drop(1); 
	                 itr.toList.map(x=> trainSchema(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12))).iterator }.toDF

Solution 10 - Scala

In pyspark 2.4 onwards, you can simply use header parameter to set the correct header:

data = spark.read.csv('data.csv', header=True)

Similarly, if using scala you can use header parameter as well.

Solution 11 - Scala

You can also do like this by using sparkSession and implicit

import sparkSession.implicits._
val pagecount:DataFrame = sparkSession.read
.option("delimiter"," ")
.option("quote","")
.option("inferSchema","true")
.csv("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
.toDF("project","article","requests","bytes_served")

Solution 12 - Scala

This is one of option where we can pass the column names to the dataframe while loading CSV.

import pandas
    names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']
    dataset = pandas.read_csv("C:/Users/NS00606317/Downloads/Iris.csv", names=names, header=0)
print(dataset.head(10))

Output

    sepal-length  sepal-width  petal-length  petal-width        class
1            5.1          3.5           1.4          0.2  Iris-setosa
2            4.9          3.0           1.4          0.2  Iris-setosa
3            4.7          3.2           1.3          0.2  Iris-setosa
4            4.6          3.1           1.5          0.2  Iris-setosa
5            5.0          3.6           1.4          0.2  Iris-setosa
6            5.4          3.9           1.7          0.4  Iris-setosa
7            4.6          3.4           1.4          0.3  Iris-setosa
8            5.0          3.4           1.5          0.2  Iris-setosa
9            4.4          2.9           1.4          0.2  Iris-setosa
10           4.9          3.1           1.5          0.1  Iris-setosa

Solution 13 - Scala

here my solution is:

import org.apache.spark.sql.types._
  val spark = org.apache.spark.sql.SparkSession.builder.
  master("local[*]").
  appName("Spark CSV Reader").
  getOrCreate()

val movie_rating_schema = StructType(Array(
  StructField("UserID", IntegerType, true),
  StructField("MovieID", IntegerType, true),
  StructField("Rating", DoubleType, true),
  StructField("Timestamp", TimestampType, true)))

val df_ratings: DataFrame = spark.read.format("csv").
  option("header", "true").
  option("mode", "DROPMALFORMED").
  option("delimiter", ",").
  //option("inferSchema", "true").
  option("nullValue", "null").
  schema(movie_rating_schema).
  load(args(0)) //"file:///home/hadoop/spark-workspace/data/ml-20m/ratings.csv"

val movie_avg_scores = df_ratings.rdd.map(_.toString()).
  map(line => {
    // drop "[", "]" and then split the str 
    val fileds = line.substring(1, line.length() - 1).split(",")
    //extract (movie id, average rating)
    (fileds(1).toInt, fileds(2).toDouble)
  }).
  groupByKey().
  map(data => {
    val avg: Double = data._2.sum / data._2.size
    (data._1, avg)
  })

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionPa1View Question on Stackoverflow
Solution 1 - ScalaArunakiran NuluView Answer on Stackoverflow
Solution 2 - Scalauser3008410View Answer on Stackoverflow
Solution 3 - ScalaAlberto Castelo BecerraView Answer on Stackoverflow
Solution 4 - ScalaX.XView Answer on Stackoverflow
Solution 5 - ScalaggordonView Answer on Stackoverflow
Solution 6 - ScalaSureshView Answer on Stackoverflow
Solution 7 - ScalaCharlie 木匠View Answer on Stackoverflow
Solution 8 - ScalaRayView Answer on Stackoverflow
Solution 9 - Scaladalwinder singhView Answer on Stackoverflow
Solution 10 - ScalaYOLOView Answer on Stackoverflow
Solution 11 - ScalawhoisthisView Answer on Stackoverflow
Solution 12 - ScalaNilesh ShindeView Answer on Stackoverflow
Solution 13 - ScalaSnailDoveView Answer on Stackoverflow