26#include <commons/display_info.hpp>
30#include <condition_variable>
36#include <shared_mutex>
40#include <unordered_map>
41#include <unordered_set>
50 std::string
name =
"tm::manager";
70 : mgr_(other.mgr_), id_(other.id_), kind_(other.kind_) {
89 [[nodiscard]]
bool valid() const noexcept {
90 return mgr_ !=
nullptr && id_ != 0;
92 [[nodiscard]] std::uint64_t
id() const noexcept {
98 enum class Kind : std::uint8_t { Summary, StuckTask };
100 : mgr_(m), id_(i), kind_(k) {}
102 void release() noexcept {
103 if (mgr_ !=
nullptr && id_ != 0) {
104 mgr_->unsubscribe(id_, kind_);
111 std::uint64_t id_ = 0;
112 Kind kind_ = Kind::Summary;
125 default_pool_->shutdown();
129 default_pool_.reset();
154 std::unique_lock guard(threads_mtx_);
155 threads_.insert_or_assign(cb->id, cb);
157 total_registered_.fetch_add(1, std::memory_order_relaxed);
161 std::unique_lock guard(threads_mtx_);
162 threads_.erase(thread_id);
166 std::scoped_lock guard(pools_mtx_);
167 pools_.push_back(&p);
173 std::scoped_lock guard(pools_mtx_);
174 for (std::size_t i = 0; i < pools_.size(); ++i) {
175 if ((pools_[i] !=
nullptr) && pools_[i]->
id() == pool_id) {
176 pools_.erase(pools_.begin() +
static_cast<std::ptrdiff_t
>(i));
184 std::vector<ThreadSnapshot> out;
185 std::shared_lock guard(threads_mtx_);
186 out.reserve(threads_.size());
187 for (
const auto& weak : threads_ | std::views::values) {
188 if (
const auto cb = weak.lock()) {
190 snap.
state = cb->state.load(std::memory_order_acquire);
192 std::scoped_lock lk(cb->mtx);
194 snap.
name = cb->name;
198 if (cb->started_at.time_since_epoch().count() != 0) {
199 const auto end = cb->ended_at.time_since_epoch().count() == 0
200 ? std::chrono::steady_clock::now()
204 out.push_back(std::move(snap));
210 [[nodiscard]] std::optional<ThreadSnapshot>
find_by_id(
const std::uint64_t
id)
const {
211 std::shared_lock guard(threads_mtx_);
212 if (
const auto it = threads_.find(
id); it != threads_.end()) {
213 if (
const auto cb = it->second.lock()) {
215 snap.
state = cb->state.load(std::memory_order_acquire);
217 std::scoped_lock lk(cb->mtx);
219 snap.
name = cb->name;
223 if (cb->started_at.time_since_epoch().count() != 0) {
224 const auto end = cb->ended_at.time_since_epoch().count() == 0
225 ? std::chrono::steady_clock::now()
235 [[nodiscard]] std::vector<ThreadSnapshot>
find_by_name(
const std::string_view name)
const {
236 std::vector<ThreadSnapshot> out;
237 std::shared_lock guard(threads_mtx_);
238 for (
const auto& weak : threads_ | std::views::values) {
239 const auto cb = weak.lock();
243 std::scoped_lock lk(cb->mtx);
244 if (cb->name == name) {
246 snap.
state = cb->state.load(std::memory_order_acquire);
249 snap.
name = cb->name;
253 if (cb->started_at.time_since_epoch().count() != 0) {
254 const auto end = cb->ended_at.time_since_epoch().count() == 0
255 ? std::chrono::steady_clock::now()
259 out.push_back(std::move(snap));
266 std::vector<ThreadPoolStats> out;
267 std::scoped_lock guard(pools_mtx_);
268 out.reserve(pools_.size());
269 for (
const auto* p : pools_) {
271 out.push_back(p->stats());
279 summary.
wall_clock = std::chrono::system_clock::now();
289 std::shared_lock g(threads_mtx_);
290 for (
const auto& w : threads_ | std::views::values) {
310 std::call_once(default_pool_once_, [
this] {
316 default_pool_ = std::make_unique<ThreadPool>(std::move(o));
318 return *default_pool_;
323 const auto id = next_sub_id_.fetch_add(1, std::memory_order_relaxed);
325 std::scoped_lock guard(subs_mtx_);
326 summary_subs_.emplace_back(
id, std::move(cb));
331 const auto id = next_sub_id_.fetch_add(1, std::memory_order_relaxed);
333 std::scoped_lock guard(subs_mtx_);
334 stuck_subs_.emplace_back(
id, std::move(cb));
341 if (
bool expected =
false; !housekeeper_started_.compare_exchange_strong(
342 expected,
true, std::memory_order_acq_rel)) {
346 topts.
name = opts_.
name +
".housekeeper";
349 housekeeper_.emplace(std::move(topts),
350 [
this](std::stop_token tok) { housekeeper_loop(std::move(tok)); });
352 lg->info(
"started housekeeper (interval={}ms, summary={}ms)",
358 if (!housekeeper_started_.load(std::memory_order_acquire)) {
361 if (housekeeper_.has_value()) {
362 housekeeper_->request_stop();
364 std::scoped_lock guard(hk_mtx_);
367 if (housekeeper_->joinable()) {
368 housekeeper_->join();
370 housekeeper_.reset();
372 housekeeper_started_.store(
false, std::memory_order_release);
375 return housekeeper_started_.load(std::memory_order_acquire);
383 return known_stuck_size_.load(std::memory_order_relaxed);
387 static const comms::DisplayInfo info{
388 .name =
"ThreadManager",
389 .description =
"Central registry of managed threads and pools with periodic summary "
390 "and stuck-task event dispatch.",
391 .icon = comms::Icon::from(
"mdi:account-supervisor"),
397 void unsubscribe(
const std::uint64_t
id,
const SubscriptionToken::Kind kind)
noexcept {
398 std::scoped_lock guard(subs_mtx_);
399 if (kind == SubscriptionToken::Kind::Summary) {
400 for (std::size_t i = 0; i < summary_subs_.size(); ++i) {
401 if (summary_subs_[i].first ==
id) {
402 summary_subs_.erase(summary_subs_.begin() +
static_cast<std::ptrdiff_t
>(i));
407 for (std::size_t i = 0; i < stuck_subs_.size(); ++i) {
408 if (stuck_subs_[i].first ==
id) {
409 stuck_subs_.erase(stuck_subs_.begin() +
static_cast<std::ptrdiff_t
>(i));
416 void prune_expired_threads() {
417 std::unique_lock guard(threads_mtx_);
418 for (
auto it = threads_.begin(); it != threads_.end();) {
419 if (it->second.expired()) {
420 it = threads_.erase(it);
427 void housekeeper_loop(std::stop_token tok) {
428 last_summary_at_ = std::chrono::steady_clock::now();
429 std::unordered_set<std::uint64_t> known_stuck;
431 while (!tok.stop_requested()) {
433 std::unique_lock lk(hk_mtx_);
437 if (tok.stop_requested()) {
441 const auto now = std::chrono::steady_clock::now();
444 prune_expired_threads();
448 std::scoped_lock guard(pools_mtx_);
449 for (
auto* p : pools_) {
453 }
catch (
const std::exception& e) {
455 lg->warn(
"scale_tick on pool '{}' threw: {}", p->name(), e.what());
465 std::vector<StuckTaskEvent> all_stuck;
467 std::scoped_lock guard(pools_mtx_);
468 for (
auto* p : pools_) {
472 for (
auto stuck = p->detect_stuck_tasks(now);
auto& s : stuck) {
473 all_stuck.push_back(std::move(s));
479 std::vector<StuckTaskListener> stuck_listeners_copy;
481 std::scoped_lock guard(subs_mtx_);
482 stuck_listeners_copy.reserve(stuck_subs_.size());
483 for (
const auto& cb : stuck_subs_ | std::views::values) {
484 stuck_listeners_copy.push_back(cb);
487 for (
const auto& ev : all_stuck) {
488 if (known_stuck.insert(ev.task.id).second) {
490 .labels(prom::Labels{{
"pool_id", std::to_string(ev.pool_id)}})
492 if (stuck_listeners_copy.empty()) {
494 lg->warn(
"task {} on pool {} stuck for {}ms",
497 std::chrono::duration_cast<std::chrono::milliseconds>(
502 for (
const auto& cb : stuck_listeners_copy) {
505 }
catch (
const std::exception& e) {
507 lg->warn(
"stuck-task listener threw: {}", e.what());
524 std::unordered_set<std::uint64_t> still_stuck;
525 still_stuck.reserve(all_stuck.size());
526 for (
const auto& ev : all_stuck) {
527 still_stuck.insert(ev.task.id);
529 std::erase_if(known_stuck,
530 [&](
const std::uint64_t
id) {
return !still_stuck.contains(
id); });
531 known_stuck_size_.store(known_stuck.size(), std::memory_order_relaxed);
537 last_summary_at_ = now;
541 for (
const auto& s : summary.per_pool_stats) {
542 const auto pool_labels = prom::Labels{{
"pool", s.name}};
551 summary.stuck_tasks.reserve(all_stuck.size());
552 for (
const auto& ev : all_stuck) {
553 summary.stuck_tasks.push_back(ev.task);
555 std::vector<SummaryListener> summary_listeners_copy;
557 std::scoped_lock guard(subs_mtx_);
558 summary_listeners_copy.reserve(summary_subs_.size());
559 for (
const auto& cb : summary_subs_ | std::views::values) {
560 summary_listeners_copy.push_back(cb);
563 for (
const auto& cb : summary_listeners_copy) {
566 }
catch (
const std::exception& e) {
568 lg->warn(
"summary listener threw: {}", e.what());
576 if (
auto& lg =
log::manager(); lg && lg->should_log(spdlog::level::debug)) {
577 std::size_t threads_n = 0;
578 std::size_t pools_n = 0;
580 std::shared_lock g(threads_mtx_);
581 threads_n = threads_.size();
584 std::scoped_lock g(pools_mtx_);
585 pools_n = pools_.size();
587 lg->debug(
"housekeeper tick: {} threads, {} pools", threads_n, pools_n);
594 mutable std::shared_mutex threads_mtx_;
595 std::unordered_map<std::uint64_t, std::weak_ptr<ManagedThread::ControlBlock>> threads_;
596 std::atomic<std::uint64_t> total_registered_{0};
598 mutable std::mutex pools_mtx_;
599 std::vector<ThreadPool*> pools_;
601 mutable std::mutex subs_mtx_;
602 std::vector<std::pair<std::uint64_t, SummaryListener>> summary_subs_;
603 std::vector<std::pair<std::uint64_t, StuckTaskListener>> stuck_subs_;
604 std::atomic<std::uint64_t> next_sub_id_{1};
606 std::once_flag default_pool_once_;
607 std::unique_ptr<ThreadPool> default_pool_;
609 std::optional<ManagedThread> housekeeper_;
610 std::atomic<bool> housekeeper_started_{
false};
611 std::chrono::steady_clock::time_point last_summary_at_;
612 std::atomic<std::size_t> known_stuck_size_{0};
613 mutable std::mutex hk_mtx_;
614 std::condition_variable hk_cv_;
622inline void ManagedThread::register_with_manager() {
623 auto* m = (opts_manager_ !=
nullptr) ? opts_manager_ : &ThreadManager::instance();
624 m->register_thread(cb_);
632 m->unregister_thread(cb_->id);
635inline void ThreadPool::register_with_manager() {
640inline void ThreadPool::unregister_from_manager() noexcept {
641 if (manager_ !=
nullptr) {
Elastic executor for blocking future-waits.
Definition future_wait_pool.hpp:81
void unregister_from_manager() noexcept
Definition manager.hpp:627
RAII handle returned by subscribe_*.
Definition manager.hpp:64
~SubscriptionToken()
Definition manager.hpp:85
friend class ThreadManager
Definition manager.hpp:97
bool valid() const noexcept
Definition manager.hpp:89
std::uint64_t id() const noexcept
Definition manager.hpp:92
SubscriptionToken & operator=(SubscriptionToken &&other) noexcept
Definition manager.hpp:74
SubscriptionToken() noexcept=default
Definition manager.hpp:47
void unregister_thread(const std::uint64_t thread_id) noexcept
Definition manager.hpp:160
SubscriptionToken subscribe_stuck_tasks(StuckTaskListener cb)
Definition manager.hpp:330
std::vector< ThreadPoolStats > snapshot_pools() const
Definition manager.hpp:265
ThreadManager(Options opts)
Definition manager.hpp:116
ThreadManager(ThreadManager &&)=delete
ThreadManager()
Definition manager.hpp:115
ManagerSummary build_summary() const
Definition manager.hpp:277
void register_thread(const std::shared_ptr< ManagedThread::ControlBlock > &cb)
Definition manager.hpp:149
void register_pool(ThreadPool &p)
Definition manager.hpp:164
ThreadManager & operator=(ThreadManager &&)=delete
static const comms::DisplayInfo & display_info()
Definition manager.hpp:386
void unregister_pool(const std::uint64_t pool_id) noexcept
Definition manager.hpp:172
FutureWaitPool make_future_wait_pool(FutureWaitPoolOptions opts={})
Create a FutureWaitPool attached to this manager.
Definition manager.hpp:304
SubscriptionToken subscribe_summary(SummaryListener cb)
Definition manager.hpp:322
std::function< void(const ManagerSummary &)> SummaryListener
Definition manager.hpp:59
~ThreadManager()
Definition manager.hpp:121
ThreadPool & default_pool()
Definition manager.hpp:309
std::optional< ThreadSnapshot > find_by_id(const std::uint64_t id) const
Definition manager.hpp:210
static ThreadManager & instance() noexcept
Definition manager.hpp:138
std::vector< ThreadSnapshot > snapshot_threads() const
Definition manager.hpp:183
std::size_t stuck_task_count() const noexcept
Number of tasks currently flagged as stuck (and already reported) by the housekeeper.
Definition manager.hpp:382
bool housekeeper_running() const noexcept
Definition manager.hpp:374
std::vector< ThreadSnapshot > find_by_name(const std::string_view name) const
Definition manager.hpp:235
ThreadManager(const ThreadManager &)=delete
std::function< void(const StuckTaskEvent &)> StuckTaskListener
Definition manager.hpp:60
void stop_housekeeper() noexcept
Definition manager.hpp:357
void start_housekeeper()
Definition manager.hpp:340
ThreadManager & operator=(const ThreadManager &)=delete
Definition thread_pool.hpp:92
Central feature-gate header for ThreadMan's optional integrations and tunable defaults.
#define THREADMAN_DEFAULT_SUMMARY_INTERVAL_MS
Periodic ManagerSummary cadence — listener publish interval (ms).
Definition config.hpp:103
#define THREADMAN_DEFAULT_POOL_NAME
Name used by ThreadManager::default_pool().
Definition config.hpp:71
#define THREADMAN_DEFAULT_SCALE_CHECK_INTERVAL_MS
Housekeeper tick interval / per-pool scale evaluation cadence (ms).
Definition config.hpp:93
Re-export <commons/display_info.hpp> and attach non-intrusive comms::HasDisplayInfo<> specializations...
Typed exception hierarchy thrown by ThreadMan.
threadman::FutureWaitPool — a dynamically-scaling executor specialized for blocking waits,...
Cached subsystem loggers for the tm.
Cached subsystem metrics for the tm_* Prometheus/OpenMetrics families, mirroring the tm.
std::shared_ptr< spdlog::logger > & task()
Logger for tm.task — stuck-task warnings (only when no listener subscribes).
Definition log.hpp:51
std::shared_ptr< spdlog::logger > & manager()
Logger for tm.manager — housekeeper lifecycle, listener faults.
Definition log.hpp:45
void warm_up()
Eagerly construct the scope and every metric handle (and, transitively, prom's process-wide adapter c...
Definition metrics.hpp:278
prom::Counter & tasks_stuck_detected()
Distinct tasks newly detected as stuck (running past the threshold).
Definition metrics.hpp:212
prom::Counter & manager_threads_registered()
Thread control blocks registered with the manager.
Definition metrics.hpp:185
prom::Gauge & tasks_stuck_current()
Tasks currently flagged as stuck (set from the housekeeper tick).
Definition metrics.hpp:220
prom::Counter & manager_summaries_published()
Periodic ManagerSummary payloads published to listeners.
Definition metrics.hpp:193
prom::Gauge & pool_queue_depth()
Tasks currently waiting in a pool's queue (set from the summary tick).
Definition metrics.hpp:102
prom::Gauge & pool_workers()
Live workers in a pool (set from the summary tick).
Definition metrics.hpp:109
prom::Gauge & pool_workers_active()
Workers currently executing a task (set from the summary tick).
Definition metrics.hpp:116
prom::Gauge & pool_workers_idle()
Workers currently idle on the queue (set from the summary tick).
Definition metrics.hpp:123
prom::Gauge & manager_pools_live()
Pools currently registered with the manager (set from the summary tick).
Definition metrics.hpp:201
Definition exceptions.hpp:22
@ Failed
Body threw an exception; reason captured in ControlBlock.
Plain value snapshots of the live state of the ThreadMan world — threads, pools, tasks,...
Configuration for a FutureWaitPool.
Definition future_wait_pool.hpp:55
bool is_core
Definition thread.hpp:99
ThreadManager * manager
Definition thread.hpp:100
std::string name
Definition thread.hpp:97
Aggregate publish-payload produced periodically by the ThreadManager's housekeeper.
Definition stats.hpp:101
std::size_t total_live_workers
Definition stats.hpp:105
std::size_t live_threads
Definition stats.hpp:103
std::size_t total_pools
Definition stats.hpp:104
std::vector< ThreadPoolStats > per_pool_stats
Definition stats.hpp:109
std::chrono::system_clock::time_point wall_clock
Definition stats.hpp:102
std::size_t total_queued
Definition stats.hpp:106
std::uint64_t total_completed
Definition stats.hpp:107
std::uint64_t total_failed
Definition stats.hpp:108
A single stuck-task event published to StuckTaskListeners.
Definition stats.hpp:114
Definition manager.hpp:49
std::string default_pool_name
Definition manager.hpp:54
std::size_t default_pool_min_workers
Definition manager.hpp:55
std::string name
Definition manager.hpp:50
std::size_t default_pool_max_workers
Definition manager.hpp:56
bool start_housekeeper_eagerly
Definition manager.hpp:53
std::chrono::milliseconds summary_interval
Definition manager.hpp:52
std::chrono::milliseconds housekeeping_interval
Definition manager.hpp:51
Definition thread_pool.hpp:70
std::string name
Definition thread_pool.hpp:71
std::size_t max_workers
Definition thread_pool.hpp:75
std::size_t min_workers
Definition thread_pool.hpp:74
ThreadManager * manager
Definition thread_pool.hpp:89
A point-in-time snapshot of a ManagedThread's control block.
Definition stats.hpp:48
bool failed
True iff state == Failed.
Definition stats.hpp:55
std::uint64_t native_id
OS-level thread id, when known.
Definition stats.hpp:51
std::string name
User-chosen name.
Definition stats.hpp:50
std::optional< std::uint64_t > pool_id
Owning pool, when this is a pool worker.
Definition stats.hpp:52
std::uint64_t id
Monotonic per-process id.
Definition stats.hpp:49
std::chrono::nanoseconds run_duration
Time the body has been running (or ran).
Definition stats.hpp:54
bool is_core
Pool worker: not subject to idle retirement.
Definition stats.hpp:56
ThreadState state
Definition stats.hpp:53
threadman::TaskHandle — a ref-counted, shareable handle to a single task's lifecycle (id,...
threadman::ManagedThread — RAII wrapper around std::jthread that publishes lifecycle state via a shar...
threadman::ThreadPool — a dynamic-scaling worker-queue executor.