Show the progress of a Python multiprocessing pool imap_unordered call?

PythonMultiprocessing

Python Problem Overview


I have a script that's successfully doing a multiprocessing Pool set of tasks with a imap_unordered() call:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

However, my num_tasks is around 250,000, and so the join() locks the main thread for 10 seconds or so, and I'd like to be able to echo out to the command line incrementally to show the main process isn't locked. Something like:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print("Waiting for", remaining, "tasks to complete...")
  time.sleep(2)

Is there a method for the result object or the pool itself that indicates the number of tasks remaining? I tried using a multiprocessing.Value object as a counter (do_work calls a counter.value += 1 action after doing its task), but the counter only gets to ~85% of the total value before stopping incrementing.

Python Solutions


Solution 1 - Python

My personal favorite -- gives you a nice little progress bar and completion ETA while things run and commit in parallel.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

Solution 2 - Python

There is no need to access private attributes of the result set:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

Solution 3 - Python

I found that the work was already done by the time I tried to check it's progress. This is what worked for me using tqdm.

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

This should work with all flavors of multiprocessing, whether they block or not.

Solution 4 - Python

Found an answer myself with some more digging: Taking a look at the __dict__ of the imap_unordered result object, I found it has a _index attribute that increments with each task completion. So this works for logging, wrapped in the while loop:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

However, I did find that swapping the imap_unordered for a map_async resulted in much faster execution, though the result object is a bit different. Instead, the result object from map_async has a _number_left attribute, and a ready() method:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

Solution 5 - Python

As suggested by Tim, you can use tqdm and imap to solve this issue. I've just stumbled upon this problem and tweaked the imap_unordered solution, so that I can access the results of the mapping. Here's how it works:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

In case you don't care about the values returned from your jobs, you don't need to assign the list to any variable.

Solution 6 - Python

I know that this is a rather old question, but here is what I'm doing when I want to track the progression of a pool of tasks in python.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

Basically, you use apply_async with a callbak (in this case, it is to append the returned value to a list), so you don't have to wait to do something else. Then, within a while-loop, you check the progression of the work. In this case, I added a widget to make it look nicer.

The output:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

Hope it helps.

Solution 7 - Python

A simple solution with Pool.apply_async():

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.2)
    return x**2


n = 10

with Pool(4) as p, tqdm(total=n) as pbar:
    res = [p.apply_async(
        work, args=(i,), callback=lambda _: pbar.update(1)) for i in range(n)]
    results = [r.get() for r in res]

Solution 8 - Python

I created a custom class to create a progress printout. Maby this helps:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results

Solution 9 - Python

