What can multiprocessing and dill do together?

PythonMultiprocessingPickleDill

Python Problem Overview


I would like to use the multiprocessing library in Python. Sadly multiprocessing uses pickle which doesn't support functions with closures, lambdas, or functions in __main__. All three of these are important to me

In [1]: import pickle

In [2]: pickle.dumps(lambda x: x)
PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda>

Fortunately there is dill a more robust pickle. Apparently dill performs magic on import to make pickle work

In [3]: import dill

In [4]: pickle.dumps(lambda x: x)
Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ...

This is very encouraging, particularly because I don't have access to the multiprocessing source code. Sadly, I still can't get this very basic example to work

import multiprocessing as mp
import dill

p = mp.Pool(4)
print p.map(lambda x: x**2, range(10))

Why is this? What am I missing? Exactly what are the limitations on the multiprocessing+dill combination?

Temporary Edit for J.F Sebastian

mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py 
    Temporary Edit for J.F Sebastian

mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py 
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner
    self.run()
  File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

^C
...lots of junk...

[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling self.run()Exception in thread Thread-2:
Traceback (most recent call last):
  File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner
    self.run()
  File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

^C
...lots of junk...

[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling self.run()

Python Solutions


Solution 1 - Python

multiprocessing makes some bad choices about pickling. Don't get me wrong, it makes some good choices that enable it to pickle certain types so they can be used in a pool's map function. However, since we have dill that can do the pickling, multiprocessing's own pickling becomes a bit limiting. Actually, if multiprocessing were to use pickle instead of cPickle... and also drop some of it's own pickling overrides, then dill could take over and give a much more full serialization for multiprocessing.

Until that happens, there's a fork of multiprocessing called pathos (the release version is a bit stale, unfortunately) that removes the above limitations. Pathos also adds some nice features that multiprocessing doesn't have, like multi-args in the map function. Pathos is due for a release, after some mild updating -- mostly conversion to python 3.x.

Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import dill
>>> from pathos.multiprocessing import ProcessingPool    
>>> pool = ProcessingPool(nodes=4)
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

and just to show off a little of what pathos.multiprocessing can do...

>>> def busy_add(x,y, delay=0.01):
...     for n in range(x):
...        x += n
...     for n in range(y):
...        y -= n
...     import time
...     time.sleep(delay)
...     return x + y
... 
>>> def busy_squared(x):
...     import time, random
...     time.sleep(2*random.random())
...     return x*x
... 
>>> def squared(x):
...     return x*x
... 
>>> def quad_factory(a=1, b=1, c=0):
...     def quad(x):
...         return a*x**2 + b*x + c
...     return quad
... 
>>> square_plus_one = quad_factory(2,0,1)
>>> 
>>> def test1(pool):
...     print pool
...     print "x: %s\n" % str(x)
...     print pool.map.__name__
...     start = time.time()
...     res = pool.map(squared, x)
...     print "time to results:", time.time() - start
...     print "y: %s\n" % str(res)
...     print pool.imap.__name__
...     start = time.time()
...     res = pool.imap(squared, x)
...     print "time to queue:", time.time() - start
...     start = time.time()
...     res = list(res)
...     print "time to results:", time.time() - start
...     print "y: %s\n" % str(res)
...     print pool.amap.__name__
...     start = time.time()
...     res = pool.amap(squared, x)
...     print "time to queue:", time.time() - start
...     start = time.time()
...     res = res.get()
...     print "time to results:", time.time() - start
...     print "y: %s\n" % str(res)
... 
>>> def test2(pool, items=4, delay=0):
...     _x = range(-items/2,items/2,2)
...     _y = range(len(_x))
...     _d = [delay]*len(_x)
...     print map
...     res1 = map(busy_squared, _x)
...     res2 = map(busy_add, _x, _y, _d)
...     print pool.map
...     _res1 = pool.map(busy_squared, _x)
...     _res2 = pool.map(busy_add, _x, _y, _d)
...     assert _res1 == res1
...     assert _res2 == res2
...     print pool.imap
...     _res1 = pool.imap(busy_squared, _x)
...     _res2 = pool.imap(busy_add, _x, _y, _d)
...     assert list(_res1) == res1
...     assert list(_res2) == res2
...     print pool.amap
...     _res1 = pool.amap(busy_squared, _x)
...     _res2 = pool.amap(busy_add, _x, _y, _d)
...     assert _res1.get() == res1
...     assert _res2.get() == res2
...     print ""
... 
>>> def test3(pool): # test against a function that should fail in pickle
...     print pool
...     print "x: %s\n" % str(x)
...     print pool.map.__name__
...     start = time.time()
...     res = pool.map(square_plus_one, x)
...     print "time to results:", time.time() - start
...     print "y: %s\n" % str(res)
... 
>>> def test4(pool, maxtries, delay):
...     print pool
...     m = pool.amap(busy_add, x, x)
...     tries = 0
...     while not m.ready():
...         time.sleep(delay)
...         tries += 1
...         print "TRY: %s" % tries
...         if tries >= maxtries:
...             print "TIMEOUT"
...             break
...     print m.get()
... 
>>> import time
>>> x = range(18)
>>> delay = 0.01
>>> items = 20
>>> maxtries = 20
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> pool = Pool(nodes=4)
>>> test1(pool)
<pool ProcessingPool(ncpus=4)>
x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]

map
time to results: 0.0553691387177
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]

