Derive multiple columns from a single column in a Spark DataFrame

ScalaApache SparkDataframeApache Spark-SqlUser Defined-Functions

Scala Problem Overview


I have a DF with a huge parseable metadata as a single string column in a Dataframe, lets call it DFA, with ColmnA.

I would like to break this column, ColmnA into multiple columns thru a function, ClassXYZ = Func1(ColmnA). This function returns a class ClassXYZ, with multiple variables, and each of these variables now has to be mapped to new Column, such a ColmnA1, ColmnA2 etc.

How would I do such a transformation from 1 Dataframe to another with these additional columns by calling this Func1 just once, and not have to repeat-it to create all the columns.

Its easy to solve if I were to call this huge function every time to add a new column, but that what I wish to avoid.

Kindly please advise with a working or pseudo code.

Thanks

Sanjay

Scala Solutions


Solution 1 - Scala

Generally speaking what you want is not directly possible. UDF can return only a single column at the time. There are two different ways you can overcome this limitation:

  1. Return a column of complex type. The most general solution is a StructType but you can consider ArrayType or MapType as well.

     import org.apache.spark.sql.functions.udf
    
     val df = Seq(
       (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
     ).toDF("x", "y", "z")
    
     case class Foobar(foo: Double, bar: Double)
    
     val foobarUdf = udf((x: Long, y: Double, z: String) => 
       Foobar(x * y, z.head.toInt * y))
    
     val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
     df1.show
     // +---+----+---+------------+
     // |  x|   y|  z|      foobar|
     // +---+----+---+------------+
     // |  1| 3.0|  a| [3.0,291.0]|
     // |  2|-1.0|  b|[-2.0,-98.0]|
     // |  3| 0.0|  c|   [0.0,0.0]|
     // +---+----+---+------------+
    
     df1.printSchema
     // root
     //  |-- x: long (nullable = false)
     //  |-- y: double (nullable = false)
     //  |-- z: string (nullable = true)
     //  |-- foobar: struct (nullable = true)
     //  |    |-- foo: double (nullable = false)
     //  |    |-- bar: double (nullable = false)
    

    This can be easily flattened later but usually there is no need for that.

  2. Switch to RDD, reshape and rebuild DF:

     import org.apache.spark.sql.types._
     import org.apache.spark.sql.Row
    
     def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = 
       Seq(x * y, z.head.toInt * y)
     
     val schema = StructType(df.schema.fields ++
       Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))
    
     val rows = df.rdd.map(r => Row.fromSeq(
       r.toSeq ++
       foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))
    
     val df2 = sqlContext.createDataFrame(rows, schema)
    
     df2.show
     // +---+----+---+----+-----+
     // |  x|   y|  z| foo|  bar|
     // +---+----+---+----+-----+
     // |  1| 3.0|  a| 3.0|291.0|
     // |  2|-1.0|  b|-2.0|-98.0|
     // |  3| 0.0|  c| 0.0|  0.0|
     // +---+----+---+----+-----+
    

Solution 2 - Scala

Assume that after your function there will be a sequence of elements, giving an example as below:

val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
df.show
+------------------+---+
|          infoComb|age|
+------------------+---+
|Mike,1986,Toronto| 30|
| Andre,1980,Ottawa| 36|
|  jill,1989,London| 27|
+------------------+---+

now what you can do with this infoComb is that you can start split the string and get more columns with:

df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
+-----+----------+-------+---+
| name|yearOfBorn|   city|age|
+-----+----------+-------+---+
|Mike|      1986|Toronto| 30|
|Andre|      1980| Ottawa| 36|
| jill|      1989| London| 27|
+-----+----------+-------+---+

Hope this helps.

Solution 3 - Scala

If your resulting columns will be of the same length as the original one, you can create brand new columns with withColumn function and by applying an udf. After this you can drop your original column, eg:

 val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
.withColumn("newCol2", myFun2(myDf("originalColumn"))
.drop(myDf("originalColumn"))

where myFun is an udf defined like this:

   def myFun= udf(
    (originalColumnContent : String) =>  {
      // do something with your original column content and return a new one
    }
  )

Solution 4 - Scala

I opted to create a function to flatten one column and then just call it simultaneously with the udf.

First define this:

implicit class DfOperations(df: DataFrame) {

  def flattenColumn(col: String) = {
    def addColumns(df: DataFrame, cols: Array[String]): DataFrame = {
      if (cols.isEmpty) df
      else addColumns(
        df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)),
        cols.tail
      )
    }

    val field = df.select(col).schema.fields(0)
    val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name)

    addColumns(df, newCols).drop(col)
  }

  def withColumnMany(colName: String, col: Column) = {
    df.withColumn(colName, col).flattenColumn(colName)
  }

}

Then usage is very simple:

case class MyClass(a: Int, b: Int)

val df = sc.parallelize(Seq(
  (0),
  (1)
)).toDF("x")

val f = udf((x: Int) => MyClass(x*2,x*3))

df.withColumnMany("test", f($"x")).show()

//  +---+------+------+
//  |  x|test_a|test_b|
//  +---+------+------+
//  |  0|     0|     0|
//  |  1|     2|     3|
//  +---+------+------+

Solution 5 - Scala

This can be easily achieved by using pivot function

df4.groupBy("year").pivot("course").sum("earnings").collect() 

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionsshroffView Question on Stackoverflow
Solution 1 - Scalazero323View Answer on Stackoverflow
Solution 2 - ScalaEdwinGuoView Answer on Stackoverflow
Solution 3 - ScalaTheMPView Answer on Stackoverflow
Solution 4 - ScalaPekkaView Answer on Stackoverflow
Solution 5 - ScalaAbhishek KgskView Answer on Stackoverflow