This is a text-only version of the following page on https://raymii.org: --- Title : My go-to C++ code for asynchronous work processing on a separate thread Author : Remy van Elst Date : 17-12-2024 23:59 URL : https://raymii.org/s/software/My_go-to_Cpp_code_for_asynchronous_work_processing_on_a_separate_thread.html Format : Markdown/HTML --- ![threads](/s/inc/img/threads.jpg) You probably recognise this situation. You're working on your code and realise that the thing you're writing might take long, be blocking or is batch-wise. It might be resizing images, calling some API or processing hardware inputs or a stream of incoming messages. These tasks, if not handled efficiently, can halt your application, leaving it unresponsive. To avoid this, one solution is to offload these time-consuming operations to a separate thread, allowing the main application to continue executing without interruptions. In this article, I'll show you how you can implement asynchronous work processing in C++ using a worker thread. This example class is my go-to for this situation and is easily adapted to handle more complex use cases. It has a queue of work items and uses a `std::thread`, a `std::mutex` combined with a `std::condition_variable` to manage work asynchronously, processing items one by one.

Recently I removed all Google Ads from this site due to their invasive tracking, as well as Google Analytics. Please, if you found this content useful, consider a small donation using any of the options below. It means the world to me if you show your appreciation and you'll help pay the server costs:

GitHub Sponsorship