Try this simple Queue based approach, which can also be used with pooling. Be mindful that printing anything after the initiation of the progress bar will cause it to be moved, at least for this particular progress bar. (PyPI's progress 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)	

    finished = 0
    while finished < n_groups:

	    while queue_stat.empty():
		    time.sleep(0.01)

	    gotten = queue_stat.get()
	    if gotten == 'finished':
		    finished += 1
	    else:
		    bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

	    ... do stuff resulting in new_data

	    queue_stat.put(1)

    queue_stat.put('finished')	
    queue_data.put(new_data)
	
def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

	queue_data = multiprocessing.Queue()
	queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

	    if i == 0:

		    p = multiprocessing.Process(target = status_bar,
			    args=(queue_stat,len(groups),len(combined)))
    		    processes.append(p)
   	 		    p.start()

    	p = multiprocessing.Process(target = process_data,
		args=(queue_data, queue_stat, group))
    	processes.append(p)
   	 	p.start()

    for i in range(len(groups)):
    	data = queue_data.get()	
	    new_data += data

	for p in processes:
    	p.join()

Solution 10 - Python

After doing some research, I wrote a small module called parallelbar. It allows you to display both the overall progress of the pool and for each core separately. It is easy to use and has a good description.

For example:

from parallelbar import progress_map
from parallelbar.tools import cpu_bench


if __name__=='__main__':
    # create list of task
    tasks = [1_000_000 + i for i in range(100)]
    progress_map(cpu_bench, tasks)

Solution 11 - Python

Some answers work with the progress bar but I could not get results out of the pool

I used tqdm to create progress bar u can install it by pip install tqdm

Below simple code work pretty well with progress bar and u can get the result as well:

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep

tasks = range(5)
result = []

def do_work(x):
    # do something with x and return the result
    sleep(2)
    return x + 2

if __name__ == '__main__':
    pbar = tqdm(total=len(tasks))

    with Pool(2) as p:
        for i in p.imap_unordered(do_work, tasks):

            result.append(i)
            pbar.update(i)
    
    pbar.close()
    print(result)

Solution 12 - Python

Quick start

Using tqdm and multiprocessing.Pool

Install

pip install tqdm

Example

import time
import threading
from multiprocessing import Pool

from tqdm import tqdm


def do_work(x):
    time.sleep(x)
    return x


def progress():
    time.sleep(3)  # Check progress after 3 seconds
    print(f'total: {pbar.total} finish:{pbar.n}')


tasks = range(10)
pbar = tqdm(total=len(tasks))

if __name__ == '__main__':
    thread = threading.Thread(target=progress)
    thread.start()
    results = []
    with Pool(processes=5) as pool:
        for result in pool.imap_unordered(do_work, tasks):
            results.append(result)
            pbar.update(1)
    print(results)

Result




Flask

Install

pip install flask

main.py

import time
from multiprocessing import Pool

from tqdm import tqdm
from flask import Flask, make_response, jsonify

app = Flask(__name__)


def do_work(x):
    time.sleep(x)
    return x


total = 5  # num of tasks
tasks = range(total)
pbar = tqdm(total=len(tasks))


@app.route('/run/')
def run():
    results = []
    with Pool(processes=2) as pool:
        for _result in pool.imap_unordered(do_work, tasks):
            results.append(_result)
            if pbar.n >= total:
                pbar.n = 0  # reset
            pbar.update(1)
    response = make_response(jsonify(dict(results=results)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response


@app.route('/progress/')
def progress():
    response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response

Run (In Windows, for example)

set FLASK_APP=main
flask run

API list

test.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Progress Bar</title>
    <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
    <script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<button id="run">Run the task</button>
<br><br>
<div class="progress">
    <div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"
         style="width: 10%">0.00%
    </div>
</div>
</body>
<script>
    function set_progress_rate(n, total) {
        //Set the rate of progress bar
        var rate = (n / total * 100).toFixed(2);
        if (n > 0) {
            $(".progress-bar").attr("aria-valuenow", n);
            $(".progress-bar").attr("aria-valuemax", total);
            $(".progress-bar").text(rate + "%");
            $(".progress-bar").css("width", rate + "%");
        }
    }

    $("#run").click(function () {
        //Run the task
        $.ajax({
            url: "http://127.0.0.1:5000/run/",
            type: "GET",
            success: function (response) {
                set_progress_rate(100, 100);
                console.log('Results:' + response['results']);
            }
        });
    });
    setInterval(function () {
        //Show progress every 1 second
        $.ajax({
            url: "http://127.0.0.1:5000/progress/",
            type: "GET",
            success: function (response) {
                console.log(response);
                var n = response["n"];
                var total = response["total"];
                set_progress_rate(n, total);
            }
        });
    }, 1000);
</script>
</html>

Result

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionMidnightLightningView Question on Stackoverflow
Solution 1 - PythonTimView Answer on Stackoverflow
Solution 2 - PythonjfsView Answer on Stackoverflow
Solution 3 - PythonreubanoView Answer on Stackoverflow
Solution 4 - PythonMidnightLightningView Answer on Stackoverflow
Solution 5 - PythonmrapaczView Answer on Stackoverflow
Solution 6 - PythonJulien TourilleView Answer on Stackoverflow
Solution 7 - PythonzeawoasView Answer on Stackoverflow
Solution 8 - PythonAronstefView Answer on Stackoverflow
Solution 9 - Pythonuser11186769View Answer on Stackoverflow
Solution 10 - PythonpaduView Answer on Stackoverflow
Solution 11 - Pythonmohammad HView Answer on Stackoverflow
Solution 12 - PythonXerCisView Answer on Stackoverflow