Thread pooling in C++11

C++MultithreadingC++11ThreadpoolStdthread

C++ Problem Overview


Relevant questions:

About C++11:

About Boost:


How do I get a pool of threads to send tasks to, without creating and deleting them over and over again? This means persistent threads to resynchronize without joining.


I have code that looks like this:

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

Instead of creating and joining threads each iteration, I'd prefer to send tasks to my worker threads each iteration and only create them once.

C++ Solutions


Solution 1 - C++

This is adapted from my answer to another very similar post.

Let's build a ThreadPool class:

class ThreadPool {
public:
    void Start();
    void QueueJob(const std::function<void()>& job);
    void Stop();
    void busy();

private:
    void ThreadLoop();

    bool should_terminate = false;           // Tells threads to stop looking for jobs
    std::mutex queue_mutex;                  // Prevents data races to the job queue
    std::condition_variable mutex_condition; // Allows threads to wait on new jobs or termination 
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> jobs;
};
  1. ThreadPool::Start

For an efficient threadpool implementation, once threads are created according to num_threads, it's better not to create new ones or destroy old ones (by joining). There will be a performance penalty, and it might even make your application go slower than the serial version. Thus, we keep a pool of threads that can be used at any time (if they aren't already running a job).

Each thread should be running its own infinite loop, constantly waiting for new tasks to grab and run.

void ThreadPool::Start() {
    const uint32_t num_threads = std::thread::hardware_concurrency(); // Max # of threads the system supports
    threads.resize(num_threads);
    for (uint32_t i = 0; i < num_threads; i++) {
        threads.at(i) = std::thread(ThreadLoop);
    }
}
  1. ThreadPool::ThreadLoop

The infinite loop function. This is a while (true) loop waiting for the task queue to open up.

void ThreadPool::ThreadLoop() {
    while (true) {
        std::function<void()> job;
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            mutex_condition.wait(lock, [this] {
                return !jobs.empty() || should_terminate;
            });
            if (should_terminate) {
                return;
            }
            job = jobs.front();
            jobs.pop();
        }
        job();
    }
}
  1. ThreadPool::QueueJob

Add a new job to the pool; use a lock so that there isn't a data race.

void ThreadPool::QueueJob(const std::function<void()>& job) {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        jobs.push(job);
    }
    mutex_condition.notify_one();
}

To use it:

thread_pool->QueueJob([] { /* ... */ });
  1. ThreadPool::busy
void ThreadPool::busy() {
    bool poolbusy;
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        poolbusy = jobs.empty();
    }
    return poolbusy;
}

The busy() function can be used in a while loop, such that the main thread can wait the threadpool to complete all the tasks before calling the threadpool destructor.

  1. ThreadPool::Stop

Stop the pool.

void ThreadPool::Stop() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        should_terminate = true;
    }
    mutex_condition.notify_all();
    for (std::thread& active_thread : threads) {
        active_thread.join();
    }
    threads.clear();
}

Once you integrate these ingredients, you have your own dynamic threading pool. These threads always run, waiting for job to do.

I apologize if there are some syntax errors, I typed this code and and I have a bad memory. Sorry that I cannot provide you the complete thread pool code; that would violate my job integrity.

Notes:

  • The anonymous code blocks are used so that when they are exited, the std::unique_lock variables created within them go out of scope, unlocking the mutex.
  • ThreadPool::Stop will not terminate any currently running jobs, it just waits for them to finish via active_thread.join().

Solution 2 - C++

You can use C++ Thread Pool Library, https://github.com/vit-vit/ctpl.

Then the code your wrote can be replaced with the following

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library

int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}

You will get the desired number of threads and will not create and delete them over and over again on the iterations.

Solution 3 - C++

A pool of threads means that all your threads are running, all the time – in other words, the thread function never returns. To give the threads something meaningful to do, you have to design a system of inter-thread communication, both for the purpose of telling the thread that there's something to do, as well as for communicating the actual work data.

