threadman 0.1.0
Header-only C++23 managed threads, dynamic pools, futures, and executors
Loading...
Searching...
No Matches
manager.hpp
Go to the documentation of this file.
1#pragma once
2
14
15#include <threadman/config.hpp>
16#include <threadman/display.hpp>
19#include <threadman/log.hpp>
20#include <threadman/metrics.hpp>
21#include <threadman/stats.hpp>
22#include <threadman/task.hpp>
23#include <threadman/thread.hpp>
25
26#include <commons/display_info.hpp>
27
28#include <atomic>
29#include <chrono>
30#include <condition_variable>
31#include <cstdint>
32#include <functional>
33#include <memory>
34#include <mutex>
35#include <optional>
36#include <shared_mutex>
37#include <string>
38#include <string_view>
39#include <thread>
40#include <unordered_map>
41#include <unordered_set>
42#include <utility>
43#include <vector>
44
45namespace threadman {
46
48public:
49 struct Options {
50 std::string name = "tm::manager";
55 std::size_t default_pool_min_workers = 1;
56 std::size_t default_pool_max_workers = std::thread::hardware_concurrency();
57 };
58
59 using SummaryListener = std::function<void(const ManagerSummary&)>;
60 using StuckTaskListener = std::function<void(const StuckTaskEvent&)>;
61
65 public:
66 SubscriptionToken() noexcept = default;
68 SubscriptionToken& operator=(const SubscriptionToken&) = delete;
70 : mgr_(other.mgr_), id_(other.id_), kind_(other.kind_) {
71 other.mgr_ = nullptr;
72 other.id_ = 0;
73 }
75 if (this != &other) {
76 release();
77 mgr_ = other.mgr_;
78 id_ = other.id_;
79 kind_ = other.kind_;
80 other.mgr_ = nullptr;
81 other.id_ = 0;
82 }
83 return *this;
84 }
86 release();
87 }
88
89 [[nodiscard]] bool valid() const noexcept {
90 return mgr_ != nullptr && id_ != 0;
91 }
92 [[nodiscard]] std::uint64_t id() const noexcept {
93 return id_;
94 }
95
96 private:
97 friend class ThreadManager;
98 enum class Kind : std::uint8_t { Summary, StuckTask };
99 SubscriptionToken(ThreadManager* m, const std::uint64_t i, const Kind k) noexcept
100 : mgr_(m), id_(i), kind_(k) {}
101
102 void release() noexcept {
103 if (mgr_ != nullptr && id_ != 0) {
104 mgr_->unsubscribe(id_, kind_);
105 }
106 mgr_ = nullptr;
107 id_ = 0;
108 }
109
110 ThreadManager* mgr_ = nullptr;
111 std::uint64_t id_ = 0;
112 Kind kind_ = Kind::Summary;
113 };
114
116 explicit ThreadManager(Options opts) : opts_(std::move(opts)) {
117 if (opts_.start_housekeeper_eagerly) {
119 }
120 }
123 if (default_pool_) {
124 try {
125 default_pool_->shutdown();
126 } catch (...) {
127 // dtor must not propagate
128 }
129 default_pool_.reset();
130 }
131 }
132
133 ThreadManager(const ThreadManager&) = delete;
137
138 [[nodiscard]] static ThreadManager& instance() noexcept {
139 // Construct the metrics subsystem (scope, all metric handles, prom's
140 // adapter cell) *before* the singleton, so reverse-order static
141 // destruction tears the singleton (and joins its housekeeper) down
142 // first, while every metric it publishes is still alive.
144 static ThreadManager inst;
145 return inst;
146 }
147
148 // --- Registration (called by ManagedThread / ThreadPool) ---------------
149 void register_thread(const std::shared_ptr<ManagedThread::ControlBlock>& cb) {
150 if (!cb) {
151 return;
152 }
153 {
154 std::unique_lock guard(threads_mtx_);
155 threads_.insert_or_assign(cb->id, cb);
156 }
157 total_registered_.fetch_add(1, std::memory_order_relaxed);
159 }
160 void unregister_thread(const std::uint64_t thread_id) noexcept {
161 std::unique_lock guard(threads_mtx_);
162 threads_.erase(thread_id);
163 }
165 {
166 std::scoped_lock guard(pools_mtx_);
167 pools_.push_back(&p);
168 }
169 // Lazy housekeeper start on first pool registration.
171 }
172 void unregister_pool(const std::uint64_t pool_id) noexcept {
173 std::scoped_lock guard(pools_mtx_);
174 for (std::size_t i = 0; i < pools_.size(); ++i) {
175 if ((pools_[i] != nullptr) && pools_[i]->id() == pool_id) {
176 pools_.erase(pools_.begin() + static_cast<std::ptrdiff_t>(i));
177 return;
178 }
179 }
180 }
181
182 // --- Snapshots ---------------------------------------------------------
183 [[nodiscard]] std::vector<ThreadSnapshot> snapshot_threads() const {
184 std::vector<ThreadSnapshot> out;
185 std::shared_lock guard(threads_mtx_);
186 out.reserve(threads_.size());
187 for (const auto& weak : threads_ | std::views::values) {
188 if (const auto cb = weak.lock()) {
189 ThreadSnapshot snap;
190 snap.state = cb->state.load(std::memory_order_acquire);
191 snap.failed = (snap.state == ThreadState::Failed);
192 std::scoped_lock lk(cb->mtx);
193 snap.id = cb->id;
194 snap.name = cb->name;
195 snap.native_id = cb->native_id;
196 snap.pool_id = cb->pool_id;
197 snap.is_core = cb->is_core;
198 if (cb->started_at.time_since_epoch().count() != 0) {
199 const auto end = cb->ended_at.time_since_epoch().count() == 0
200 ? std::chrono::steady_clock::now()
201 : cb->ended_at;
202 snap.run_duration = end - cb->started_at;
203 }
204 out.push_back(std::move(snap));
205 }
206 }
207 return out;
208 }
209
210 [[nodiscard]] std::optional<ThreadSnapshot> find_by_id(const std::uint64_t id) const {
211 std::shared_lock guard(threads_mtx_);
212 if (const auto it = threads_.find(id); it != threads_.end()) {
213 if (const auto cb = it->second.lock()) {
214 ThreadSnapshot snap;
215 snap.state = cb->state.load(std::memory_order_acquire);
216 snap.failed = (snap.state == ThreadState::Failed);
217 std::scoped_lock lk(cb->mtx);
218 snap.id = cb->id;
219 snap.name = cb->name;
220 snap.native_id = cb->native_id;
221 snap.pool_id = cb->pool_id;
222 snap.is_core = cb->is_core;
223 if (cb->started_at.time_since_epoch().count() != 0) {
224 const auto end = cb->ended_at.time_since_epoch().count() == 0
225 ? std::chrono::steady_clock::now()
226 : cb->ended_at;
227 snap.run_duration = end - cb->started_at;
228 }
229 return snap;
230 }
231 }
232 return std::nullopt;
233 }
234
235 [[nodiscard]] std::vector<ThreadSnapshot> find_by_name(const std::string_view name) const {
236 std::vector<ThreadSnapshot> out;
237 std::shared_lock guard(threads_mtx_);
238 for (const auto& weak : threads_ | std::views::values) {
239 const auto cb = weak.lock();
240 if (!cb) {
241 continue;
242 }
243 std::scoped_lock lk(cb->mtx);
244 if (cb->name == name) {
245 ThreadSnapshot snap;
246 snap.state = cb->state.load(std::memory_order_acquire);
247 snap.failed = (snap.state == ThreadState::Failed);
248 snap.id = cb->id;
249 snap.name = cb->name;
250 snap.native_id = cb->native_id;
251 snap.pool_id = cb->pool_id;
252 snap.is_core = cb->is_core;
253 if (cb->started_at.time_since_epoch().count() != 0) {
254 const auto end = cb->ended_at.time_since_epoch().count() == 0
255 ? std::chrono::steady_clock::now()
256 : cb->ended_at;
257 snap.run_duration = end - cb->started_at;
258 }
259 out.push_back(std::move(snap));
260 }
261 }
262 return out;
263 }
264
265 [[nodiscard]] std::vector<ThreadPoolStats> snapshot_pools() const {
266 std::vector<ThreadPoolStats> out;
267 std::scoped_lock guard(pools_mtx_);
268 out.reserve(pools_.size());
269 for (const auto* p : pools_) {
270 if (p != nullptr) {
271 out.push_back(p->stats());
272 }
273 }
274 return out;
275 }
276
277 [[nodiscard]] ManagerSummary build_summary() const {
278 ManagerSummary summary;
279 summary.wall_clock = std::chrono::system_clock::now();
280 summary.per_pool_stats = snapshot_pools();
281 summary.total_pools = summary.per_pool_stats.size();
282 for (const auto& s : summary.per_pool_stats) {
283 summary.total_live_workers += s.workers;
284 summary.total_queued += s.queued;
285 summary.total_completed += s.completed;
286 summary.total_failed += s.failed;
287 }
288 {
289 std::shared_lock g(threads_mtx_);
290 for (const auto& w : threads_ | std::views::values) {
291 if (w.lock()) {
292 ++summary.live_threads;
293 }
294 }
295 }
296 return summary;
297 }
298
299 // --- Future-wait pool --------------------------------------------------
300
305 return FutureWaitPool{std::move(opts), this};
306 }
307
308 // --- Default pool ------------------------------------------------------
309 [[nodiscard]] ThreadPool& default_pool() {
310 std::call_once(default_pool_once_, [this] {
312 o.name = opts_.default_pool_name;
315 o.manager = this;
316 default_pool_ = std::make_unique<ThreadPool>(std::move(o));
317 });
318 return *default_pool_;
319 }
320
321 // --- Subscriptions ----------------------------------------------------
323 const auto id = next_sub_id_.fetch_add(1, std::memory_order_relaxed);
324 {
325 std::scoped_lock guard(subs_mtx_);
326 summary_subs_.emplace_back(id, std::move(cb));
327 }
328 return SubscriptionToken{this, id, SubscriptionToken::Kind::Summary};
329 }
331 const auto id = next_sub_id_.fetch_add(1, std::memory_order_relaxed);
332 {
333 std::scoped_lock guard(subs_mtx_);
334 stuck_subs_.emplace_back(id, std::move(cb));
335 }
336 return SubscriptionToken{this, id, SubscriptionToken::Kind::StuckTask};
337 }
338
339 // --- Housekeeper control ----------------------------------------------
341 if (bool expected = false; !housekeeper_started_.compare_exchange_strong(
342 expected, true, std::memory_order_acq_rel)) {
343 return;
344 }
346 topts.name = opts_.name + ".housekeeper";
347 topts.is_core = true;
348 topts.manager = this;
349 housekeeper_.emplace(std::move(topts),
350 [this](std::stop_token tok) { housekeeper_loop(std::move(tok)); });
351 if (const auto& lg = log::manager()) {
352 lg->info("started housekeeper (interval={}ms, summary={}ms)",
353 opts_.housekeeping_interval.count(),
354 opts_.summary_interval.count());
355 }
356 }
357 void stop_housekeeper() noexcept {
358 if (!housekeeper_started_.load(std::memory_order_acquire)) {
359 return;
360 }
361 if (housekeeper_.has_value()) {
362 housekeeper_->request_stop();
363 {
364 std::scoped_lock guard(hk_mtx_);
365 hk_cv_.notify_all();
366 }
367 if (housekeeper_->joinable()) {
368 housekeeper_->join();
369 }
370 housekeeper_.reset();
371 }
372 housekeeper_started_.store(false, std::memory_order_release);
373 }
374 [[nodiscard]] bool housekeeper_running() const noexcept {
375 return housekeeper_started_.load(std::memory_order_acquire);
376 }
377
382 [[nodiscard]] std::size_t stuck_task_count() const noexcept {
383 return known_stuck_size_.load(std::memory_order_relaxed);
384 }
385
386 [[nodiscard]] static const comms::DisplayInfo& display_info() {
387 static const comms::DisplayInfo info{
388 .name = "ThreadManager",
389 .description = "Central registry of managed threads and pools with periodic summary "
390 "and stuck-task event dispatch.",
391 .icon = comms::Icon::from("mdi:account-supervisor"),
392 };
393 return info;
394 }
395
396private:
397 void unsubscribe(const std::uint64_t id, const SubscriptionToken::Kind kind) noexcept {
398 std::scoped_lock guard(subs_mtx_);
399 if (kind == SubscriptionToken::Kind::Summary) {
400 for (std::size_t i = 0; i < summary_subs_.size(); ++i) {
401 if (summary_subs_[i].first == id) {
402 summary_subs_.erase(summary_subs_.begin() + static_cast<std::ptrdiff_t>(i));
403 return;
404 }
405 }
406 } else {
407 for (std::size_t i = 0; i < stuck_subs_.size(); ++i) {
408 if (stuck_subs_[i].first == id) {
409 stuck_subs_.erase(stuck_subs_.begin() + static_cast<std::ptrdiff_t>(i));
410 return;
411 }
412 }
413 }
414 }
415
416 void prune_expired_threads() {
417 std::unique_lock guard(threads_mtx_);
418 for (auto it = threads_.begin(); it != threads_.end();) {
419 if (it->second.expired()) {
420 it = threads_.erase(it);
421 } else {
422 ++it;
423 }
424 }
425 }
426
427 void housekeeper_loop(std::stop_token tok) {
428 last_summary_at_ = std::chrono::steady_clock::now();
429 std::unordered_set<std::uint64_t> known_stuck;
430
431 while (!tok.stop_requested()) {
432 {
433 std::unique_lock lk(hk_mtx_);
434 hk_cv_.wait_for(
435 lk, opts_.housekeeping_interval, [&] { return tok.stop_requested(); });
436 }
437 if (tok.stop_requested()) {
438 break;
439 }
440
441 const auto now = std::chrono::steady_clock::now();
442
443 // 1) Prune expired weak refs.
444 prune_expired_threads();
445
446 // 2) Pool scaling.
447 {
448 std::scoped_lock guard(pools_mtx_);
449 for (auto* p : pools_) {
450 if (p != nullptr) {
451 try {
452 p->scale_tick(now);
453 } catch (const std::exception& e) {
454 if (auto& lg = log::manager()) {
455 lg->warn("scale_tick on pool '{}' threw: {}", p->name(), e.what());
456 }
457 } catch (...) {
458 // ignore
459 }
460 }
461 }
462 }
463
464 // 3) Stuck-task detection.
465 std::vector<StuckTaskEvent> all_stuck;
466 {
467 std::scoped_lock guard(pools_mtx_);
468 for (auto* p : pools_) {
469 if (p == nullptr) {
470 continue;
471 }
472 for (auto stuck = p->detect_stuck_tasks(now); auto& s : stuck) {
473 all_stuck.push_back(std::move(s));
474 }
475 }
476 }
477
478 // Dispatch new stuck-task events.
479 std::vector<StuckTaskListener> stuck_listeners_copy;
480 {
481 std::scoped_lock guard(subs_mtx_);
482 stuck_listeners_copy.reserve(stuck_subs_.size());
483 for (const auto& cb : stuck_subs_ | std::views::values) {
484 stuck_listeners_copy.push_back(cb);
485 }
486 }
487 for (const auto& ev : all_stuck) {
488 if (known_stuck.insert(ev.task.id).second) {
490 .labels(prom::Labels{{"pool_id", std::to_string(ev.pool_id)}})
491 .inc();
492 if (stuck_listeners_copy.empty()) {
493 if (auto& lg = log::task()) {
494 lg->warn("task {} on pool {} stuck for {}ms",
495 ev.task.id,
496 ev.pool_id,
497 std::chrono::duration_cast<std::chrono::milliseconds>(
498 ev.running_for)
499 .count());
500 }
501 } else {
502 for (const auto& cb : stuck_listeners_copy) {
503 try {
504 cb(ev);
505 } catch (const std::exception& e) {
506 if (auto& lg = log::manager()) {
507 lg->warn("stuck-task listener threw: {}", e.what());
508 }
509 } catch (...) {
510 // ignore
511 }
512 }
513 }
514 }
515 }
516
517 // Reconcile the dedup set against tasks that are still stuck so it
518 // cannot grow without bound over the process lifetime: drop any id
519 // that no longer appears in this tick's stuck set. A task's
520 // run-duration only increases while it occupies a worker, so a
521 // still-stuck task stays in every tick's set and never re-fires;
522 // ids are only forgotten once their task leaves the running map.
523 {
524 std::unordered_set<std::uint64_t> still_stuck;
525 still_stuck.reserve(all_stuck.size());
526 for (const auto& ev : all_stuck) {
527 still_stuck.insert(ev.task.id);
528 }
529 std::erase_if(known_stuck,
530 [&](const std::uint64_t id) { return !still_stuck.contains(id); });
531 known_stuck_size_.store(known_stuck.size(), std::memory_order_relaxed);
532 metrics::tasks_stuck_current().set(known_stuck.size());
533 }
534
535 // 4) Periodic summary publish.
536 if (now - last_summary_at_ >= opts_.summary_interval) {
537 last_summary_at_ = now;
538 auto summary = build_summary();
539
540 // Publish per-pool gauges from the freshly-built summary.
541 for (const auto& s : summary.per_pool_stats) {
542 const auto pool_labels = prom::Labels{{"pool", s.name}};
543 metrics::pool_queue_depth().labels(pool_labels).set(s.queued);
544 metrics::pool_workers().labels(pool_labels).set(s.workers);
545 metrics::pool_workers_active().labels(pool_labels).set(s.active);
546 metrics::pool_workers_idle().labels(pool_labels).set(s.idle);
547 }
548 metrics::manager_pools_live().set(summary.total_pools);
550
551 summary.stuck_tasks.reserve(all_stuck.size());
552 for (const auto& ev : all_stuck) {
553 summary.stuck_tasks.push_back(ev.task);
554 }
555 std::vector<SummaryListener> summary_listeners_copy;
556 {
557 std::scoped_lock guard(subs_mtx_);
558 summary_listeners_copy.reserve(summary_subs_.size());
559 for (const auto& cb : summary_subs_ | std::views::values) {
560 summary_listeners_copy.push_back(cb);
561 }
562 }
563 for (const auto& cb : summary_listeners_copy) {
564 try {
565 cb(summary);
566 } catch (const std::exception& e) {
567 if (auto& lg = log::manager()) {
568 lg->warn("summary listener threw: {}", e.what());
569 }
570 } catch (...) {
571 // ignore
572 }
573 }
574 }
575
576 if (auto& lg = log::manager(); lg && lg->should_log(spdlog::level::debug)) {
577 std::size_t threads_n = 0;
578 std::size_t pools_n = 0;
579 {
580 std::shared_lock g(threads_mtx_);
581 threads_n = threads_.size();
582 }
583 {
584 std::scoped_lock g(pools_mtx_);
585 pools_n = pools_.size();
586 }
587 lg->debug("housekeeper tick: {} threads, {} pools", threads_n, pools_n);
588 }
589 }
590 }
591
592 Options opts_;
593
594 mutable std::shared_mutex threads_mtx_;
595 std::unordered_map<std::uint64_t, std::weak_ptr<ManagedThread::ControlBlock>> threads_;
596 std::atomic<std::uint64_t> total_registered_{0};
597
598 mutable std::mutex pools_mtx_;
599 std::vector<ThreadPool*> pools_;
600
601 mutable std::mutex subs_mtx_;
602 std::vector<std::pair<std::uint64_t, SummaryListener>> summary_subs_;
603 std::vector<std::pair<std::uint64_t, StuckTaskListener>> stuck_subs_;
604 std::atomic<std::uint64_t> next_sub_id_{1};
605
606 std::once_flag default_pool_once_;
607 std::unique_ptr<ThreadPool> default_pool_;
608
609 std::optional<ManagedThread> housekeeper_;
610 std::atomic<bool> housekeeper_started_{false};
611 std::chrono::steady_clock::time_point last_summary_at_;
612 std::atomic<std::size_t> known_stuck_size_{0};
613 mutable std::mutex hk_mtx_;
614 std::condition_variable hk_cv_;
615};
616
617// ---------------------------------------------------------------------------
618// Deferred manager-coupled definitions on ManagedThread / ThreadPool that
619// could not be inlined earlier because ThreadManager was incomplete.
620// ---------------------------------------------------------------------------
621
622inline void ManagedThread::register_with_manager() {
623 auto* m = (opts_manager_ != nullptr) ? opts_manager_ : &ThreadManager::instance();
624 m->register_thread(cb_);
625}
626
628 if (!cb_) {
629 return;
630 }
631 auto* m = (opts_manager_ != nullptr) ? opts_manager_ : &ThreadManager::instance();
632 m->unregister_thread(cb_->id);
633}
634
635inline void ThreadPool::register_with_manager() {
636 manager_ = (opts_.manager != nullptr) ? opts_.manager : &ThreadManager::instance();
637 manager_->register_pool(*this);
638}
639
640inline void ThreadPool::unregister_from_manager() noexcept {
641 if (manager_ != nullptr) {
642 manager_->unregister_pool(pool_id_);
643 }
644}
645
646} // namespace threadman
Elastic executor for blocking future-waits.
Definition future_wait_pool.hpp:81
void unregister_from_manager() noexcept
Definition manager.hpp:627
RAII handle returned by subscribe_*.
Definition manager.hpp:64
~SubscriptionToken()
Definition manager.hpp:85
friend class ThreadManager
Definition manager.hpp:97
bool valid() const noexcept
Definition manager.hpp:89
std::uint64_t id() const noexcept
Definition manager.hpp:92
SubscriptionToken & operator=(SubscriptionToken &&other) noexcept
Definition manager.hpp:74
Definition manager.hpp:47
void unregister_thread(const std::uint64_t thread_id) noexcept
Definition manager.hpp:160
SubscriptionToken subscribe_stuck_tasks(StuckTaskListener cb)
Definition manager.hpp:330
std::vector< ThreadPoolStats > snapshot_pools() const
Definition manager.hpp:265
ThreadManager(Options opts)
Definition manager.hpp:116
ThreadManager(ThreadManager &&)=delete
ThreadManager()
Definition manager.hpp:115
ManagerSummary build_summary() const
Definition manager.hpp:277
void register_thread(const std::shared_ptr< ManagedThread::ControlBlock > &cb)
Definition manager.hpp:149
void register_pool(ThreadPool &p)
Definition manager.hpp:164
ThreadManager & operator=(ThreadManager &&)=delete
static const comms::DisplayInfo & display_info()
Definition manager.hpp:386
void unregister_pool(const std::uint64_t pool_id) noexcept
Definition manager.hpp:172
FutureWaitPool make_future_wait_pool(FutureWaitPoolOptions opts={})
Create a FutureWaitPool attached to this manager.
Definition manager.hpp:304
SubscriptionToken subscribe_summary(SummaryListener cb)
Definition manager.hpp:322
std::function< void(const ManagerSummary &)> SummaryListener
Definition manager.hpp:59
~ThreadManager()
Definition manager.hpp:121
ThreadPool & default_pool()
Definition manager.hpp:309
std::optional< ThreadSnapshot > find_by_id(const std::uint64_t id) const
Definition manager.hpp:210
static ThreadManager & instance() noexcept
Definition manager.hpp:138
std::vector< ThreadSnapshot > snapshot_threads() const
Definition manager.hpp:183
std::size_t stuck_task_count() const noexcept
Number of tasks currently flagged as stuck (and already reported) by the housekeeper.
Definition manager.hpp:382
bool housekeeper_running() const noexcept
Definition manager.hpp:374
std::vector< ThreadSnapshot > find_by_name(const std::string_view name) const
Definition manager.hpp:235
ThreadManager(const ThreadManager &)=delete
std::function< void(const StuckTaskEvent &)> StuckTaskListener
Definition manager.hpp:60
void stop_housekeeper() noexcept
Definition manager.hpp:357
void start_housekeeper()
Definition manager.hpp:340
ThreadManager & operator=(const ThreadManager &)=delete
Definition thread_pool.hpp:92
Central feature-gate header for ThreadMan's optional integrations and tunable defaults.
#define THREADMAN_DEFAULT_SUMMARY_INTERVAL_MS
Periodic ManagerSummary cadence — listener publish interval (ms).
Definition config.hpp:103
#define THREADMAN_DEFAULT_POOL_NAME
Name used by ThreadManager::default_pool().
Definition config.hpp:71
#define THREADMAN_DEFAULT_SCALE_CHECK_INTERVAL_MS
Housekeeper tick interval / per-pool scale evaluation cadence (ms).
Definition config.hpp:93
Re-export <commons/display_info.hpp> and attach non-intrusive comms::HasDisplayInfo<> specializations...
Typed exception hierarchy thrown by ThreadMan.
threadman::FutureWaitPool — a dynamically-scaling executor specialized for blocking waits,...
Cached subsystem loggers for the tm.
Cached subsystem metrics for the tm_* Prometheus/OpenMetrics families, mirroring the tm.
std::shared_ptr< spdlog::logger > & task()
Logger for tm.task — stuck-task warnings (only when no listener subscribes).
Definition log.hpp:51
std::shared_ptr< spdlog::logger > & manager()
Logger for tm.manager — housekeeper lifecycle, listener faults.
Definition log.hpp:45
void warm_up()
Eagerly construct the scope and every metric handle (and, transitively, prom's process-wide adapter c...
Definition metrics.hpp:278
prom::Counter & tasks_stuck_detected()
Distinct tasks newly detected as stuck (running past the threshold).
Definition metrics.hpp:212
prom::Counter & manager_threads_registered()
Thread control blocks registered with the manager.
Definition metrics.hpp:185
prom::Gauge & tasks_stuck_current()
Tasks currently flagged as stuck (set from the housekeeper tick).
Definition metrics.hpp:220
prom::Counter & manager_summaries_published()
Periodic ManagerSummary payloads published to listeners.
Definition metrics.hpp:193
prom::Gauge & pool_queue_depth()
Tasks currently waiting in a pool's queue (set from the summary tick).
Definition metrics.hpp:102
prom::Gauge & pool_workers()
Live workers in a pool (set from the summary tick).
Definition metrics.hpp:109
prom::Gauge & pool_workers_active()
Workers currently executing a task (set from the summary tick).
Definition metrics.hpp:116
prom::Gauge & pool_workers_idle()
Workers currently idle on the queue (set from the summary tick).
Definition metrics.hpp:123
prom::Gauge & manager_pools_live()
Pools currently registered with the manager (set from the summary tick).
Definition metrics.hpp:201
Definition exceptions.hpp:22
@ Failed
Body threw an exception; reason captured in ControlBlock.
Plain value snapshots of the live state of the ThreadMan world — threads, pools, tasks,...
Configuration for a FutureWaitPool.
Definition future_wait_pool.hpp:55
Definition thread.hpp:96
bool is_core
Definition thread.hpp:99
ThreadManager * manager
Definition thread.hpp:100
std::string name
Definition thread.hpp:97
Aggregate publish-payload produced periodically by the ThreadManager's housekeeper.
Definition stats.hpp:101
std::size_t total_live_workers
Definition stats.hpp:105
std::size_t live_threads
Definition stats.hpp:103
std::size_t total_pools
Definition stats.hpp:104
std::vector< ThreadPoolStats > per_pool_stats
Definition stats.hpp:109
std::chrono::system_clock::time_point wall_clock
Definition stats.hpp:102
std::size_t total_queued
Definition stats.hpp:106
std::uint64_t total_completed
Definition stats.hpp:107
std::uint64_t total_failed
Definition stats.hpp:108
A single stuck-task event published to StuckTaskListeners.
Definition stats.hpp:114
Definition manager.hpp:49
std::string default_pool_name
Definition manager.hpp:54
std::size_t default_pool_min_workers
Definition manager.hpp:55
std::string name
Definition manager.hpp:50
std::size_t default_pool_max_workers
Definition manager.hpp:56
bool start_housekeeper_eagerly
Definition manager.hpp:53
std::chrono::milliseconds summary_interval
Definition manager.hpp:52
std::chrono::milliseconds housekeeping_interval
Definition manager.hpp:51
Definition thread_pool.hpp:70
std::string name
Definition thread_pool.hpp:71
std::size_t max_workers
Definition thread_pool.hpp:75
std::size_t min_workers
Definition thread_pool.hpp:74
ThreadManager * manager
Definition thread_pool.hpp:89
A point-in-time snapshot of a ManagedThread's control block.
Definition stats.hpp:48
bool failed
True iff state == Failed.
Definition stats.hpp:55
std::uint64_t native_id
OS-level thread id, when known.
Definition stats.hpp:51
std::string name
User-chosen name.
Definition stats.hpp:50
std::optional< std::uint64_t > pool_id
Owning pool, when this is a pool worker.
Definition stats.hpp:52
std::uint64_t id
Monotonic per-process id.
Definition stats.hpp:49
std::chrono::nanoseconds run_duration
Time the body has been running (or ran).
Definition stats.hpp:54
bool is_core
Pool worker: not subject to idle retirement.
Definition stats.hpp:56
ThreadState state
Definition stats.hpp:53
threadman::TaskHandle — a ref-counted, shareable handle to a single task's lifecycle (id,...
threadman::ManagedThread — RAII wrapper around std::jthread that publishes lifecycle state via a shar...
threadman::ThreadPool — a dynamic-scaling worker-queue executor.