Make Pandas DataFrame apply() use all cores?

PandasDask

Pandas Problem Overview


As of August 2017, Pandas DataFame.apply() is unfortunately still limited to working with a single core, meaning that a multi-core machine will waste the majority of its compute-time when you run df.apply(myfunc, axis=1).

How can you use all your cores to run apply on a dataframe in parallel?

Pandas Solutions


Solution 1 - Pandas

The simplest way is to use Dask's map_partitions. You need these imports (you will need to pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

and the syntax is

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(I believe that 30 is a suitable number of partitions if you have 16 cores). Just for completeness, I timed the difference on my machine (16 cores):

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

> 28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

> 2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

> 0.010668013244867325

Giving a factor of 10 speedup going from pandas apply to dask apply on partitions. Of course, if you have a function you can vectorize, you should - in this case the function (y*(x**2+1)) is trivially vectorized, but there are plenty of things that are impossible to vectorize.

Solution 2 - Pandas

You may use the swifter package:

pip install swifter

(Note that you may want to use this in a virtualenv to avoid version conflicts with installed dependencies.)

Swifter works as a plugin for pandas, allowing you to reuse the apply function:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

It will automatically figure out the most efficient way to parallelize the function, no matter if it's vectorized (as in the above example) or not.

More examples and a performance comparison are available on GitHub. Note that the package is under active development, so the API may change.

Also note that this will not work automatically for string columns. When using strings, Swifter will fallback to a “simple” Pandas apply, which will not be parallel. In this case, even forcing it to use dask will not create performance improvements, and you would be better off just splitting your dataset manually and parallelizing using multiprocessing.

Solution 3 - Pandas

you can try pandarallel instead: A simple and efficient tool to parallelize your pandas operations on all your CPUs (On Linux & macOS)

  • Parallelization has a cost (instanciating new processes, sending data via shared memory, etc ...), so parallelization is efficiant only if the amount of calculation to parallelize is high enough. For very little amount of data, using parallezation not always worth it.
  • Functions applied should NOT be lambda functions.
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

see https://github.com/nalepae/pandarallel

Solution 4 - Pandas

If you want to stay in native python:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

will apply function f in a parallel fashion to column col of dataframe df

Solution 5 - Pandas

Just want to give an update answer for Dask

import dask.dataframe as dd

def your_func(row):
  #do something
  return row

ddf = dd.from_pandas(df, npartitions=30) # find your own number of partitions
ddf_update = ddf.apply(your_func, axis=1).compute()

On my 100,000 records, without Dask:

CPU times: user 6min 32s, sys: 100 ms, total: 6min 32s Wall time: 6min 32s

With Dask:

CPU times: user 5.19 s, sys: 784 ms, total: 5.98 s Wall time: 1min 3s

Solution 6 - Pandas

To use all (physical or logical) cores, you could try mapply as an alternative to swifter and pandarallel.

You can set the amount of cores (and the chunking behaviour) upon init:

import pandas as pd
import mapply

mapply.init(n_workers=-1)

...

df.mapply(myfunc, axis=1)

By default (n_workers=-1), the package uses all physical CPUs available on the system. If your system uses hyper-threading (usually twice the amount of physical CPUs would show up), mapply will spawn one extra worker to prioritise the multiprocessing pool over other processes on the system.

Depending on your definition of all your cores, you could also use all logical cores instead (beware that like this the CPU-bound processes will be fighting for physical CPUs, which might slow down your operation):

import multiprocessing
n_workers = multiprocessing.cpu_count()

# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)

Solution 7 - Pandas

Here is an example of sklearn base transformer, in which pandas apply is parallelized

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1
        
        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)
        
        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)
        
        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)
        
        pool = mp.Pool(cores)
        
        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )
        
        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

for more info see https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

Solution 8 - Pandas

Here another one using Joblib and some helper code from scikit-learn. Lightweight (if you already have scikit-learn), good if you prefer more control over what it is doing since joblib is easily hackable.

from joblib import parallel_backend, Parallel, delayed, effective_n_jobs
from sklearn.utils import gen_even_slices
from sklearn.utils.validation import _num_samples


def parallel_apply(df, func, n_jobs= -1, **kwargs):
    """ Pandas apply in parallel using joblib. 
    Uses sklearn.utils to partition input evenly.
    
    Args:
        df: Pandas DataFrame, Series, or any other object that supports slicing and apply.
        func: Callable to apply
        n_jobs: Desired number of workers. Default value -1 means use all available cores.
        **kwargs: Any additional parameters will be supplied to the apply function
        
    Returns:
        Same as for normal Pandas DataFrame.apply()
        
    """
    
    if effective_n_jobs(n_jobs) == 1:
        return df.apply(func, **kwargs)
    else:
        ret = Parallel(n_jobs=n_jobs)(
            delayed(type(df).apply)(df[s], func, **kwargs)
            for s in gen_even_slices(_num_samples(df), effective_n_jobs(n_jobs)))
        return pd.concat(ret)

Usage: result = parallel_apply(my_dataframe, my_func)

Solution 9 - Pandas

Instead of

df["new"] = df["old"].map(fun)

do

from joblib import Parallel, delayed
df["new"] = Parallel(n_jobs=-1, verbose=10)(delayed(fun)(i) for i in df["old"])

To me this is a slight improvement over

import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
    df["new"] = pool.map(fun, df["old"])

as you get a progress indication and automatic batching if the jobs are very small.

Solution 10 - Pandas

The native Python solution (with numpy) that can be applied on the whole DataFrame as the original question asks (not only on a single column)

import numpy as np
import multiprocessing as mp

dfs = np.array_split(df, 8000) # divide the dataframe as desired

def f_app(df):
    return df.apply(myfunc, axis=1)

with mp.Pool(mp.cpu_count()) as pool:
    res = pd.concat(pool.map(f_app, dfs))

Solution 11 - Pandas

Since the question was "How can you use all your cores to run apply on a dataframe in parallel?", the answer can also be with modin. You can run all cores in parallel, though the real time is worse.

See https://github.com/modin-project/modin . It runs of top of dask or ray. They say "Modin is a DataFrame designed for datasets from 1MB to 1TB+." I tried: pip3 install "modin"[ray]". Modin vs pandas was - 12 sec on six cores vs. 6 sec.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionRoko MijicView Question on Stackoverflow
Solution 1 - PandasRoko MijicView Answer on Stackoverflow
Solution 2 - PandasslhckView Answer on Stackoverflow
Solution 3 - Pandaskkkobelief24View Answer on Stackoverflow
Solution 4 - PandasOlivier CruchantView Answer on Stackoverflow
Solution 5 - PandasLYuView Answer on Stackoverflow
Solution 6 - PandasddelangeView Answer on Stackoverflow
Solution 7 - PandasMaxim BalatskoView Answer on Stackoverflow
Solution 8 - PandasYaroslavView Answer on Stackoverflow
Solution 9 - Pandas0-_-0View Answer on Stackoverflow
Solution 10 - PandasPavel ProchazkaView Answer on Stackoverflow
Solution 11 - PandasjaromraxView Answer on Stackoverflow