imap
time to queue: 7.91549682617e-05
time to results: 0.102381229401
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]

amap
time to queue: 7.08103179932e-05
time to results: 0.0489699840546
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]

>>> test2(pool, items, delay)
<built-in function map>
<bound method ProcessingPool.map of <pool ProcessingPool(ncpus=4)>>
<bound method ProcessingPool.imap of <pool ProcessingPool(ncpus=4)>>
<bound method ProcessingPool.amap of <pool ProcessingPool(ncpus=4)>>

>>> test3(pool)
<pool ProcessingPool(ncpus=4)>
x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]

map
time to results: 0.0523059368134
y: [1, 3, 9, 19, 33, 51, 73, 99, 129, 163, 201, 243, 289, 339, 393, 451, 513, 579]

>>> test4(pool, maxtries, delay)
<pool ProcessingPool(ncpus=4)>
TRY: 1
TRY: 2
TRY: 3
TRY: 4
TRY: 5
TRY: 6
TRY: 7
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34]

Solution 2 - Python

You may want to try using the multiprocessing_on_dill library, which is a fork of multiprocessing that implements dill on the backend.

For example, you can run:

>>> import multiprocessing_on_dill as multiprocessing
>>> with multiprocessing.Pool() as pool:
...     pool.map(lambda x: x**2, range(10))
... 
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Solution 3 - Python

Overwrite multiprocessing module Pickle class

import dill, multiprocessing
dill.Pickler.dumps, dill.Pickler.loads = dill.dumps, dill.loads
multiprocessing.reduction.ForkingPickler = dill.Pickler
multiprocessing.reduction.dump = dill.dump
multiprocessing.queues._ForkingPickler = dill.Pickler

Solution 4 - Python

I know this thread is old, however, you don't necessarily have to use the pathos module as Mike McKerns pointed out. I also find it quite annoying that multiprocessing uses pickle instead of dill, so you can do something like this:

import multiprocessing as mp
import dill
def helperFunction(f, inp, *args, **kwargs):
    import dill # reimport, just in case this is not available on the new processes
    f = dill.loads(f) # converts bytes to (potentially lambda) function
    return f(inp, *args, **kwargs)
def mapStuff(f, inputs, *args, **kwargs):
    pool = mp.Pool(6) # create a 6-worker pool
    f = dill.dumps(f) # converts (potentially lambda) function to bytes
    futures = [pool.apply_async(helperFunction, [f, inp, *args], kwargs) for inp in inputs]
    return [f.get() for f in futures]

Then, you can use it like this:

mapStuff(lambda x: x**2, [2, 3]) # returns [4, 9]
mapStuff(lambda x, b: x**2 + b, [2, 3], 1) # returns [5, 10]
mapStuff(lambda x, b: x**2 + b, [2, 3], b=1) # also returns [5, 10]

def f(x):
    return x**2
mapStuff(f, [4, 5]) # returns [16, 25]

How it works is basically, you convert the lambda function to bytes object, pass that through to the child process, and have it reconstruct the lambda function. In the code, I have just used dill to serialize the function, but you can also serialize the arguments if need to.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionMRocklinView Question on Stackoverflow
Solution 1 - PythonMike McKernsView Answer on Stackoverflow
Solution 2 - PythoneliotcView Answer on Stackoverflow
Solution 3 - PythonDevylView Answer on Stackoverflow
Solution 4 - Python157 239nView Answer on Stackoverflow