Spark sql how to explode without losing null values

JavaApache SparkNullApache Spark-Sql

Java Problem Overview


I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. For instance,

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]

should become

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

This is my code

private DataFrame explodeDataFrame(DataFrame df) {
    DataFrame resultDf = df;
    for (StructField field : df.schema().fields()) {
        if (field.dataType() instanceof ArrayType) {
            resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
            resultDf.show();
        }
    }
    return resultDf;
}

The problem is that in my data, some of the array columns have nulls. In that case, the entire row is deleted. So this dataframe:

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
2  | Lucy | null

becomes

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

instead of

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
2  | Lucy | null

How can I explode my arrays so that I don't lose the null rows?

I am using Spark 1.5.2 and Java 8

Java Solutions


Solution 1 - Java

Spark 2.2+

You can use explode_outer function:

import org.apache.spark.sql.functions.explode_outer

df.withColumn("likes", explode_outer($"likes")).show

// +---+----+--------+
// | id|name|   likes|
// +---+----+--------+
// |  1|Luke|baseball|
// |  1|Luke|  soccer|
// |  2|Lucy|    null|
// +---+----+--------+

Spark <= 2.1

In Scala but Java equivalent should be almost identical (to import individual functions use import static).

import org.apache.spark.sql.functions.{array, col, explode, lit, when}

val df = Seq(
  (1, "Luke", Some(Array("baseball", "soccer"))),
  (2, "Lucy", None)
).toDF("id", "name", "likes")

df.withColumn("likes", explode(
  when(col("likes").isNotNull, col("likes"))
    // If null explode an array<string> with a single null
    .otherwise(array(lit(null).cast("string")))))

The idea here is basically to replace NULL with an array(NULL) of a desired type. For complex type (a.k.a structs) you have to provide full schema:

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")

val st =  StructType(Seq(
  StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast(st)))))

or

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))

Note:

If array Column has been created with containsNull set to false you should change this first (tested with Spark 2.1):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))

Solution 2 - Java

You can use explode_outer() function.

Solution 3 - Java

Following up on the accepted answer, when the array elements are a complex type it can be difficult to define it by hand (e.g with large structs).

To do it automatically I wrote the following helper method:

  def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = {
      val arrayFields = df.schema.fields
          .map(field => field.name -> field.dataType)
          .collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])}
          .toMap

      columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) =>
      dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
        .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))    
 }

Edit: it seems that spark 2.2 and newer have this built in.

Solution 4 - Java

To handle empty map type column: for Spark <= 2.1

 List((1, Array(2, 3, 4), Map(1 -> "a")),
(2, Array(5, 6, 7), Map(2 -> "b")),
(3, Array[Int](), Map[Int, String]())).toDF("col1", "col2", "col3").show()


 df.select('col1, explode(when(size(map_keys('col3)) === 0, map(lit("null"), lit("null"))).
otherwise('col3))).show()

Solution 5 - Java

from pyspark.sql.functions import *

def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
    flat_df = nested_df.select(flat_cols +
                               [col(nc + '.' + c).alias(nc + '_' + c)
                                for nc in nested_cols
                                for c in nested_df.select(nc + '.*').columns])
    print("flatten_df_count :", flat_df.count())
    return flat_df

def explode_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct' and c[1][:5] != 'array']
    array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
    for array_col in array_cols:
        schema = new_df.select(array_col).dtypes[0][1]
        nested_df = nested_df.withColumn(array_col, when(col(array_col).isNotNull(), col(array_col)).otherwise(array(lit(None)).cast(schema))) 
    nested_df = nested_df.withColumn("tmp", arrays_zip(*array_cols)).withColumn("tmp", explode("tmp")).select([col("tmp."+c).alias(c) for c in array_cols] + flat_cols)
    print("explode_dfs_count :", nested_df.count())
    return nested_df


new_df = flatten_df(myDf)
while True:
    array_cols = [c[0] for c in new_df.dtypes if c[1][:5] == 'array']
    if len(array_cols):
        new_df = flatten_df(explode_df(new_df))
    else:
        break
    
new_df.printSchema()

Used arrays_zip and explode to do it faster and address the null issue.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionalexgbelovView Question on Stackoverflow
Solution 1 - Javazero323View Answer on Stackoverflow
Solution 2 - JavaTopGuysView Answer on Stackoverflow
Solution 3 - JavansanglarView Answer on Stackoverflow
Solution 4 - JavaMohana B CView Answer on Stackoverflow
Solution 5 - JavaVijay Anand PandianView Answer on Stackoverflow