conduit 0.6.0
Modern C++23 header-only event-dispatching / event-transport library
Loading...
Searching...
No Matches
transport.hpp
Go to the documentation of this file.
1#pragma once
2
5
6#include <conduit/bus.hpp>
8#include <conduit/flags.hpp>
10
11#include <threadman/manager.hpp>
12
13#include <condition_variable>
14#include <cstddef>
15#include <cstdint>
16#include <memory>
17#include <mutex>
18#include <string>
19#include <utility>
20
21namespace conduit::local {
22
23enum class Execution : std::uint8_t { Direct, Queue, ThreadPool };
24
35using ThreadPoolConfig = threadman::ThreadPoolOptions;
36
38public:
40 : mode_(mode) {
41 if (mode_ == Execution::Direct) {
42 return;
43 }
44 // Give the pool a conduit-branded name (so threadman's tm_pool_* series
45 // are attributable) unless the caller set one of their own. threadman's
46 // default name isn't empty, so treat it as "unset" too.
47 const bool name_is_default =
48 cfg.name.empty() || cfg.name == threadman::ThreadPoolOptions{}.name;
49 if (mode_ == Execution::Queue) {
50 // A queue is a single serialized worker, whatever counts were given.
51 cfg.min_workers = 1;
52 cfg.max_workers = 1;
53 if (name_is_default) {
54 cfg.name = "conduit:local-queue";
55 }
56 } else if (name_is_default) { // ThreadPool
57 cfg.name = "conduit:local";
58 }
59 // Capture the requested bound for our own blocking back-pressure, then
60 // hand threadman an unbounded queue so it never *rejects* (drops) an
61 // envelope on overflow — dispatch() blocks the producer instead.
62 capacity_ = cfg.max_queue_size;
63 cfg.max_queue_size = 0;
64 pool_ = std::make_unique<threadman::ThreadPool>(std::move(cfg));
65 }
66
68 return Transport{Execution::ThreadPool, std::move(cfg)};
69 }
70 static Transport queued(const std::size_t queue_capacity = 0) {
72 cfg.max_queue_size = queue_capacity;
73 return Transport{Execution::Queue, std::move(cfg)};
74 }
75
76 [[nodiscard]] TransportScope scope() const noexcept override {
78 }
79
80 void detach() noexcept override {
81 flush();
82 }
83
84 void dispatch(const EventEnvelopeView& v) override {
85 if (const bool force_direct = v.flags().contains<flags::Direct>();
86 mode_ == Execution::Direct || force_direct || !pool_) {
88 return;
89 }
90 EventEnvelopeView snap = v; // shared core/payload
91 {
92 std::unique_lock lock(drain_mu_);
93 // Bounded-queue back-pressure: block the producer while there are
94 // already `capacity_` units in flight. threadman's own
95 // `max_queue_size` *rejects* on overflow (throws) rather than
96 // blocking, which would drop events — so the pool queue stays
97 // unbounded and this semaphore preserves the blocking semantics
98 // without ever losing an envelope.
99 if (capacity_ != 0) {
100 space_cv_.wait(lock, [this] { return outstanding_ < capacity_; });
101 }
102 ++outstanding_;
103 }
104 try {
105 pool_->execute([this, snap]() mutable {
106 try {
107 deliver_inbound(snap);
108 } catch (...) {
109 // deliver_inbound routes listener exceptions through the bus
110 // middleware; guard here so nothing escapes the pool worker.
111 }
112 complete_one();
113 });
114 } catch (...) {
115 // Submission failed (e.g. pool shutting down) — undo the reservation
116 // so flush() can still reach zero.
117 complete_one();
118 }
119 }
120
121 void flush() override {
122 if (!pool_) {
123 return;
124 }
125 std::unique_lock lock(drain_mu_);
126 drained_cv_.wait(lock, [this] { return outstanding_ == 0; });
127 }
128
129 [[nodiscard]] Execution mode() const noexcept {
130 return mode_;
131 }
132
133private:
134 void complete_one() {
135 std::scoped_lock lock(drain_mu_);
136 if (--outstanding_ == 0) {
137 drained_cv_.notify_all();
138 }
139 space_cv_.notify_one();
140 }
141
142 Execution mode_;
143 std::unique_ptr<threadman::ThreadPool> pool_;
144 std::size_t capacity_ = 0; // blocking back-pressure bound (0 = unbounded)
145
146 std::mutex drain_mu_;
147 std::condition_variable drained_cv_; // fires when outstanding_ hits 0
148 std::condition_variable space_cv_; // fires when a bounded-queue slot frees
149 std::size_t outstanding_ = 0;
150};
151
152} // namespace conduit::local
Bus — owns transports, middleware, listeners; dispatches envelopes.
Polymorphic envelope cell.
Definition envelope.hpp:62
const flags::FlagSet & flags() const noexcept
Definition envelope.hpp:103
Definition transport.hpp:30
void deliver_inbound(const EventEnvelopeView &v) const
Subclasses call this for inbound delivery instead of touching the bus directly.
Definition transport.hpp:62
Definition transport.hpp:37
void dispatch(const EventEnvelopeView &v) override
Definition transport.hpp:84
TransportScope scope() const noexcept override
Definition transport.hpp:76
void detach() noexcept override
Definition transport.hpp:80
Execution mode() const noexcept
Definition transport.hpp:129
static Transport queued(const std::size_t queue_capacity=0)
Definition transport.hpp:70
Transport(const Execution mode=Execution::Direct, ThreadPoolConfig cfg={})
Definition transport.hpp:39
void flush() override
Definition transport.hpp:121
static Transport thread_pool(ThreadPoolConfig cfg={})
Definition transport.hpp:67
EventEnvelope — a parcel cell carrying conduit's envelope metadata plus a polymorphic payload cell.
Conduit flag tags, built atop comms::Flag / comms::FlagSet.
Transport interface and the local/remote scope enum used for flag-based filtering.
Definition transport.hpp:21
threadman::ThreadPoolOptions ThreadPoolConfig
Configuration for the Queue / ThreadPool execution modes.
Definition transport.hpp:35
Execution
Definition transport.hpp:23
TransportScope
Distinguishes in-process transports from off-machine ones.
Definition transport.hpp:21
Force same-thread dispatch even in Queue / ThreadPool execution modes.
Definition flags.hpp:41