32#include <commons/display_info.hpp>
36#include <condition_variable>
63class FutureState :
public std::enable_shared_from_this<FutureState<T>> {
69 return ready_.load(std::memory_order_acquire);
72 return exception_set_.load(std::memory_order_acquire);
75 std::scoped_lock guard(mtx_);
76 return static_cast<std::uint32_t
>(continuations_.size());
88 template <
class U = T>
89 requires(!std::is_void_v<U>)
91 std::vector<Continuation> to_run;
93 std::unique_lock lock(mtx_);
94 if (ready_.load(std::memory_order_relaxed)) {
97 slot_.value.emplace(std::forward<U>(v));
98 ready_.store(
true, std::memory_order_release);
99 to_run.swap(continuations_);
103 for (
auto& c : to_run) {
108 template <
class U = T>
109 requires std::is_void_v<U>
111 std::vector<Continuation> to_run;
113 std::unique_lock lock(mtx_);
114 if (ready_.load(std::memory_order_relaxed)) {
117 ready_.store(
true, std::memory_order_release);
118 to_run.swap(continuations_);
122 for (
auto& c : to_run) {
128 std::vector<Continuation> to_run;
130 std::unique_lock lock(mtx_);
131 if (ready_.load(std::memory_order_relaxed)) {
135 exception_set_.store(
true, std::memory_order_release);
136 ready_.store(
true, std::memory_order_release);
137 to_run.swap(continuations_);
141 for (
auto& c : to_run) {
147 std::unique_lock lock(mtx_);
148 cv_.wait(lock, [
this] {
return ready_.load(std::memory_order_acquire); });
151 template <
class Rep,
class Period>
152 bool wait_for(
const std::chrono::duration<Rep, Period>& dur)
const {
153 std::unique_lock lock(mtx_);
154 return cv_.wait_for(lock, dur, [
this] {
return ready_.load(std::memory_order_acquire); });
157 template <
class Clock,
class Duration>
158 bool wait_until(
const std::chrono::time_point<Clock, Duration>& tp)
const {
159 std::unique_lock lock(mtx_);
160 return cv_.wait_until(lock, tp, [
this] {
return ready_.load(std::memory_order_acquire); });
165 template <
class U = T>
166 requires(!std::is_void_v<U>)
170 std::rethrow_exception(exception_);
172 return std::move(*slot_.value);
175 template <
class U = T>
176 requires(!std::is_void_v<U>)
180 std::rethrow_exception(exception_);
185 template <
class U = T>
186 requires std::is_void_v<U>
190 std::rethrow_exception(exception_);
200 std::unique_lock lock(mtx_);
201 if (ready_.load(std::memory_order_relaxed)) {
204 continuations_.push_back(std::move(cb));
213 mutable std::mutex mtx_;
214 mutable std::condition_variable cv_;
215 std::atomic<bool> ready_{
false};
216 std::atomic<bool> exception_set_{
false};
218 std::exception_ptr exception_;
219 std::vector<Continuation> continuations_;
226void execute_on(IExecutor& exec, std::function<
void()> task);
250 [[nodiscard]]
bool valid() const noexcept {
251 return static_cast<bool>(state_);
254 return state_ && state_->ready();
262 template <
class Rep,
class Period>
263 [[nodiscard]]
bool wait_for(
const std::chrono::duration<Rep, Period>& dur)
const {
265 return state_->wait_for(dur);
268 template <
class Clock,
class Duration>
269 [[nodiscard]]
bool wait_until(
const std::chrono::time_point<Clock, Duration>& tp)
const {
271 return state_->wait_until(tp);
275 template <
class U = T>
276 requires(!std::is_void_v<U>)
279 auto s = std::move(state_);
280 return s->get_value_move();
283 template <
class U = T>
284 requires std::is_void_v<U>
287 auto s = std::move(state_);
296 template <
class Exec,
class Fn>
297 auto then(Exec& exec, Fn&& fn);
302 template <
class Exec,
class Fn>
310 static const comms::DisplayInfo info{
312 .description =
"One-shot, move-only async result with single .get and single .then.",
313 .icon = comms::Icon::from(
"mdi:clock-fast"),
328 void check_valid()
const {
330 throw FutureError(
"future has no state (default-constructed or moved-from)");
334 std::shared_ptr<detail::FutureState<T>> state_;
335 bool has_continuation_{
false};
353 [[nodiscard]]
bool valid() const noexcept {
354 return static_cast<bool>(state_);
357 return state_ && state_->ready();
365 template <
class Rep,
class Period>
366 [[nodiscard]]
bool wait_for(
const std::chrono::duration<Rep, Period>& dur)
const {
368 return state_->wait_for(dur);
374 template <
class U = T>
375 requires(!std::is_void_v<U>)
378 return state_->get_value_ref();
381 template <
class U = T>
382 requires std::is_void_v<U>
385 state_->get_value_void();
388 template <
class Exec,
class Fn>
389 auto then(Exec& exec, Fn&& fn)
const;
391 template <
class Exec,
class Fn>
399 static const comms::DisplayInfo info{
400 .name =
"SharedFuture",
401 .description =
"Multi-shot, copyable async result supporting many .get and .then.",
402 .icon = comms::Icon::from(
"mdi:clock-multiple"),
417 void check_valid()
const {
419 throw FutureError(
"shared_future has no state (default-constructed)");
423 std::shared_ptr<detail::FutureState<T>> state_;
440 Promise() : state_(std::make_shared<detail::FutureState<T>>()) {
449 if (state_ && !state_->ready()) {
451 state_->set_exception(std::make_exception_ptr(
459 [[nodiscard]]
bool valid() const noexcept {
460 return static_cast<bool>(state_);
469 if (future_retrieved_) {
472 future_retrieved_ =
true;
476 template <
class U = T>
477 requires(!std::is_void_v<U>)
482 state_->set_value(std::forward<U>(v));
485 template <
class U = T>
486 requires std::is_void_v<U>
498 state_->set_exception(ex);
502 static const comms::DisplayInfo info{
504 .description =
"Producer side of a Future/SharedFuture pair.",
505 .icon = comms::Icon::from(
"mdi:handshake"),
511 std::shared_ptr<detail::FutureState<T>> state_;
512 bool future_retrieved_ =
false;
520template <
class Fn,
class Arg>
521struct invoke_result_helper {
522 using type = std::invoke_result_t<Fn, Arg>;
526struct invoke_result_helper<Fn, void> {
527 using type = std::invoke_result_t<Fn>;
530template <
class Fn,
class Arg>
531using future_invoke_result_t =
typename invoke_result_helper<Fn, Arg>::type;
533template <
class T,
class Fn,
class Sink>
534inline void run_continuation(
const std::shared_ptr<FutureState<T>>& src,
536 const std::shared_ptr<Sink>& sink_promise) {
538 using SinkT =
typename Sink::value_type;
540 if (src->has_exception()) {
541 sink_promise->set_exception(src->exception());
544 if constexpr (std::is_void_v<T>) {
545 if constexpr (std::is_void_v<SinkT>) {
547 sink_promise->set_value();
549 sink_promise->set_value(fn());
552 if constexpr (std::is_void_v<SinkT>) {
553 fn(src->get_value_ref());
554 sink_promise->set_value();
556 sink_promise->set_value(fn(src->get_value_ref()));
560 sink_promise->set_exception(std::current_exception());
567template <
class Exec,
class Fn>
570 if (has_continuation_) {
572 "Future::then already registered; use share() for multi-shot");
574 has_continuation_ =
true;
576 using U =
typename detail::invoke_result_helper<std::decay_t<Fn>, T>::type;
578 auto sink_promise = std::make_shared<Promise<U>>();
579 auto sink_future = sink_promise->get_future();
585 auto fn_shared = std::make_shared<std::decay_t<Fn>>(std::forward<Fn>(fn));
587 auto schedule = [&exec, src, fn_shared, sink_promise]() {
589 detail::run_continuation<T>(src, *fn_shared, sink_promise);
593 auto result = src->add_continuation([schedule = std::move(schedule)]()
mutable { schedule(); });
596 detail::run_continuation<T>(src, *fn_shared, sink_promise);
603template <
class Exec,
class Fn>
606 if (has_continuation_) {
608 "Future::on_error already registered (a Future supports a single continuation)");
610 has_continuation_ =
true;
612 auto sink_promise = std::make_shared<Promise<T>>();
613 auto sink_future = sink_promise->get_future();
616 auto fn_shared = std::make_shared<std::decay_t<Fn>>(std::forward<Fn>(fn));
618 auto run = [src, fn_shared, sink_promise]() {
621 if (src->has_exception()) {
622 if constexpr (std::is_void_v<T>) {
623 (*fn_shared)(src->exception());
624 sink_promise->set_value();
626 auto recovered = (*fn_shared)(src->exception());
627 sink_promise->set_value(std::move(recovered));
630 if constexpr (std::is_void_v<T>) {
631 sink_promise->set_value();
633 sink_promise->set_value(T{src->get_value_ref()});
637 sink_promise->set_exception(std::current_exception());
641 auto schedule = [&exec, run = std::move(run)]()
mutable {
detail::execute_on(exec, run); };
643 auto result = src->add_continuation([schedule = std::move(schedule)]()
mutable { schedule(); });
647 if (src->has_exception()) {
648 if constexpr (std::is_void_v<T>) {
649 (*fn_shared)(src->exception());
650 sink_promise->set_value();
652 auto recovered = (*fn_shared)(src->exception());
653 sink_promise->set_value(std::move(recovered));
656 if constexpr (std::is_void_v<T>) {
657 sink_promise->set_value();
659 sink_promise->set_value(T{src->get_value_ref()});
663 sink_promise->set_exception(std::current_exception());
671template <
class Exec,
class Fn>
674 metrics::future_continuations_registered().inc();
675 using U =
typename detail::invoke_result_helper<std::decay_t<Fn>, T>::type;
677 auto sink_promise = std::make_shared<Promise<U>>();
678 auto sink_future = sink_promise->get_future();
681 auto fn_shared = std::make_shared<std::decay_t<Fn>>(std::forward<Fn>(fn));
683 auto schedule = [&exec, src, fn_shared, sink_promise]() {
684 detail::execute_on(exec, [src, fn_shared, sink_promise]() {
685 detail::run_continuation<T>(src, *fn_shared, sink_promise);
689 auto result = src->add_continuation([schedule = std::move(schedule)]()
mutable { schedule(); });
691 detail::execute_on(exec, [src, fn_shared, sink_promise]() {
692 detail::run_continuation<T>(src, *fn_shared, sink_promise);
699template <
class Exec,
class Fn>
702 metrics::future_continuations_registered().inc();
703 auto sink_promise = std::make_shared<Promise<T>>();
704 auto sink_future = sink_promise->get_future().share();
707 auto fn_shared = std::make_shared<std::decay_t<Fn>>(std::forward<Fn>(fn));
709 auto run = [src, fn_shared, sink_promise]() {
710 metrics::future_continuations_dispatched().inc();
712 if (src->has_exception()) {
713 if constexpr (std::is_void_v<T>) {
714 (*fn_shared)(src->exception());
715 sink_promise->set_value();
717 auto recovered = (*fn_shared)(src->exception());
718 sink_promise->set_value(std::move(recovered));
721 if constexpr (std::is_void_v<T>) {
722 sink_promise->set_value();
724 sink_promise->set_value(T{src->get_value_ref()});
728 sink_promise->set_exception(std::current_exception());
732 auto schedule = [&exec, run = std::move(run)]()
mutable { detail::execute_on(exec, run); };
734 auto result = src->add_continuation([schedule = std::move(schedule)]()
mutable { schedule(); });
736 detail::execute_on(exec, run);
A Promise was dropped before it satisfied its Future.
Definition exceptions.hpp:39
A second .then() was registered on a one-shot threadman::Future.
Definition exceptions.hpp:59
Promise::get_future() was called more than once on the same promise.
Definition exceptions.hpp:45
Generic future/promise error.
Definition exceptions.hpp:32
Definition future.hpp:239
Future(Future &&) noexcept=default
auto then(Exec &exec, Fn &&fn)
Register a continuation; consumes this future.
Definition future.hpp:568
Future< T > on_error(Exec &exec, Fn &&fn)
Recover from a captured exception by running fn(exception_ptr).
Definition future.hpp:604
FutureSnapshot snapshot() const
Definition future.hpp:305
void wait() const
Definition future.hpp:257
Future(const Future &)=delete
bool wait_for(const std::chrono::duration< Rep, Period > &dur) const
Definition future.hpp:263
friend class Future
Definition future.hpp:324
Future & operator=(const Future &)=delete
bool is_ready() const noexcept
Definition future.hpp:253
bool wait_until(const std::chrono::time_point< Clock, Duration > &tp) const
Definition future.hpp:269
SharedFuture< T > share()
Convert to a multi-shot SharedFuture. Future is invalidated.
Definition future.hpp:427
bool valid() const noexcept
Definition future.hpp:250
static const comms::DisplayInfo & display_info()
Definition future.hpp:309
void get()
Definition future.hpp:285
U get()
Block, then consume — second get() throws.
Definition future.hpp:277
T value_type
Definition future.hpp:241
set_value() / set_exception() was called twice on the same promise.
Definition exceptions.hpp:51
Definition future.hpp:436
Promise()
Definition future.hpp:440
T value_type
Definition future.hpp:438
void set_value(U &&v)
Definition future.hpp:478
Future< T > get_future()
Retrieve the future.
Definition future.hpp:465
void set_value()
Definition future.hpp:487
Promise & operator=(const Promise &)=delete
Promise(const Promise &)=delete
void set_exception(const std::exception_ptr &ex)
Definition future.hpp:494
Promise(Promise &&) noexcept=default
static const comms::DisplayInfo & display_info()
Definition future.hpp:501
bool valid() const noexcept
Definition future.hpp:459
Definition future.hpp:342
FutureSnapshot snapshot() const
Definition future.hpp:394
T value_type
Definition future.hpp:344
SharedFuture(const SharedFuture &)=default
bool is_ready() const noexcept
Definition future.hpp:356
void wait() const
Definition future.hpp:360
SharedFuture< T > on_error(Exec &exec, Fn &&fn) const
Definition future.hpp:700
SharedFuture(SharedFuture &&) noexcept=default
bool valid() const noexcept
Definition future.hpp:353
static const comms::DisplayInfo & display_info()
Definition future.hpp:398
const U & get() const
Non-consuming read.
Definition future.hpp:376
auto then(Exec &exec, Fn &&fn) const
Definition future.hpp:672
friend class SharedFuture
Definition future.hpp:411
void get() const
Definition future.hpp:383
bool wait_for(const std::chrono::duration< Rep, Period > &dur) const
Definition future.hpp:366
T value_type
Definition future.hpp:65
std::function< void()> Continuation
Definition future.hpp:66
U get_value_move()
Block until ready, then either return the value (move/copy depending on T) or rethrow the captured ex...
Definition future.hpp:167
void set_value(U &&v)
Definition future.hpp:90
bool has_exception() const noexcept
Definition future.hpp:71
void set_exception(const std::exception_ptr &ex)
Definition future.hpp:127
AddResult add_continuation(Continuation cb)
Definition future.hpp:199
std::exception_ptr exception() const noexcept
Definition future.hpp:208
void wait() const
Definition future.hpp:146
FutureSnapshot snapshot(const bool shared) const
Definition future.hpp:79
void set_value()
Definition future.hpp:110
bool wait_for(const std::chrono::duration< Rep, Period > &dur) const
Definition future.hpp:152
bool ready() const noexcept
Definition future.hpp:68
std::uint32_t continuation_count() const noexcept
Definition future.hpp:74
AddResult
Register a continuation.
Definition future.hpp:197
void get_value_void() const
Definition future.hpp:187
const U & get_value_ref() const
Definition future.hpp:177
bool wait_until(const std::chrono::time_point< Clock, Duration > &tp) const
Definition future.hpp:158
Typed exception hierarchy thrown by ThreadMan.
Cached subsystem loggers for the tm.
Cached subsystem metrics for the tm_* Prometheus/OpenMetrics families, mirroring the tm.
void execute_on(IExecutor &exec, std::function< void()> task)
Dispatch helper used by Future::then / SharedFuture::then.
Definition executor.hpp:48
prom::Counter & future_continuations_registered()
.then/.on_error continuations registered against a future.
Definition metrics.hpp:252
prom::Counter & futures_satisfied_value()
Futures satisfied with a value.
Definition metrics.hpp:238
prom::Counter & future_continuations_dispatched()
Continuations dispatched through an executor.
Definition metrics.hpp:260
prom::Counter & futures_satisfied_exception()
Futures satisfied with an exception.
Definition metrics.hpp:245
prom::Counter & futures_created()
Promise/Future pairs created.
Definition metrics.hpp:231
Definition exceptions.hpp:22
Plain value snapshots of the live state of the ThreadMan world — threads, pools, tasks,...
A snapshot of a future-state object. Useful for diagnostics.
Definition stats.hpp:91
bool shared
Definition stats.hpp:95
std::uint32_t continuation_count
Definition stats.hpp:94
bool has_exception
Definition stats.hpp:93
bool ready
Definition stats.hpp:92
std::optional< T > value
Definition future.hpp:56