Efficiently applying a function to a grouped pandas DataFrame in parallel

PythonPandasMultiprocessingShared Memory

Python Problem Overview


I often need to apply a function to the groups of a very large DataFrame (of mixed data types) and would like to take advantage of multiple cores.

I can create an iterator from the groups and use the multiprocessing module, but it is not efficient because every group and the results of the function must be pickled for messaging between processes.

Is there any way to avoid the pickling or even avoid the copying of the DataFrame completely? It looks like the shared memory functions of the multiprocessing modules are limited to numpy arrays. Are there any other options?

Python Solutions


Solution 1 - Python

From the comments above, it seems that this is planned for pandas some time (there's also an interesting-looking rosetta project which I just noticed).

However, until every parallel functionality is incorporated into pandas, I noticed that it's very easy to write efficient & non-memory-copying parallel augmentations to pandas directly using cython + OpenMP and C++.

Here's a short example of writing a parallel groupby-sum, whose use is something like this:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

and output is:

     sum
key     
0      6
1      11
2      4

Note Doubtlessly, this simple example's functionality will eventually be part of pandas. Some things, however, will be more natural to parallelize in C++ for some time, and it's important to be aware of how easy it is to combine this into pandas.


To do this, I wrote a simple single-source-file extension whose code follows.

It starts with some imports and type definitions

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

The C++ unordered_map type is for summing by a single thread, and the vector is for summing by all threads.

Now to the function sum. It starts off with typed memory views for fast access:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

The function continues by dividing the semi-equally to the threads (here hardcoded to 4), and having each thread sum the entries in its range:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

When the threads have completed, the function merges all the results (from the different ranges) into a single unordered_map:

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

All that's left is to create a DataFrame and return the results:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
Questionuser2303View Question on Stackoverflow
Solution 1 - PythonAmi TavoryView Answer on Stackoverflow