Spark unionAll multiple dataframes

ScalaApache SparkApache Spark-Sql

Scala Problem Overview


For a set of dataframes

val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")

to union all of them I do

df1.unionAll(df2).unionAll(df3)

Is there a more elegant and scalable way of doing this for any number of dataframes, for example from

Seq(df1, df2, df3) 

Scala Solutions


Solution 1 - Scala

For pyspark you can do the following:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)

It's also worth nothing that the order of the columns in the dataframes should be the same for this to work. This can silently give unexpected results if you don't have the correct column orders!!

If you are using pyspark 2.3 or greater, you can use unionByName so you don't have to reorder the columns.

Solution 2 - Scala

The simplest solution is to reduce with union (unionAll in Spark < 2.0):

val dfs = Seq(df1, df2, df3)
dfs.reduce(_ union _)

This is relatively concise and shouldn't move data from off-heap storage but extends lineage with each union requires non-linear time to perform plan analysis. what can be a problem if you try to merge large number of DataFrames.

You can also convert to RDDs and use SparkContext.union:

dfs match {
  case h :: Nil => Some(h)
  case h :: _   => Some(h.sqlContext.createDataFrame(
                     h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
                     h.schema
                   ))
  case Nil  => None
}

It keeps lineage short analysis cost low but otherwise it is less efficient than merging DataFrames directly.

Solution 3 - Scala

Under the Hood spark flattens union expressions. So it takes longer when the Union is done linearly.

The best solution is spark to have a union function that supports multiple DataFrames.

But the following code might speed up the union of multiple DataFrames (or DataSets)somewhat.

  def union[T : ClassTag](datasets : TraversableOnce[Dataset[T]]) : Dataset[T] = {
      binaryReduce[Dataset[T]](datasets, _.union(_))
  }
  def binaryReduce[T : ClassTag](ts : TraversableOnce[T], op: (T, T) => T) : T = {
      if (ts.isEmpty) {
         throw new IllegalArgumentException
      }
      var array = ts toArray
      var size = array.size
      while(size > 1) {
         val newSize = (size + 1) / 2
         for (i <- 0 until newSize) {
             val index = i*2
             val index2 = index + 1
             if (index2 >= size) {
                array(i) = array(index)  // last remaining
             } else {
                array(i) = op(array(index), array(index2))
             }
         }
         size = newSize
     }
     array(0)
 }

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionechoView Question on Stackoverflow
Solution 1 - ScalaTH22View Answer on Stackoverflow
Solution 2 - Scalazero323View Answer on Stackoverflow
Solution 3 - ScalaS. BiedermannView Answer on Stackoverflow