A lightweight, header-only C++ library for managing a thread pool optimized for microsecond-level realtime tasks (e.g. robotics realtime control).
- Ultra-low latency: Designed for μs level response times.
- Header-only: Easy integration, compiler optimization, no build dependencies.
- Real-time priority: Thread priority & CPU affinity support.
- Lock-free communication: Lockfree SPSC queues for optimal performance.
- Fallback strategy: Always returns a result, even under load.
Clone the repository in your project, here we do it in workspace's submodules directory:
# Directly clone
git clone https://github.com/hwyao/realtime_calc_thread_pool
# Or if you use git, add as a submodule
git submodule add https://github.com/hwyao/realtime_calc_thread_poolIt is suggested to use CMake to manage the project. Add the following to your root project's CMakeLists.txt:
# add subdirectory to your CMakeLists.txt
add_subdirectory(submodule/realtime_calc_thread_pool)
# link the library to your target
target_link_libraries(your_target realtime_calc_thread_pool)Then include the library in your project targets:
#include <realtime_calc_thread_pool/thread_pool.hpp>#include <realtime_calc_thread_pool/thread_pool.hpp>
// Create thread pool with 4 workers.
// The template parameter is the return type of the task. Defining a structure here is strongly recommended.
realtime_calc_thread_pool::RealtimeThreadPool<double> pool(
realtime_calc_thread_pool::Config(4)
);
while(true) { // Your main loop code
// Submit task with lambda function. It should be a function accepting the task ID.
pool.submitTask([](uint64_t task_id) -> double {
return expensive_computation();
});
// Get result after 300 microseconds of waiting for result. Will wait a bit more than 300us.
// Suggested to wait with provided methods, instead of implementing own waiting.
double result = pool.getLatestResult(std::chrono::microseconds(300));
}The above code creates a thread pool with 4 worker threads, submits a task that performs an expensive computation, and retrieves the latest result after waiting for 300 microseconds.
By the time of calling submitTask, if:
- there are idle workers, the task will be assigned to one of them immediately.
- all workers are busy, the task will be rejected.
By the time of calling getLatestResult, if:
- there are one or several completed results, the latest one will be returned. The completed results will be cleared and workers will be in idle state.
- no completed result, the fallback strategy will be applied. See later "Advanced Configuration - Fallback Strategies" section for details.
The thread pool can be configured with the Config structure:
realtime_calc_thread_pool::Config config(
2, // number of worker threads
1, // (worker) thread priority (default 1, range 1-99, higher is more priority)
99, // main thread priority (default 99, range 1-99)
std::vector<int>{8, 10}, // (worker) CPU affinity (default empty, meaning no binding)
0, // main thread CPU affinity (default -1, meaning no binding)
true, // enable the (worker) thread priority. False will ignore the corresponding priority setting.
true // enable the main thread priority setting. False will ignore the corresponding priority setting.
)
// Create thread pool with custom configuration
// Error will be thrown if priority or CPU affinity setting fails.
realtime_calc_thread_pool::RealtimeThreadPool<double> pool(config);The later section "Selecting Thread Priority and CPU Affinity" provides more information about how to select proper priority and CPU affinity.
When there are no completed results by the time of calling getLatestResult, the following strategies can be applied:
-
All the previous results will be once captured by
StatePredictorBaseclass. The predictor will provide a result. Currently we support:LatestCachePredictor(Default). For allResultType. Returns the latest available result.
-
If the fallback strategy is also unable to provide a result (e.g. no previous result exists), the default zero value will be returned. Therefore, for custom
ResultType, it is required to haveZero()orsetZero()method.
When submitting a task, most of the configuration and data related information can be captured by value in lambda functions.
auto compute_func = [config1, config2, data1, data2](uint64_t task_id) -> ResultType {
// ... use config and data
return result;
};
pool.submitTask(compute_func);Resources can be also captured by references. But keep in mind that they should not be modified by the tasks for thread-safety. C++17 provides a friendly and easy compile-time check with std::as_const.
// C++17 or later
auto compute_func = [&const_config = std::as_const(config),
&const_data = std::as_const(data)]
(uint64_t task_id) -> ResultType {
// ... use const_config and const_data
return result;
};
pool.submitTask(compute_func);However, some resources are unique, e.g. class referenced by std::unique_ptr. Or, even with std::shared_ptr or raw pointer, some objects has potential attibute modification by calling its methods. This can happen quite often by some calculation libraries based on class objects.
To solve this problem safely, our solution is thread_local ThreadLocalObjectManager. To use it:
- The object itself should support copy construction.
- The object can be captured by reference with raw pointer (suggest to set it as const pointee).
- Construct
ThreadLocalObjectManagerwithgetOrCreate()methods in the front of the task function. This will make a copy of the object with thread local storage.
std::unique_ptr<ExpensiveResource> resource = ...; // unique resource
const auto *resource_raw = resource.get(); // get raw pointer with constant pointee
auto compute_func = [resource_raw](uint64_t task_id) -> ResultType {
// Get or create thread local object
auto *local_resource = realtime_calc_thread_pool::ThreadLocalObjectManager<ExpensiveResource>::getOrCreate(resource_raw);
// Use local_resource safely ...
}There are some general tips to optimize the general performance and reactivity of your machine.
Buy a powerful machine. Make sure that CPU cooling works efficientyly. Close other resource hungery apps like browser while running your controller..- Install realtime kernel patch or enable realtime feature with Ubuntu Pro above 22.04.
- Use
performancepower scaler.
If you compile with -DREALTIME_CTP_ENABLE_TEST=ON over cmake, a small performance test ./test/benchmark.cpp will be compiled. It will start a thread pool with 4 threads, repeat the loop each 1ms, and letting a submitted task run for 0.3ms after submission. The task itself should be easy for any modern CPU, therefore you can play with the configuration and use this to test the latency of the thread pool itself.
Some important informantion:
submitandretrieveare the true time spent by thread pool itself, provided by the profiling feature of thread pool. It is usually a few μs for 99% of the case. If this is too high, improve the performance of the machine itself.submit_out,retrieve_out(substracting the sleeping time) are measured outside the thread pool, and should be close to the inside version, andsleeping_overheadshould be as small as possible. The extra overhead here is usually a few μs for 99% of the case. If these difference is too high, it usually indicates that something is eating the resource of (not very resource-hungery) main thread. Scale up the main thread priority, seperate its affinity on a core with its own resources.
You can look at the source code to understand the rest of the statistics.
There are some general tips for selecting the configuration of the thread pool:
- Generally speaking, main thread's priority should be as high as possible, or at least higher than the worker thread.
- It is strongly suggested to assign affinity for all threads to seperate workload. At least the main thread should possess a single core.
- You can use
lscpu --all --extendedto see the maximum frequency of each CPU, and which CPUs sharing some same resources (e.g. physical core or NUMA). It is suggested to set affinity to performance core (usually higher maximum frequency) for all threads, and let all threads seperate themselfs form each other using different resources.