In this post we explore the key techniques in thread pooling by implementing it step by step.
Preliminary discussion
Why do I need thread pools?
Using threads in C++ has been easy since C++11. At the most basic level, a thread can be managed with std::thread
. For asynchronous execution of tasks, it is also convenient to use std::async
and std::future
. With all this infrastructure in place, why do we need a thread pool? Or rather, when do we need thread pools?
As we all know, threads, as a system resource, take time to create and destroy. Therefore, if the time to create and destroy threads is of the same order of magnitude as the time required to perform a task, the performance loss from frequent thread creation and destruction becomes significant. At this point, we need to consider using thread pools.
What should be the characteristics of a thread pool?
A thread pool is essentially a set of threads to be used. In C++, it can be represented as an array of std::thread
or as a vector. In practice, for possible extensions, it is obviously more appropriate to use std::vector<std::thread>
.
For each thread in the thread pool, it may receive a task at some point. The exact task is not known when the thread is created. Expressed in C++ language, this means that a thread in a thread pool:
- should be able to execute arbitrary functions - supporting any list of parameters, and any return value type.
- it should be possible to send the results of the execution of a task back to the publisher of the task.
- should be able to be woken up to perform a task when needed, without taking up excessive CPU resources when not needed.
- should be controllable by the master thread to pause tasks, stop receiving tasks, discard unfinished tasks, etc. when appropriate.
For the first one, the modern C++ approach would be to use the infrastructure provided by the functional
header file (std::bind
, std::function
, etc.) in combination with the template parameter package. For the second one, the old-fashioned approach is to register the callback function at the same time as the task is published; the modern C++ approach would be to use std::packaged_task
in combination with std::future
to implement it. For the third, consider std::condition_variable
for less frequent tasks, or std::this_thread::yield
for very frequent tasks. For the fourth one, this can be achieved by setting an internal variable as a token and having each worker thread check that token periodically.
The discussion gets to tasks. Obviously, we will need a thread-safe queue to manage the tasks posted by other threads.
Thread-safe queues
Let’s start directly with the code and analyze it step by step.
|
|
blocking_queue
inherits fromstd::queue
and leaves the implementation of the most basic queue to the standard library.- use
std::shared_mutex
in combination withstd::unique_lock
andstd::shared_lock
to implement read/write locks. - Here we have disabled the copy and move constructors and the corresponding assignment operators. This is purely because we don’t use them during the implementation of the thread pool. It is possible to implement them on demand if needed.
- we use read-only locks in both observers.
push
andemplace
are similar operations, both appending elements to the end of the queue. The difference and connection is the same as the interface to the standard library container. Note that inemplace
, we use the perfect forwarding technique.pop
here is more properly calledtry_pop
. Because thepop
action does not necessarily succeed here, the function returnsfalse
when the queue is empty and does not make any changes to the queue.- This is a coarse-grained lock on the entire queue. In fact, since the push and pop of the queue are somewhat separate, it is possible to implement a fine-grained version of the lock with some care, with significant performance gains when both push and pop operations are frequent. We can discuss this point in a separate article later.
Thread Pooling
Interface definition
Following the previous discussion, we can put together a rough idea of what a thread pool looks like.
|
|
- The first group of three interfaces is the control interface for the entire thread pool. The
init
interface starts the thread pool with the parameternum
, which is the number of threads in the pool. Theterminate
interface terminates the thread pool, not accepting new tasks and ensuring that accepted tasks are processed.cancel
is similar toterminate
, but it discards accepted but unprocessed tasks. - The three interfaces in the second group are all observers.
- The only interface in the third group is the thread pool’s interface for accepting external tasks. It is almost identical to
std::async
provided by the standard library, accepts arbitrary functions, and returns astd::future
. - This is the thread pool ontology.
- This is the task queue.
The control interface of the thread pool
Next we discuss the specific implementation of the control interface.
|
|
- The work done by
init
can logically only be done once. However, we cannot guarantee that the user code will actually execute as we think it should. Therefore, we usestd::call_once
to ensure that the work in question is executed only once. - Because it involves modifying the state of the
threadpool
, we use a write lock here. - The
spawn
interface is a thread function, that is, a function that runs all the time after the thread is started. - When a thread is woken up (either accidentally or by the
notify_*
function), if the threadpool is notcanceled
orterminate
and no task is taken out of the task queue, the thread should remain dormant, otherwise it should wake up and continue processing. - If the thread pool is
canceled
, the current task is not executed; if the thread pool is stopped and no task is removed from the task queue, the current task is not executed either; otherwise, the current task is executed. - the implementation of
terminate
andcancel
is almost identical. - except that
terminate
modifies thestop_
variable andcancel
modifies thecancel_
variable. - In addition, the
cancel
interface explicitly empties the task queue.
Observer for thread pools
The observer is relatively simple, the only thing worth mentioning here is the use of a read lock.
Task interface
|
|
- the three types defined using
using
. - There is no modification of the thread pool state involved here, so it is sufficient to read the lock. Obviously, we are forbidden to continue posting tasks to a thread pool that is already
terminate
orcancel
. - Since the task queue only receives callable objects from
std::function<void()>
, here we usestd::bind
to match the argument list first. - Here we use
std::packed_task
to associate the task to be executed with astd::future
and returnstd::future
to the outside world so that the task publisher can get the result of the task execution in the future. - Here we use a lambda that both executes the task and wipes out the return value (but is managed by future) to match
std::function<void()>
. - Here we wake up the worker thread via a condition variable.
Full implementation
The full implementation can be found at Liam0205/toy-threadpool, which includes unit tests and performance comparisons compared to std::async
Finally
Here we’ve implemented a thread pool that you can look at and use. But as the GitHub repo name implies, it’s still just a toy. There are a number of optimizations you can make to use it in your project. For example.
- optimizing the thread-safe queue, using finer-grained locks (which are already in the full implementation), or switching to a lock-free implementation.
- A complete thread pool that supports the states mentioned in this article, but also has the ability to pause, expand (automatically expand when there are too many tasks), and shrink (automatically shrink when there are too many idle threads).
All these elements can continue to be dug deeper and optimized.