Spark functions vs UDF performance?

PerformanceApache SparkPysparkApache Spark-SqlUser Defined-Functions

Performance Problem Overview


Spark now offers predefined functions that can be used in dataframes, and it seems they are highly optimized. My original question was going to be on which is faster, but I did some testing myself and found the spark functions to be about 10 times faster at least in one instance. Does anyone know why this is so, and when would a udf be faster (only for instances that an identical spark function exists)?

Here is my testing code (ran on Databricks community ed):

# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)

# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
  name = fake.name().split()
  return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)

# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
    for _ in xrange(times):
        yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print len(data)
data[0]

dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))
dataDF.cache()

UDF function:

concat_s = udf(lambda s: s+ 's')
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name'))
udfData.count()

Spark Function:

spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name'))
spfData.count()

Ran both multiple times, the udf usually took about 1.1 - 1.4 s, and the Spark concat function always took under 0.15 s.

Performance Solutions


Solution 1 - Performance

> when would a udf be faster

If you ask about Python UDF the answer is probably never*. Since SQL functions are relatively simple and are not designed for complex tasks it is pretty much impossible compensate the cost of repeated serialization, deserialization and data movement between Python interpreter and JVM.

> Does anyone know why this is so

The main reasons are already enumerated above and can be reduced to a simple fact that Spark DataFrame is natively a JVM structure and standard access methods are implemented by simple calls to Java API. UDF from the other hand are implemented in Python and require moving data back and forth.

While PySpark in general requires data movements between JVM and Python, in case of low level RDD API it typically doesn't require expensive serde activity. Spark SQL adds additional cost of serialization and serialization as well cost of moving data from and to unsafe representation on JVM. The later one is specific to all UDFs (Python, Scala and Java) but the former one is specific to non-native languages.

Unlike UDFs, Spark SQL functions operate directly on JVM and typically are well integrated with both Catalyst and Tungsten. It means these can be optimized in the execution plan and most of the time can benefit from codgen and other Tungsten optimizations. Moreover these can operate on data in its "native" representation.

So in a sense the problem here is that Python UDF has to bring data to the code while SQL expressions go the other way around.


* According to rough estimates PySpark window UDF can beat Scala window function.

Solution 2 - Performance

After years, when I have a more spark knowledge and had second look on the question, just realized what @alfredox really want to ask. So I revised again, and divide the answer into two parts:


To answer Why native DF function (native Spark-SQL function) is faster:

Basically, why native Spark function is ALWAYS faster than Spark UDF, regardless your UDF is implemented in Python or Scala.

Firstly, we need to understand what Tungsten, which is firstly introduced in Spark 1.4.

It is a backend and what it focus on: > 1. Off-Heap Memory Management using binary in-memory data representation aka Tungsten row format and managing memory explicitly, > 2. Cache Locality which is about cache-aware computations with cache-aware layout for high cache hit rates, > 3. Whole-Stage Code Generation (aka CodeGen).

One of the biggest Spark performance killer is GC. The GC would pause the every threads in JVM until the GC finished. This is exactly why Off-Heap Memory Management being introduced.

When executing Spark-SQL native functions, the data will stays in tungsten backend. However, in Spark UDF scenario, the data will be moved out from tungsten into JVM (Scala scenario) or JVM and Python Process (Python) to do the actual process, and then move back into tungsten. As a result of that:

  1. Inevitably, there would be a overhead / penalty on :
  2. Deserialize the input from tungsten.
  3. Serialize the output back into tungsten.
  4. Even using Scala, the first-class citizen in Spark, it will increase the memory footprint within JVM, and which may likely involve more GC within JVM. This issue exactly what tungsten "Off-Heap Memory Management" feature try to address.

To answer if Python would necessarily slower than Scala:

Since 30th October, 2017, Spark just introduced vectorized udfs for pyspark.

https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way:

According to the paragraph from the link.

> Spark added a Python API in version 0.7, with support for user-defined functions. These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead.

However the newly vectorized udfs seem to be improving the performance a lot: > ranging from 3x to over 100x.

enter image description here

Solution 3 - Performance

Use the higher-level standard Column-based functions with Dataset operators whenever possible before reverting to using your own custom UDF functions since UDFs are a BlackBox for Spark and so it does not even try to optimize them.

What actually happens behind the screens, is that the Catalyst can’t process and optimize UDFs at all, and it threats them as BlackBox, which results in losing many optimizations like Predicate pushdown, Constant folding and many others.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionalfredoxView Question on Stackoverflow
Solution 1 - Performancezero323View Answer on Stackoverflow
Solution 2 - PerformanceTom TangView Answer on Stackoverflow
Solution 3 - PerformanceSharhabeel HamdanView Answer on Stackoverflow