pandas multiprocessing apply
PythonPandasMultiprocessingPython Problem Overview
I'm trying to use multiprocessing with pandas dataframe, that is split the dataframe to 8 parts. apply some function to each part using apply (with each part processed in different process).
EDIT: Here's the solution I finally found:
import multiprocessing as mp
import pandas.util.testing as pdt
def process_apply(x):
# do some stuff to data here
def process(df):
res = df.apply(process_apply, axis=1)
return res
if __name__ == '__main__':
p = mp.Pool(processes=8)
split_dfs = np.array_split(big_df,8)
pool_results = p.map(aoi_proc, split_dfs)
p.close()
p.join()
# merging parts processed by different processes
parts = pd.concat(pool_results, axis=0)
# merging newly calculated parts to big_df
big_df = pd.concat([big_df, parts], axis=1)
# checking if the dfs were merged correctly
pdt.assert_series_equal(parts['id'], big_df['id'])
Python Solutions
Solution 1 - Python
You can use https://github.com/nalepae/pandarallel, as in the following example:
from pandarallel import pandarallel
from math import sin
pandarallel.initialize()
def func(x):
return sin(x**2)
df.parallel_apply(func, axis=1)
Solution 2 - Python
A more generic version based on the author solution, that allows to run it on every function and dataframe:
from multiprocessing import Pool
from functools import partial
import numpy as np
def parallelize(data, func, num_of_processes=8):
data_split = np.array_split(data, num_of_processes)
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
def run_on_subset(func, data_subset):
return data_subset.apply(func, axis=1)
def parallelize_on_rows(data, func, num_of_processes=8):
return parallelize(data, partial(run_on_subset, func), num_of_processes)
So the following line:
df.apply(some_func, axis=1)
Will become:
parallelize_on_rows(df, some_func)
Solution 3 - Python
This is some code that I found useful. Automatically splits the dataframe into however many cpu cores you have.
import pandas as pd
import numpy as np
import multiprocessing as mp
def parallelize_dataframe(df, func):
num_processes = mp.cpu_count()
df_split = np.array_split(df, num_processes)
with mp.Pool(num_processes) as p:
df = pd.concat(p.map(func, df_split))
return df
def parallelize_function(df):
df[column_output] = df[column_input].apply(example_function)
return df
def example_function(x):
x = x*2
return x
To run:
df_output = parallelize_dataframe(df, parallelize_function)
Solution 4 - Python
Since I don't have much of your data script, this is a guess, but I'd suggest using p.map
instead of apply_async
with the callback.
p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
results.extend(result)
Solution 5 - Python
This worked well for me:
rows_iter = (row for _, row in df.iterrows())
with multiprocessing.Pool() as pool:
df['new_column'] = pool.map(process_apply, rows_iter)
Solution 6 - Python
To use all (physical or logical) cores, you could try mapply
as an alternative to swifter
and pandarallel
.
You can set the amount of cores (and the chunking behaviour) upon init:
import pandas as pd
import mapply
mapply.init(n_workers=-1)
def process_apply(x):
# do some stuff to data here
def process(df):
# spawns a pathos.multiprocessing.ProcessPool if sensible
res = df.mapply(process_apply, axis=1)
return res
By default (n_workers=-1
), the package uses all physical CPUs available on the system. If your system uses hyper-threading (usually twice the amount of physical CPUs would show up), mapply
will spawn one extra worker to prioritise the multiprocessing pool over other processes on the system.
You could also use all logical cores instead (beware that like this the CPU-bound processes will be fighting for physical CPUs, which might slow down your operation):
import multiprocessing
n_workers = multiprocessing.cpu_count()
# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)
Solution 7 - Python
I also run into the same problem when I use multiprocessing.map()
to apply function to different chunk of a large dataframe.
I just want to add several points just in case other people run into the same problem as I do.
- remember to add
if __name__ == '__main__':
- execute the file in a
.py
file, if you useipython/jupyter notebook
, then you can not runmultiprocessing
(this is true for my case, though I have no clue)
Solution 8 - Python
Install Pyxtension that simplifies using parallel map and use like this:
from pyxtension.streams import stream
big_df = pd.concat(stream(np.array_split(df, multiprocessing.cpu_count())).mpmap(process))
Solution 9 - Python
I ended up using concurrent.futures.ProcessPoolExecutor.map
in place of multiprocessing.Pool.map
which took 316 microseconds for some code that took 12 seconds in serial.
Solution 10 - Python
Python's pool.starmap() method can be used to succinctly introduce parallelism also to apply
use cases where column values are passed as arguments, i.e. to cases like:
df.apply(lambda row: my_func(row["col_1"], row["col_2"], ...), axis=1)
Full example and benchmarking:
import time
from multiprocessing import Pool
import numpy as np
import pandas as pd
def mul(a, b, c):
# For illustration, could obviously be vectorized
return a * b * c
df = pd.DataFrame(np.random.randint(0, 100, size=(10_000_000, 3)), columns=list('ABC'))
# Standard apply
start = time.time()
df["mul"] = df.apply(lambda row: mul(row["A"], row["B"], row["C"]), axis=1)
print(f"Standard apply took {time.time() - start:.0f} seconds.")
# Starmap apply
start = time.time()
with Pool(10) as pool:
df["mul_pool"] = pool.starmap(mul, zip(df["A"], df["B"], df["C"]))
print(f"Starmap apply took {time.time() - start:.0f} seconds.")
pd.testing.assert_series_equal(df["mul"], df["mul_pool"], check_names=False)
>>> Standard apply took 72 seconds.
>>> Starmap apply took 5 seconds.
This has the benefit of not relying on external libraries, plus being very readable.
Solution 11 - Python
Tom Raz's answer https://stackoverflow.com/a/53135031/11847090 misses an edge case where there are fewer rows in the dataframe than processes
use this parallelize method instead
def parallelize(data, func, num_of_processes=8):
# check if the number of rows is less than the number of processes
# to avoid the following error
# ValueError: Expected a 1D array, got an array with shape
num_rows = len(data)
if num_rows == 0:
return None
elif num_rows < num_of_processes:
num_of_processes = num_rows
data_split = np.array_split(data, num_of_processes)
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
and also I used dask bag to multithread this instead of this custom code