|
threadman 0.1.0
Header-only C++23 managed threads, dynamic pools, futures, and executors
|
ThreadMan is a header-only C++23 library for running work on background threads and getting results back. It gives you a dynamically-scaling thread pool, composable futures (Future<T> / SharedFuture<T> with .then and .on_error), a managed-thread RAII wrapper, and a central manager that watches every thread and pool in your process. It also bridges std::future into its own future type, exposes plain-value snapshots for diagnostics, and emits structured logs and Prometheus-style metrics — both of which stay silent until your application turns them on.
Everything lives in namespace threadman.
You reach for ThreadMan when raw std::thread / std::async stops being enough and you want a small, inspectable layer on top.
submit(...).then(...).then(...) instead of manually chaining std::promise/std::future.std::futures (from std::async, a database driver, an HTTP client) and want to fold their blocking waits into one elastic pool instead of burning a dedicated thread each.join() on a still-running pool throws instead of hanging.commons, logman, prom); they are always linked.The smallest useful program: make a pool, submit one task, read the result.
Why this works:
ThreadPool pool; starts immediately with min_workers (default 1) core worker threads. Construction never blocks.submit takes any zero-argument callable, runs it on a worker, and hands you a Future<R> where R is the callable's return type (here int).future.get() blocks the calling thread until the task finishes, then returns the value. If the task had thrown, get() would rethrow that exception here.Chaining continuations and recovering from errors, without touching threads directly:
ThreadMan is header-only and ships as a CMake INTERFACE library exporting the target threadman::threadman. Its three required dependencies (commons, logman, prom, plus prom's own dimval and spdlog) are fetched automatically when you build it through CMake.
A minimal complete consumer project:
If you vendor the source (e.g. a git submodule):
When ThreadMan and all its dependencies come from system packages (not FetchContent), it installs a CMake package config:
Note:
THREADMAN_INSTALLis automatically disabled if any dependency was pulled in via FetchContent, because a FetchContent-built dependency cannot be re-exported. To produce installable artifacts, provide the dependencies as installed packages.
Both are off by default and enabled via CMake options:
They also self-enable by header detection: if <nlohmann/json.hpp> (or <parcel/parcel.h>) is already on the include path, the corresponding hooks turn on automatically. See Optional: JSON and Optional: parcel.
CMAKE_CXX_STANDARD 23 with extensions off.Threads::Threads / pthread).commons (≥ 0.1.4, for DisplayInfo), logman (≥ 0.1.0, spdlog-backed logging), and prom (≥ 0.1.0, metrics; pulls in dimval). spdlog 1.17.0 is fetched transitively.nlohmann/json (3.12.0) and cpp-parcel (0.2.0).pthread on Linux and macOS; other platforms fall back to a hashed std::thread::id and skip naming. The library otherwise has no OS-specific behavior.A pool of worker threads with a FIFO task queue. It starts with min_workers core threads that never retire, and scales up to max_workers non-core threads when the queue backs up; non-core threads retire after idle_timeout. Scaling is driven by the ThreadManager housekeeper (see below), so a pool only grows while a manager is ticking it.
ThreadPool is non-copyable and non-movable (workers hold a pointer back to it). Construct it where it will live, or keep it on the heap behind a std::unique_ptr. It implements IExecutor, so it can be passed anywhere an executor is expected.
Default options: min_workers = 1, max_workers = hardware_concurrency(), unbounded queue. max_workers == 0 is coerced to 1, and min_workers is clamped down to max_workers.
Future<T>** is move-only and one-shot: exactly one get() (which consumes it) and at most one continuation (then or on_error).SharedFuture<T>** is copyable and multi-shot: many get()s (returning const T&) and many continuations. Get one with Future::share().Promise<T>** is the producer side. You rarely create one directly — ThreadPool::submit and FutureWaitPool::add make them for you — but it's there when you need to bridge a callback-based API.Lifetime: the result lives in a heap-allocated shared state owned jointly by the promise and its future(s). A Future/SharedFuture keeps that state alive on its own, so it stays valid even after the producing pool is gone.
An executor is "anything that can run a `std::function<void()>`". ThreadMan offers a virtual interface and a concept-based duck-typed version:
IExecutor — abstract base with execute(...) and name().Executor — a concept matching any type with execute(std::function<void()>).InlineExecutor — runs the task synchronously on the calling thread. Singleton: threadman::InlineExecutor::instance().SingleThreadExecutor — owns a one-worker pool; tasks run FIFO on a single thread.ExecutorRef<E> — wraps a concept-conforming executor behind IExecutor.Future::then(exec, fn) takes the executor as its first argument and dispatches the continuation through it. ThreadPool, FutureWaitPool, SingleThreadExecutor, and InlineExecutor are all valid executors.
An RAII wrapper around std::jthread. It publishes lifecycle state (Starting → Running → Completed/Failed) through a shared ControlBlock, applies a best-effort OS thread name, captures any escaped exception, and registers itself with a ThreadManager so it shows up in snapshots.
The body may take an optional leading std::stop_token; the constructor selects the right overload automatically. ManagedThread is move-only.
A registry of every live ManagedThread and ThreadPool, with a dedicated housekeeper thread that, on each tick:
scale_tick(now) on every registered pool (this is what makes pools scale),stuck_task_threshold,ManagerSummary to subscribed listeners.There is a process-wide singleton (ThreadManager::instance()) used by default. You can also construct a private ThreadManager for tests or isolation and point pools/threads at it via the manager option.
The housekeeper starts lazily when the first pool registers (or eagerly if Options::start_housekeeper_eagerly is set). A pool with no manager ticking it keeps only its core workers and never scales.
Every major type produces a plain, copyable value snapshot — no locks, no references into live state — so you can pass them across threads, log them, or serialize them: ThreadSnapshot, ThreadPoolStats, TaskSnapshot, FutureSnapshot, ManagerSummary, and StuckTaskEvent.
Submission flavours:
| Call | Returns | Use when |
|---|---|---|
pool.execute(fn) | void | Fire-and-forget; fn takes no args, result discarded. |
pool.submit(fn) | Future<R> | You want the result or to chain continuations. |
pool.submit_named("n", fn) | Future<R> | Same, but tags the task name for snapshots. |
pool.submit_stoppable(fn) | Future<R> | fn receives a std::stop_token to observe cancellation. |
execute silently ignores an empty std::function. submit* accept any callable; the return type is deduced from the callable.
Each .then(exec, fn) returns a new Future<U> where U is fn's return type. The continuation receives the previous stage's value. If any stage throws (or fn's argument stage failed), the exception propagates straight through — later then stages are skipped and the final get() rethrows.
void stages work too: a Future<void> continuation takes no argument, and a continuation returning void produces a Future<void>.
on_error(exec, fn) returns a Future<T>. If the source succeeded, the value passes through unchanged and fn is not called. If it failed, fn receives the std::exception_ptr and its return value becomes the result. Note that then and on_error each count as the one continuation a Future allows — you can't attach both to the same one-shot future (use share()).
Future::share() converts a one-shot future into a copyable SharedFuture that supports many get()s and many continuations off the same result. share() consumes the original Future.
submit_stoppable hands your callable a std::stop_token. A running task is responsible for checking it — ThreadMan can't preempt CPU-bound work, it can only request a stop. Tasks still sitting in the queue when shutdown_now() fires are cancelled; their futures rethrow TaskCancelledError.
A std::future has no completion callback, so the only way to know it's ready is to block a thread on get(). A fixed-size pool is a bad host for that — a few slow futures can occupy every worker. FutureWaitPool is a ThreadPool tuned for blocking waits: a worker parked in get() counts as busy (never idle), so saturation reliably triggers scale-up, and it adds capacity eagerly (scale_up_when_queue_exceeds == 0 by default).
add accepts std::future<T> (including move-only T) and std::shared_future<T>. add_blocking(fn) runs an arbitrary blocking callable on the same elastic pool without first wrapping it in a std::future. The returned Future<T> is fully first-class.
ThreadMan's
Future/SharedFuturedo not implicitly convert to or fromstd::future.FutureWaitPool::addis the supported bridge.
subscribe_summary / subscribe_stuck_tasks return a move-only SubscriptionToken. The listener stays active until the token is destroyed — keep it alive for as long as you want callbacks. Listeners run on the housekeeper thread; an exception thrown from one is caught and logged, never propagated.
Snapshots are point-in-time copies; reading them never blocks workers for longer than a short critical section.
A ThreadPool moves through Running → ShuttingDown/ShutdownNow → Terminated.
shutdown()** — stop accepting new work, drain everything already queued, then let workers exit. Graceful.shutdown_now()** — stop accepting new work, cancel queued tasks (their futures throw TaskCancelledError), and request stop on running tasks.join()** — block until all workers have exited. You must call shutdown() or shutdown_now() first.Running, calls shutdown() (the draining variant) and joins. So the common case "just let the pool go out of scope" drains queued work.Submitting to a pool that is no longer Running throws PoolShuttingDownError.
ThreadMan reports problems through a typed exception hierarchy rooted at threadman::Exception (itself a std::runtime_error). Catch the base to handle anything from the library, or narrow down:
| Exception | Thrown when |
|---|---|
FutureError | Operating on a future/promise with no state (default-constructed or moved-from). Base of the future errors below. |
BrokenPromiseError | A Promise is destroyed before being satisfied; its future's get() rethrows this. |
FutureAlreadyRetrievedError | Promise::get_future() called a second time. |
PromiseAlreadySatisfiedError | set_value/set_exception called twice on one promise. |
ContinuationAlreadyRegisteredError | A second .then/.on_error on a one-shot Future. Use share(). |
PoolShuttingDownError | submit/execute on a pool past shutdown(). |
PoolQueueFullError | A bounded pool's queue is at max_queue_size. |
TaskCancelledError | A queued task was cancelled (e.g. by shutdown_now()). |
HousekeeperError | Reserved for internal housekeeper bookkeeping failures. |
Task exceptions propagate through the future. A callable that throws does not crash the worker — the exception is captured and rethrown at get() (or routed to on_error):
Backpressure on a bounded pool:
get() consumes a Future.** A second get() (or any use after get()) throws FutureError, because the first get() moved the shared state out. Use share() if you need to read a result more than once.Future. then and on_error together allow a single continuation on a one-shot future; a second attempt throws ContinuationAlreadyRegisteredError. SharedFuture has no such limit..then/.on_error always dispatch through the supplied executor — even when the source future is already ready, dispatch is deferred to the executor rather than running on the calling thread. The one exception is InlineExecutor, which runs the continuation synchronously on whatever thread satisfies the future.then(exec, fn) captures exec by reference. If exec is destroyed before the source future is satisfied, dispatch is undefined. Pass a long-lived pool, not a temporary.scale_tick. The default singleton starts its housekeeper on first pool registration; a private ThreadManager starts its housekeeper lazily on first pool registration too, or eagerly if configured. Without a ticking manager, the pool keeps just its core workers.join() before shutdown() throws.** This is deliberate — it surfaces a logic error (std::logic_error) instead of deadlocking.shutdown_now() cannot preempt CPU work.** It cancels queued tasks and requests stop on running ones; a running task only stops if it checks its std::stop_token. Use submit_stoppable for cancellable work.Promise breaks its Future. If you create a Promise manually and let it die without set_value/set_exception, the future's get() throws BrokenPromiseError.SharedFuture::get() returns const T&.** The value is owned by the shared state; the reference is valid as long as a SharedFuture keeps that state alive. Don't hold the reference past the last SharedFuture copy.PoolQueueFullError.FutureWaitPool::add(std::future<T>) supports move-only T. std::shared_future<T> requires copyable T (the value is copied out).ThreadMan logs through logman channels, all prefixed tm.:
| Channel | Covers |
|---|---|
tm.pool | pool scaling, shutdown, queue backpressure |
tm.thread | ManagedThread lifecycle and faults |
tm.future | future satisfaction, continuation dispatch |
tm.manager | housekeeper lifecycle, listener faults |
tm.task | stuck-task warnings (only when no listener is subscribed) |
tm.executor | executor wiring |
Initialization is your responsibility — ThreadMan only ever calls logman::get(...), which lazily creates a channel using whatever defaults logman has when first touched. Set it up once at startup:
If you never initialize logman, logging falls back to logman's own defaults.
ThreadMan is instrumented with prom Prometheus/OpenMetrics families under the scope "threadman" (prefix tm_), mirroring the tm.* log channels: tm_pool_*, tm_threads_*, tm_manager_*, tm_tasks_stuck_*, tm_futures_*, and tm_future_continuations_*.
Metrics are always-on but inert by default — exactly like logging with no sinks. Until your application installs a prom backend adapter, every metric operation is a noexcept no-op through prom's NullAdapter. The host owns backend selection:
The only dynamic labels are pool (the operator-chosen pool name) and pool_id (numeric) — never task ids, thread ids, or exception text, which would be unbounded cardinality. examples/threadman_metrics.cpp runs a workload and lists the registered tm_* families.
Every public type exposes static const comms::DisplayInfo& display_info() (a name, description, and icon, from commons): ManagedThread, TaskHandle, Future, SharedFuture, Promise, ThreadPool, FutureWaitPool, SingleThreadExecutor, InlineExecutor, and ThreadManager. The enums (ThreadState, TaskState, PoolState) are made displayable non-intrusively in <threadman/display.hpp>. This is purely for tooling/diagnostics and has no runtime cost if unused.
Enable with -DTHREADMAN_WITH_NLOHMANN_JSON=ON (or by placing <nlohmann/json.hpp> on the include path). Every snapshot type and the three enums round-trip:
Enable with -DTHREADMAN_WITH_PARCEL=ON (which forces JSON on, since the parcel cells reuse the JSON hooks). Five cells are provided — ThreadSnapshotCell, ThreadPoolStatsCell, TaskSnapshotCell, FutureSnapshotCell, ManagerSummaryCell — with wire kinds tm:thread_snapshot, tm:thread_pool_stats, and so on. Register them once per registry:
Defaults live in <threadman/config.hpp> and can be redefined by the consumer before including the umbrella header. Per-pool ThreadPoolOptions always take precedence over these compile-time defaults.
| Macro | Default | Meaning |
|---|---|---|
THREADMAN_RECENT_TASKS_CAPACITY | 256 | Ring-buffer size for snapshot_recent_tasks() |
THREADMAN_DEFAULT_POOL_NAME | "tm::default" | Name of ThreadManager::default_pool() |
THREADMAN_DEFAULT_SCALE_UP_WAIT_MS | 50 | Oldest-queued wait before forcing scale-up |
THREADMAN_DEFAULT_IDLE_TIMEOUT_MS | 30000 | Non-core worker retirement window |
THREADMAN_DEFAULT_SCALE_UP_QUEUE_THRESHOLD | 4 | Queue depth that triggers scale-up |
THREADMAN_DEFAULT_SCALE_CHECK_INTERVAL_MS | 100 | Housekeeper tick / scale cadence |
THREADMAN_DEFAULT_STUCK_TASK_THRESHOLD_MS | 60000 | Running duration that flags a task as stuck |
THREADMAN_DEFAULT_SUMMARY_INTERVAL_MS | 1000 | Summary publish cadence |
THREADMAN_DEFAULT_FUTURE_WAIT_SCALE_UP_WAIT_MS | 10 | FutureWaitPool scale-up wait window |
| API | Purpose | Notes |
|---|---|---|
ThreadPool | Dynamic-scaling worker pool | Non-copyable, non-movable; IExecutor |
ThreadPoolOptions | Pool configuration | name, workers, scaling, queue bound |
Future<T> | One-shot async result | move-only; one get, one continuation |
SharedFuture<T> | Multi-shot async result | copyable; get() → const T& |
Promise<T> | Producer side of a future | get_future, set_value, set_exception |
FutureWaitPool | Elastic pool for blocking waits | add(std::future), add_blocking |
ManagedThread | RAII std::jthread wrapper | move-only; lifecycle + stop-token |
ThreadManager | Registry + housekeeper | instance() singleton or private |
IExecutor / Executor | Executor interface / concept | ThreadPool implements IExecutor |
InlineExecutor | Runs tasks on the calling thread | singleton instance() |
SingleThreadExecutor | One-worker FIFO executor | owns its pool |
ThreadPoolStats, ThreadSnapshot, TaskSnapshot, FutureSnapshot, ManagerSummary, StuckTaskEvent | Plain-value snapshots | copyable, lock-free |
All live under examples/ and are built when ThreadMan is the top-level project (or with -DTHREADMAN_BUILD_EXAMPLES=ON).
| Example | Demonstrates |
|---|---|
threadman_hello.cpp | Minimal pool + single submit + get |
threadman_pool_submit.cpp | Fan-out of named tasks, gathering results, reading stats |
threadman_then_chain.cpp | .then chaining and .on_error recovery |
threadman_shared_future.cpp | share() fan-out to multiple continuations |
threadman_shutdown_now.cpp | Stoppable task + queued-task cancellation |
threadman_dynamic_scaling.cpp | Watch a pool scale up under load and retire idle workers |
threadman_future_wait_pool.cpp | Adopt std::futures into the elastic wait pool |
threadman_listener_summary.cpp | Subscribe to periodic summaries and stuck-task events |
threadman_log_setup.cpp | Initialize logman and raise tm.* log levels |
threadman_metrics.cpp | Run a workload and list the registered tm_* metric families |
ThreadMan optimizes for clarity and introspection over raw scheduler throughput.
std::deque per pool, with a condition variable to wake workers. This is simple and predictable, not lock-free or work-stealing.get() blocks** the calling thread on a condition variable until the result is ready.shared_ptr), so a long .then chain trades a few heap allocations for composability.scale_check_interval, default 100 ms), so growth is not instantaneous — a burst shorter than a tick may never trigger scale-up.Do I need to link a library, or is it header-only? Header-only, but it's not zero-config: link the CMake target threadman::threadman, which transitively brings in commons, logman, and prom.
What happens if a task throws? The exception is captured and rethrown when you call Future::get(), or routed to .on_error if you attached one. The worker thread keeps running.
Can I use a pool from multiple threads? Yes. submit/execute/stats/snapshots are safe to call concurrently. A single Future<T> is not meant to be shared across threads (it's move-only and one-shot); use SharedFuture<T> if multiple threads need the result.
Does a future own its result, or borrow it? It owns it. The shared state outlives the producing pool, so a Future stays valid even after the pool is destroyed.
Why isn't my pool growing past min_workers? Scaling runs on the ThreadManager housekeeper. Make sure a manager is ticking the pool (the default singleton does this automatically once the pool registers), and that the load actually exceeds scale_up_when_queue_exceeds or that the oldest task waits longer than scale_up_wait.
Why does join() throw? Because you called it before shutdown()/shutdown_now(). Shut the pool down first; the library refuses to silently deadlock.
How do I see what's running? Call pool.stats(), pool.snapshot_workers(), pool.snapshot_recent_tasks(), or subscribe to ThreadManager::subscribe_summary.
Which compilers work? Any C++23 toolchain with std::jthread. Consult the CI workflow for the exact tested versions.
Contributions to the library are welcome! If you encounter any issues or have suggestions for improvements, please feel free to submit a pull request or open an issue on the project's repository.
This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.