Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors the threadpool implementation to use variadic templates and standard library components (std::future, std::packaged_task) instead of custom return_value_handle and task classes. The refactoring aims to simplify the codebase and provide a more modern C++ interface with better template support for arbitrary function arguments.
Changes:
- Replaced custom
return_value_handle<T>andtaskclasses withstd::futureandstd::packaged_task - Implemented variadic template
submit()method to accept functions with arbitrary arguments - Removed
poll_task()helper method in favor of inline task polling - Updated tests to use new API and removed obsolete test files (
task_tests.h,return_value_tests.h) - Changed
queue_size()return type frominttosize_t - Added
explicitkeyword to constructor
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| src/threadpool.h | Core refactor: new variadic template submit() using std::future/std::packaged_task, removed custom return value handling classes |
| src/threadpool.cpp | Simplified task polling logic, updated queue_size() return type to size_t |
| tests/threadpool_tests.h | Updated tests to use new std::future-based API, added tests for variadic arguments and function pointers |
| tests/task_tests.h | Removed (task class no longer exists) |
| tests/return_value_tests.h | Removed (return_value_handle class no longer exists) |
| tests/all_tests.h | Updated to reflect removed test files and new API |
| demo/demo.h | Updated to use std::future instead of return_value_handle, reduced fibonacci test values |
| CMakeLists.txt | Removed references to deleted test files |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/threadpool.h
Outdated
|
|
||
| [[nodiscard]] | ||
| int queue_size(); | ||
| write_task([task]() mutable { (*task)(); }); |
There was a problem hiding this comment.
The 'mutable' keyword in the lambda capture is unnecessary. The lambda captures task by value (a shared_ptr), and calling (*task)() doesn't require mutable since it's calling through a pointer, not modifying the shared_ptr itself. This keyword can be removed for clarity.
| write_task([task]() mutable { (*task)(); }); | |
| write_task([task]() { (*task)(); }); |
| auto submit(Function &&F, Args &&...ArgList) { | ||
|
|
||
| template<class T> | ||
| [[nodiscard]] | ||
| return_value_handle<T> submit(const std::function<T()>& ptr, int dependency_id) { | ||
| return_value_handle<T> rv_handle{}; | ||
| return rv_handle; | ||
| } | ||
| using ReturnType = std::invoke_result_t<Function, Args...>; | ||
|
|
||
| std::shared_ptr<std::packaged_task<ReturnType()>> task = std::make_shared<std::packaged_task<ReturnType()>>(( | ||
| std::bind(std::forward<Function>(F), | ||
| std::forward<Args>(ArgList)...) | ||
| )); | ||
|
|
||
| void shutdown(); // finish queued tasks | ||
| void shutdown_now(); // cancel pending tasks | ||
| auto future = task->get_future(); | ||
|
|
||
| [[nodiscard]] | ||
| int queue_size(); | ||
| write_task([task]() mutable { (*task)(); }); | ||
|
|
||
| // Dependency DAG API | ||
| template<typename... Args> | ||
| return_value_handle<void> when_all(Args... args) { | ||
| // Todo: actually implement the logic | ||
| return_value_handle<void> rv{}; | ||
| return rv; | ||
|
|
||
| return future; // Return type is future<ReturnType> | ||
| } |
There was a problem hiding this comment.
The submit() method is missing critical thread-safety checks. The old implementation verified that m_Stop was false before accepting tasks and threw a std::runtime_error if called after shutdown. The new variadic template implementation in write_task() doesn't acquire any lock or check m_Stop, which means tasks can be submitted after shutdown has been called. This violates the expected behavior and could lead to tasks being added to the queue after workers have stopped processing.
| ); | ||
| return rv_handle; | ||
| } | ||
| auto submit(Function &&F, Args &&...ArgList) { |
There was a problem hiding this comment.
The submit() method is missing the [[nodiscard]] attribute that was present in the old implementation. Since submit() returns a std::future that the caller should use to retrieve results or wait for completion, ignoring the return value is likely a programming error. The [[nodiscard]] attribute helps catch such mistakes at compile time.
| auto submit(Function &&F, Args &&...ArgList) { | |
| [[nodiscard]] auto submit(Function &&F, Args &&...ArgList) { |
| // template<std::invocable Fn> | ||
| // [[nodiscard]] | ||
| // auto submit(const Fn&& fn) { | ||
| // using return_type = std::invoke_result_t<Fn>; | ||
| // | ||
| // std::unique_lock lock(queue_stop_mutex); | ||
| // if (m_Stop) { | ||
| // throw std::runtime_error{"ThreadPool::submit() after shutdown called"}; | ||
| // } | ||
| // | ||
| // std::packaged_task<return_type> task{fn}; | ||
| // write_task([&task]() { task(); }); | ||
| // return task.get_future(); | ||
| // } |
There was a problem hiding this comment.
Commented-out code should be removed rather than left in the production codebase. This alternative implementation appears to be experimental or obsolete and clutters the header file. If this code needs to be preserved for reference, it should be moved to documentation or version control history.
| // Dependency DAG API | ||
| // template<typename... Args> | ||
| // return_value_handle<void> when_all(Args... args) { | ||
| // // Todo: actually implement the logic | ||
| // return_value_handle<void> rv{}; | ||
| // return rv; | ||
| // } |
There was a problem hiding this comment.
Commented-out code for the when_all() dependency API should be removed. This dead code adds no value and makes the codebase harder to maintain. If this functionality is planned for the future, track it in an issue or documentation instead.
|
|
||
| } | ||
| }); | ||
| //std::function<int()> f1 = []() -> int {return 5;}; |
There was a problem hiding this comment.
The commented-out line with std::function declaration serves no purpose and should be removed. If this was used for debugging or testing, it should not remain in the final code.
| //std::function<int()> f1 = []() -> int {return 5;}; |
| inline void threadpool_tests() { | ||
|
|
||
| // Ensure shutdown finishes all remaining tasks | ||
| // Submit syntax | ||
| { | ||
| threadpool tp{1}; | ||
| int i{0}; | ||
| auto f1 = []() { std::this_thread::sleep_for(std::chrono::milliseconds(100));}; | ||
| auto f2 = [&i]() mutable{ i = 5; }; | ||
|
|
||
| auto rv1 = tp.submit<void>(f1); | ||
| auto rv2 = tp.submit<void>(f2); | ||
| auto future = tp.submit([]() {}); | ||
| tp.shutdown(); | ||
|
|
||
| assert(i == 5); | ||
| } | ||
|
|
||
| // Ensure shutdown_now clears the remaining tasks | ||
| // Verify work is done on a submit | ||
| { | ||
| threadpool tp{1}; | ||
| int i{0}; | ||
| auto f1 = []() { std::this_thread::sleep_for(std::chrono::milliseconds(100));}; | ||
| auto f2 = [&i]() mutable{ i = 5; }; | ||
|
|
||
| auto rv1 = tp.submit<void>(f1); | ||
| auto rv2 = tp.submit<void>(f2); | ||
| tp.shutdown_now(); | ||
|
|
||
| assert(i == 0); | ||
| int i = 0; | ||
| auto future = tp.submit([&i](){i = 42;}); | ||
| tp.shutdown(); | ||
| assert(i == 42); | ||
| } | ||
|
|
||
|
|
||
| // Submit after shutdown | ||
| // Return type syntax | ||
| { | ||
| threadpool tp{1}; | ||
| auto future = tp.submit([]() {return 42;}); | ||
| tp.shutdown(); | ||
| try { | ||
| auto rv = tp.submit<void>([]() {}); | ||
| assert(false); | ||
| } catch (std::runtime_error) { | ||
|
|
||
| } | ||
| int work = future.get(); | ||
| assert(work == 42); | ||
| } | ||
|
|
||
| // Submit after shutdown_now | ||
| // Variadic arguments works | ||
| { | ||
| threadpool tp{1}; | ||
| tp.shutdown_now(); | ||
| try { | ||
| auto rv = tp.submit<void>([]() {}); | ||
| assert(false); | ||
| } catch (std::runtime_error) { | ||
|
|
||
| } | ||
| auto future = tp.submit([](int num1, int num2, int num3) {return num1 + num2 + num3;}, 1, 2, 3); | ||
| tp.shutdown(); | ||
| assert(future.valid() && future.get() == 6); | ||
| } | ||
|
|
||
| // Destructor stress tests | ||
| // Function pointer works | ||
| { | ||
| for (int i = 0; i < 10'000; ++i) { | ||
| threadpool tp{4}; | ||
| auto rv = tp.submit<void>([]{}); | ||
| } | ||
| threadpool tp{2}; | ||
| auto future1 = tp.submit(string_test); | ||
| auto future2 = tp.submit(int_test, 1, 2); | ||
| tp.shutdown(); | ||
| assert(future1.valid() && future1.get() == "Hello world!"); | ||
| assert(future2.valid() && future2.get() == 3); | ||
|
|
||
| } | ||
|
|
||
| // Nested submission | ||
| /* | ||
| The invariant here is a little more subtle | ||
| What happens here is the task has a sub-task to put another task onto the threadpool queue | ||
| However, what most of the time happens is that shutdown() in the main thread gets called before the task gets processed | ||
| Which means that the queue no longer accepts any tasks, and therefore would throw the runtime_error exception | ||
| This invariant is kept here - for the DAG aware pools, there would be a private internal enqueing function that would bypass this check | ||
| */ | ||
| // Function pointers with variadic arguments | ||
| { | ||
| threadpool tp{1}; | ||
| auto rv1 = tp.submit<void>([&]{ | ||
|
|
||
| try { | ||
| auto rv2 = tp.submit<void>([](){ /* work */ }); | ||
| assert(false); | ||
| } catch (std::runtime_error) { | ||
|
|
||
| } | ||
| }); | ||
| //std::function<int()> f1 = []() -> int {return 5;}; | ||
| std::future<int> future = tp.submit([]() {return 5;}); | ||
| tp.shutdown(); | ||
| assert(future.get() == 5); | ||
| } | ||
|
|
||
|
|
||
| // Shutdown now test | ||
|
|
||
| // Ensure shutdown finishes all remaining tasks | ||
| { | ||
| threadpool tp{1}; | ||
| auto rv1 = tp.submit<void>([]{ std::this_thread::sleep_for(std::chrono::milliseconds(50));}); | ||
| auto rv2 = tp.submit<void>( []() { }); | ||
| int i{0}; | ||
| auto f1 = []() { std::this_thread::sleep_for(std::chrono::milliseconds(100));}; | ||
| auto f2 = [&i]() mutable{ i = 5; }; | ||
|
|
||
| std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Wait for thread to pick up the task | ||
| tp.shutdown_now(); | ||
| auto rv1 = tp.submit(f1); | ||
| auto rv2 = tp.submit(f2); | ||
| tp.shutdown(); | ||
|
|
||
| // Not sure exactly why, but this fails | ||
| int i; | ||
| //assert(rv1.is_valid()); | ||
| assert(!rv2.is_valid()); | ||
| assert(i == 5); | ||
| } |
There was a problem hiding this comment.
Critical test coverage has been removed. The old tests verified that submit() throws std::runtime_error when called after shutdown() or shutdown_now(), which is important for ensuring proper API usage. The new tests no longer cover this behavior, even though the implementation should maintain this safety check. This missing test coverage could allow regression of important error handling behavior.
| inline void threadpool_tests() { | ||
|
|
||
| // Ensure shutdown finishes all remaining tasks | ||
| // Submit syntax | ||
| { | ||
| threadpool tp{1}; | ||
| int i{0}; | ||
| auto f1 = []() { std::this_thread::sleep_for(std::chrono::milliseconds(100));}; | ||
| auto f2 = [&i]() mutable{ i = 5; }; | ||
|
|
||
| auto rv1 = tp.submit<void>(f1); | ||
| auto rv2 = tp.submit<void>(f2); | ||
| auto future = tp.submit([]() {}); | ||
| tp.shutdown(); | ||
|
|
||
| assert(i == 5); | ||
| } | ||
|
|
||
| // Ensure shutdown_now clears the remaining tasks | ||
| // Verify work is done on a submit | ||
| { | ||
| threadpool tp{1}; | ||
| int i{0}; | ||
| auto f1 = []() { std::this_thread::sleep_for(std::chrono::milliseconds(100));}; | ||
| auto f2 = [&i]() mutable{ i = 5; }; | ||
|
|
||
| auto rv1 = tp.submit<void>(f1); | ||
| auto rv2 = tp.submit<void>(f2); | ||
| tp.shutdown_now(); | ||
|
|
||
| assert(i == 0); | ||
| int i = 0; | ||
| auto future = tp.submit([&i](){i = 42;}); | ||
| tp.shutdown(); | ||
| assert(i == 42); | ||
| } | ||
|
|
||
|
|
||
| // Submit after shutdown | ||
| // Return type syntax | ||
| { | ||
| threadpool tp{1}; | ||
| auto future = tp.submit([]() {return 42;}); | ||
| tp.shutdown(); | ||
| try { | ||
| auto rv = tp.submit<void>([]() {}); | ||
| assert(false); | ||
| } catch (std::runtime_error) { | ||
|
|
||
| } | ||
| int work = future.get(); | ||
| assert(work == 42); | ||
| } | ||
|
|
||
| // Submit after shutdown_now | ||
| // Variadic arguments works | ||
| { | ||
| threadpool tp{1}; | ||
| tp.shutdown_now(); | ||
| try { | ||
| auto rv = tp.submit<void>([]() {}); | ||
| assert(false); | ||
| } catch (std::runtime_error) { | ||
|
|
||
| } | ||
| auto future = tp.submit([](int num1, int num2, int num3) {return num1 + num2 + num3;}, 1, 2, 3); | ||
| tp.shutdown(); | ||
| assert(future.valid() && future.get() == 6); | ||
| } | ||
|
|
||
| // Destructor stress tests | ||
| // Function pointer works | ||
| { | ||
| for (int i = 0; i < 10'000; ++i) { | ||
| threadpool tp{4}; | ||
| auto rv = tp.submit<void>([]{}); | ||
| } | ||
| threadpool tp{2}; | ||
| auto future1 = tp.submit(string_test); | ||
| auto future2 = tp.submit(int_test, 1, 2); | ||
| tp.shutdown(); | ||
| assert(future1.valid() && future1.get() == "Hello world!"); | ||
| assert(future2.valid() && future2.get() == 3); | ||
|
|
||
| } | ||
|
|
||
| // Nested submission | ||
| /* | ||
| The invariant here is a little more subtle | ||
| What happens here is the task has a sub-task to put another task onto the threadpool queue | ||
| However, what most of the time happens is that shutdown() in the main thread gets called before the task gets processed | ||
| Which means that the queue no longer accepts any tasks, and therefore would throw the runtime_error exception | ||
| This invariant is kept here - for the DAG aware pools, there would be a private internal enqueing function that would bypass this check | ||
| */ | ||
| // Function pointers with variadic arguments | ||
| { | ||
| threadpool tp{1}; | ||
| auto rv1 = tp.submit<void>([&]{ | ||
|
|
||
| try { | ||
| auto rv2 = tp.submit<void>([](){ /* work */ }); | ||
| assert(false); | ||
| } catch (std::runtime_error) { | ||
|
|
||
| } | ||
| }); | ||
| //std::function<int()> f1 = []() -> int {return 5;}; | ||
| std::future<int> future = tp.submit([]() {return 5;}); | ||
| tp.shutdown(); | ||
| assert(future.get() == 5); | ||
| } | ||
|
|
||
|
|
||
| // Shutdown now test | ||
|
|
||
| // Ensure shutdown finishes all remaining tasks | ||
| { | ||
| threadpool tp{1}; | ||
| auto rv1 = tp.submit<void>([]{ std::this_thread::sleep_for(std::chrono::milliseconds(50));}); | ||
| auto rv2 = tp.submit<void>( []() { }); | ||
| int i{0}; | ||
| auto f1 = []() { std::this_thread::sleep_for(std::chrono::milliseconds(100));}; | ||
| auto f2 = [&i]() mutable{ i = 5; }; | ||
|
|
||
| std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Wait for thread to pick up the task | ||
| tp.shutdown_now(); | ||
| auto rv1 = tp.submit(f1); | ||
| auto rv2 = tp.submit(f2); | ||
| tp.shutdown(); | ||
|
|
||
| // Not sure exactly why, but this fails | ||
| int i; | ||
| //assert(rv1.is_valid()); | ||
| assert(!rv2.is_valid()); | ||
| assert(i == 5); | ||
| } |
There was a problem hiding this comment.
Test coverage for shutdown_now() behavior has been completely removed. The old tests verified that shutdown_now() clears pending tasks and that return values become invalid when tasks are cancelled. Additionally, tests that verified shutdown_now() prevents new task submission are missing. This functionality is critical for proper resource cleanup and should be tested.
| futures.emplace_back( tp.submit( []() -> int { return recursive_fibonacci(10);})); | ||
| futures.emplace_back( tp.submit( []() -> int { return recursive_fibonacci(20);})); | ||
| futures.emplace_back( tp.submit( []() -> int { return recursive_fibonacci(30);})); |
There was a problem hiding this comment.
The Fibonacci numbers were changed from computing fib(20), fib(30), fib(40) to computing fib(10), fib(20), fib(30). While this change makes the demo run faster, it significantly reduces the computational workload and may not effectively demonstrate the threadpool's ability to handle parallel CPU-intensive tasks. The original values provided a better stress test for the threadpool.
src/threadpool.h
Outdated
| std::shared_ptr<std::packaged_task<ReturnType()>> task = std::make_shared<std::packaged_task<ReturnType()>>(( | ||
| std::bind(std::forward<Function>(F), | ||
| std::forward<Args>(ArgList)...) | ||
| )); | ||
|
|
||
| void shutdown(); // finish queued tasks | ||
| void shutdown_now(); // cancel pending tasks | ||
| auto future = task->get_future(); | ||
|
|
||
| [[nodiscard]] | ||
| int queue_size(); | ||
| write_task([task]() mutable { (*task)(); }); |
There was a problem hiding this comment.
Using std::shared_ptr for each packaged_task adds unnecessary overhead. Each task submission creates a heap allocation for the shared_ptr and its control block. The lambda capture of the shared_ptr by value in write_task is necessary to keep the task alive, but this design could impact performance for high-frequency task submissions. Consider whether the shared_ptr is truly necessary or if there's a more efficient ownership model.
No description provided.