Dead simple example of using Multiprocessing Queue, Pool and Locking

PythonPython 2.7Multiprocessing

Python Problem Overview


I tried to read the documentation at http://docs.python.org/dev/library/multiprocessing.html but I'm still struggling with multiprocessing Queue, Pool and Locking. And for now I was able to build the example below.

Regarding Queue and Pool, I'm not sure if I understood the concept in the right way, so correct me if I'm wrong. What I'm trying to achieve is to process 2 requests at time ( data list have 8 in this example ) so, what should I use? Pool to create 2 processes that can handle two different queues ( 2 at max ) or should I just use Queue to process 2 inputs each time? The lock would be to print the outputs correctly.

import multiprocessing
import time

data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
        ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)


def mp_handler(var1):
    for indata in var1:
        p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
        p.start()


def mp_worker(inputs, the_time):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

if __name__ == '__main__':
    mp_handler(data)

Python Solutions


Solution 1 - Python

The best solution for your problem is to utilize a Pool. Using Queues and having a separate "queue feeding" functionality is probably overkill.

Here's a slightly rearranged version of your program, this time with only 2 processes coralled in a Pool. I believe it's the easiest way to go, with minimal changes to original code:

import multiprocessing
import time

data = (
    ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
    ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker((inputs, the_time)):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

def mp_handler():
    p = multiprocessing.Pool(2)
    p.map(mp_worker, data)

if __name__ == '__main__':
    mp_handler()

Note that mp_worker() function now accepts a single argument (a tuple of the two previous arguments) because the map() function chunks up your input data into sublists, each sublist given as a single argument to your worker function.

Output:

Processs a	Waiting 2 seconds
Processs b	Waiting 4 seconds
Process a	DONE
Processs c	Waiting 6 seconds
Process b	DONE
Processs d	Waiting 8 seconds
Process c	DONE
Processs e	Waiting 1 seconds
Process e	DONE
Processs f	Waiting 3 seconds
Process d	DONE
Processs g	Waiting 5 seconds
Process f	DONE
Processs h	Waiting 7 seconds
Process g	DONE
Process h	DONE

Edit as per @Thales comment below:

If you want "a lock for each pool limit" so that your processes run in tandem pairs, ala:

A waiting B waiting | A done , B done | C waiting , D waiting | C done, D done | ...

then change the handler function to launch pools (of 2 processes) for each pair of data:

def mp_handler():
    subdata = zip(data[0::2], data[1::2])
    for task1, task2 in subdata:
        p = multiprocessing.Pool(2)
        p.map(mp_worker, (task1, task2))

Now your output is:

 Processs a	Waiting 2 seconds
 Processs b	Waiting 4 seconds
 Process a	DONE
 Process b	DONE
 Processs c	Waiting 6 seconds
 Processs d	Waiting 8 seconds
 Process c	DONE
 Process d	DONE
 Processs e	Waiting 1 seconds
 Processs f	Waiting 3 seconds
 Process e	DONE
 Process f	DONE
 Processs g	Waiting 5 seconds
 Processs h	Waiting 7 seconds
 Process g	DONE
 Process h	DONE

Solution 2 - Python

Here is my personal goto for this topic:

Gist here, (pull requests welcome!): https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec

import multiprocessing
import sys

THREADS = 3

# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()


def func_worker(args):
    """This function will be called by each thread.
    This function can not be a class method.
    """
    # Expand list of args into named args.
    str1, str2 = args
    del args

    # Work
    # ...



    # Serial-only Portion
    GLOBALLOCK.acquire()
    print(str1)
    print(str2)
    GLOBALLOCK.release()


def main(argp=None):
    """Multiprocessing Spawn Example
    """
    # Create the number of threads you want
    pool = multiprocessing.Pool(THREADS)

    # Define two jobs, each with two args.
    func_args = [
        ('Hello', 'World',), 
        ('Goodbye', 'World',), 
    ]


    try:
        pool.map_async(func_worker, func_args).get()
    except KeyboardInterrupt:
        # Allow ^C to interrupt from any thread.
        sys.stdout.write('\033[0m')
        sys.stdout.write('User Interupt\n')
    pool.close()

if __name__ == '__main__':
    main()

Solution 3 - Python

This might be not 100% related to the question, but on my search for an example of using multiprocessing with a queue this shows up first on google.

This is a basic example class that you can instantiate and put items in a queue and can wait until queue is finished. That's all I needed.

from multiprocessing import JoinableQueue
from multiprocessing.context import Process


class Renderer:
    queue = None

    def __init__(self, nb_workers=2):
        self.queue = JoinableQueue()
        self.processes = [Process(target=self.upload) for i in range(nb_workers)]
        for p in self.processes:
            p.start()

    def render(self, item):
        self.queue.put(item)

    def upload(self):
        while True:
            item = self.queue.get()
            if item is None:
                break

            # process your item here

            self.queue.task_done()

    def terminate(self):
        """ wait until queue is empty and terminate processes """
        self.queue.join()
        for p in self.processes:
            p.terminate()

r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()

Solution 4 - Python

For everyone using editors like Komodo Edit (win10) add sys.stdout.flush() to:

def mp_worker((inputs, the_time)):
    print " Process %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs
    sys.stdout.flush()

or as first line to:

    if __name__ == '__main__':
       sys.stdout.flush()

This helps to see what goes on during the run of the script; in stead of having to look at the black command line box.

Solution 5 - Python

Here is an example from my code (for threaded pool, but just change class name and you'll have process pool):

def execute_run(rp): 
   ... do something 

pool = ThreadPoolExecutor(6)
for mat in TESTED_MATERIAL:
    for en in TESTED_ENERGIES:
        for ecut in TESTED_E_CUT:
            rp = RunParams(
                simulations, DEST_DIR,
                PARTICLE, mat, 960, 0.125, ecut, en
            )
            pool.submit(execute_run, rp)
pool.join()

Basically:

  • pool = ThreadPoolExecutor(6) creates a pool for 6 threads
  • Then you have bunch of for's that add tasks to the pool
  • pool.submit(execute_run, rp) adds a task to pool, first arogument is a function called in in a thread/process, rest of the arguments are passed to the called function.
  • pool.join waits until all tasks are done.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionthclprView Question on Stackoverflow
Solution 1 - PythonVelimir MlakerView Answer on Stackoverflow
Solution 2 - PythonThorSummonerView Answer on Stackoverflow
Solution 3 - PythonlinquView Answer on Stackoverflow
Solution 4 - PythonZF007View Answer on Stackoverflow
Solution 5 - Pythonjb.View Answer on Stackoverflow