How to create an empty DataFrame with a specified schema?

ScalaApache SparkDataframeApache Spark-Sql

Scala Problem Overview


I want to create on DataFrame with a specified schema in Scala. I have tried to use JSON read (I mean reading empty file) but I don't think that's the best practice.

Scala Solutions


Solution 1 - Scala

Lets assume you want a data frame with the following schema:

root
 |-- k: string (nullable = true)
 |-- v: integer (nullable = false)

You simply define schema for a data frame and use empty RDD[Row]:

import org.apache.spark.sql.types.{
    StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row

val schema = StructType(
    StructField("k", StringType, true) ::
    StructField("v", IntegerType, false) :: Nil)

// Spark < 2.0
// sqlContext.createDataFrame(sc.emptyRDD[Row], schema) 
spark.createDataFrame(sc.emptyRDD[Row], schema)

PySpark equivalent is almost identical:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("k", StringType(), True), StructField("v", IntegerType(), False)
])

# or df = sc.parallelize([]).toDF(schema)

# Spark < 2.0 
# sqlContext.createDataFrame([], schema)
df = spark.createDataFrame([], schema)

Using implicit encoders (Scala only) with Product types like Tuple:

import spark.implicits._

Seq.empty[(String, Int)].toDF("k", "v")

or case class:

case class KV(k: String, v: Int)

Seq.empty[KV].toDF

or

spark.emptyDataset[KV].toDF

Solution 2 - Scala

As of Spark 2.0.0, you can do the following.

Case Class

Let's define a Person case class:

scala> case class Person(id: Int, name: String)
defined class Person

Import spark SparkSession implicit Encoders:

scala> import spark.implicits._
import spark.implicits._

And use SparkSession to create an empty Dataset[Person]:

scala> spark.emptyDataset[Person]
res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]

Schema DSL

You could also use a Schema "DSL" (see Support functions for DataFrames in org.apache.spark.sql.ColumnName).

scala> val id = $"id".int
id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true)

scala> val name = $"name".string
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)

scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType

scala> val mySchema = StructType(id :: name :: Nil)
mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true))

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema)
emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> emptyDF.printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)

Solution 3 - Scala

import scala.reflect.runtime.{universe => ru}
def createEmptyDataFrame[T: ru.TypeTag] =
    hiveContext.createDataFrame(sc.emptyRDD[Row],
      ScalaReflection.schemaFor(ru.typeTag[T].tpe).dataType.asInstanceOf[StructType]
    )
  case class RawData(id: String, firstname: String, lastname: String, age: Int)
  val sourceDF = createEmptyDataFrame[RawData]

Solution 4 - Scala

Here you can create schema using StructType in scala and pass the Empty RDD so you will able to create empty table. Following code is for the same.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType



//import org.apache.hadoop.hive.serde2.objectinspector.StructField

object EmptyTable extends App {
  val conf = new SparkConf;
  val sc = new SparkContext(conf)
  //create sparksession object
  val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
  
  //Created schema for three columns 
   val schema = StructType(
    StructField("Emp_ID", LongType, true) ::
      StructField("Emp_Name", StringType, false) ::
      StructField("Emp_Salary", LongType, false) :: Nil)
 
      //Created Empty RDD 
      
  var dataRDD = sc.emptyRDD[Row]

  //pass rdd and schema to create dataframe
  val newDFSchema = sparkSession.createDataFrame(dataRDD, schema)

  newDFSchema.createOrReplaceTempView("tempSchema")

  sparkSession.sql("create table Finaltable AS select * from tempSchema")

}

Solution 5 - Scala

Java version to create empty DataSet:

public Dataset<Row> emptyDataSet(){

	SparkSession spark = SparkSession.builder().appName("Simple Application")
        		.config("spark.master", "local").getOrCreate();

	Dataset<Row> emptyDataSet = spark.createDataFrame(new ArrayList<>(), getSchema());

    return emptyDataSet;
}

public StructType getSchema() {

    String schemaString = "column1 column2 column3 column4 column5";

    List<StructField> fields = new ArrayList<>();

    StructField indexField = DataTypes.createStructField("column0", DataTypes.LongType, true);
    fields.add(indexField);

    for (String fieldName : schemaString.split(" ")) {
        StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
        fields.add(field);
    }

    StructType schema = DataTypes.createStructType(fields);

    return schema;
}

Solution 6 - Scala

This is helpful for testing purposes.

Seq.empty[String].toDF()

Solution 7 - Scala

Here is a solution that creates an empty dataframe in pyspark 2.0.0 or more.

from pyspark.sql import SQLContext
sc = spark.sparkContext
schema = StructType([StructField('col1', StringType(),False),StructField('col2', IntegerType(), True)])
sqlContext.createDataFrame(sc.emptyRDD(), schema)

Solution 8 - Scala

I had a special requirement wherein I already had a dataframe but given a certain condition I had to return an empty dataframe so I returned df.limit(0) instead.

Solution 9 - Scala

As of Spark 2.4.3

val df = SparkSession.builder().getOrCreate().emptyDataFrame

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
Questionuser1735076View Question on Stackoverflow
Solution 1 - Scalazero323View Answer on Stackoverflow
Solution 2 - ScalaJacek LaskowskiView Answer on Stackoverflow
Solution 3 - ScalaRavindraView Answer on Stackoverflow
Solution 4 - ScalaNilesh ShindeView Answer on Stackoverflow
Solution 5 - ScalaMolayView Answer on Stackoverflow
Solution 6 - Scalass301View Answer on Stackoverflow
Solution 7 - ScalabrajView Answer on Stackoverflow
Solution 8 - ScalaiamsmkrView Answer on Stackoverflow
Solution 9 - ScalaFox FairyView Answer on Stackoverflow