PySpark create new column with mapping from a dict

PythonApache SparkDictionaryPysparkApache Spark-Sql

Python Problem Overview


Using Spark 1.6, I have a Spark DataFrame column (named let's say col1) with values A, B, C, DS, DNS, E, F, G and H and I want to create a new column (say col2) with the values from the dict here below, how do I map this? (so f.i. 'A' needs to be mapped to 'S' etc..)

dict = {'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}

Python Solutions


Solution 1 - Python

Inefficient solution with UDF (version independent):

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def translate(mapping):
    def translate_(col):
        return mapping.get(col)
    return udf(translate_, StringType())

df = sc.parallelize([('DS', ), ('G', ), ('INVALID', )]).toDF(['key'])
mapping = {
    'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 
    'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}

df.withColumn("value", translate(mapping)("key"))

with the result:

+-------+-----+
|    key|value|
+-------+-----+
|     DS|    S|
|      G|   NS|
|INVALID| null|
+-------+-----+

Much more efficient (Spark >= 2.0, Spark < 3.0) is to create a MapType literal:

from pyspark.sql.functions import col, create_map, lit
from itertools import chain

mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])

df.withColumn("value", mapping_expr.getItem(col("key")))

with the same result:

+-------+-----+
|    key|value|
+-------+-----+
|     DS|    S|
|      G|   NS|
|INVALID| null|
+-------+-----+

but more efficient execution plan:

== Physical Plan ==
*Project [key#15, keys: [B,DNS,DS,F,E,H,C,G,A], values: [S,S,S,NS,NS,NS,S,NS,S][key#15] AS value#53]
+- Scan ExistingRDD[key#15]

compared to UDF version:

== Physical Plan ==
*Project [key#15, pythonUDF0#61 AS value#57]
+- BatchEvalPython [translate_(key#15)], [key#15, pythonUDF0#61]
   +- Scan ExistingRDD[key#15]

In Spark >= 3.0 getItem should be replaced with __getitem__ ([]), i.e:

df.withColumn("value", mapping_expr[col("key")]).show()

Solution 2 - Python

Sounds like the simplest solution would be to use the replace function: http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace

mapping= {
        'A': '1',
        'B': '2'
    }
df2 = df.replace(to_replace=mapping, subset=['yourColName'])

Solution 3 - Python

If you want to create a map col from a nested dictionary you can use this:

def create_map(d,):
    if type(d) != dict:
        return F.lit(d)

    level_map = []
    for k in d:
        level_map.append(F.lit(k))
        level_map.append(create_map(d[k]))
    return F.create_map(level_map)

d = {'a': 1, 'b': {'c': 2, 'd': 'blah'}}
print(create_map(d)) # <- Column<b'map(a, 1, b, map(c, 2, d, blah))'>

Solution 4 - Python

you can use the function which convert dictionary into case syntax in Spark SQL

func_mapper = lambda dic,col,default : f"(CASE {col} WHEN " + " WHEN ".join([ f"'{k}' THEN '{v}'" for (k,v) in dic.items() ]) + f" ELSE '{default}' END)"

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
Questionad_sView Question on Stackoverflow
Solution 1 - Pythonzero323View Answer on Stackoverflow
Solution 2 - PythonHaim BendananView Answer on Stackoverflow
Solution 3 - PythonBenView Answer on Stackoverflow
Solution 4 - PythonAshwini JindalView Answer on Stackoverflow