threadman 0.1.0
Header-only C++23 managed threads, dynamic pools, futures, and executors
Loading...
Searching...
No Matches
future_wait_pool.hpp
Go to the documentation of this file.
1#pragma once
2
25
26#include <threadman/config.hpp>
28#include <threadman/future.hpp>
29#include <threadman/stats.hpp>
31
32#include <commons/display_info.hpp>
33
34#include <chrono>
35#include <cstddef>
36#include <cstdint>
37#include <exception>
38#include <functional>
39#include <future>
40#include <memory>
41#include <string>
42#include <string_view>
43#include <thread>
44#include <type_traits>
45#include <utility>
46#include <vector>
47
48namespace threadman {
49
50class ThreadManager; // forward — defined in <threadman/manager.hpp>
51
56 std::string name = "tm::future-wait";
57 std::size_t min_workers = 1;
58 std::size_t max_workers = std::thread::hardware_concurrency();
59
71
74 std::size_t max_pending = 0;
75};
76
81class FutureWaitPool final : public IExecutor {
82public:
83 explicit FutureWaitPool(FutureWaitPoolOptions opts = {}, ThreadManager* manager = nullptr) {
85 po.name = std::move(opts.name);
86 po.min_workers = opts.min_workers;
87 po.max_workers = opts.max_workers;
88 po.scale_up_wait = opts.scale_up_wait;
89 po.scale_up_when_queue_exceeds = opts.scale_up_when_queue_exceeds;
90 po.idle_timeout = opts.idle_timeout;
91 po.scale_check_interval = opts.scale_check_interval;
92 po.max_queue_size = opts.max_pending;
93 po.manager = manager;
94 pool_ = std::make_unique<ThreadPool>(std::move(po));
95 }
96
99 FutureWaitPool(FutureWaitPool&&) noexcept = default;
100 FutureWaitPool& operator=(FutureWaitPool&&) noexcept = default;
101 ~FutureWaitPool() override = default;
102
103 // --- IExecutor --------------------------------------------------------
104 void execute(std::function<void()> task) override {
105 pool_->execute(std::move(task));
106 }
107 [[nodiscard]] std::string_view name() const noexcept override {
108 return pool_->name();
109 }
110
111 // --- Adoption ---------------------------------------------------------
112
116 template <class T>
117 [[nodiscard]] Future<T> add(std::future<T> fut) {
118 auto promise = std::make_shared<Promise<T>>();
119 auto future = promise->get_future();
120 // std::future is move-only, so it can't be captured directly into the
121 // std::function the pool wants — hold it behind a shared_ptr so the
122 // closure stays copyable.
123 auto fut_sp = std::make_shared<std::future<T>>(std::move(fut));
124 pool_->execute([promise, fut_sp]() mutable {
125 try {
126 if constexpr (std::is_void_v<T>) {
127 fut_sp->get();
128 promise->set_value();
129 } else {
130 promise->set_value(fut_sp->get());
131 }
132 } catch (...) {
133 promise->set_exception(std::current_exception());
134 }
135 });
136 return future;
137 }
138
141 template <class T>
142 [[nodiscard]] Future<T> add(std::shared_future<T> fut) {
143 auto promise = std::make_shared<Promise<T>>();
144 auto future = promise->get_future();
145 pool_->execute([promise, fut = std::move(fut)]() mutable {
146 try {
147 if constexpr (std::is_void_v<T>) {
148 fut.get();
149 promise->set_value();
150 } else {
151 promise->set_value(fut.get());
152 }
153 } catch (...) {
154 promise->set_exception(std::current_exception());
155 }
156 });
157 return future;
158 }
159
163 template <class Fn>
164 requires std::invocable<Fn>
165 [[nodiscard]] auto add_blocking(Fn&& fn) -> Future<std::invoke_result_t<Fn>> {
166 return pool_->submit(std::forward<Fn>(fn));
167 }
168
169 // --- Inspection / lifecycle (delegated to the backing pool) -----------
170 [[nodiscard]] std::uint64_t id() const noexcept {
171 return pool_->id();
172 }
173 [[nodiscard]] std::size_t worker_count() const noexcept {
174 return pool_->worker_count();
175 }
176 [[nodiscard]] std::size_t core_worker_count() const noexcept {
177 return pool_->core_worker_count();
178 }
179 [[nodiscard]] std::size_t max_worker_count() const noexcept {
180 return pool_->max_worker_count();
181 }
182 [[nodiscard]] ThreadPoolStats stats() const {
183 return pool_->stats();
184 }
185 [[nodiscard]] std::vector<ThreadSnapshot> snapshot_workers() const {
186 return pool_->snapshot_workers();
187 }
188 [[nodiscard]] ThreadPool& pool() const noexcept {
189 return *pool_;
190 }
191
192 void shutdown() const {
193 pool_->shutdown();
194 }
195 void shutdown_now() const {
196 pool_->shutdown_now();
197 }
198 void join() const {
199 pool_->join();
200 }
201 [[nodiscard]] bool is_shutting_down() const noexcept {
202 return pool_->is_shutting_down();
203 }
204 [[nodiscard]] bool is_terminated() const noexcept {
205 return pool_->is_terminated();
206 }
207
208 [[nodiscard]] static const comms::DisplayInfo& display_info() {
209 static const comms::DisplayInfo info{
210 .name = "FutureWaitPool",
211 .description = "Elastic executor for blocking std::future waits; scales 1..N under "
212 "wait pressure.",
213 .icon = comms::Icon::from("mdi:timer-sand"),
214 };
215 return info;
216 }
217
218private:
219 std::unique_ptr<ThreadPool> pool_;
220};
221
222} // namespace threadman
Elastic executor for blocking future-waits.
Definition future_wait_pool.hpp:81
FutureWaitPool(FutureWaitPool &&) noexcept=default
FutureWaitPool(const FutureWaitPool &)=delete
Future< T > add(std::future< T > fut)
Adopt a std::future<T>.
Definition future_wait_pool.hpp:117
std::string_view name() const noexcept override
Definition future_wait_pool.hpp:107
std::size_t max_worker_count() const noexcept
Definition future_wait_pool.hpp:179
bool is_shutting_down() const noexcept
Definition future_wait_pool.hpp:201
std::size_t core_worker_count() const noexcept
Definition future_wait_pool.hpp:176
FutureWaitPool(FutureWaitPoolOptions opts={}, ThreadManager *manager=nullptr)
Definition future_wait_pool.hpp:83
std::vector< ThreadSnapshot > snapshot_workers() const
Definition future_wait_pool.hpp:185
void shutdown() const
Definition future_wait_pool.hpp:192
ThreadPool & pool() const noexcept
Definition future_wait_pool.hpp:188
void execute(std::function< void()> task) override
Definition future_wait_pool.hpp:104
auto add_blocking(Fn &&fn) -> Future< std::invoke_result_t< Fn > >
Run an arbitrary blocking callable on the wait pool, returning a Future<R>.
Definition future_wait_pool.hpp:165
void shutdown_now() const
Definition future_wait_pool.hpp:195
FutureWaitPool & operator=(const FutureWaitPool &)=delete
std::uint64_t id() const noexcept
Definition future_wait_pool.hpp:170
ThreadPoolStats stats() const
Definition future_wait_pool.hpp:182
void join() const
Definition future_wait_pool.hpp:198
Future< T > add(std::shared_future< T > fut)
Adopt a std::shared_future<T>.
Definition future_wait_pool.hpp:142
static const comms::DisplayInfo & display_info()
Definition future_wait_pool.hpp:208
bool is_terminated() const noexcept
Definition future_wait_pool.hpp:204
std::size_t worker_count() const noexcept
Definition future_wait_pool.hpp:173
Definition future.hpp:239
Virtual interface for "anything that can run a `std::function<void()>`".
Definition executor.hpp:29
Definition manager.hpp:47
Definition thread_pool.hpp:92
Central feature-gate header for ThreadMan's optional integrations and tunable defaults.
#define THREADMAN_DEFAULT_FUTURE_WAIT_SCALE_UP_WAIT_MS
FutureWaitPool scale-up wait window (ms).
Definition config.hpp:110
#define THREADMAN_DEFAULT_IDLE_TIMEOUT_MS
Idle-timeout window: a non-core worker that sees no work for this long retires (milliseconds).
Definition config.hpp:83
#define THREADMAN_DEFAULT_SCALE_CHECK_INTERVAL_MS
Housekeeper tick interval / per-pool scale evaluation cadence (ms).
Definition config.hpp:93
Executor abstraction — IExecutor (virtual), the Executor concept (duck-typed), and three concrete exe...
threadman::Future<T> (one-shot), threadman::SharedFuture<T> (multi-shot), and threadman::Promise<T> —...
Definition exceptions.hpp:22
Plain value snapshots of the live state of the ThreadMan world — threads, pools, tasks,...
Configuration for a FutureWaitPool.
Definition future_wait_pool.hpp:55
std::chrono::milliseconds scale_up_wait
Scale up once the oldest pending wait has queued for this long while every worker is busy.
Definition future_wait_pool.hpp:62
std::chrono::milliseconds idle_timeout
A non-core worker idle this long retires.
Definition future_wait_pool.hpp:68
std::chrono::milliseconds scale_check_interval
Scale-evaluation cadence (driven by the manager housekeeper).
Definition future_wait_pool.hpp:70
std::size_t min_workers
Definition future_wait_pool.hpp:57
std::size_t max_workers
Definition future_wait_pool.hpp:58
std::string name
Definition future_wait_pool.hpp:56
std::size_t max_pending
Maximum number of pending waits; 0 = unbounded.
Definition future_wait_pool.hpp:74
std::size_t scale_up_when_queue_exceeds
Queue depth above which the pool scales up while saturated.
Definition future_wait_pool.hpp:66
Definition thread_pool.hpp:70
std::size_t scale_up_when_queue_exceeds
Definition thread_pool.hpp:77
std::chrono::milliseconds scale_check_interval
Definition thread_pool.hpp:79
std::string name
Definition thread_pool.hpp:71
std::chrono::milliseconds idle_timeout
Definition thread_pool.hpp:78
std::size_t max_workers
Definition thread_pool.hpp:75
std::size_t min_workers
Definition thread_pool.hpp:74
ThreadManager * manager
Definition thread_pool.hpp:89
std::chrono::milliseconds scale_up_wait
Definition thread_pool.hpp:76
std::size_t max_queue_size
Definition thread_pool.hpp:82
A snapshot of a pool's headline counters.
Definition stats.hpp:60
threadman::ThreadPool — a dynamic-scaling worker-queue executor.