how to use initializer to set up my multiprocess pool?

PythonMultiprocessing

Python Problem Overview


I'm trying to use the multiprocess Pool object. I'd like each process to open a database connection when it starts, then use that connection to process the data that is passed in. (Rather than opening and closing the connection for each bit of data.) This seems like what the initializer is for, but I can't wrap my head around how the worker and the initializer communicate. So I have something like this:

def get_cursor():
  return psycopg2.connect(...).cursor()

def process_data(data):
   # here I'd like to have the cursor so that I can do things with the data

if __name__ == "__main__":
  pool = Pool(initializer=get_cursor, initargs=())
  pool.map(process_data, get_some_data_iterator())

how do I (or do I) get the cursor back from get_cursor() into the process_data()?

Python Solutions


Solution 1 - Python

The initialize function is called thus:

def worker(...):
    ...
    if initializer is not None:
        initializer(*args)

so there is no return value saved anywhere. You might think this dooms you, but no! Each worker is in a separate process. Thus, you can use an ordinary global variable.

This is not exactly pretty, but it works:

cursor = None
def set_global_cursor(...):
    global cursor
    cursor = ...

Now you can just use cursor in your process_data function. The cursor variable inside each separate process is separate from all the other processes, so they do not step on each other.

(I have no idea whether psycopg2 has a different way to deal with this that does not involve using multiprocessing in the first place; this is meant as a general answer to a general problem with the multiprocessing module.)

Solution 2 - Python

You can also send the function along to the initializer and create a connection in it. Afterwards you add the cursor to the function.

def init_worker(function):
    function.cursor = db.conn()

Now you can access the db through function.cursor without using globals, for example:

def use_db(i):
    print(use_db.cursor) #process local
pool = Pool(initializer=init_worker, initargs=(use_db,))
pool.map(use_db, range(10))

Solution 3 - Python

torek has already give a good explanation of why initializer is not working in this case. However, I am not a fan of Global variable personally, so I'd like to paste another solution here.

The idea is to use a class to wrap the function and initialize the class with the "global" variable.

class Processor(object):
  """Process the data and save it to database."""

  def __init__(self, credentials):
    """Initialize the class with 'global' variables"""
    self.cursor = psycopg2.connect(credentials).cursor()

  def __call__(self, data):
    """Do something with the cursor and data"""
    self.cursor.find(data.key)

And then call with

p = Pool(5)
p.map(Processor(credentials), list_of_data)

So the first parameter initialized the class with credential, return an instance of the class and map call the instance with data.

Though this is not as straightforward as the global variable solution, I strongly suggest to avoid global variable and encapsulate the variables in some safe way. (And I really wish they can support lambda expression one day, it will make things much easier...)

Solution 4 - Python

Given defining global variables in the initializer is generally undesirable, we can avoid their use and also avoid repeating costly initialization within each call with simple caching within each subprocess:

from functools import lru_cache
from multiprocessing.pool import Pool
from time import sleep


@lru_cache(maxsize=None)
def _initializer(a, b):
    print(f'Initialized with {a}, {b}')


def _pool_func(a, b, i):
    _initializer(a, b)
    sleep(1)
    print(f'got {i}')


arg_a = 1
arg_b = 2

with Pool(processes=5) as pool:
    pool.starmap(_pool_func, ((arg_a, arg_b, i) for i in range(0, 20)))

Output:

Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
got 1
got 0
got 4
got 2
got 3
got 5
got 7
got 8
got 6
got 9
got 10
got 11
got 12
got 14
got 13
got 15
got 16
got 17
got 18
got 19

Solution 5 - Python

If you first answer wasn't clear, here is snippet that runs:

import multiprocessing
n_proc = 5
cursor = [ 0 for _ in range(n_proc)]
def set_global_cursor():
    global cursor
    cursor[multiprocessing.current_process()._identity[0]-1] = 1

def process_data(data):
    print(cursor)
    return data**2
    
pool = multiprocessing.Pool(processes=n_proc,initializer=set_global_cursor)
pool.map(process_data, list(range(10))) 

Output:

[1, 0, 0, 0, 0]
[0, 0, 1, 0, 0]
[0, 1, 0, 0, 0]
[0, 0, 1, 0, 0]
[0, 0, 0, 0, 1]
[1, 0, 0, 0, 0]
[0, 0, 1, 0, 0]
[0, 0, 1, 0, 0]
[0, 0, 0, 1, 0]
[0, 1, 0, 0, 0]

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionChris CurveyView Question on Stackoverflow
Solution 1 - PythontorekView Answer on Stackoverflow
Solution 2 - PythonThe Unfun CatView Answer on Stackoverflow
Solution 3 - PythonyeelanView Answer on Stackoverflow
Solution 4 - PythonmcguipView Answer on Stackoverflow
Solution 5 - PythonOhad RubinView Answer on Stackoverflow