Pyspark: Pass multiple columns in UDF

Apache SparkPysparkSpark Dataframe

Apache Spark Problem Overview


I am writing a User Defined Function which will take all the columns except the first one in a dataframe and do sum (or any other operation). Now the dataframe can sometimes have 3 columns or 4 columns or more. It will vary.

I know I can hard code 4 column names as pass in the UDF but in this case it will vary so I would like to know how to get it done?

Here are two examples in the first one we have two columns to add and in the second one we have three columns to add.

enter image description here

Apache Spark Solutions


Solution 1 - Apache Spark

If all columns you want to pass to UDF have the same data type you can use array as input parameter, for example:

>>> from pyspark.sql.types import IntegerType
>>> from pyspark.sql.functions import udf, array
>>> sum_cols = udf(lambda arr: sum(arr), IntegerType())
>>> spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B']) \
...     .withColumn('Result', sum_cols(array('A', 'B'))).show()
+---+---+---+------+
| ID|  A|  B|Result|
+---+---+---+------+
|101|  1| 16|    17|
+---+---+---+------+

>>> spark.createDataFrame([(101, 1, 16, 8)], ['ID', 'A', 'B', 'C'])\
...     .withColumn('Result', sum_cols(array('A', 'B', 'C'))).show()
+---+---+---+---+------+
| ID|  A|  B|  C|Result|
+---+---+---+---+------+
|101|  1| 16|  8|    25|
+---+---+---+---+------+

Solution 2 - Apache Spark

Another simple way without Array and Struct.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def sum(x, y):
    return x + y

sum_cols = udf(sum, IntegerType())

a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols('A', 'B')).show()

Solution 3 - Apache Spark

Use struct instead of array

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct
sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols(struct('A', 'B'))).show()

Solution 4 - Apache Spark

Maybe it's a late answer, but I don't like using UDFs without necessity, so:

from pyspark.sql.functions import col
from functools import reduce
data = [["a",1,2,5],["b",2,3,7],["c",3,4,8]]
df = spark.createDataFrame(data,["id","v1","v2",'v3'])

calculate = reduce(lambda a, x: a+x, map(col, ["v1","v2",'v3']))

df.withColumn("Result", calculate)
#
#id	v1	v2	v3	Result
#a	1	2	5	8
#b	2	3	7	12
#c	3	4	8	15

Here u could to use any operation which implement in Column. Also if u want to write a custom udf with specific logic, u could use it, because Column provide tree execution operations. Without collecting to array and sum on it.

If compared with process as array operations, it will be bad from performance perspective, let's take a look at the physical plan, in my case and array case, in my case and array cased.

my case:

== Physical Plan ==
*(1) Project [id#355, v1#356L, v2#357L, v3#358L, ((v1#356L + v2#357L) + v3#358L) AS Result#363L]
+- *(1) Scan ExistingRDD[id#355,v1#356L,v2#357L,v3#358L]

array case:

== Physical Plan ==
*(2) Project [id#339, v1#340L, v2#341L, v3#342L, pythonUDF0#354 AS Result#348]
+- BatchEvalPython [<lambda>(array(v1#340L, v2#341L, v3#342L))], [pythonUDF0#354]
   +- *(1) Scan ExistingRDD[id#339,v1#340L,v2#341L,v3#342L]

When possible - we need to avoid using UDFs as Catalyst does not know how to optimize those

Solution 5 - Apache Spark

If you don't want to type out all your column names and would rather just dump all the columns into your UDF, you'll need to wrap a list comprehension within a struct.

from pyspark.sql.functions import struct, udf
sum_udf = udf(lambda x: sum(x[1:]))
df_sum = df.withColumn("result", sum_udf(struct([df[col] for col in df.columns])))

Solution 6 - Apache Spark

This is the way I tried and seemed to work:

colsToSum = df.columns[1:]
df_sum = df.withColumn("rowSum", sum([df[col] for col in colsToSum]))

Solution 7 - Apache Spark

            udf_ = spark.udf.register("udf_",self.funct)
            print("registered udf................:",udf_)
            df = df.withColumn('result',udf_(struct([df[col] for col in df.columns]))) 
            print("after df call")

where self.funct is defined in another class and I am trying to register this function using spark.udf.register and call this function from df.withColumn and returning as result is not working ..

Output: registered udf................: after df call

but in real this is not entering into the funct function of the function class.

function class as follows class function(): def init: def funct(self,df): print("inside funct function") return F.col(S)*F.col(S)

S column is part of df data frame and int

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionsjishanView Question on Stackoverflow
Solution 1 - Apache SparkMariuszView Answer on Stackoverflow
Solution 2 - Apache SparkNeeraj BhadaniView Answer on Stackoverflow
Solution 3 - Apache SparkkaviView Answer on Stackoverflow
Solution 4 - Apache SparkРоберт НадьView Answer on Stackoverflow
Solution 5 - Apache SparkmdrishanView Answer on Stackoverflow
Solution 6 - Apache SparkTravis HensrudView Answer on Stackoverflow
Solution 7 - Apache SparkDebView Answer on Stackoverflow