threadman 0.1.0
Header-only C++23 managed threads, dynamic pools, futures, and executors
Loading...
Searching...
No Matches
thread.hpp
Go to the documentation of this file.
1#pragma once
2
15
16#include <threadman/config.hpp>
17#include <threadman/display.hpp>
18#include <threadman/log.hpp>
19#include <threadman/metrics.hpp>
20#include <threadman/stats.hpp>
21
22#include <commons/display_info.hpp>
23
24#include <atomic>
25#include <chrono>
26#include <concepts>
27#include <cstdint>
28#include <exception>
29#include <memory>
30#include <mutex>
31#include <optional>
32#include <stop_token>
33#include <string>
34#include <thread>
35#include <type_traits>
36#include <utility>
37
38#if THREADMAN_PLATFORM_APPLE || THREADMAN_PLATFORM_LINUX
39#include <pthread.h>
40#endif
41
42namespace threadman {
43
44class ThreadManager; // forward
45
46namespace detail {
47
48inline std::uint64_t next_thread_id() noexcept {
49 static std::atomic<std::uint64_t> counter{1};
50 return counter.fetch_add(1, std::memory_order_relaxed);
51}
52
53inline std::uint64_t current_native_id() noexcept {
54#if THREADMAN_PLATFORM_APPLE
55 std::uint64_t tid = 0;
56 pthread_threadid_np(nullptr, &tid);
57 return tid;
58#elif THREADMAN_PLATFORM_LINUX
59 return static_cast<std::uint64_t>(pthread_self());
60#else
61 return static_cast<std::uint64_t>(std::hash<std::thread::id>{}(std::this_thread::get_id()));
62#endif
63}
64
65inline void apply_native_name(const std::string& name) noexcept {
66#if THREADMAN_PLATFORM_APPLE
67 // macOS allows setting only the current thread's name; truncate to 63 chars.
68 pthread_setname_np(name.substr(0, 63).c_str());
69#elif THREADMAN_PLATFORM_LINUX
70 // Linux limits the name to 16 bytes including NUL.
71 pthread_setname_np(pthread_self(), name.substr(0, 15).c_str());
72#else
73 (void)name;
74#endif
75}
76
77} // namespace detail
78
80public:
83 struct ControlBlock {
84 std::uint64_t id = 0;
85 std::string name;
86 std::uint64_t native_id = 0;
87 std::optional<std::uint64_t> pool_id;
88 bool is_core = true;
89 std::atomic<ThreadState> state{ThreadState::Starting};
90 std::chrono::steady_clock::time_point started_at;
91 std::chrono::steady_clock::time_point ended_at;
92 std::exception_ptr exception;
93 mutable std::mutex mtx;
94 };
95
96 struct Options {
97 std::string name;
98 std::optional<std::uint64_t> pool_id;
99 bool is_core = true;
100 ThreadManager* manager = nullptr; // null → singleton
101 };
102
105 template <class Fn, class... Args>
106 requires(std::invocable<Fn, std::stop_token, Args...> || std::invocable<Fn, Args...>)
107 explicit ManagedThread(Options opts, Fn&& fn, Args&&... args)
108 : cb_(std::make_shared<ControlBlock>()), opts_manager_(opts.manager) {
109 cb_->id = detail::next_thread_id();
110 cb_->name = std::move(opts.name);
111 cb_->pool_id = opts.pool_id;
112 cb_->is_core = opts.is_core;
113
114 register_with_manager();
115
116 if constexpr (std::invocable<Fn, std::stop_token, Args...>) {
117 jth_ = std::jthread(
118 [cb = cb_, fn = std::forward<Fn>(fn), ... captured_args = std::forward<Args>(args)](
119 // forwarded via std::move below; const& would break bodies
120 // taking std::stop_token&&.
121 // NOLINTNEXTLINE(performance-unnecessary-value-param)
122 std::stop_token tok) mutable {
123 enter(cb);
124 try {
125 fn(std::move(tok), std::move(captured_args)...);
126 exit_completed(cb);
127 } catch (...) {
128 exit_failed(cb, std::current_exception());
129 }
130 });
131 } else {
132 jth_ = std::jthread(
133 [cb = cb_, fn = std::forward<Fn>(fn), ... captured_args = std::forward<Args>(args)](
134 const std::stop_token&) mutable {
135 enter(cb);
136 try {
137 fn(std::move(captured_args)...);
138 exit_completed(cb);
139 } catch (...) {
140 exit_failed(cb, std::current_exception());
141 }
142 });
143 }
144 }
145
146 ManagedThread(const ManagedThread&) = delete;
148 void unregister_from_manager() noexcept;
149 ManagedThread(ManagedThread&&) noexcept = default;
150 ManagedThread& operator=(ManagedThread&&) noexcept = default;
151
155
156 [[nodiscard]] std::uint64_t id() const noexcept {
157 return cb_ ? cb_->id : 0;
158 }
159 [[nodiscard]] const std::string& name() const noexcept {
160 static constexpr std::string empty;
161 return cb_ ? cb_->name : empty;
162 }
163 [[nodiscard]] std::uint64_t native_id() const noexcept {
164 return cb_ ? cb_->native_id : 0;
165 }
166 [[nodiscard]] std::optional<std::uint64_t> pool_id() const noexcept {
167 return cb_ ? cb_->pool_id : std::nullopt;
168 }
169 [[nodiscard]] bool is_core() const noexcept {
170 return cb_ ? cb_->is_core : true;
171 }
172 [[nodiscard]] ThreadState state() const noexcept {
173 return cb_ ? cb_->state.load(std::memory_order_acquire) : ThreadState::Completed;
174 }
175 [[nodiscard]] bool failed() const noexcept {
176 return state() == ThreadState::Failed;
177 }
178
181 [[nodiscard]] std::chrono::nanoseconds run_duration() const noexcept {
182 if (!cb_) {
183 return std::chrono::nanoseconds{0};
184 }
185 std::scoped_lock guard(cb_->mtx);
186 if (cb_->started_at.time_since_epoch().count() == 0) {
187 return std::chrono::nanoseconds{0};
188 }
189 const auto end = cb_->ended_at.time_since_epoch().count() == 0
190 ? std::chrono::steady_clock::now()
191 : cb_->ended_at;
192 return end - cb_->started_at;
193 }
194
195 [[nodiscard]] std::stop_source get_stop_source() noexcept {
196 return jth_.get_stop_source();
197 }
198 [[nodiscard]] std::stop_token get_stop_token() const noexcept {
199 return jth_.get_stop_token();
200 }
201
203 bool request_stop() noexcept {
204 return jth_.request_stop();
205 }
206 [[nodiscard]] bool joinable() const noexcept {
207 return jth_.joinable();
208 }
209 void join() {
210 jth_.join();
211 }
212 void detach() {
213 jth_.detach();
214 }
215
217 [[nodiscard]] std::thread::id native_thread_id() const noexcept {
218 return jth_.get_id();
219 }
220
221 [[nodiscard]] std::shared_ptr<ControlBlock> control_block() const noexcept {
222 return cb_;
223 }
224
225 [[nodiscard]] ThreadSnapshot snapshot() const {
226 ThreadSnapshot snap;
227 if (!cb_) {
228 return snap;
229 }
230 snap.state = cb_->state.load(std::memory_order_acquire);
231 snap.failed = (snap.state == ThreadState::Failed);
232 snap.run_duration = run_duration();
233 std::scoped_lock guard(cb_->mtx);
234 snap.id = cb_->id;
235 snap.name = cb_->name;
236 snap.native_id = cb_->native_id;
237 snap.pool_id = cb_->pool_id;
238 snap.is_core = cb_->is_core;
239 return snap;
240 }
241
245 [[nodiscard]] static ControlBlock* current() noexcept {
246 return current_cb();
247 }
248
249 [[nodiscard]] static const comms::DisplayInfo& display_info() {
250 static const comms::DisplayInfo info{
251 .name = "ManagedThread",
252 .description = "RAII wrapper around std::jthread with lifecycle "
253 "introspection and ThreadManager integration.",
254 .icon = comms::Icon::from("mdi:cog-outline"),
255 };
256 return info;
257 }
258
259private:
260 static ControlBlock*& current_cb() noexcept {
261 thread_local ControlBlock* tls = nullptr;
262 return tls;
263 }
264
265 static void enter(const std::shared_ptr<ControlBlock>& cb) noexcept {
266 current_cb() = cb.get();
267 const auto now = std::chrono::steady_clock::now();
268 const auto native = detail::current_native_id();
269 if (!cb->name.empty()) {
271 }
272 {
273 std::scoped_lock guard(cb->mtx);
274 cb->started_at = now;
275 cb->native_id = native;
276 }
277 cb->state.store(ThreadState::Running, std::memory_order_release);
279 metrics::threads_live().inc();
280 if (const auto& lg = ::threadman::log::thread();
281 lg && lg->should_log(spdlog::level::debug)) {
282 lg->debug("started '{}' (id={}, native={})", cb->name, cb->id, native);
283 }
284 }
285
286 static void exit_completed(const std::shared_ptr<ControlBlock>& cb) noexcept {
287 {
288 std::scoped_lock guard(cb->mtx);
289 cb->ended_at = std::chrono::steady_clock::now();
290 }
291 cb->state.store(ThreadState::Completed, std::memory_order_release);
293 metrics::threads_live().dec();
294 current_cb() = nullptr;
295 }
296
297 static void exit_failed(const std::shared_ptr<ControlBlock>& cb,
298 const std::exception_ptr& ex) noexcept {
299 std::string what_msg = "(unknown exception)";
300 try {
301 if (ex) {
302 std::rethrow_exception(ex);
303 }
304 } catch (const std::exception& e) {
305 what_msg = e.what();
306 } catch (...) {
307 // keep default
308 }
309 {
310 std::scoped_lock guard(cb->mtx);
311 cb->ended_at = std::chrono::steady_clock::now();
312 cb->exception = ex;
313 }
314 cb->state.store(ThreadState::Failed, std::memory_order_release);
316 metrics::threads_live().dec();
317 if (const auto& lg = ::threadman::log::thread()) {
318 lg->warn("'{}' failed: {}", cb->name, what_msg);
319 }
320 current_cb() = nullptr;
321 }
322
323 void register_with_manager();
324 void unregister_from_manager() const noexcept;
325
326 std::shared_ptr<ControlBlock> cb_;
327 ThreadManager* opts_manager_ = nullptr;
328 std::jthread jth_;
329};
330
331} // namespace threadman
Definition thread.hpp:79
static ControlBlock * current() noexcept
Returns a pointer to the ControlBlock of the currently-executing ManagedThread, or nullptr when calle...
Definition thread.hpp:245
void join()
Definition thread.hpp:209
std::optional< std::uint64_t > pool_id() const noexcept
Definition thread.hpp:166
std::shared_ptr< ControlBlock > control_block() const noexcept
Definition thread.hpp:221
ManagedThread(Options opts, Fn &&fn, Args &&... args)
Construct a managed thread; fn may take an optional leading std::stop_token.
Definition thread.hpp:107
ManagedThread & operator=(const ManagedThread &)=delete
bool request_stop() noexcept
Request the thread to stop. The body sees tok.stop_requested().
Definition thread.hpp:203
void detach()
Definition thread.hpp:212
ManagedThread(const ManagedThread &)=delete
std::chrono::nanoseconds run_duration() const noexcept
Time elapsed since the worker body entered; once exited, the duration from started_at to ended_at.
Definition thread.hpp:181
std::stop_token get_stop_token() const noexcept
Definition thread.hpp:198
const std::string & name() const noexcept
Definition thread.hpp:159
std::uint64_t id() const noexcept
Definition thread.hpp:156
std::uint64_t native_id() const noexcept
Definition thread.hpp:163
ThreadSnapshot snapshot() const
Definition thread.hpp:225
bool failed() const noexcept
Definition thread.hpp:175
ThreadState state() const noexcept
Definition thread.hpp:172
bool is_core() const noexcept
Definition thread.hpp:169
std::thread::id native_thread_id() const noexcept
Underlying std::jthread::id().
Definition thread.hpp:217
static const comms::DisplayInfo & display_info()
Definition thread.hpp:249
std::stop_source get_stop_source() noexcept
Definition thread.hpp:195
bool joinable() const noexcept
Definition thread.hpp:206
void unregister_from_manager() noexcept
Definition manager.hpp:627
Definition manager.hpp:47
Central feature-gate header for ThreadMan's optional integrations and tunable defaults.
Re-export <commons/display_info.hpp> and attach non-intrusive comms::HasDisplayInfo<> specializations...
Cached subsystem loggers for the tm.
Cached subsystem metrics for the tm_* Prometheus/OpenMetrics families, mirroring the tm.
std::uint64_t next_thread_id() noexcept
Definition thread.hpp:48
std::uint64_t current_native_id() noexcept
Definition thread.hpp:53
void apply_native_name(const std::string &name) noexcept
Definition thread.hpp:65
std::shared_ptr< spdlog::logger > & thread()
Logger for tm.thread — ManagedThread lifecycle and faults.
Definition log.hpp:33
prom::Counter & threads_failed()
ManagedThread bodies that exited by throwing.
Definition metrics.hpp:167
prom::Gauge & threads_live()
ManagedThread bodies currently running (inc on enter, dec on exit).
Definition metrics.hpp:174
prom::Counter & threads_completed()
ManagedThread bodies that returned normally.
Definition metrics.hpp:159
prom::Counter & threads_created()
ManagedThread bodies that entered (started running).
Definition metrics.hpp:152
Definition exceptions.hpp:22
ThreadState
Lifecycle state of a ManagedThread / pool worker.
Definition stats.hpp:21
@ Completed
Body returned normally; thread joined or about to.
@ Running
User body is actively executing.
@ Starting
Constructed; user body not yet entered.
@ Failed
Body threw an exception; reason captured in ControlBlock.
Plain value snapshots of the live state of the ThreadMan world — threads, pools, tasks,...
Heap-resident publishing block — shared by the thread, the manager's registry (as weak_ptr),...
Definition thread.hpp:83
std::atomic< ThreadState > state
Definition thread.hpp:89
std::mutex mtx
Definition thread.hpp:93
std::chrono::steady_clock::time_point ended_at
Definition thread.hpp:91
std::uint64_t native_id
Definition thread.hpp:86
std::optional< std::uint64_t > pool_id
Definition thread.hpp:87
bool is_core
Definition thread.hpp:88
std::exception_ptr exception
Definition thread.hpp:92
std::chrono::steady_clock::time_point started_at
Definition thread.hpp:90
std::string name
Definition thread.hpp:85
Definition thread.hpp:96
bool is_core
Definition thread.hpp:99
ThreadManager * manager
Definition thread.hpp:100
std::string name
Definition thread.hpp:97
std::optional< std::uint64_t > pool_id
Definition thread.hpp:98
A point-in-time snapshot of a ManagedThread's control block.
Definition stats.hpp:48
bool failed
True iff state == Failed.
Definition stats.hpp:55
std::uint64_t native_id
OS-level thread id, when known.
Definition stats.hpp:51
std::string name
User-chosen name.
Definition stats.hpp:50
std::optional< std::uint64_t > pool_id
Owning pool, when this is a pool worker.
Definition stats.hpp:52
std::uint64_t id
Monotonic per-process id.
Definition stats.hpp:49
std::chrono::nanoseconds run_duration
Time the body has been running (or ran).
Definition stats.hpp:54
bool is_core
Pool worker: not subject to idle retirement.
Definition stats.hpp:56
ThreadState state
Definition stats.hpp:53