Typically this will involve some kind of concurrent data structure, and each thread would presumably sleep on some kind of condition variable, which would be notified when there's work to do. Upon receiving the notification, one or several of the threads wake up, recover a task from the concurrent data structure, process it, and store the result in an analogous fashion.

The thread would then go on to check whether there's even more work to do, and if not go back to sleep.

The upshot is that you have to design all this yourself, since there isn't a natural notion of "work" that's universally applicable. It's quite a bit of work, and there are some subtle issues you have to get right. (You can program in Go if you like a system which takes care of thread management for you behind the scenes.)

Solution 4 - C++

A threadpool is at core a set of threads all bound to a function working as an event loop. These threads will endlessly wait for a task to be executed, or their own termination.

The threadpool job is to provide an interface to submit jobs, define (and perhaps modify) the policy of running these jobs (scheduling rules, thread instantiation, size of the pool), and monitor the status of the threads and related resources.

So for a versatile pool, one must start by defining what a task is, how it is launched, interrupted, what is the result (see the notion of promise and future for that question), what sort of events the threads will have to respond to, how they will handle them, how these events shall be discriminated from the ones handled by the tasks. This can become quite complicated as you can see, and impose restrictions on how the threads will work, as the solution becomes more and more involved.

The current tooling for handling events is fairly barebones(*): primitives like mutexes, condition variables, and a few abstractions on top of that (locks, barriers). But in some cases, these abstrations may turn out to be unfit (see this related question), and one must revert to using the primitives.

Other problems have to be managed too:

  • signal
  • i/o
  • hardware (processor affinity, heterogenous setup)

How would these play out in your setting?

This answer to a similar question points to an existing implementation meant for boost and the stl.

I offered a very crude implementation of a threadpool for another question, which doesn't address many problems outlined above. You might want to build up on it. You might also want to have a look of existing frameworks in other languages, to find inspiration.


(*) I don't see that as a problem, quite to the contrary. I think it's the very spirit of C++ inherited from C.

Solution 5 - C++

Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:

> function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{
	
private:
	std::queue<std::function<void()>> m_function_queue;
	std::mutex m_lock;
	std::condition_variable m_data_condition;
	std::atomic<bool> m_accept_functions;

public:
	
	Function_pool();
	~Function_pool();
	void push(std::function<void()> func);
	void done();
	void infinite_loop_func();
};

> function_pool.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
	std::unique_lock<std::mutex> lock(m_lock);
	m_function_queue.push(func);
	// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
	lock.unlock();
	m_data_condition.notify_one();
}

void Function_pool::done()
{
	std::unique_lock<std::mutex> lock(m_lock);
	m_accept_functions = false;
	lock.unlock();
	// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
	m_data_condition.notify_all();
	//notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
	std::function<void()> func;
	while (true)
	{
		{
			std::unique_lock<std::mutex> lock(m_lock);
			m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
			if (!m_accept_functions && m_function_queue.empty())
			{
				//lock will be release automatically.
				//finish the thread loop and let it join in the main thread.
				return;
			}
			func = m_function_queue.front();
			m_function_queue.pop();
			//release the lock
		}
		func();
	}
}

> main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
	std::cout << "bla" << std::endl;
}

int main()
{
	std::cout << "stating operation" << std::endl;
	int num_threads = std::thread::hardware_concurrency();
	std::cout << "number of threads = " << num_threads << std::endl;
	std::vector<std::thread> thread_pool;
	for (int i = 0; i < num_threads; i++)
	{
		thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
	}

	//here we should send our functions
	for (int i = 0; i < 50; i++)
	{
		func_pool.push(example_function);
	}
	func_pool.done();
	for (unsigned int i = 0; i < thread_pool.size(); i++)
	{
		thread_pool.at(i).join();
	}
}

Solution 6 - C++

You can use thread_pool from boost library:

void my_task(){...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    boost::asio::thread_pool pool(threadNumbers);

    // Submit a function to the pool.
    boost::asio::post(pool, my_task);

    // Submit a lambda object to the pool.
    boost::asio::post(pool, []() {
      ...
    });
}

You also can use threadpool from open source community:

