Is there a simple process-based parallel map for python?

PythonParallel ProcessingSmp

Python Problem Overview


I'm looking for a simple process-based parallel map for python, that is, a function

parmap(function,[data])

that would run function on each element of [data] on a different process (well, on a different core, but AFAIK, the only way to run stuff on different cores in python is to start multiple interpreters), and return a list of results.

Does something like this exist? I would like something simple, so a simple module would be nice. Of course, if no such thing exists, I will settle for a big library :-/

Python Solutions


Solution 1 - Python

I seems like what you need is the map method in multiprocessing.Pool():

> map(func, iterable[, chunksize]) > > A parallel equivalent of the map() built-in function (it supports only > one iterable argument though). It blocks till the result is ready. > > This method chops the iterable into a number of chunks which it submits to the > process pool as separate tasks. The (approximate) size of these chunks can be > specified by setting chunksize to a positive integ

For example, if you wanted to map this function:

def f(x):
    return x**2

to range(10), you could do it using the built-in map() function:

map(f, range(10))

or using a multiprocessing.Pool() object's method map():

import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))

Solution 2 - Python

This can be done elegantly with Ray, a system that allows you to easily parallelize and distribute your Python code.

To parallelize your example, you'd need to define your map function with the @ray.remote decorator, and then invoke it with .remote. This will ensure that every instance of the remote function will executed in a different process.

import time
import ray

ray.init()

# Define the function you want to apply map on, as remote function. 
@ray.remote
def f(x):
    # Do some work...
    time.sleep(1)
    return x*x

# Define a helper parmap(f, list) function.
# This function executes a copy of f() on each element in "list".
# Each copy of f() runs in a different process.
# Note f.remote(x) returns a future of its result (i.e., 
# an identifier of the result) rather than the result itself.  
def parmap(f, list):
    return [f.remote(x) for x in list]

# Call parmap() on a list consisting of first 5 integers.
result_ids = parmap(f, range(1, 6))

# Get the results
results = ray.get(result_ids)
print(results)

This will print:

[1, 4, 9, 16, 25]

and it will finish in approximately len(list)/p (rounded up the nearest integer) where p is number of cores on your machine. Assuming a machine with 2 cores, our example will execute in 5/2 rounded up, i.e, in approximately 3 sec.

There are a number of advantages of using Ray over the multiprocessing module. In particular, the same code will run on a single machine as well as on a cluster of machines. For more advantages of Ray see this related post.

Solution 3 - Python

For those who looking for Python equivalent of R's mclapply(), here is my implementation. It is an improvement of the following two examples:

It can be apply to map functions with single or multiple arguments.

import numpy as np, pandas as pd
from scipy import sparse
import functools, multiprocessing
from multiprocessing import Pool

num_cores = multiprocessing.cpu_count()

def parallelize_dataframe(df, func, U=None, V=None):
    
    #blockSize = 5000
    num_partitions = 5 # int( np.ceil(df.shape[0]*(1.0/blockSize)) )
    blocks = np.array_split(df, num_partitions)
    
    pool = Pool(num_cores)
    if V is not None and U is not None:
        # apply func with multiple arguments to dataframe (i.e. involves multiple columns)
        df = pd.concat(pool.map(functools.partial(func, U=U, V=V), blocks))
    else:
        # apply func with one argument to dataframe (i.e. involves single column)
        df = pd.concat(pool.map(func, blocks))
    
    pool.close()
    pool.join()
    
    return df

def square(x):
    return x**2

def test_func(data):
    print("Process working on: ", data.shape)
    data["squareV"] = data["testV"].apply(square)
    return data

def vecProd(row, U, V):
    return np.sum( np.multiply(U[int(row["obsI"]),:], V[int(row["obsJ"]),:]) )

def mProd_func(data, U, V):
    data["predV"] = data.apply( lambda row: vecProd(row, U, V), axis=1 )
    return data

def generate_simulated_data():
    
    N, D, nnz, K = [302, 184, 5000, 5]
    I = np.random.choice(N, size=nnz, replace=True)
    J = np.random.choice(D, size=nnz, replace=True)
    vals = np.random.sample(nnz)
    
    sparseY = sparse.csc_matrix((vals, (I, J)), shape=[N, D])

    # Generate parameters U and V which could be used to reconstruct the matrix Y
    U = np.random.sample(N*K).reshape([N,K])
    V = np.random.sample(D*K).reshape([D,K])
    
    return sparseY, U, V

def main():
    Y, U, V = generate_simulated_data()
    
    # find row, column indices and obvseved values for sparse matrix Y
    (testI, testJ, testV) = sparse.find(Y)
    
    colNames = ["obsI", "obsJ", "testV", "predV", "squareV"]
    dtypes = {"obsI":int, "obsJ":int, "testV":float, "predV":float, "squareV": float}
    
    obsValDF = pd.DataFrame(np.zeros((len(testV), len(colNames))), columns=colNames)
    obsValDF["obsI"] = testI
    obsValDF["obsJ"] = testJ
    obsValDF["testV"] = testV
    obsValDF = obsValDF.astype(dtype=dtypes)
    
    print("Y.shape: {!s}, #obsVals: {}, obsValDF.shape: {!s}".format(Y.shape, len(testV), obsValDF.shape))

    # calculate the square of testVals    
    obsValDF = parallelize_dataframe(obsValDF, test_func)
    
    # reconstruct prediction of testVals using parameters U and V
    obsValDF = parallelize_dataframe(obsValDF, mProd_func, U, V)
    
    print("obsValDF.shape after reconstruction: {!s}".format(obsValDF.shape))
    print("First 5 elements of obsValDF:\n", obsValDF.iloc[:5,:])
 
if __name__ == '__main__':
    main()

Solution 4 - Python

Python3's Pool class has a map() method and that's all you need to parallelize map:

from multiprocessing import Pool

with Pool() as P:
    xtransList = P.map(some_func, a_list)

Using with Pool() as P is similar to a process pool and will execute each item in the list in parallel. You can provide the number of cores:

with Pool(processes=4) as P:

Solution 5 - Python

I know this is an old post, but just in case, I wrote a tool to make this super, super easy called parmapper (I actually call it parmap in my use but the name was taken).

It handles a lot of the setup and deconstruction of processes and adds tons of features. In rough order of importance

  • Can take lambda and other unpickleable functions
  • Can apply starmap and other similar call methods to make it very easy to directly use.
  • Can split amongst both threads and/or processes
  • Includes features such as progress bars

It does incur a small cost but for most uses, that is negligible.

I hope you find it useful.

(Note: It, like map in Python 3+, returns an iterable so if you expect all results to pass through it immediately, use list())

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
Questionstatic_rttiView Question on Stackoverflow
Solution 1 - PythonFlávio AmieiroView Answer on Stackoverflow
Solution 2 - PythonIon StoicaView Answer on Stackoverflow
Solution 3 - PythonGood WillView Answer on Stackoverflow
Solution 4 - PythonbressonView Answer on Stackoverflow
Solution 5 - PythonJustin WinokurView Answer on Stackoverflow