This is a text-only version of the following page on https://raymii.org:
---
Title : My go-to C++ code for asynchronous work processing on a separate thread
Author : Remy van Elst
Date : 17-12-2024 23:59
URL : https://raymii.org/s/software/My_go-to_Cpp_code_for_asynchronous_work_processing_on_a_separate_thread.html
Format : Markdown/HTML
---

You probably recognise this situation. You're working on your code and realise that the thing you're writing might take long, be blocking or is batch-wise. It might be resizing images, calling some API or processing hardware inputs or a stream of incoming messages. These tasks, if not handled efficiently, can halt your application, leaving it unresponsive. To avoid this, one solution is to offload these time-consuming operations to a separate thread, allowing the main application to continue executing without interruptions.
In this article, I'll show you how you can implement asynchronous work processing in C++ using a worker thread. This example class is my go-to for this situation and is easily adapted to handle more complex use cases. It has a queue of work items and uses a `std::thread`, a `std::mutex` combined with a `std::condition_variable` to manage work asynchronously, processing items one by one.
Recently I removed all Google Ads from this site due to their invasive tracking, as well as Google Analytics. Please, if you found this content useful, consider a small donation using any of the options below. It means the world to me if you show your appreciation and you'll help pay the server costs:
GitHub Sponsorship
PCBWay referral link (You get $5, I get $20 after you've placed an order)
Digital Ocea referral link ($200 credit for 60 days. Spend $25 after your credit expires and I'll get $25!)
You can easily customise and template this class so that it has a generic
queue and accepts a callback. By doing so, you can inherit from it in any of
your own classed that need to do some batch work. For this example I've
chosen not to do so since that would make it more complex.
For even more complex stuff I have great experiences using [concurrentqueue](https://github.com/cameron314/concurrentqueue).
### The class explained
This class, `classWithWorkerThread`, is a multi-threaded worker model that
processes a queue of work items one by one using a dedicated thread. The
class has a `std::queue` to store work items. For this example those work
items are of type `exampleWorkItem`, which have a name and a wait time
(in seconds). The items to work on and the work-method is completely up to
you. Could be resizing images, processing messages, whatever you fancy.
The constructor starts a worker thread, which runs the method
`functionThatProcessesWorkOnThread()`. The worker thread uses a
`std::unique_lock` with a `std::condition_variable` to wait for new work to
be added to the queue or for a shutdown signal.
The `unique_lock` is released during the `condition_variable` wait, allowing
other threads to modify the queue. Once notified, two things happen in the
worker thread. First it checks if [it was not a `spurious wakeup`](https://web.archive.org/web/20241217194215/https://en.wikipedia.org/wiki/Spurious_wakeup)
by making sure the work queue is not empty and the thread is not shutting
down. Then the thread proceeds to process the next item. If no work is
available, it waits again, preventing unnecessary CPU consumption.
The `addSomethingToProcess` method is used to add work items to the queue. It
locks the `mutex` to ensure thread safety, adds the new work item to the
queue, and then notifies the worker thread that new work is available via the
`condition_variable`. After adding the item, the lock is released, allowing
the worker thread to process the item.
When the class is destroyed, the destructor ensures that the worker thread is
stopped gracefully. It sets the `_threadRunning` flag to false, notifies the
worker thread, and joins the worker thread to wait for its completion before
exiting, ensuring that all resources are cleaned up properly.
The worker thread itself waits on the `condition_variable` and processes the
work items one by one. It first `pop's` an item from the queue, then the
`mutex` is temporarily released during processing of the item to allow others
to add more items to the queue. Processing of work items can be time
consuming and we don't want others to wait before adding items. Once the work
is completed, the worker thread starts again, waiting on the `condition_variable`.
Note that a `std::queue` itself is not thread safe. But due to my use of mutex
locks in this class there are no race conditions. Reading or writing via
other methods without proper locking could result in race conditions.
### The code
This is the class. For this article example it is header only and the work
processing is based on example items.
#pragma once
/*
* Copyright (c) 2024 Remy van Elst
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
*/
#include
#include
#include
#include
#include
#include
#include
using namespace std::chrono_literals;
namespace raymii {
struct exampleWorkItem {
std::string name;
int waitSec = 5;
};
class classWithWorkerThread {
public:
classWithWorkerThread()
{
// Start worker thread
_threadRunning = true;
_workerThread = std::thread(&classWithWorkerThread::functionThatProcessesWorkOnThread, this);
}
classWithWorkerThread(const classWithWorkerThread&) = delete;
classWithWorkerThread& operator=(const classWithWorkerThread&) = delete;
classWithWorkerThread(classWithWorkerThread&&) = delete;
classWithWorkerThread& operator=(classWithWorkerThread&&) = delete;
~classWithWorkerThread()
{
// Stop worker thread
{
std::lock_guard lock(_threadMutex);
_threadRunning = false;
}
_threadCV.notify_one();
if (_workerThread.joinable()) {
_workerThread.join();
}
}
void addSomethingToProcess(const exampleWorkItem &work)
{
{ // scoped lock mutex before adding work item
std::lock_guard lock(_threadMutex);
// add item to queue
_itemsToProcess.emplace(work);
} // lock is out of scope here
// notify worker thread that new work is available
_threadCV.notify_one();
}
private:
// methods related to Worker Thread
void functionThatProcessesWorkOnThread()
{
// only process work while running. Otherwise, exit method.
while (_threadRunning) {
// Acquire mutex. Is unlocked during the cv wait call. When woken up, its locked again
std::unique_lock lock(_threadMutex);
// wait until notified that new work is available.
_threadCV.wait(lock, [&] {
// When woken, check if it's not a spurious wakeup.
// Either there is work to process: !_itemsToProcess.empty()
// or the thread is stopping: !_threadRunning
return !_itemsToProcess.empty() || !_threadRunning;
});
if (!_threadRunning) // extra check to see if we're stopped after wakeup
break; // if so, gracefully do an early exit.
// Start processing work item. Make sure queue is not empty
if (!_itemsToProcess.empty())
{
// Retrieve item from queue
exampleWorkItem workItem = std::move(_itemsToProcess.front());
_itemsToProcess.pop();
// Unlock mutex because processing can be time-consuming
lock.unlock();
// Process Item
std::cout << "Start Processing item " << workItem.name << ", sleeping " << std::to_string(workItem.waitSec) << " seconds " ;
for (int i = 0; i < workItem.waitSec; ++i)
{
std::cout << ".";
std::this_thread::sleep_for(1s);
}
std::cout << "; Finished Processing item " << workItem.name << ";" << std::endl;
// Lock mutex again after finishing time-consuming work
lock.lock();
}
// End processing work item.
}
}
std::queue _itemsToProcess;
std::thread _workerThread;
std::mutex _threadMutex;
std::condition_variable _threadCV;
std::atomic _threadRunning = false;
};
} //namespace raymii
Here is an example `main.ccp` program that uses this class. If you press
`ENTER` 5 work items are added to the queue and processed. If you type `exit`
the program quits. You can watch work being done while you can enter any
text. If you enter `exit`, the program finishes the current work item and
then quits without crashing.
int main() {
raymii::classWithWorkerThread exampleClass;
std::cout << "Press ENTER to add 5 work items. "
"Type 'exit' and press ENTER to quit.\n";
std::string input;
int taskCounter = 1;
while (true) {
std::getline(std::cin, input);
if (input == "exit") break;
std::cout << "Adding 5 work items..." << std::endl;
for (int i = 0; i < 5; ++i) {
raymii::exampleWorkItem item;
item.name = "Task_" + std::to_string(taskCounter++);
item.waitSec = 5 + (i % 10);
exampleClass.addSomethingToProcess(item);
}
}
std::cout << "Exiting program.\n";
return 0;
}
Example output:
Press ENTER to add 5 work items. Type 'exit' and press ENTER to quit.
** PRESSES ENTER**
Adding 5 work items...
Start Processing item Task_1, sleeping 5 seconds .....; Finished Processing item Task_1;
Start Processing item Task_2, sleeping 6 seconds ......; Finished Processing item Task_2;
Start Processing item Task_3, sleeping 7 seconds .......; Finished Processing item Task_3;
Start Processing item Task_4, sleeping 8 seconds ........; Finished Processing item Task_4;
Start Processing item Task_5, sleeping 9 seconds .........; Finished Processing item Task_5;
** PRESSES ENTER AGAIN**
Adding 5 work items...
Start Processing item Task_6, sleeping 5 seconds .....; Finished Processing item Task_6;
** TYPES exit**
Start Processing item Task_7, sleeping 6 seconds ..Exiting program.
....; Finished Processing item Task_7;
Process finished with exit code 0
---
License:
All the text on this website is free as in freedom unless stated otherwise.
This means you can use it in any way you want, you can copy it, change it
the way you like and republish it, as long as you release the (modified)
content under the same license to give others the same freedoms you've got
and place my name and a link to this site with the article as source.
This site uses Google Analytics for statistics and Google Adwords for
advertisements. You are tracked and Google knows everything about you.
Use an adblocker like ublock-origin if you don't want it.
All the code on this website is licensed under the GNU GPL v3 license
unless already licensed under a license which does not allows this form
of licensing or if another license is stated on that page / in that software:
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Just to be clear, the information on this website is for meant for educational
purposes and you use it at your own risk. I do not take responsibility if you
screw something up. Use common sense, do not 'rm -rf /' as root for example.
If you have any questions then do not hesitate to contact me.
See https://raymii.org/s/static/About.html for details.