threadman 0.1.0
Header-only C++23 managed threads, dynamic pools, futures, and executors
Loading...
Searching...
No Matches
thread_pool.hpp
Go to the documentation of this file.
1#pragma once
2
22
23#include <threadman/config.hpp>
24#include <threadman/display.hpp>
27#include <threadman/future.hpp>
28#include <threadman/log.hpp>
29#include <threadman/metrics.hpp>
30#include <threadman/stats.hpp>
31#include <threadman/task.hpp>
32#include <threadman/thread.hpp>
33
34#include <commons/display_info.hpp>
35
36#include <algorithm>
37#include <atomic>
38#include <chrono>
39#include <condition_variable>
40#include <cstddef>
41#include <cstdint>
42#include <deque>
43#include <functional>
44#include <memory>
45#include <mutex>
46#include <optional>
47#include <stdexcept>
48#include <stop_token>
49#include <string>
50#include <thread>
51#include <unordered_map>
52#include <utility>
53#include <vector>
54
55namespace threadman {
56
57class ThreadManager; // forward — defined in <threadman/manager.hpp>
58
59namespace detail {
60inline std::uint64_t next_pool_id() noexcept {
61 static std::atomic<std::uint64_t> counter{1};
62 return counter.fetch_add(1, std::memory_order_relaxed);
63}
64inline std::uint64_t next_task_id() noexcept {
65 static std::atomic<std::uint64_t> counter{1};
66 return counter.fetch_add(1, std::memory_order_relaxed);
67}
68} // namespace detail
69
71 std::string name = "tm::pool";
72
73 // Scaling --------------------------------------------------------------
74 std::size_t min_workers = 1;
75 std::size_t max_workers = std::thread::hardware_concurrency();
80
81 // Back-pressure --------------------------------------------------------
82 std::size_t max_queue_size = 0; // 0 = unbounded.
83
84 // Observability --------------------------------------------------------
87
88 // Attachment -----------------------------------------------------------
90};
91
92class ThreadPool final : public IExecutor {
93public:
94 explicit ThreadPool(ThreadPoolOptions opts = {});
95 ~ThreadPool() override;
96
97 ThreadPool(const ThreadPool&) = delete;
98 ThreadPool& operator=(const ThreadPool&) = delete;
101
102 // IExecutor
103 void execute(std::function<void()> task) override;
104 [[nodiscard]] std::string_view name() const noexcept override {
105 return opts_.name;
106 }
107
108 // Native overloads
109 template <class Fn>
110 requires std::invocable<Fn>
112
113 template <class Fn>
114 requires std::invocable<Fn>
115 auto submit_named(std::string task_name, Fn&& f) -> Future<std::invoke_result_t<Fn>>;
116
117 template <class Fn>
118 requires std::invocable<Fn, std::stop_token>
120
121 // Lifecycle
122 void shutdown();
123 void shutdown_now();
124 void join();
125 [[nodiscard]] bool is_shutting_down() const noexcept;
126 [[nodiscard]] bool is_terminated() const noexcept;
127
128 // Inspection
129 [[nodiscard]] std::uint64_t id() const noexcept {
130 return pool_id_;
131 }
132 [[nodiscard]] std::size_t worker_count() const noexcept;
133 [[nodiscard]] std::size_t core_worker_count() const noexcept {
134 return opts_.min_workers;
135 }
136 [[nodiscard]] std::size_t max_worker_count() const noexcept {
137 return opts_.max_workers;
138 }
139 [[nodiscard]] ThreadPoolStats stats() const;
140 [[nodiscard]] std::vector<ThreadSnapshot> snapshot_workers() const;
141 [[nodiscard]] std::vector<TaskSnapshot> snapshot_recent_tasks() const;
142 [[nodiscard]] std::vector<TaskSnapshot> snapshot_queued_tasks() const;
143
144 [[nodiscard]] static const comms::DisplayInfo& display_info() {
145 static const comms::DisplayInfo info{
146 .name = "ThreadPool",
147 .description = "Dynamic-scaling thread pool executor with task tracking.",
148 .icon = comms::Icon::from("mdi:server-network"),
149 };
150 return info;
151 }
152
154 int scale_tick(std::chrono::steady_clock::time_point now);
157 [[nodiscard]] std::vector<StuckTaskEvent>
158 detect_stuck_tasks(std::chrono::steady_clock::time_point now) const;
159
160private:
161 struct QueuedTask {
162 TaskHandle handle;
163 std::function<void(std::stop_token)> work;
164 std::function<void()> on_cancel; // satisfies the future with TaskCancelledError
165 std::chrono::steady_clock::time_point enqueued_at;
166 };
167
168 template <class Fn, class R>
169 Future<R> submit_impl(std::optional<std::string> task_name, Fn&& f);
170 template <class Fn, class R>
171 Future<R> submit_stoppable_impl(std::optional<std::string> task_name, Fn&& f);
172
173 void enqueue(QueuedTask qt);
176 void throw_if_not_running() const {
177 if (state_.load(std::memory_order_acquire) == PoolState::Running) {
178 return;
179 }
180 metrics::pool_submit_rejected().labels(pool_labels_).inc();
181 throw PoolShuttingDownError("ThreadPool '" + opts_.name + "': shutting down or terminated");
182 }
183 void worker_loop(std::stop_token tok, std::uint64_t worker_index, bool is_core);
184 void spawn_worker(bool is_core);
185 void record_recent(TaskHandle h);
186 void register_with_manager();
187 void unregister_from_manager() noexcept;
188
189 ThreadPoolOptions opts_;
190 std::uint64_t pool_id_ = detail::next_pool_id();
191 ThreadManager* manager_ = nullptr;
192
193 mutable std::mutex workers_mtx_;
194 std::vector<std::unique_ptr<ManagedThread>> workers_;
195 std::vector<std::shared_ptr<ManagedThread::ControlBlock>> worker_cbs_; // parallel to workers_
196
197 mutable std::mutex queue_mtx_;
198 std::condition_variable queue_cv_;
199 std::deque<QueuedTask> queue_;
200
201 std::atomic<PoolState> state_{PoolState::Running};
202
203 std::atomic<std::size_t> active_{0}; // worker currently in user work
204 std::atomic<std::size_t> idle_{0}; // worker waiting on queue
205 std::atomic<std::uint64_t> completed_{0};
206 std::atomic<std::uint64_t> failed_{0};
207 std::atomic<std::uint64_t> scale_ups_{0};
208 std::atomic<std::uint64_t> scale_downs_{0};
209 std::atomic<std::uint64_t> total_exec_ns_{0};
210
211 mutable std::mutex recent_mtx_;
212 std::deque<TaskHandle> recent_;
213
214 mutable std::mutex running_mtx_;
215 std::unordered_map<std::uint64_t, TaskHandle> running_; // by task id
216
217 // Cached `{pool=<name>}` label set reused at every metric call site. Only
218 // the Labels are cached, never a labelled child: prom children are pinned to
219 // the adapter present at their creation, so building a fresh child per call
220 // keeps metrics flowing to whatever backend is installed at that moment.
221 prom::Labels pool_labels_{prom::Labels{{"pool", opts_.name}}};
222};
223
224// ---------------------------------------------------------------------------
225// Inline definitions
226// ---------------------------------------------------------------------------
227
228inline ThreadPool::ThreadPool(ThreadPoolOptions opts) : opts_(std::move(opts)) {
229 if (opts_.max_workers == 0) {
230 opts_.max_workers = 1;
231 }
232
233 opts_.min_workers = std::min(opts_.min_workers, opts_.max_workers);
234
235 if (opts_.recent_tasks_capacity == 0) {
236 opts_.recent_tasks_capacity = 1;
237 }
238
239 register_with_manager();
240
241 for (std::size_t i = 0; i < opts_.min_workers; ++i) {
242 spawn_worker(/*is_core=*/true);
243 }
244
245 if (const auto& lg = log::pool()) {
246 lg->debug("pool '{}' created (id={}, core={}, max={})",
247 opts_.name,
248 pool_id_,
249 opts_.min_workers,
250 opts_.max_workers);
251 }
252}
253
255 if (state_.load(std::memory_order_acquire) == PoolState::Running) {
256 try {
257 shutdown();
258 } catch (...) {
259 // dtor must not propagate
260 }
261 }
262 // Join all workers.
263 {
264 std::scoped_lock guard(workers_mtx_);
265 for (const auto& w : workers_) {
266 if (w && w->joinable()) {
267 w->request_stop();
268 }
269 }
270 }
271 {
272 std::unique_lock lk(queue_mtx_);
273 queue_cv_.notify_all();
274 }
275 {
276 std::scoped_lock guard(workers_mtx_);
277 for (const auto& w : workers_) {
278 if (w && w->joinable()) {
279 w->join();
280 }
281 }
282 workers_.clear();
283 worker_cbs_.clear();
284 }
285 state_.store(PoolState::Terminated, std::memory_order_release);
286 unregister_from_manager();
287}
288
289inline void ThreadPool::execute(std::function<void()> task) {
290 if (!task) {
291 return;
292 }
293 throw_if_not_running();
294 const TaskHandle handle(detail::next_task_id(), pool_id_);
295 QueuedTask qt;
296 qt.handle = handle;
297 qt.enqueued_at = std::chrono::steady_clock::now();
298 qt.work = [t = std::move(task)](const std::stop_token&) { t(); };
299 enqueue(std::move(qt));
300}
301
302template <class Fn, class R>
303Future<R> ThreadPool::submit_impl(std::optional<std::string> task_name, Fn&& f) {
304 throw_if_not_running();
305 auto promise = std::make_shared<Promise<R>>();
306 auto future = promise->get_future();
307 const TaskHandle handle(detail::next_task_id(), pool_id_, std::move(task_name));
308
309 auto fn_shared = std::make_shared<std::decay_t<Fn>>(std::forward<Fn>(f));
310
311 QueuedTask qt;
312 qt.handle = handle;
313 qt.enqueued_at = std::chrono::steady_clock::now();
314 qt.work = [promise, fn_shared](const std::stop_token&) {
315 try {
316 if constexpr (std::is_void_v<R>) {
317 (*fn_shared)();
318 promise->set_value();
319 } else {
320 promise->set_value((*fn_shared)());
321 }
322 } catch (...) {
323 promise->set_exception(std::current_exception());
324 }
325 };
326 qt.on_cancel = [promise]() {
327 try {
328 promise->set_exception(
329 std::make_exception_ptr(TaskCancelledError("task cancelled before it could run")));
330 } catch (...) {
331 // promise may already be satisfied via another path
332 }
333 };
334 enqueue(std::move(qt));
335 return future;
336}
337
338template <class Fn, class R>
339Future<R> ThreadPool::submit_stoppable_impl(std::optional<std::string> task_name, Fn&& f) {
340 throw_if_not_running();
341 auto promise = std::make_shared<Promise<R>>();
342 auto future = promise->get_future();
343 const TaskHandle handle(detail::next_task_id(), pool_id_, std::move(task_name));
344
345 auto fn_shared = std::make_shared<std::decay_t<Fn>>(std::forward<Fn>(f));
346
347 QueuedTask qt;
348 qt.handle = handle;
349 qt.enqueued_at = std::chrono::steady_clock::now();
350 // stop_token is taken by value (the std::function<void(std::stop_token)>
351 // signature) and forwarded via std::move into the user callable, which may
352 // accept it by value or by rvalue-ref — const& would break the latter.
353 // NOLINTNEXTLINE(performance-unnecessary-value-param)
354 qt.work = [promise, fn_shared](std::stop_token tok) {
355 try {
356 if constexpr (std::is_void_v<R>) {
357 (*fn_shared)(std::move(tok));
358 promise->set_value();
359 } else {
360 promise->set_value((*fn_shared)(std::move(tok)));
361 }
362 } catch (...) {
363 promise->set_exception(std::current_exception());
364 }
365 };
366 qt.on_cancel = [promise]() {
367 try {
368 promise->set_exception(
369 std::make_exception_ptr(TaskCancelledError("task cancelled before it could run")));
370 } catch (...) {
371 // promise may already be satisfied via another path
372 }
373 };
374 enqueue(std::move(qt));
375 return future;
376}
377
378template <class Fn>
379 requires std::invocable<Fn>
381 using R = std::invoke_result_t<Fn>;
382 return submit_impl<Fn, R>(std::nullopt, std::forward<Fn>(f));
383}
384
385template <class Fn>
386 requires std::invocable<Fn>
387auto ThreadPool::submit_named(std::string task_name, Fn&& f) -> Future<std::invoke_result_t<Fn>> {
388 using R = std::invoke_result_t<Fn>;
389 return submit_impl<Fn, R>(std::move(task_name), std::forward<Fn>(f));
390}
391
392template <class Fn>
393 requires std::invocable<Fn, std::stop_token>
395 using R = std::invoke_result_t<Fn, std::stop_token>;
396 return submit_stoppable_impl<Fn, R>(std::nullopt, std::forward<Fn>(f));
397}
398
399inline void ThreadPool::enqueue(QueuedTask qt) {
400 {
401 std::unique_lock lk(queue_mtx_);
402 if (opts_.max_queue_size != 0 && queue_.size() >= opts_.max_queue_size) {
403 // Emit outside the lock — release first, then count the rejection.
404 lk.unlock();
405 metrics::pool_queue_rejected().labels(pool_labels_).inc();
406 throw PoolQueueFullError("ThreadPool '" + opts_.name + "': queue full");
407 }
408 queue_.push_back(std::move(qt));
409 queue_cv_.notify_one();
410 }
411 metrics::pool_tasks_submitted().labels(pool_labels_).inc();
412}
413
414inline void ThreadPool::spawn_worker(bool is_core) {
415 ManagedThread::Options topts;
416 topts.name = opts_.name + "-worker-" + std::to_string(workers_.size());
417 topts.pool_id = pool_id_;
418 topts.is_core = is_core;
419 topts.manager = manager_;
420
421 const std::uint64_t worker_index = workers_.size();
422 auto worker = std::make_unique<ManagedThread>(
423 std::move(topts), [this, worker_index, is_core](std::stop_token tok) {
424 worker_loop(std::move(tok), worker_index, is_core);
425 });
426 worker_cbs_.push_back(worker->control_block());
427 workers_.push_back(std::move(worker));
428 if (!is_core) {
429 scale_ups_.fetch_add(1, std::memory_order_relaxed);
430 metrics::pool_scale_ups().labels(pool_labels_).inc();
431 }
432}
433
434inline void ThreadPool::worker_loop(std::stop_token tok, std::uint64_t worker_index, bool is_core) {
435 (void)worker_index;
436
437 auto& lg_pool = log::pool();
438 auto* cb_raw = ManagedThread::current();
439
440 // Until stopped, fetch tasks. Track per-iteration last-active time so an
441 // overflow worker can age out via idle_timeout.
442 auto last_active = std::chrono::steady_clock::now();
443
444 while (!tok.stop_requested()) {
445 QueuedTask task;
446 bool got_task = false;
447 bool drain = false;
448 std::uint64_t cancelled_now = 0;
449
450 {
451 std::unique_lock lk(queue_mtx_);
452
453 // Park as idle.
454 if (cb_raw != nullptr) {
455 cb_raw->state.store(ThreadState::Idle, std::memory_order_release);
456 }
457 idle_.fetch_add(1, std::memory_order_relaxed);
458
459 const bool waited = queue_cv_.wait_for(lk, opts_.scale_check_interval, [&] {
460 if (tok.stop_requested()) {
461 return true;
462 }
463 if (!queue_.empty()) {
464 return true;
465 }
466 const auto s = state_.load(std::memory_order_acquire);
467 if (s == PoolState::ShutdownNow) {
468 return true;
469 }
470 if (s == PoolState::ShuttingDown && queue_.empty()) {
471 return true;
472 }
473 return false;
474 });
475 (void)waited;
476
477 idle_.fetch_sub(1, std::memory_order_relaxed);
478 if (cb_raw != nullptr) {
479 cb_raw->state.store(ThreadState::Running, std::memory_order_release);
480 }
481
482 if (const auto s = state_.load(std::memory_order_acquire);
484 // Cancel any remaining queued tasks visible to us.
485 while (!queue_.empty()) {
486 auto cancelled = std::move(queue_.front());
487 queue_.pop_front();
488 cancelled.handle.mark_cancelled(std::make_exception_ptr(
489 TaskCancelledError("task cancelled by shutdown_now")));
490 if (cancelled.on_cancel) {
491 cancelled.on_cancel();
492 }
493 failed_.fetch_add(1, std::memory_order_relaxed);
494 ++cancelled_now;
495 record_recent(std::move(cancelled.handle));
496 }
497 drain = true;
498 } else if (!queue_.empty()) {
499 task = std::move(queue_.front());
500 queue_.pop_front();
501 got_task = true;
502 } else if (s == PoolState::ShuttingDown) {
503 drain = true;
504 }
505 }
506
507 // Emit cancellations outside the queue lock (per metrics policy).
508 if (cancelled_now != 0) {
509 metrics::pool_tasks_cancelled().labels(pool_labels_).inc(cancelled_now);
510 }
511
512 if (drain) {
513 break;
514 }
515
516 if (!got_task) {
517 // Idle-timeout retirement for non-core workers.
518 if (!is_core) {
519 if (const auto now = std::chrono::steady_clock::now();
520 now - last_active >= opts_.idle_timeout) {
521 // Self-retire.
522 if (cb_raw != nullptr) {
523 cb_raw->state.store(ThreadState::Retiring, std::memory_order_release);
524 }
525 scale_downs_.fetch_add(1, std::memory_order_relaxed);
526 metrics::pool_scale_downs().labels(pool_labels_).inc();
527 if (auto& lg = log::pool()) {
528 lg->info("pool '{}' retiring idle non-core worker", opts_.name);
529 }
530 return;
531 }
532 }
533 continue;
534 }
535
536 last_active = std::chrono::steady_clock::now();
537 active_.fetch_add(1, std::memory_order_relaxed);
538 task.handle.mark_running();
539 {
540 std::scoped_lock guard(running_mtx_);
541 running_.emplace(task.handle.id(), task.handle);
542 }
543 const auto t0 = std::chrono::steady_clock::now();
544 const auto queue_wait_ns =
545 std::chrono::duration_cast<std::chrono::nanoseconds>(t0 - task.enqueued_at).count();
546 bool task_failed = false;
547 try {
548 task.work(tok);
549 task.handle.mark_completed();
550 completed_.fetch_add(1, std::memory_order_relaxed);
551 } catch (...) {
552 task.handle.mark_failed(std::current_exception());
553 failed_.fetch_add(1, std::memory_order_relaxed);
554 task_failed = true;
555 }
556 const auto t1 = std::chrono::steady_clock::now();
557 const auto elap = std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0).count();
558 total_exec_ns_.fetch_add(static_cast<std::uint64_t>(elap), std::memory_order_relaxed);
559 active_.fetch_sub(1, std::memory_order_relaxed);
560 {
561 std::scoped_lock guard(running_mtx_);
562 running_.erase(task.handle.id());
563 }
564 record_recent(std::move(task.handle));
565 // Emit metrics only after record_recent so the small window callers
566 // observe right after Future::get() (recent-task snapshot) is not
567 // widened by per-task metric work.
569 .labels(pool_labels_)
570 .observe(static_cast<double>(queue_wait_ns) / 1e9);
572 .labels(pool_labels_)
573 .observe(static_cast<double>(elap) / 1e9);
574 if (task_failed) {
575 metrics::pool_tasks_failed().labels(pool_labels_).inc();
576 } else {
577 metrics::pool_tasks_completed().labels(pool_labels_).inc();
578 }
579 }
580
581 // Drain on shutdown (drain queued tasks with the worker's own loop).
582 std::uint64_t drain_cancelled = 0;
583 while (true) {
584 QueuedTask task;
585 bool got_task = false;
586 {
587 std::unique_lock lk(queue_mtx_);
588 if (state_.load(std::memory_order_acquire) == PoolState::ShutdownNow) {
589 while (!queue_.empty()) {
590 auto cancelled = std::move(queue_.front());
591 queue_.pop_front();
592 cancelled.handle.mark_cancelled(std::make_exception_ptr(
593 TaskCancelledError("task cancelled by shutdown_now")));
594 if (cancelled.on_cancel) {
595 cancelled.on_cancel();
596 }
597 failed_.fetch_add(1, std::memory_order_relaxed);
598 ++drain_cancelled;
599 record_recent(std::move(cancelled.handle));
600 }
601 break;
602 }
603 if (queue_.empty()) {
604 break;
605 }
606 task = std::move(queue_.front());
607 queue_.pop_front();
608 got_task = true;
609 }
610 if (!got_task) {
611 break;
612 }
613 active_.fetch_add(1, std::memory_order_relaxed);
614 task.handle.mark_running();
615 {
616 std::scoped_lock guard(running_mtx_);
617 running_.emplace(task.handle.id(), task.handle);
618 }
619 const auto t0 = std::chrono::steady_clock::now();
620 const auto queue_wait_ns =
621 std::chrono::duration_cast<std::chrono::nanoseconds>(t0 - task.enqueued_at).count();
622 bool task_failed = false;
623 try {
624 task.work(tok);
625 task.handle.mark_completed();
626 completed_.fetch_add(1, std::memory_order_relaxed);
627 } catch (...) {
628 task.handle.mark_failed(std::current_exception());
629 failed_.fetch_add(1, std::memory_order_relaxed);
630 task_failed = true;
631 }
632 const auto t1 = std::chrono::steady_clock::now();
633 const auto elap = std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0).count();
634 total_exec_ns_.fetch_add(static_cast<std::uint64_t>(elap), std::memory_order_relaxed);
635 active_.fetch_sub(1, std::memory_order_relaxed);
636 {
637 std::scoped_lock guard(running_mtx_);
638 running_.erase(task.handle.id());
639 }
640 record_recent(std::move(task.handle));
641 // Emit metrics only after record_recent so the small window callers
642 // observe right after Future::get() (recent-task snapshot) is not
643 // widened by per-task metric work.
645 .labels(pool_labels_)
646 .observe(static_cast<double>(queue_wait_ns) / 1e9);
648 .labels(pool_labels_)
649 .observe(static_cast<double>(elap) / 1e9);
650 if (task_failed) {
651 metrics::pool_tasks_failed().labels(pool_labels_).inc();
652 } else {
653 metrics::pool_tasks_completed().labels(pool_labels_).inc();
654 }
655 }
656
657 // Emit drain-time cancellations outside the queue lock (per metrics policy).
658 if (drain_cancelled != 0) {
659 metrics::pool_tasks_cancelled().labels(pool_labels_).inc(drain_cancelled);
660 }
661
662 (void)lg_pool;
663}
664
665inline void ThreadPool::record_recent(TaskHandle h) {
666 std::scoped_lock guard(recent_mtx_);
667 if (recent_.size() >= opts_.recent_tasks_capacity) {
668 recent_.pop_front();
669 }
670 recent_.push_back(std::move(h));
671}
672
673inline void ThreadPool::shutdown() {
674 if (auto expected = PoolState::Running; !state_.compare_exchange_strong(
675 expected, PoolState::ShuttingDown, std::memory_order_acq_rel)) {
676 return; // already shutting down or terminated
677 }
678 {
679 std::unique_lock lk(queue_mtx_);
680 queue_cv_.notify_all();
681 }
682 if (const auto& lg = log::pool()) {
683 lg->info("pool '{}' shutdown requested", opts_.name);
684 }
685}
686
687inline void ThreadPool::shutdown_now() {
688 auto current = state_.load(std::memory_order_acquire);
689 while (current == PoolState::Running || current == PoolState::ShuttingDown) {
690 if (state_.compare_exchange_weak(
691 current, PoolState::ShutdownNow, std::memory_order_acq_rel)) {
692 break;
693 }
694 }
695 if (state_.load(std::memory_order_acquire) != PoolState::ShutdownNow) {
696 return;
697 }
698 {
699 std::scoped_lock guard(workers_mtx_);
700 for (auto& w : workers_) {
701 if (w) {
702 w->request_stop();
703 }
704 }
705 }
706 {
707 std::unique_lock lk(queue_mtx_);
708 queue_cv_.notify_all();
709 }
710 if (const auto& lg = log::pool()) {
711 lg->info("pool '{}' shutdown_now requested", opts_.name);
712 }
713}
714
715inline void ThreadPool::join() {
716 if (state_.load(std::memory_order_acquire) == PoolState::Running) {
717 throw std::logic_error("ThreadPool::join() called without prior shutdown()");
718 }
719 std::scoped_lock guard(workers_mtx_);
720 for (const auto& w : workers_) {
721 if (w && w->joinable()) {
722 w->join();
723 }
724 }
725 state_.store(PoolState::Terminated, std::memory_order_release);
726}
727
728inline bool ThreadPool::is_shutting_down() const noexcept {
729 const auto s = state_.load(std::memory_order_acquire);
730 return s == PoolState::ShuttingDown || s == PoolState::ShutdownNow ||
731 s == PoolState::Terminated;
732}
733
734inline bool ThreadPool::is_terminated() const noexcept {
735 return state_.load(std::memory_order_acquire) == PoolState::Terminated;
736}
737
738inline std::size_t ThreadPool::worker_count() const noexcept {
739 std::scoped_lock guard(workers_mtx_);
740 return workers_.size();
741}
742
743inline ThreadPoolStats ThreadPool::stats() const {
745 s.pool_id = pool_id_;
746 s.name = opts_.name;
747 s.state = state_.load(std::memory_order_acquire);
748 s.core_workers = opts_.min_workers;
749 s.max_workers = opts_.max_workers;
750 s.active = active_.load(std::memory_order_relaxed);
751 s.idle = idle_.load(std::memory_order_relaxed);
752 s.completed = completed_.load(std::memory_order_relaxed);
753 s.failed = failed_.load(std::memory_order_relaxed);
754 s.scale_ups = scale_ups_.load(std::memory_order_relaxed);
755 s.scale_downs = scale_downs_.load(std::memory_order_relaxed);
756 {
757 std::scoped_lock guard(workers_mtx_);
758 s.workers = workers_.size();
759 }
760 {
761 std::scoped_lock lk(queue_mtx_);
762 s.queued = queue_.size();
763 }
764 if (const auto done = s.completed + s.failed; done > 0) {
765 const auto total = total_exec_ns_.load(std::memory_order_relaxed);
766 s.avg_execution_duration = std::chrono::nanoseconds{total / done};
767 }
768 return s;
769}
770
771inline std::vector<ThreadSnapshot> ThreadPool::snapshot_workers() const {
772 std::scoped_lock guard(workers_mtx_);
773 std::vector<ThreadSnapshot> out;
774 out.reserve(workers_.size());
775 for (const auto& w : workers_) {
776 if (w) {
777 out.push_back(w->snapshot());
778 }
779 }
780 return out;
781}
782
783inline std::vector<TaskSnapshot> ThreadPool::snapshot_recent_tasks() const {
784 std::scoped_lock guard(recent_mtx_);
785 std::vector<TaskSnapshot> out;
786 out.reserve(recent_.size());
787 for (const auto& h : recent_) {
788 out.push_back(h.snapshot());
789 }
790 return out;
791}
792
793inline std::vector<TaskSnapshot> ThreadPool::snapshot_queued_tasks() const {
794 std::scoped_lock lk(queue_mtx_);
795 std::vector<TaskSnapshot> out;
796 out.reserve(queue_.size());
797 for (const auto& q : queue_) {
798 out.push_back(q.handle.snapshot());
799 }
800 return out;
801}
802
803inline int ThreadPool::scale_tick(const std::chrono::steady_clock::time_point now) {
804 if (state_.load(std::memory_order_acquire) != PoolState::Running) {
805 return 0;
806 }
807
808 // Reap workers whose ManagedThread has exited (state Completed/Failed).
809 int reaped = 0;
810 {
811 std::scoped_lock guard(workers_mtx_);
812 for (std::size_t i = 0; i < workers_.size();) {
813 if (const auto st = workers_[i]->control_block()->state.load(std::memory_order_acquire);
814 st == ThreadState::Completed || st == ThreadState::Failed) {
815 workers_[i]->join();
816 workers_.erase(workers_.begin() + static_cast<std::ptrdiff_t>(i));
817 worker_cbs_.erase(worker_cbs_.begin() + static_cast<std::ptrdiff_t>(i));
818 ++reaped;
819 } else {
820 ++i;
821 }
822 }
823 }
824
825 int net = -reaped;
826
827 // Scale-up decision.
828 bool scale_up = false;
829 std::size_t queue_size = 0;
830 std::optional<std::chrono::steady_clock::time_point> oldest_enqueued;
831 {
832 std::scoped_lock lk(queue_mtx_);
833 queue_size = queue_.size();
834 if (!queue_.empty()) {
835 oldest_enqueued = queue_.front().enqueued_at;
836 }
837 }
838 const std::size_t idle_now = idle_.load(std::memory_order_relaxed);
839 std::size_t workers_now = 0;
840 {
841 std::scoped_lock guard(workers_mtx_);
842 workers_now = workers_.size();
843 }
844
845 if (workers_now < opts_.max_workers && idle_now == 0) {
846 const bool queue_pressure = queue_size > opts_.scale_up_when_queue_exceeds;
847 const bool wait_pressure =
848 oldest_enqueued.has_value() && (now - *oldest_enqueued) >= opts_.scale_up_wait;
849 scale_up = queue_pressure || wait_pressure;
850 }
851
852 if (scale_up) {
853 std::scoped_lock guard(workers_mtx_);
854 if (workers_.size() < opts_.max_workers) {
855 spawn_worker(/*is_core=*/false);
856 ++net;
857 if (const auto& lg = log::pool()) {
858 lg->info("pool '{}' scaled up to {} workers (idle=0, queued={})",
859 opts_.name,
860 workers_.size(),
861 queue_size);
862 }
863 }
864 }
865
866 return net;
867}
868
869inline std::vector<StuckTaskEvent>
870ThreadPool::detect_stuck_tasks(std::chrono::steady_clock::time_point now) const {
871 std::vector<StuckTaskEvent> out;
872 std::scoped_lock guard(running_mtx_);
873 for (const auto& h : running_ | std::views::values) {
874 if (const auto snap = h.snapshot();
875 snap.state == TaskState::Running && snap.started_at.has_value()) {
876 if (const auto running_for = now - *snap.started_at;
877 running_for >= opts_.stuck_task_threshold) {
879 ev.task = snap;
880 ev.running_for = std::chrono::duration_cast<std::chrono::nanoseconds>(running_for);
881 ev.pool_id = pool_id_;
882 out.push_back(std::move(ev));
883 }
884 }
885 }
886 return out;
887}
888
889// ---------------------------------------------------------------------------
890// SingleThreadExecutor — convenience wrapper around a ThreadPool with
891// min_workers == max_workers == 1. Owns its pool; tasks run FIFO on a single
892// thread.
893// ---------------------------------------------------------------------------
894
895class SingleThreadExecutor final : public IExecutor {
896public:
897 explicit SingleThreadExecutor(std::string nm = "tm::single") {
899 o.name = std::move(nm);
900 o.min_workers = 1;
901 o.max_workers = 1;
902 pool_ = std::make_unique<ThreadPool>(std::move(o));
903 }
904
905 void execute(std::function<void()> task) override {
906 pool_->execute(std::move(task));
907 }
908 [[nodiscard]] std::string_view name() const noexcept override {
909 return pool_->name();
910 }
911
912 template <class Fn>
913 requires std::invocable<Fn>
915 return pool_->submit(std::forward<Fn>(f));
916 }
917
918 [[nodiscard]] ThreadPool& pool() const noexcept {
919 return *pool_;
920 }
921
922 [[nodiscard]] static const comms::DisplayInfo& display_info() {
923 static const comms::DisplayInfo info{
924 .name = "SingleThreadExecutor",
925 .description = "Executor owning a thread pool of exactly one worker (FIFO).",
926 .icon = comms::Icon::from("mdi:speedometer-slow"),
927 };
928 return info;
929 }
930
931private:
932 std::unique_ptr<ThreadPool> pool_;
933};
934
935} // namespace threadman
Definition future.hpp:239
Virtual interface for "anything that can run a `std::function<void()>`".
Definition executor.hpp:29
static ControlBlock * current() noexcept
Returns a pointer to the ControlBlock of the currently-executing ManagedThread, or nullptr when calle...
Definition thread.hpp:245
Submitting work to a bounded pool whose queue is already at capacity.
Definition exceptions.hpp:71
Definition thread_pool.hpp:895
void execute(std::function< void()> task) override
Definition thread_pool.hpp:905
auto submit(Fn &&f) -> Future< std::invoke_result_t< Fn > >
Definition thread_pool.hpp:914
static const comms::DisplayInfo & display_info()
Definition thread_pool.hpp:922
SingleThreadExecutor(std::string nm="tm::single")
Definition thread_pool.hpp:897
std::string_view name() const noexcept override
Definition thread_pool.hpp:908
ThreadPool & pool() const noexcept
Definition thread_pool.hpp:918
Definition task.hpp:33
Definition manager.hpp:47
Definition thread_pool.hpp:92
bool is_terminated() const noexcept
Definition thread_pool.hpp:734
std::vector< StuckTaskEvent > detect_stuck_tasks(std::chrono::steady_clock::time_point now) const
Driven by ThreadManager housekeeper.
Definition thread_pool.hpp:870
ThreadPool & operator=(ThreadPool &&)=delete
~ThreadPool() override
Definition thread_pool.hpp:254
void shutdown()
Definition thread_pool.hpp:673
std::string_view name() const noexcept override
Definition thread_pool.hpp:104
std::size_t max_worker_count() const noexcept
Definition thread_pool.hpp:136
std::vector< TaskSnapshot > snapshot_recent_tasks() const
Definition thread_pool.hpp:783
static const comms::DisplayInfo & display_info()
Definition thread_pool.hpp:144
std::size_t core_worker_count() const noexcept
Definition thread_pool.hpp:133
void execute(std::function< void()> task) override
Definition thread_pool.hpp:289
auto submit_named(std::string task_name, Fn &&f) -> Future< std::invoke_result_t< Fn > >
Definition thread_pool.hpp:387
std::vector< ThreadSnapshot > snapshot_workers() const
Definition thread_pool.hpp:771
ThreadPool(ThreadPool &&)=delete
std::size_t worker_count() const noexcept
Definition thread_pool.hpp:738
ThreadPool(const ThreadPool &)=delete
auto submit_stoppable(Fn &&f) -> Future< std::invoke_result_t< Fn, std::stop_token > >
Definition thread_pool.hpp:394
void join()
Definition thread_pool.hpp:715
int scale_tick(std::chrono::steady_clock::time_point now)
Driven by ThreadManager housekeeper. Returns net worker change.
Definition thread_pool.hpp:803
ThreadPoolStats stats() const
Definition thread_pool.hpp:743
bool is_shutting_down() const noexcept
Definition thread_pool.hpp:728
ThreadPool(ThreadPoolOptions opts={})
Definition thread_pool.hpp:228
auto submit(Fn &&f) -> Future< std::invoke_result_t< Fn > >
Definition thread_pool.hpp:380
std::vector< TaskSnapshot > snapshot_queued_tasks() const
Definition thread_pool.hpp:793
void shutdown_now()
Definition thread_pool.hpp:687
ThreadPool & operator=(const ThreadPool &)=delete
Central feature-gate header for ThreadMan's optional integrations and tunable defaults.
#define THREADMAN_DEFAULT_SCALE_UP_QUEUE_THRESHOLD
Queue-depth threshold above which the housekeeper considers scaling up.
Definition config.hpp:88
#define THREADMAN_DEFAULT_STUCK_TASK_THRESHOLD_MS
A task that has been Running longer than this is flagged as stuck (ms).
Definition config.hpp:98
#define THREADMAN_RECENT_TASKS_CAPACITY
Per-pool recent-task ring-buffer capacity.
Definition config.hpp:66
#define THREADMAN_DEFAULT_IDLE_TIMEOUT_MS
Idle-timeout window: a non-core worker that sees no work for this long retires (milliseconds).
Definition config.hpp:83
#define THREADMAN_DEFAULT_SCALE_CHECK_INTERVAL_MS
Housekeeper tick interval / per-pool scale evaluation cadence (ms).
Definition config.hpp:93
#define THREADMAN_DEFAULT_SCALE_UP_WAIT_MS
How long the oldest queued task is allowed to wait before forcing a scale-up (milliseconds).
Definition config.hpp:77
Re-export <commons/display_info.hpp> and attach non-intrusive comms::HasDisplayInfo<> specializations...
Typed exception hierarchy thrown by ThreadMan.
Executor abstraction — IExecutor (virtual), the Executor concept (duck-typed), and three concrete exe...
threadman::Future<T> (one-shot), threadman::SharedFuture<T> (multi-shot), and threadman::Promise<T> —...
Cached subsystem loggers for the tm.
Cached subsystem metrics for the tm_* Prometheus/OpenMetrics families, mirroring the tm.
std::uint64_t next_task_id() noexcept
Definition thread_pool.hpp:64
std::uint64_t next_pool_id() noexcept
Definition thread_pool.hpp:60
std::shared_ptr< spdlog::logger > & task()
Logger for tm.task — stuck-task warnings (only when no listener subscribes).
Definition log.hpp:51
std::shared_ptr< spdlog::logger > & future()
Logger for tm.future — future satisfaction and continuation dispatch.
Definition log.hpp:39
std::shared_ptr< spdlog::logger > & pool()
Logger for tm.pool — scaling, shutdown, queue back-pressure.
Definition log.hpp:27
prom::Counter & pool_scale_downs()
Non-core workers retired after idling out.
Definition metrics.hpp:95
prom::Counter & pool_tasks_cancelled()
Tasks cancelled before running (drained by shutdown_now).
Definition metrics.hpp:64
prom::Counter & pool_scale_ups()
Non-core workers spawned by a scale-up decision.
Definition metrics.hpp:88
prom::Counter & pool_tasks_submitted()
Tasks submitted to a thread pool (all submit variants).
Definition metrics.hpp:41
prom::Histogram & pool_task_execution_seconds()
Wall-clock seconds a task body spent executing.
Definition metrics.hpp:130
prom::Counter & pool_submit_rejected()
Submissions rejected because the pool was shutting down or terminated.
Definition metrics.hpp:80
prom::Counter & pool_tasks_completed()
Tasks that ran to completion on a pool worker.
Definition metrics.hpp:49
prom::Histogram & pool_task_queue_wait_seconds()
Seconds a task waited in the queue before a worker picked it up.
Definition metrics.hpp:139
prom::Counter & pool_tasks_failed()
Tasks whose body threw an exception.
Definition metrics.hpp:57
prom::Counter & pool_queue_rejected()
Submissions rejected because a bounded queue was full.
Definition metrics.hpp:72
Definition exceptions.hpp:22
@ Retiring
Pool worker stop-requested; will exit on next loop check.
@ Running
User body is actively executing.
@ Idle
Pool worker sleeping on the queue, no task.
PoolState
Lifecycle state of a ThreadPool.
Definition stats.hpp:40
@ ShuttingDown
shutdown() requested; draining queue, no new submits.
@ Running
Accepting submissions.
@ ShutdownNow
shutdown_now() requested; queue cancelled, workers stopping.
@ Terminated
All workers joined.
Plain value snapshots of the live state of the ThreadMan world — threads, pools, tasks,...
A single stuck-task event published to StuckTaskListeners.
Definition stats.hpp:114
std::uint64_t pool_id
Definition stats.hpp:117
TaskSnapshot task
Definition stats.hpp:115
std::chrono::nanoseconds running_for
Definition stats.hpp:116
Definition thread_pool.hpp:70
std::size_t scale_up_when_queue_exceeds
Definition thread_pool.hpp:77
std::chrono::milliseconds scale_check_interval
Definition thread_pool.hpp:79
std::string name
Definition thread_pool.hpp:71
std::chrono::milliseconds idle_timeout
Definition thread_pool.hpp:78
std::size_t max_workers
Definition thread_pool.hpp:75
std::size_t min_workers
Definition thread_pool.hpp:74
ThreadManager * manager
Definition thread_pool.hpp:89
std::chrono::milliseconds scale_up_wait
Definition thread_pool.hpp:76
std::size_t recent_tasks_capacity
Definition thread_pool.hpp:86
std::chrono::milliseconds stuck_task_threshold
Definition thread_pool.hpp:85
std::size_t max_queue_size
Definition thread_pool.hpp:82
A snapshot of a pool's headline counters.
Definition stats.hpp:60
std::size_t queued
Tasks waiting in the queue.
Definition stats.hpp:69
std::uint64_t completed
Definition stats.hpp:70
std::uint64_t failed
Definition stats.hpp:71
std::size_t workers
Currently live workers.
Definition stats.hpp:64
std::uint64_t scale_ups
Definition stats.hpp:74
std::uint64_t scale_downs
Definition stats.hpp:75
std::uint64_t pool_id
Definition stats.hpp:61
std::size_t idle
Workers currently in Idle.
Definition stats.hpp:68
std::size_t active
Workers currently in Running.
Definition stats.hpp:67
PoolState state
Definition stats.hpp:63
std::size_t max_workers
Definition stats.hpp:66
std::string name
Definition stats.hpp:62
std::size_t core_workers
min_workers.
Definition stats.hpp:65
std::chrono::nanoseconds avg_execution_duration
Rolling average across completed tasks.
Definition stats.hpp:72
threadman::TaskHandle — a ref-counted, shareable handle to a single task's lifecycle (id,...
threadman::ManagedThread — RAII wrapper around std::jthread that publishes lifecycle state via a shar...