How to melt Spark DataFrame?

Apache SparkPysparkApache Spark-SqlMelt

Apache Spark Problem Overview


Is there an equivalent of Pandas Melt function in Apache Spark in PySpark or at least in Scala?

I was running a sample dataset till now in Python and now I want to use Spark for the entire dataset.

Apache Spark Solutions


Solution 1 - Apache Spark

There is no built-in function (if you work with SQL and Hive support enabled you can use stack function, but it is not exposed in Spark and has no native implementation) but it is trivial to roll your own. Required imports:

from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable 

Example implementation:

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

And some tests (based on Pandas doctests):

import pandas as pd

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6}})

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])

   A variable  value
0  a        B      1
1  b        B      3
2  c        B      5
3  a        C      2
4  b        C      4
5  c        C      6

sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()

+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

Note: For use with legacy Python versions remove type annotations.

Related:

Solution 2 - Apache Spark

Came across this question in my search for an implementation of melt in Spark for Scala.

Posting my Scala port in case someone also stumbles upon this.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/** Extends the [[org.apache.spark.sql.DataFrame]] class
 *
 *  @param df the data frame to melt
 */
implicit class DataFrameFunctions(df: DataFrame) {
    
    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
     * 
     *  melt is (kind of) the inverse of pivot
     *  melt is currently (02/2017) not implemented in spark
     *
     *  @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
     *  @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark
     *  
     *  @todo method overloading for simple calling
     *
     *  @param id_vars the columns to preserve
     *  @param value_vars the columns to melt
     *  @param var_name the name for the column holding the melted columns names
     *  @param value_name the name for the column holding the values of the melted columns
     *
     */

    def melt(
            id_vars: Seq[String], value_vars: Seq[String], 
            var_name: String = "variable", value_name: String = "value") : DataFrame = {

        // Create array<struct<variable: str, value: ...>>
        val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)

        // Add to the DataFrame and explode
        val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

        val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}

        return _tmp.select(cols: _*)

    }
}

Since I'm am not that advanced considering Scala, I'm sure there is room for improvement.

Any comments are welcome.

Solution 3 - Apache Spark

Voted for user6910411's answer. It works as expected, however, it cannot handle None values well. thus I refactored his melt function to the following:

from pyspark.sql.functions import array, col, explode, lit
from pyspark.sql.functions import create_map
from pyspark.sql import DataFrame
from typing import Iterable 
from itertools import chain

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create map<key: value>
    _vars_and_vals = create_map(
        list(chain.from_iterable([
            [lit(c), col(c)] for c in value_vars]
        ))
    )

    _tmp = df.select(*id_vars, explode(_vars_and_vals)) \
        .withColumnRenamed('key', var_name) \
        .withColumnRenamed('value', value_name)
    
    return _tmp

Test is with the following dataframe:

import pandas as pd

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6},
                   'D': {1: 7, 2: 9}})

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C', 'D'])

A	variable	value
0	a	B	1.0
1	b	B	3.0
2	c	B	5.0
3	a	C	2.0
4	b	C	4.0
5	c	C	6.0
6	a	D	NaN
7	b	D	7.0
8	c	D	9.0

sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C', 'D']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|  1.0|
|  a|       C|  2.0|
|  a|       D|  NaN|
|  b|       B|  3.0|
|  b|       C|  4.0|
|  b|       D|  7.0|
|  c|       B|  5.0|
|  c|       C|  6.0|
|  c|       D|  9.0|
+---+--------+-----+

Solution 4 - Apache Spark

UPD

Finally i've found most effective implementation for me. It uses all resources for cluster in my yarn configuration.

from pyspark.sql.functions import explode
def melt(df):
    sp = df.columns[1:]
    return (df
            .rdd
            .map(lambda x: [str(x[0]), [(str(i[0]), 
                                         float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]], 
                 preservesPartitioning = True)
            .toDF()
            .withColumn('_2', explode('_2'))
            .rdd.map(lambda x: [str(x[0]), 
                                str(x[1][0]), 
                                float(x[1][1] if x[1][1] else 0)], 
                     preservesPartitioning = True)
            .toDF()
            )

For very wide dataframe I've got performance decreasing at _vars_and_vals generation from user6910411 answer.

It was useful to implement melting via selectExpr

columns=['a', 'b', 'c', 'd', 'e', 'f']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|
|  4|  5|  6|  7|  9|  8|
|  7|  8|  9|  1|  2|  4|
|  8|  3|  9|  8|  7|  4|
+---+---+---+---+---+---+

cols = df.columns[1:]
df.selectExpr('a', "stack({}, {})".format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in cols))))
+---+----+----+
|  a|col0|col1|
+---+----+----+
|  1|   b|   2|
|  1|   c|   3|
|  1|   d|   4|
|  1|   e|   5|
|  1|   f|   6|
|  4|   b|   5|
|  4|   c|   6|
|  4|   d|   7|
|  4|   e|   9|
|  4|   f|   8|
|  7|   b|   8|
|  7|   c|   9|
...

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionVenkatesh DurgumahanthiView Question on Stackoverflow
Solution 1 - Apache Sparkzero323View Answer on Stackoverflow
Solution 2 - Apache SparkAhueView Answer on Stackoverflow
Solution 3 - Apache SparkWei LiView Answer on Stackoverflow
Solution 4 - Apache SparkAnton AlekseevView Answer on Stackoverflow