Pyspark: Pass multiple columns in UDF
Apache SparkPysparkSpark DataframeApache Spark Problem Overview
I am writing a User Defined Function which will take all the columns except the first one in a dataframe and do sum (or any other operation). Now the dataframe can sometimes have 3 columns or 4 columns or more. It will vary.
I know I can hard code 4 column names as pass in the UDF but in this case it will vary so I would like to know how to get it done?
Here are two examples in the first one we have two columns to add and in the second one we have three columns to add.
Apache Spark Solutions
Solution 1 - Apache Spark
If all columns you want to pass to UDF have the same data type you can use array as input parameter, for example:
>>> from pyspark.sql.types import IntegerType
>>> from pyspark.sql.functions import udf, array
>>> sum_cols = udf(lambda arr: sum(arr), IntegerType())
>>> spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B']) \
... .withColumn('Result', sum_cols(array('A', 'B'))).show()
+---+---+---+------+
| ID| A| B|Result|
+---+---+---+------+
|101| 1| 16| 17|
+---+---+---+------+
>>> spark.createDataFrame([(101, 1, 16, 8)], ['ID', 'A', 'B', 'C'])\
... .withColumn('Result', sum_cols(array('A', 'B', 'C'))).show()
+---+---+---+---+------+
| ID| A| B| C|Result|
+---+---+---+---+------+
|101| 1| 16| 8| 25|
+---+---+---+---+------+
Solution 2 - Apache Spark
Another simple way without Array and Struct.
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def sum(x, y):
return x + y
sum_cols = udf(sum, IntegerType())
a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols('A', 'B')).show()
Solution 3 - Apache Spark
Use struct instead of array
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct
sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols(struct('A', 'B'))).show()
Solution 4 - Apache Spark
Maybe it's a late answer, but I don't like using UDFs without necessity, so:
from pyspark.sql.functions import col
from functools import reduce
data = [["a",1,2,5],["b",2,3,7],["c",3,4,8]]
df = spark.createDataFrame(data,["id","v1","v2",'v3'])
calculate = reduce(lambda a, x: a+x, map(col, ["v1","v2",'v3']))
df.withColumn("Result", calculate)
#
#id v1 v2 v3 Result
#a 1 2 5 8
#b 2 3 7 12
#c 3 4 8 15
Here u could to use any operation which implement in Column
. Also if u want to write a custom udf
with specific logic, u could use it, because Column
provide tree execution operations. Without collecting to array and sum on it.
If compared with process as array operations, it will be bad from performance perspective, let's take a look at the physical plan, in my case and array case, in my case and array
cased.
my case:
== Physical Plan ==
*(1) Project [id#355, v1#356L, v2#357L, v3#358L, ((v1#356L + v2#357L) + v3#358L) AS Result#363L]
+- *(1) Scan ExistingRDD[id#355,v1#356L,v2#357L,v3#358L]
array case:
== Physical Plan ==
*(2) Project [id#339, v1#340L, v2#341L, v3#342L, pythonUDF0#354 AS Result#348]
+- BatchEvalPython [<lambda>(array(v1#340L, v2#341L, v3#342L))], [pythonUDF0#354]
+- *(1) Scan ExistingRDD[id#339,v1#340L,v2#341L,v3#342L]
When possible - we need to avoid using UDFs as Catalyst does not know how to optimize those
Solution 5 - Apache Spark
If you don't want to type out all your column names and would rather just dump all the columns into your UDF, you'll need to wrap a list comprehension within a struct.
from pyspark.sql.functions import struct, udf
sum_udf = udf(lambda x: sum(x[1:]))
df_sum = df.withColumn("result", sum_udf(struct([df[col] for col in df.columns])))
Solution 6 - Apache Spark
This is the way I tried and seemed to work:
colsToSum = df.columns[1:]
df_sum = df.withColumn("rowSum", sum([df[col] for col in colsToSum]))
Solution 7 - Apache Spark
udf_ = spark.udf.register("udf_",self.funct)
print("registered udf................:",udf_)
df = df.withColumn('result',udf_(struct([df[col] for col in df.columns])))
print("after df call")
where self.funct is defined in another class and I am trying to register this function using spark.udf.register and call this function from df.withColumn and returning as result is not working ..
Output:
registered udf................:
but in real this is not entering into the funct function of the function class.
function class as follows class function(): def init: def funct(self,df): print("inside funct function") return F.col(S)*F.col(S)
S column is part of df data frame and int