PCBWay referral link (You get $5, I get $20 after you've placed an order)

Digital Ocea referral link ($200 credit for 60 days. Spend $25 after your credit expires and I'll get $25!)

You can easily customise and template this class so that it has a generic queue and accepts a callback. By doing so, you can inherit from it in any of your own classed that need to do some batch work. For this example I've chosen not to do so since that would make it more complex. For even more complex stuff I have great experiences using [concurrentqueue](https://github.com/cameron314/concurrentqueue). ### The class explained This class, `classWithWorkerThread`, is a multi-threaded worker model that processes a queue of work items one by one using a dedicated thread. The class has a `std::queue` to store work items. For this example those work items are of type `exampleWorkItem`, which have a name and a wait time (in seconds). The items to work on and the work-method is completely up to you. Could be resizing images, processing messages, whatever you fancy. The constructor starts a worker thread, which runs the method `functionThatProcessesWorkOnThread()`. The worker thread uses a `std::unique_lock` with a `std::condition_variable` to wait for new work to be added to the queue or for a shutdown signal. The `unique_lock` is released during the `condition_variable` wait, allowing other threads to modify the queue. Once notified, two things happen in the worker thread. First it checks if [it was not a `spurious wakeup`](https://web.archive.org/web/20241217194215/https://en.wikipedia.org/wiki/Spurious_wakeup) by making sure the work queue is not empty and the thread is not shutting down. Then the thread proceeds to process the next item. If no work is available, it waits again, preventing unnecessary CPU consumption. The `addSomethingToProcess` method is used to add work items to the queue. It locks the `mutex` to ensure thread safety, adds the new work item to the queue, and then notifies the worker thread that new work is available via the `condition_variable`. After adding the item, the lock is released, allowing the worker thread to process the item. When the class is destroyed, the destructor ensures that the worker thread is stopped gracefully. It sets the `_threadRunning` flag to false, notifies the worker thread, and joins the worker thread to wait for its completion before exiting, ensuring that all resources are cleaned up properly. The worker thread itself waits on the `condition_variable` and processes the work items one by one. It first `pop's` an item from the queue, then the `mutex` is temporarily released during processing of the item to allow others to add more items to the queue. Processing of work items can be time consuming and we don't want others to wait before adding items. Once the work is completed, the worker thread starts again, waiting on the `condition_variable`. Note that a `std::queue` itself is not thread safe. But due to my use of mutex locks in this class there are no race conditions. Reading or writing via other methods without proper locking could result in race conditions. ### The code This is the class. For this article example it is header only and the work processing is based on example items. #pragma once /* * Copyright (c) 2024 Remy van Elst * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, version 3. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ #include #include #include #include #include #include #include using namespace std::chrono_literals; namespace raymii { struct exampleWorkItem { std::string name; int waitSec = 5; }; class classWithWorkerThread { public: classWithWorkerThread() { // Start worker thread _threadRunning = true; _workerThread = std::thread(&classWithWorkerThread::functionThatProcessesWorkOnThread, this); } classWithWorkerThread(const classWithWorkerThread&) = delete; classWithWorkerThread& operator=(const classWithWorkerThread&) = delete; classWithWorkerThread(classWithWorkerThread&&) = delete; classWithWorkerThread& operator=(classWithWorkerThread&&) = delete; ~classWithWorkerThread() { // Stop worker thread { std::lock_guard lock(_threadMutex); _threadRunning = false; } _threadCV.notify_one(); if (_workerThread.joinable()) { _workerThread.join(); } } void addSomethingToProcess(const exampleWorkItem &work) { { // scoped lock mutex before adding work item std::lock_guard lock(_threadMutex); // add item to queue _itemsToProcess.emplace(work); } // lock is out of scope here // notify worker thread that new work is available _threadCV.notify_one(); } private: // methods related to Worker Thread void functionThatProcessesWorkOnThread() { // only process work while running. Otherwise, exit method. while (_threadRunning) { // Acquire mutex. Is unlocked during the cv wait call. When woken up, its locked again std::unique_lock lock(_threadMutex); // wait until notified that new work is available. _threadCV.wait(lock, [&] { // When woken, check if it's not a spurious wakeup. // Either there is work to process: !_itemsToProcess.empty() // or the thread is stopping: !_threadRunning return !_itemsToProcess.empty() || !_threadRunning; }); if (!_threadRunning) // extra check to see if we're stopped after wakeup break; // if so, gracefully do an early exit. // Start processing work item. Make sure queue is not empty if (!_itemsToProcess.empty()) { // Retrieve item from queue exampleWorkItem workItem = std::move(_itemsToProcess.front()); _itemsToProcess.pop(); // Unlock mutex because processing can be time-consuming lock.unlock(); // Process Item std::cout << "Start Processing item " << workItem.name << ", sleeping " << std::to_string(workItem.waitSec) << " seconds " ; for (int i = 0; i < workItem.waitSec; ++i) { std::cout << "."; std::this_thread::sleep_for(1s); } std::cout << "; Finished Processing item " << workItem.name << ";" << std::endl; // Lock mutex again after finishing time-consuming work lock.lock(); } // End processing work item. } } std::queue _itemsToProcess; std::thread _workerThread; std::mutex _threadMutex; std::condition_variable _threadCV; std::atomic _threadRunning = false; }; } //namespace raymii Here is an example `main.ccp` program that uses this class. If you press `ENTER` 5 work items are added to the queue and processed. If you type `exit` the program quits. You can watch work being done while you can enter any text. If you enter `exit`, the program finishes the current work item and then quits without crashing. int main() { raymii::classWithWorkerThread exampleClass; std::cout << "Press ENTER to add 5 work items. " "Type 'exit' and press ENTER to quit.\n"; std::string input; int taskCounter = 1; while (true) { std::getline(std::cin, input); if (input == "exit") break; std::cout << "Adding 5 work items..." << std::endl; for (int i = 0; i < 5; ++i) { raymii::exampleWorkItem item; item.name = "Task_" + std::to_string(taskCounter++); item.waitSec = 5 + (i % 10); exampleClass.addSomethingToProcess(item); } } std::cout << "Exiting program.\n"; return 0; } Example output: Press ENTER to add 5 work items. Type 'exit' and press ENTER to quit. ** PRESSES ENTER** Adding 5 work items... Start Processing item Task_1, sleeping 5 seconds .....; Finished Processing item Task_1; Start Processing item Task_2, sleeping 6 seconds ......; Finished Processing item Task_2; Start Processing item Task_3, sleeping 7 seconds .......; Finished Processing item Task_3; Start Processing item Task_4, sleeping 8 seconds ........; Finished Processing item Task_4; Start Processing item Task_5, sleeping 9 seconds .........; Finished Processing item Task_5; ** PRESSES ENTER AGAIN** Adding 5 work items... Start Processing item Task_6, sleeping 5 seconds .....; Finished Processing item Task_6; ** TYPES exit** Start Processing item Task_7, sleeping 6 seconds ..Exiting program. ....; Finished Processing item Task_7; Process finished with exit code 0 --- License: All the text on this website is free as in freedom unless stated otherwise. This means you can use it in any way you want, you can copy it, change it the way you like and republish it, as long as you release the (modified) content under the same license to give others the same freedoms you've got and place my name and a link to this site with the article as source. This site uses Google Analytics for statistics and Google Adwords for advertisements. You are tracked and Google knows everything about you. Use an adblocker like ublock-origin if you don't want it. All the code on this website is licensed under the GNU GPL v3 license unless already licensed under a license which does not allows this form of licensing or if another license is stated on that page / in that software: This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . Just to be clear, the information on this website is for meant for educational purposes and you use it at your own risk. I do not take responsibility if you screw something up. Use common sense, do not 'rm -rf /' as root for example. If you have any questions then do not hesitate to contact me. See https://raymii.org/s/static/About.html for details.