34#include <commons/display_info.hpp>
39#include <condition_variable>
51#include <unordered_map>
61 static std::atomic<std::uint64_t> counter{1};
62 return counter.fetch_add(1, std::memory_order_relaxed);
65 static std::atomic<std::uint64_t> counter{1};
66 return counter.fetch_add(1, std::memory_order_relaxed);
71 std::string
name =
"tm::pool";
75 std::size_t
max_workers = std::thread::hardware_concurrency();
103 void execute(std::function<
void()> task)
override;
104 [[nodiscard]] std::string_view
name() const noexcept
override {
110 requires std::invocable<Fn>
114 requires std::invocable<Fn>
118 requires std::invocable<Fn, std::stop_token>
129 [[nodiscard]] std::uint64_t
id() const noexcept {
132 [[nodiscard]] std::size_t
worker_count() const noexcept;
145 static const comms::DisplayInfo info{
146 .name =
"ThreadPool",
147 .description =
"Dynamic-scaling thread pool executor with task tracking.",
148 .icon = comms::Icon::from(
"mdi:server-network"),
154 int scale_tick(std::chrono::steady_clock::time_point now);
157 [[nodiscard]] std::vector<StuckTaskEvent>
163 std::function<void(std::stop_token)> work;
164 std::function<void()> on_cancel;
165 std::chrono::steady_clock::time_point enqueued_at;
168 template <
class Fn,
class R>
169 Future<R> submit_impl(std::optional<std::string> task_name, Fn&& f);
170 template <
class Fn,
class R>
171 Future<R> submit_stoppable_impl(std::optional<std::string> task_name, Fn&& f);
173 void enqueue(QueuedTask qt);
176 void throw_if_not_running()
const {
181 throw PoolShuttingDownError(
"ThreadPool '" + opts_.
name +
"': shutting down or terminated");
183 void worker_loop(std::stop_token tok, std::uint64_t worker_index,
bool is_core);
184 void spawn_worker(
bool is_core);
185 void record_recent(TaskHandle h);
186 void register_with_manager();
187 void unregister_from_manager() noexcept;
189 ThreadPoolOptions opts_;
190 std::uint64_t pool_id_ = detail::next_pool_id();
191 ThreadManager* manager_ =
nullptr;
193 mutable std::mutex workers_mtx_;
194 std::vector<std::unique_ptr<ManagedThread>> workers_;
195 std::vector<std::shared_ptr<ManagedThread::ControlBlock>> worker_cbs_;
197 mutable std::mutex queue_mtx_;
198 std::condition_variable queue_cv_;
199 std::deque<QueuedTask> queue_;
203 std::atomic<std::size_t> active_{0};
204 std::atomic<std::size_t> idle_{0};
205 std::atomic<std::uint64_t> completed_{0};
206 std::atomic<std::uint64_t> failed_{0};
207 std::atomic<std::uint64_t> scale_ups_{0};
208 std::atomic<std::uint64_t> scale_downs_{0};
209 std::atomic<std::uint64_t> total_exec_ns_{0};
211 mutable std::mutex recent_mtx_;
212 std::deque<TaskHandle> recent_;
214 mutable std::mutex running_mtx_;
215 std::unordered_map<std::uint64_t, TaskHandle> running_;
221 prom::Labels pool_labels_{prom::Labels{{
"pool", opts_.
name}}};
239 register_with_manager();
241 for (std::size_t i = 0; i < opts_.
min_workers; ++i) {
246 lg->debug(
"pool '{}' created (id={}, core={}, max={})",
264 std::scoped_lock guard(workers_mtx_);
265 for (
const auto& w : workers_) {
266 if (w && w->joinable()) {
272 std::unique_lock lk(queue_mtx_);
273 queue_cv_.notify_all();
276 std::scoped_lock guard(workers_mtx_);
277 for (
const auto& w : workers_) {
278 if (w && w->joinable()) {
286 unregister_from_manager();
293 throw_if_not_running();
297 qt.enqueued_at = std::chrono::steady_clock::now();
298 qt.work = [t = std::move(task)](
const std::stop_token&) { t(); };
299 enqueue(std::move(qt));
302template <
class Fn,
class R>
303Future<R> ThreadPool::submit_impl(std::optional<std::string> task_name, Fn&& f) {
304 throw_if_not_running();
305 auto promise = std::make_shared<Promise<R>>();
306 auto future = promise->get_future();
309 auto fn_shared = std::make_shared<std::decay_t<Fn>>(std::forward<Fn>(f));
313 qt.enqueued_at = std::chrono::steady_clock::now();
314 qt.work = [promise, fn_shared](
const std::stop_token&) {
316 if constexpr (std::is_void_v<R>) {
318 promise->set_value();
320 promise->set_value((*fn_shared)());
323 promise->set_exception(std::current_exception());
326 qt.on_cancel = [promise]() {
328 promise->set_exception(
329 std::make_exception_ptr(TaskCancelledError(
"task cancelled before it could run")));
334 enqueue(std::move(qt));
338template <
class Fn,
class R>
339Future<R> ThreadPool::submit_stoppable_impl(std::optional<std::string> task_name, Fn&& f) {
340 throw_if_not_running();
341 auto promise = std::make_shared<Promise<R>>();
342 auto future = promise->get_future();
345 auto fn_shared = std::make_shared<std::decay_t<Fn>>(std::forward<Fn>(f));
349 qt.enqueued_at = std::chrono::steady_clock::now();
354 qt.work = [promise, fn_shared](std::stop_token tok) {
356 if constexpr (std::is_void_v<R>) {
357 (*fn_shared)(std::move(tok));
358 promise->set_value();
360 promise->set_value((*fn_shared)(std::move(tok)));
363 promise->set_exception(std::current_exception());
366 qt.on_cancel = [promise]() {
368 promise->set_exception(
369 std::make_exception_ptr(TaskCancelledError(
"task cancelled before it could run")));
374 enqueue(std::move(qt));
379 requires std::invocable<Fn>
381 using R = std::invoke_result_t<Fn>;
382 return submit_impl<Fn, R>(std::nullopt, std::forward<Fn>(f));
386 requires std::invocable<Fn>
388 using R = std::invoke_result_t<Fn>;
389 return submit_impl<Fn, R>(std::move(task_name), std::forward<Fn>(f));
393 requires std::invocable<Fn, std::stop_token>
395 using R = std::invoke_result_t<Fn, std::stop_token>;
396 return submit_stoppable_impl<Fn, R>(std::nullopt, std::forward<Fn>(f));
399inline void ThreadPool::enqueue(QueuedTask qt) {
401 std::unique_lock lk(queue_mtx_);
408 queue_.push_back(std::move(qt));
409 queue_cv_.notify_one();
414inline void ThreadPool::spawn_worker(
bool is_core) {
415 ManagedThread::Options topts;
416 topts.name = opts_.
name +
"-worker-" + std::to_string(workers_.size());
417 topts.pool_id = pool_id_;
418 topts.is_core = is_core;
419 topts.manager = manager_;
421 const std::uint64_t worker_index = workers_.size();
422 auto worker = std::make_unique<ManagedThread>(
423 std::move(topts), [
this, worker_index, is_core](std::stop_token tok) {
424 worker_loop(std::move(tok), worker_index, is_core);
426 worker_cbs_.push_back(worker->control_block());
427 workers_.push_back(std::move(worker));
429 scale_ups_.fetch_add(1, std::memory_order_relaxed);
434inline void ThreadPool::worker_loop(std::stop_token tok, std::uint64_t worker_index,
bool is_core) {
442 auto last_active = std::chrono::steady_clock::now();
444 while (!tok.stop_requested()) {
446 bool got_task =
false;
448 std::uint64_t cancelled_now = 0;
451 std::unique_lock lk(queue_mtx_);
454 if (cb_raw !=
nullptr) {
457 idle_.fetch_add(1, std::memory_order_relaxed);
460 if (tok.stop_requested()) {
463 if (!queue_.empty()) {
466 const auto s = state_.load(std::memory_order_acquire);
477 idle_.fetch_sub(1, std::memory_order_relaxed);
478 if (cb_raw !=
nullptr) {
482 if (
const auto s = state_.load(std::memory_order_acquire);
485 while (!queue_.empty()) {
486 auto cancelled = std::move(queue_.front());
488 cancelled.handle.mark_cancelled(std::make_exception_ptr(
489 TaskCancelledError(
"task cancelled by shutdown_now")));
490 if (cancelled.on_cancel) {
491 cancelled.on_cancel();
493 failed_.fetch_add(1, std::memory_order_relaxed);
495 record_recent(std::move(cancelled.handle));
498 }
else if (!queue_.empty()) {
499 task = std::move(queue_.front());
508 if (cancelled_now != 0) {
519 if (
const auto now = std::chrono::steady_clock::now();
522 if (cb_raw !=
nullptr) {
525 scale_downs_.fetch_add(1, std::memory_order_relaxed);
528 lg->info(
"pool '{}' retiring idle non-core worker", opts_.
name);
536 last_active = std::chrono::steady_clock::now();
537 active_.fetch_add(1, std::memory_order_relaxed);
538 task.handle.mark_running();
540 std::scoped_lock guard(running_mtx_);
541 running_.emplace(
task.handle.id(),
task.handle);
543 const auto t0 = std::chrono::steady_clock::now();
544 const auto queue_wait_ns =
545 std::chrono::duration_cast<std::chrono::nanoseconds>(t0 -
task.enqueued_at).count();
546 bool task_failed =
false;
549 task.handle.mark_completed();
550 completed_.fetch_add(1, std::memory_order_relaxed);
552 task.handle.mark_failed(std::current_exception());
553 failed_.fetch_add(1, std::memory_order_relaxed);
556 const auto t1 = std::chrono::steady_clock::now();
557 const auto elap = std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0).count();
558 total_exec_ns_.fetch_add(
static_cast<std::uint64_t
>(elap), std::memory_order_relaxed);
559 active_.fetch_sub(1, std::memory_order_relaxed);
561 std::scoped_lock guard(running_mtx_);
562 running_.erase(
task.handle.id());
564 record_recent(std::move(
task.handle));
569 .labels(pool_labels_)
570 .observe(
static_cast<double>(queue_wait_ns) / 1e9);
572 .labels(pool_labels_)
573 .observe(
static_cast<double>(elap) / 1e9);
582 std::uint64_t drain_cancelled = 0;
585 bool got_task =
false;
587 std::unique_lock lk(queue_mtx_);
589 while (!queue_.empty()) {
590 auto cancelled = std::move(queue_.front());
592 cancelled.handle.mark_cancelled(std::make_exception_ptr(
593 TaskCancelledError(
"task cancelled by shutdown_now")));
594 if (cancelled.on_cancel) {
595 cancelled.on_cancel();
597 failed_.fetch_add(1, std::memory_order_relaxed);
599 record_recent(std::move(cancelled.handle));
603 if (queue_.empty()) {
606 task = std::move(queue_.front());
613 active_.fetch_add(1, std::memory_order_relaxed);
614 task.handle.mark_running();
616 std::scoped_lock guard(running_mtx_);
617 running_.emplace(
task.handle.id(),
task.handle);
619 const auto t0 = std::chrono::steady_clock::now();
620 const auto queue_wait_ns =
621 std::chrono::duration_cast<std::chrono::nanoseconds>(t0 -
task.enqueued_at).count();
622 bool task_failed =
false;
625 task.handle.mark_completed();
626 completed_.fetch_add(1, std::memory_order_relaxed);
628 task.handle.mark_failed(std::current_exception());
629 failed_.fetch_add(1, std::memory_order_relaxed);
632 const auto t1 = std::chrono::steady_clock::now();
633 const auto elap = std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0).count();
634 total_exec_ns_.fetch_add(
static_cast<std::uint64_t
>(elap), std::memory_order_relaxed);
635 active_.fetch_sub(1, std::memory_order_relaxed);
637 std::scoped_lock guard(running_mtx_);
638 running_.erase(
task.handle.id());
640 record_recent(std::move(
task.handle));
645 .labels(pool_labels_)
646 .observe(
static_cast<double>(queue_wait_ns) / 1e9);
648 .labels(pool_labels_)
649 .observe(
static_cast<double>(elap) / 1e9);
658 if (drain_cancelled != 0) {
665inline void ThreadPool::record_recent(TaskHandle h) {
666 std::scoped_lock guard(recent_mtx_);
667 if (recent_.size() >= opts_.recent_tasks_capacity) {
670 recent_.push_back(std::move(h));
673inline void ThreadPool::shutdown() {
674 if (
auto expected = PoolState::Running; !state_.compare_exchange_strong(
675 expected, PoolState::ShuttingDown, std::memory_order_acq_rel)) {
679 std::unique_lock lk(queue_mtx_);
680 queue_cv_.notify_all();
682 if (
const auto& lg = log::pool()) {
683 lg->info(
"pool '{}' shutdown requested", opts_.name);
687inline void ThreadPool::shutdown_now() {
688 auto current = state_.load(std::memory_order_acquire);
689 while (current == PoolState::Running || current == PoolState::ShuttingDown) {
690 if (state_.compare_exchange_weak(
691 current, PoolState::ShutdownNow, std::memory_order_acq_rel)) {
695 if (state_.load(std::memory_order_acquire) != PoolState::ShutdownNow) {
699 std::scoped_lock guard(workers_mtx_);
700 for (
auto& w : workers_) {
707 std::unique_lock lk(queue_mtx_);
708 queue_cv_.notify_all();
710 if (
const auto& lg = log::pool()) {
711 lg->info(
"pool '{}' shutdown_now requested", opts_.name);
715inline void ThreadPool::join() {
716 if (state_.load(std::memory_order_acquire) == PoolState::Running) {
717 throw std::logic_error(
"ThreadPool::join() called without prior shutdown()");
719 std::scoped_lock guard(workers_mtx_);
720 for (
const auto& w : workers_) {
721 if (w && w->joinable()) {
725 state_.store(PoolState::Terminated, std::memory_order_release);
728inline bool ThreadPool::is_shutting_down() const noexcept {
729 const auto s = state_.load(std::memory_order_acquire);
730 return s == PoolState::ShuttingDown || s == PoolState::ShutdownNow ||
731 s == PoolState::Terminated;
734inline bool ThreadPool::is_terminated() const noexcept {
735 return state_.load(std::memory_order_acquire) == PoolState::Terminated;
738inline std::size_t ThreadPool::worker_count() const noexcept {
739 std::scoped_lock guard(workers_mtx_);
740 return workers_.size();
747 s.
state = state_.load(std::memory_order_acquire);
750 s.
active = active_.load(std::memory_order_relaxed);
751 s.
idle = idle_.load(std::memory_order_relaxed);
752 s.
completed = completed_.load(std::memory_order_relaxed);
753 s.
failed = failed_.load(std::memory_order_relaxed);
754 s.
scale_ups = scale_ups_.load(std::memory_order_relaxed);
755 s.
scale_downs = scale_downs_.load(std::memory_order_relaxed);
757 std::scoped_lock guard(workers_mtx_);
761 std::scoped_lock lk(queue_mtx_);
765 const auto total = total_exec_ns_.load(std::memory_order_relaxed);
771inline std::vector<ThreadSnapshot> ThreadPool::snapshot_workers()
const {
772 std::scoped_lock guard(workers_mtx_);
773 std::vector<ThreadSnapshot> out;
774 out.reserve(workers_.size());
775 for (
const auto& w : workers_) {
777 out.push_back(w->snapshot());
783inline std::vector<TaskSnapshot> ThreadPool::snapshot_recent_tasks()
const {
784 std::scoped_lock guard(recent_mtx_);
785 std::vector<TaskSnapshot> out;
786 out.reserve(recent_.size());
787 for (
const auto& h : recent_) {
788 out.push_back(h.snapshot());
793inline std::vector<TaskSnapshot> ThreadPool::snapshot_queued_tasks()
const {
794 std::scoped_lock lk(queue_mtx_);
795 std::vector<TaskSnapshot> out;
796 out.reserve(queue_.size());
797 for (
const auto& q : queue_) {
798 out.push_back(q.handle.snapshot());
803inline int ThreadPool::scale_tick(
const std::chrono::steady_clock::time_point now) {
804 if (state_.load(std::memory_order_acquire) != PoolState::Running) {
811 std::scoped_lock guard(workers_mtx_);
812 for (std::size_t i = 0; i < workers_.size();) {
813 if (
const auto st = workers_[i]->control_block()->state.load(std::memory_order_acquire);
814 st == ThreadState::Completed || st == ThreadState::Failed) {
816 workers_.erase(workers_.begin() +
static_cast<std::ptrdiff_t
>(i));
817 worker_cbs_.erase(worker_cbs_.begin() +
static_cast<std::ptrdiff_t
>(i));
828 bool scale_up =
false;
829 std::size_t queue_size = 0;
830 std::optional<std::chrono::steady_clock::time_point> oldest_enqueued;
832 std::scoped_lock lk(queue_mtx_);
833 queue_size = queue_.size();
834 if (!queue_.empty()) {
835 oldest_enqueued = queue_.front().enqueued_at;
838 const std::size_t idle_now = idle_.load(std::memory_order_relaxed);
839 std::size_t workers_now = 0;
841 std::scoped_lock guard(workers_mtx_);
842 workers_now = workers_.size();
845 if (workers_now < opts_.max_workers && idle_now == 0) {
846 const bool queue_pressure = queue_size > opts_.scale_up_when_queue_exceeds;
847 const bool wait_pressure =
848 oldest_enqueued.has_value() && (now - *oldest_enqueued) >= opts_.scale_up_wait;
849 scale_up = queue_pressure || wait_pressure;
853 std::scoped_lock guard(workers_mtx_);
854 if (workers_.size() < opts_.max_workers) {
857 if (
const auto& lg = log::pool()) {
858 lg->info(
"pool '{}' scaled up to {} workers (idle=0, queued={})",
869inline std::vector<StuckTaskEvent>
870ThreadPool::detect_stuck_tasks(std::chrono::steady_clock::time_point now)
const {
871 std::vector<StuckTaskEvent> out;
872 std::scoped_lock guard(running_mtx_);
873 for (
const auto& h : running_ | std::views::values) {
874 if (
const auto snap = h.snapshot();
875 snap.state == TaskState::Running && snap.started_at.has_value()) {
876 if (
const auto running_for = now - *snap.started_at;
877 running_for >= opts_.stuck_task_threshold) {
880 ev.
running_for = std::chrono::duration_cast<std::chrono::nanoseconds>(running_for);
882 out.push_back(std::move(ev));
899 o.
name = std::move(nm);
902 pool_ = std::make_unique<ThreadPool>(std::move(o));
905 void execute(std::function<
void()> task)
override {
906 pool_->execute(std::move(task));
908 [[nodiscard]] std::string_view
name() const noexcept
override {
909 return pool_->name();
913 requires std::invocable<Fn>
915 return pool_->submit(std::forward<Fn>(f));
923 static const comms::DisplayInfo info{
924 .name =
"SingleThreadExecutor",
925 .description =
"Executor owning a thread pool of exactly one worker (FIFO).",
926 .icon = comms::Icon::from(
"mdi:speedometer-slow"),
932 std::unique_ptr<ThreadPool> pool_;
Definition future.hpp:239
Virtual interface for "anything that can run a `std::function<void()>`".
Definition executor.hpp:29
static ControlBlock * current() noexcept
Returns a pointer to the ControlBlock of the currently-executing ManagedThread, or nullptr when calle...
Definition thread.hpp:245
Submitting work to a bounded pool whose queue is already at capacity.
Definition exceptions.hpp:71
Definition thread_pool.hpp:895
void execute(std::function< void()> task) override
Definition thread_pool.hpp:905
auto submit(Fn &&f) -> Future< std::invoke_result_t< Fn > >
Definition thread_pool.hpp:914
static const comms::DisplayInfo & display_info()
Definition thread_pool.hpp:922
SingleThreadExecutor(std::string nm="tm::single")
Definition thread_pool.hpp:897
std::string_view name() const noexcept override
Definition thread_pool.hpp:908
ThreadPool & pool() const noexcept
Definition thread_pool.hpp:918
Definition manager.hpp:47
Definition thread_pool.hpp:92
bool is_terminated() const noexcept
Definition thread_pool.hpp:734
std::vector< StuckTaskEvent > detect_stuck_tasks(std::chrono::steady_clock::time_point now) const
Driven by ThreadManager housekeeper.
Definition thread_pool.hpp:870
ThreadPool & operator=(ThreadPool &&)=delete
~ThreadPool() override
Definition thread_pool.hpp:254
void shutdown()
Definition thread_pool.hpp:673
std::string_view name() const noexcept override
Definition thread_pool.hpp:104
std::size_t max_worker_count() const noexcept
Definition thread_pool.hpp:136
std::vector< TaskSnapshot > snapshot_recent_tasks() const
Definition thread_pool.hpp:783
static const comms::DisplayInfo & display_info()
Definition thread_pool.hpp:144
std::size_t core_worker_count() const noexcept
Definition thread_pool.hpp:133
void execute(std::function< void()> task) override
Definition thread_pool.hpp:289
auto submit_named(std::string task_name, Fn &&f) -> Future< std::invoke_result_t< Fn > >
Definition thread_pool.hpp:387
std::vector< ThreadSnapshot > snapshot_workers() const
Definition thread_pool.hpp:771
ThreadPool(ThreadPool &&)=delete
std::size_t worker_count() const noexcept
Definition thread_pool.hpp:738
ThreadPool(const ThreadPool &)=delete
auto submit_stoppable(Fn &&f) -> Future< std::invoke_result_t< Fn, std::stop_token > >
Definition thread_pool.hpp:394
void join()
Definition thread_pool.hpp:715
int scale_tick(std::chrono::steady_clock::time_point now)
Driven by ThreadManager housekeeper. Returns net worker change.
Definition thread_pool.hpp:803
ThreadPoolStats stats() const
Definition thread_pool.hpp:743
bool is_shutting_down() const noexcept
Definition thread_pool.hpp:728
ThreadPool(ThreadPoolOptions opts={})
Definition thread_pool.hpp:228
auto submit(Fn &&f) -> Future< std::invoke_result_t< Fn > >
Definition thread_pool.hpp:380
std::vector< TaskSnapshot > snapshot_queued_tasks() const
Definition thread_pool.hpp:793
void shutdown_now()
Definition thread_pool.hpp:687
ThreadPool & operator=(const ThreadPool &)=delete
Central feature-gate header for ThreadMan's optional integrations and tunable defaults.
#define THREADMAN_DEFAULT_SCALE_UP_QUEUE_THRESHOLD
Queue-depth threshold above which the housekeeper considers scaling up.
Definition config.hpp:88
#define THREADMAN_DEFAULT_STUCK_TASK_THRESHOLD_MS
A task that has been Running longer than this is flagged as stuck (ms).
Definition config.hpp:98
#define THREADMAN_RECENT_TASKS_CAPACITY
Per-pool recent-task ring-buffer capacity.
Definition config.hpp:66
#define THREADMAN_DEFAULT_IDLE_TIMEOUT_MS
Idle-timeout window: a non-core worker that sees no work for this long retires (milliseconds).
Definition config.hpp:83
#define THREADMAN_DEFAULT_SCALE_CHECK_INTERVAL_MS
Housekeeper tick interval / per-pool scale evaluation cadence (ms).
Definition config.hpp:93
#define THREADMAN_DEFAULT_SCALE_UP_WAIT_MS
How long the oldest queued task is allowed to wait before forcing a scale-up (milliseconds).
Definition config.hpp:77
Re-export <commons/display_info.hpp> and attach non-intrusive comms::HasDisplayInfo<> specializations...
Typed exception hierarchy thrown by ThreadMan.
Executor abstraction — IExecutor (virtual), the Executor concept (duck-typed), and three concrete exe...
threadman::Future<T> (one-shot), threadman::SharedFuture<T> (multi-shot), and threadman::Promise<T> —...
Cached subsystem loggers for the tm.
Cached subsystem metrics for the tm_* Prometheus/OpenMetrics families, mirroring the tm.
std::uint64_t next_task_id() noexcept
Definition thread_pool.hpp:64
std::uint64_t next_pool_id() noexcept
Definition thread_pool.hpp:60
std::shared_ptr< spdlog::logger > & task()
Logger for tm.task — stuck-task warnings (only when no listener subscribes).
Definition log.hpp:51
std::shared_ptr< spdlog::logger > & future()
Logger for tm.future — future satisfaction and continuation dispatch.
Definition log.hpp:39
std::shared_ptr< spdlog::logger > & pool()
Logger for tm.pool — scaling, shutdown, queue back-pressure.
Definition log.hpp:27
prom::Counter & pool_scale_downs()
Non-core workers retired after idling out.
Definition metrics.hpp:95
prom::Counter & pool_tasks_cancelled()
Tasks cancelled before running (drained by shutdown_now).
Definition metrics.hpp:64
prom::Counter & pool_scale_ups()
Non-core workers spawned by a scale-up decision.
Definition metrics.hpp:88
prom::Counter & pool_tasks_submitted()
Tasks submitted to a thread pool (all submit variants).
Definition metrics.hpp:41
prom::Histogram & pool_task_execution_seconds()
Wall-clock seconds a task body spent executing.
Definition metrics.hpp:130
prom::Counter & pool_submit_rejected()
Submissions rejected because the pool was shutting down or terminated.
Definition metrics.hpp:80
prom::Counter & pool_tasks_completed()
Tasks that ran to completion on a pool worker.
Definition metrics.hpp:49
prom::Histogram & pool_task_queue_wait_seconds()
Seconds a task waited in the queue before a worker picked it up.
Definition metrics.hpp:139
prom::Counter & pool_tasks_failed()
Tasks whose body threw an exception.
Definition metrics.hpp:57
prom::Counter & pool_queue_rejected()
Submissions rejected because a bounded queue was full.
Definition metrics.hpp:72
Definition exceptions.hpp:22
@ Retiring
Pool worker stop-requested; will exit on next loop check.
@ Running
User body is actively executing.
@ Idle
Pool worker sleeping on the queue, no task.
PoolState
Lifecycle state of a ThreadPool.
Definition stats.hpp:40
@ ShuttingDown
shutdown() requested; draining queue, no new submits.
@ Running
Accepting submissions.
@ ShutdownNow
shutdown_now() requested; queue cancelled, workers stopping.
@ Terminated
All workers joined.
Plain value snapshots of the live state of the ThreadMan world — threads, pools, tasks,...
A single stuck-task event published to StuckTaskListeners.
Definition stats.hpp:114
std::uint64_t pool_id
Definition stats.hpp:117
TaskSnapshot task
Definition stats.hpp:115
std::chrono::nanoseconds running_for
Definition stats.hpp:116
Definition thread_pool.hpp:70
std::size_t scale_up_when_queue_exceeds
Definition thread_pool.hpp:77
std::chrono::milliseconds scale_check_interval
Definition thread_pool.hpp:79
std::string name
Definition thread_pool.hpp:71
std::chrono::milliseconds idle_timeout
Definition thread_pool.hpp:78
std::size_t max_workers
Definition thread_pool.hpp:75
std::size_t min_workers
Definition thread_pool.hpp:74
ThreadManager * manager
Definition thread_pool.hpp:89
std::chrono::milliseconds scale_up_wait
Definition thread_pool.hpp:76
std::size_t recent_tasks_capacity
Definition thread_pool.hpp:86
std::chrono::milliseconds stuck_task_threshold
Definition thread_pool.hpp:85
std::size_t max_queue_size
Definition thread_pool.hpp:82
A snapshot of a pool's headline counters.
Definition stats.hpp:60
std::size_t queued
Tasks waiting in the queue.
Definition stats.hpp:69
std::uint64_t completed
Definition stats.hpp:70
std::uint64_t failed
Definition stats.hpp:71
std::size_t workers
Currently live workers.
Definition stats.hpp:64
std::uint64_t scale_ups
Definition stats.hpp:74
std::uint64_t scale_downs
Definition stats.hpp:75
std::uint64_t pool_id
Definition stats.hpp:61
std::size_t idle
Workers currently in Idle.
Definition stats.hpp:68
std::size_t active
Workers currently in Running.
Definition stats.hpp:67
PoolState state
Definition stats.hpp:63
std::size_t max_workers
Definition stats.hpp:66
std::string name
Definition stats.hpp:62
std::size_t core_workers
min_workers.
Definition stats.hpp:65
std::chrono::nanoseconds avg_execution_duration
Rolling average across completed tasks.
Definition stats.hpp:72
threadman::TaskHandle — a ref-counted, shareable handle to a single task's lifecycle (id,...
threadman::ManagedThread — RAII wrapper around std::jthread that publishes lifecycle state via a shar...