void first_task() {...}    
void second_task() {...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    pool tp(threadNumbers);

    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}

Solution 7 - C++

Something like this might help (taken from a working app).

#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

struct thread_pool {
  typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;

  thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      auto worker = [this] { return service.run(); };
      grp.add_thread(new boost::thread(worker));
    }
  }

  template<class F>
  void enqueue(F f) {
    service.post(f);
  }

  ~thread_pool() {
    service_worker.reset();
    grp.join_all();
    service.stop();
  }

private:
  boost::asio::io_service service;
  asio_worker service_worker;
  boost::thread_group grp;
};

You can use it like this:

thread_pool pool(2);

pool.enqueue([] {
  std::cout << "Hello from Task 1\n";
});

pool.enqueue([] {
  std::cout << "Hello from Task 2\n";
});

Keep in mind that reinventing an efficient asynchronous queuing mechanism is not trivial.

Boost::asio::io_service is a very efficient implementation, or actually is a collection of platform-specific wrappers (e.g. it wraps I/O completion ports on Windows).

Solution 8 - C++

Edit: This now requires C++17 and concepts. (As of 9/12/16, only g++ 6.0+ is sufficient.)

The template deduction is a lot more accurate because of it, though, so it's worth the effort of getting a newer compiler. I've not yet found a function that requires explicit template arguments.

It also now takes any appropriate callable object (and is still statically typesafe!!!).

It also now includes an optional green threading priority thread pool using the same API. This class is POSIX only, though. It uses the ucontext_t API for userspace task switching.


I created a simple library for this. An example of usage is given below. (I'm answering this because it was one of the things I found before I decided it was necessary to write it myself.)

bool is_prime(int n){
  // Determine if n is prime.
}

int main(){
  thread_pool pool(8); // 8 threads
  
  list<future<bool>> results;
  for(int n = 2;n < 10000;n++){
    // Submit a job to the pool.
    results.emplace_back(pool.async(is_prime, n));
  }

  int n = 2;
  for(auto i = results.begin();i != results.end();i++, n++){
    // i is an iterator pointing to a future representing the result of is_prime(n)
    cout << n << " ";
    bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
    if(prime)
      cout << "is prime";
    else
      cout << "is not prime";
    cout << endl;
  }  
}

You can pass async any function with any (or void) return value and any (or no) arguments and it will return a corresponding std::future. To get the result (or just wait until a task has completed) you call get() on the future.

Here's the github: https://github.com/Tyler-Hardin/thread_pool.

Solution 9 - C++

looks like threadpool is very popular problem/exercise :-)

I recently wrote one in modern C++; it’s owned by me and publicly available here - https://github.com/yurir-dev/threadpool

It supports templated return values, core pinning, ordering of some tasks. all implementation in two .h files.

So, the original question will be something like this:

#include "tp/threadpool.h"

int arr[5] = { 0 };

concurency::threadPool<void> tp;
tp.start(std::thread::hardware_concurrency());

std::vector<std::future<void>> futures;
for (int i = 0; i < 8; ++i) { // for 8 iterations,
	for (int j = 0; j < 4; ++j) {
		futures.push_back(tp.push([&arr, j]() {
			   arr[j] += 2;
			}));
	}
}

// wait until all pushed tasks are finished.
for (auto& f : futures)
	f.get();
// or just tp.end(); // will kill all the threads

arr[4] = *std::min_element(arr, arr + 4);

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionYktulaView Question on Stackoverflow
Solution 1 - C++PhD AP EcEView Answer on Stackoverflow
Solution 2 - C++vit-vitView Answer on Stackoverflow
Solution 3 - C++Kerrek SBView Answer on Stackoverflow
Solution 4 - C++didiercView Answer on Stackoverflow
Solution 5 - C++pioView Answer on Stackoverflow
Solution 6 - C++Amir FoView Answer on Stackoverflow
Solution 7 - C++rustyxView Answer on Stackoverflow
Solution 8 - C++TylerView Answer on Stackoverflow
Solution 9 - C++yurirView Answer on Stackoverflow