conduit 0.6.0
Modern C++23 header-only event-dispatching / event-transport library
Loading...
Searching...
No Matches
bus.hpp
Go to the documentation of this file.
1#pragma once
2
5
6#include <conduit/builder.hpp>
8#include <conduit/event.hpp>
9#include <conduit/flags.hpp>
10#include <conduit/glob.hpp>
11#include <conduit/listener.hpp>
12#include <conduit/metrics.hpp>
15#include <conduit/transport.hpp>
16
17#include <commons/prioritized.hpp>
18
19#include <atomic>
20#include <concepts>
21#include <exception>
22#include <functional>
23#include <memory>
24#include <mutex>
25#include <string>
26#include <string_view>
27#include <type_traits>
28#include <utility>
29#include <vector>
30
31#include <parcel/parcel.h>
32
33namespace conduit {
34
35namespace detail {
36
39 std::function<void(const EventEnvelope&)> handler;
40 std::string name_or_pattern; // empty if typed
41 bool is_pattern = false;
42 int listener_priority = comms::Prioritized::DEFAULT_PRIORITY;
43
45 return listener_priority;
46 }
47
51 bool operator==(const ListenerEntry& other) const noexcept {
52 return id == other.id;
53 }
54};
55
56} // namespace detail
57
58class Bus : public detail::SubscriptionBackref, public std::enable_shared_from_this<Bus> {
59public:
60 Bus() : registry_(std::make_shared<EventRegistry>()) {}
61
62 explicit Bus(std::shared_ptr<EventRegistry> registry)
63 : registry_(registry ? std::move(registry) : std::make_shared<EventRegistry>()) {}
64
65 ~Bus() override {
66 shutdown();
67 // Balance the always-on `conduit_listeners` gauge for any subscriptions
68 // still registered when the bus dies (they must not outlive it).
69 std::scoped_lock lock(mu_);
70 for (const auto& e : listeners_) {
71 metrics::listeners().labels(event_labels(e.name_or_pattern)).dec();
72 }
73 }
74
75 Bus(const Bus&) = delete;
76 Bus& operator=(const Bus&) = delete;
77 Bus(Bus&&) = delete;
78 Bus& operator=(Bus&&) = delete;
79
80 // -- Transports ---------------------------------------------------------
81
82 template <typename Tp, typename... Args>
83 Tp& use_transport(Args&&... args) {
84 auto t = std::make_shared<Tp>(std::forward<Args>(args)...);
85 Tp& ref = *t;
86 attach_transport(std::move(t));
87 return ref;
88 }
89
90 void use_transport(std::shared_ptr<Transport> t) {
91 attach_transport(std::move(t));
92 }
93
94 // -- Middleware ---------------------------------------------------------
95
96 template <typename M, typename... Args>
97 M& use_middleware(Args&&... args) {
98 auto m = std::make_shared<M>(std::forward<Args>(args)...);
99 M& ref = *m;
100 std::scoped_lock lock(mu_);
101 middleware_.insert(std::move(m));
102 return ref;
103 }
104
105 void use_middleware(std::shared_ptr<Middleware> m) {
106 std::scoped_lock lock(mu_);
107 middleware_.insert(std::move(m));
108 }
109
110 // -- Event registry ------------------------------------------------------
111
115 [[nodiscard]] const std::shared_ptr<EventRegistry>& registry() const noexcept {
116 return registry_;
117 }
118
123 template <typename T>
125 static_assert(std::derived_from<T, parcel::ICell>,
126 "Bus::register_event<T>(): T must derive from "
127 "conduit::Event<T, Name>");
128 registry_->add_descriptor(T::descriptor());
129 }
130
131 // -- Listener registration ----------------------------------------------
132
137 template <typename T, typename F>
138 [[nodiscard]] Subscription listen(F&& handler,
139 int priority = comms::Prioritized::DEFAULT_PRIORITY) {
140 static_assert(std::derived_from<T, parcel::ICell>,
141 "Bus::listen<T>(F): T must derive from conduit::Event<T, Name>");
142
143 register_event<T>();
144 const int effective_priority = derive_listener_priority(handler, priority);
145 auto wrapped = wrap_typed_handler<T>(std::forward<F>(handler));
146 return register_listener_for_name(std::string{T::event_name_v},
147 std::move(wrapped),
148 /*is_pattern=*/false,
149 effective_priority);
150 }
151
152 template <typename T>
153 [[nodiscard]] Subscription listen(std::shared_ptr<EventListener<T>> listener) {
154 register_event<T>();
155 const int priority = listener ? listener->priority() : comms::Prioritized::DEFAULT_PRIORITY;
156 auto wrapped = [listener](const EventEnvelope& v) {
157 if (auto typed = v.payload_as<T>(); typed) {
158 listener->on_event(*typed);
159 }
160 };
161 return register_listener_for_name(std::string{T::event_name_v},
162 std::move(wrapped),
163 /*is_pattern=*/false,
164 priority);
165 }
166
168 template <typename F>
169 [[nodiscard]] Subscription listen(const std::string_view pattern,
170 F&& handler,
171 const int priority = comms::Prioritized::DEFAULT_PRIORITY) {
172 const bool is_pattern = (pattern.contains('*'));
173 std::function<void(const EventEnvelope&)> wrapped =
174 [h = std::forward<F>(handler)](const EventEnvelope& v) mutable { h(v); };
175 return register_listener_for_name(
176 std::string{pattern}, std::move(wrapped), is_pattern, priority);
177 }
178
179 // -- Subscribers --------------------------------------------------------
180
182 s.register_to(*this);
183 }
184
185 // -- Publish ------------------------------------------------------------
186
187 void publish(const EventEnvelope& env) {
188 publish_impl(env);
189 }
190
191 template <typename T>
193 register_event<T>();
194 publish_impl(std::move(b).build());
195 }
196
197 template <typename T>
199 register_event<T>();
200 publish_impl(b.build());
201 }
202
203 template <typename T>
204 auto publish(T payload) -> void
205 requires(std::is_base_of_v<parcel::ICell, T>)
206 {
207 register_event<T>();
208 publish_impl(event(std::move(payload)).build());
209 }
210
211 // -- Lifecycle ----------------------------------------------------------
212
213 void drain() const {
214 std::vector<std::shared_ptr<Transport>> tcopy;
215 {
216 std::scoped_lock lock(mu_);
217 tcopy.assign(transports_.begin(), transports_.end());
218 }
219 for (const auto& t : tcopy) {
220 t->flush();
221 }
222 }
223
224 void shutdown() noexcept {
225 if (shutdown_.exchange(true)) {
226 return;
227 }
228 std::vector<std::shared_ptr<Transport>> tcopy;
229 {
230 std::scoped_lock lock(mu_);
231 tcopy.assign(transports_.begin(), transports_.end());
232 transports_.clear();
233 }
234 for (const auto& t : tcopy) {
235 try {
236 t->flush();
237 } catch (...) {
238 // ignore — shutdown must not throw
239 }
240 try {
241 t->detach();
242 } catch (...) {
243 // ignore — shutdown must not throw
244 }
245 }
246 }
247
251 void deliver_to_listeners(const EventEnvelope& v) const {
252 std::vector<detail::ListenerEntry> snapshot;
253 {
254 std::scoped_lock lock(mu_);
255 snapshot.assign(listeners_.begin(), listeners_.end());
256 }
257 const std::string_view name = v.name();
258 for (const auto& e : snapshot) {
259 const bool match =
260 e.is_pattern ? Glob::match(e.name_or_pattern, name) : (e.name_or_pattern == name);
261 if (!match) {
262 continue;
263 }
264 const auto labels = event_labels(name);
265 metrics::listener_invocations().labels(labels).inc();
266 try {
267 e.handler(v);
268 } catch (...) {
269 metrics::listener_errors().labels(labels).inc();
270 run_on_error(v, std::current_exception());
271 }
272 }
273 }
274
279 void report_transport_error(const std::string_view transport,
280 const std::exception_ptr& ep) const noexcept {
282 .labels(prom::Labels{{"transport", std::string{transport}}})
283 .inc();
284 std::vector<std::shared_ptr<Middleware>> snap;
285 {
286 std::scoped_lock lock(mu_);
287 snap.assign(middleware_.begin(), middleware_.end());
288 }
289 for (const auto& m : snap) {
290 try {
291 m->on_transport_error(transport, ep);
292 } catch (...) {
293 // ignore — error sink must be silent
294 }
295 }
296 }
297
299 void release(const detail::SubscriptionId id) noexcept override {
300 std::scoped_lock lock(mu_);
301 for (auto it = listeners_.begin(); it != listeners_.end(); ++it) {
302 if (it->id == id) {
303 const auto labels = event_labels(it->name_or_pattern);
304 listeners_.erase(it);
305 metrics::listeners().labels(labels).dec();
306 return;
307 }
308 }
309 }
310
311private:
317 [[nodiscard]] static prom::Labels event_labels(const std::string_view event) {
318 return prom::Labels{{"event", std::string{event}}};
319 }
320
321 void attach_transport(std::shared_ptr<Transport> t) {
322 t->attach(*this);
323 std::scoped_lock lock(mu_);
324 transports_.insert(std::move(t));
325 }
326
330 template <typename H>
331 static int derive_listener_priority(const H& handler, int explicit_priority) noexcept {
332 if (explicit_priority != comms::Prioritized::DEFAULT_PRIORITY) {
333 return explicit_priority;
334 }
335 if constexpr (requires { handler->priority(); }) {
336 return handler ? handler->priority() : explicit_priority;
337 } else {
338 return explicit_priority;
339 }
340 }
341
342 template <typename T, typename F>
343 static std::function<void(const EventEnvelope&)> wrap_typed_handler(F&& handler) {
344 using Handler = std::decay_t<F>;
345 if constexpr (std::is_invocable_v<Handler, const T&>) {
346 return [h = std::forward<F>(handler)](const EventEnvelope& v) mutable {
347 if (auto typed = v.payload_as<T>(); typed) {
348 h(*typed);
349 }
350 };
351 } else if constexpr (std::is_invocable_v<Handler, const EventEnvelope&>) {
352 return [h = std::forward<F>(handler)](const EventEnvelope& v) mutable { h(v); };
353 } else if constexpr (requires(Handler h, const T& t) { h->on_event(t); }) {
354 return [h = std::forward<F>(handler)](const EventEnvelope& v) mutable {
355 if (auto typed = v.payload_as<T>(); typed) {
356 h->on_event(*typed);
357 }
358 };
359 } else {
360 static_assert(sizeof(Handler) == 0,
361 "Bus::listen<T>(F): handler must take (const T&) or "
362 "(const EventEnvelope&), or be a pointer-like "
363 "EventListener<T>");
364 return {};
365 }
366 }
367
368 Subscription register_listener_for_name(std::string name_or_pattern,
369 std::function<void(const EventEnvelope&)> handler,
370 const bool is_pattern,
371 const int priority) {
372 const auto id = next_id_.fetch_add(1, std::memory_order_relaxed) + 1;
373 const auto labels = event_labels(name_or_pattern);
374 {
375 std::scoped_lock lock(mu_);
376 listeners_.insert(
377 priority,
378 detail::ListenerEntry{
379 id, std::move(handler), std::move(name_or_pattern), is_pattern, priority});
380 }
381 metrics::listeners().labels(labels).inc();
382 return Subscription{
383 std::static_pointer_cast<detail::SubscriptionBackref>(shared_from_this_safe()), id};
384 }
385
386 std::shared_ptr<Bus> shared_from_this_safe() {
387 // The bus may not yet be owned by a shared_ptr if the user constructed
388 // it as a stack/local. To support both, lazily create an alias-tracker
389 // that points to `this` but never deletes it.
390 if (!self_alias_) {
391 self_alias_ = std::shared_ptr<Bus>(this, [](Bus*) {});
392 }
393 return self_alias_;
394 }
395
396 void publish_impl(const EventEnvelope& v) const {
397 EventEnvelope local = v; // shared core/payload — cheap
398
399 const bool local_only = local.flags().contains<flags::LocalOnly>();
400 const bool remote_only = local.flags().contains<flags::RemoteOnly>();
401
402 const auto publish_started = std::chrono::system_clock::now();
403 local.timestamps().published_at = publish_started;
404 metrics::events_published().labels(event_labels(local.name())).inc();
405
406 // Middleware before_dispatch — unless NoMiddleware.
407 std::vector<std::shared_ptr<Middleware>> mw_snapshot;
408 const bool run_mw = !local.flags().contains<flags::NoMiddleware>();
409 if (run_mw) {
410 std::scoped_lock lock(mu_);
411 mw_snapshot.assign(middleware_.begin(), middleware_.end());
412 }
413
414 bool proceed = true;
415 if (run_mw) {
416 for (const auto& m : mw_snapshot) {
417 try {
418 if (!m->before_dispatch(local)) {
419 proceed = false;
420 metrics::events_dropped().labels(event_labels(local.name())).inc();
421 break;
422 }
423 } catch (...) {
424 try {
425 m->on_error(local, std::current_exception());
426 } catch (...) {
427 // suppress — middleware errors must not propagate
428 }
429 }
430 }
431 }
432
433 std::vector<std::shared_ptr<Transport>> transports_snapshot;
434 {
435 std::scoped_lock lock(mu_);
436 transports_snapshot.assign(transports_.begin(), transports_.end());
437 }
438
439 if (local_only && remote_only) {
440 run_on_error(local,
441 std::make_exception_ptr(std::runtime_error(
442 "LocalOnly + RemoteOnly conflict — envelope dropped")));
443 proceed = false;
444 metrics::events_dropped().labels(event_labels(local.name())).inc();
445 }
446
447 if (proceed) {
448 bool routed = false;
449 for (const auto& t : transports_snapshot) {
450 const auto sc = t->scope();
451 if (local_only && sc != TransportScope::Local)
452 continue;
453 if (remote_only && sc != TransportScope::Remote)
454 continue;
455 try {
456 t->dispatch(local);
457 routed = true;
458 } catch (...) {
459 run_on_error(local, std::current_exception());
460 }
461 }
462
463 // Default: if no transport was registered, perform local fan-out inline.
464 if (transports_snapshot.empty() && !remote_only) {
466 routed = true;
467 }
468 (void)routed;
469 }
470
471 if (run_mw) {
472 for (auto it = mw_snapshot.rbegin(); it != mw_snapshot.rend(); ++it) {
473 try {
474 (*it)->after_dispatch(local);
475 } catch (...) {
476 try {
477 (*it)->on_error(local, std::current_exception());
478 } catch (...) {}
479 }
480 }
481 }
482
483 const auto elapsed =
484 std::chrono::duration<double>(std::chrono::system_clock::now() - publish_started)
485 .count();
486 if (elapsed >= 0.0) {
487 metrics::dispatch_seconds().labels(event_labels(local.name())).observe(elapsed);
488 }
489 }
490
491 void run_on_error(const EventEnvelope& v, const std::exception_ptr& ep) const noexcept {
492 std::vector<std::shared_ptr<Middleware>> snap;
493 {
494 std::scoped_lock lock(mu_);
495 snap.assign(middleware_.begin(), middleware_.end());
496 }
497 EventEnvelope mv = v;
498 for (const auto& m : snap) {
499 try {
500 m->on_error(mv, ep);
501 } catch (...) {
502 // ignore — error sink must be silent
503 }
504 }
505 }
506
507 mutable std::mutex mu_;
508 std::shared_ptr<EventRegistry> registry_;
509 comms::PrioritizedSet<std::shared_ptr<Transport>> transports_;
510 comms::PrioritizedSet<std::shared_ptr<Middleware>> middleware_;
511 comms::PrioritizedSet<detail::ListenerEntry> listeners_;
512 std::atomic<detail::SubscriptionId> next_id_{0};
513 std::atomic<bool> shutdown_{false};
514 std::shared_ptr<Bus> self_alias_;
515};
516
517// -- Transport base-class definitions that depend on Bus -------------------
518
519inline void Transport::attach(Bus& bus) {
521}
522
523// -- EventSubscriber helpers (need Bus interface) --------------------------
524
525template <typename T, typename F>
526inline void EventSubscriber::on(Bus& bus, F&& handler) {
527 subscriptions_.push_back(bus.listen<T>(std::forward<F>(handler)));
528}
529
530template <typename F>
531inline void EventSubscriber::on(Bus& bus, std::string_view pattern, F&& handler) {
532 subscriptions_.push_back(bus.listen(pattern, std::forward<F>(handler)));
533}
534
535} // namespace conduit
Fluent builder for EventEnvelope plus the conduit::event(...) and conduit::make_event<T>(....
Definition bus.hpp:58
Bus & operator=(Bus &&)=delete
void register_event()
Register an event type T with the bus's event registry.
Definition bus.hpp:124
void register_subscriber(EventSubscriber &s)
Definition bus.hpp:181
~Bus() override
Definition bus.hpp:65
void use_middleware(std::shared_ptr< Middleware > m)
Definition bus.hpp:105
M & use_middleware(Args &&... args)
Definition bus.hpp:97
void publish(const EventEnvelope &env)
Definition bus.hpp:187
void deliver_to_listeners(const EventEnvelope &v) const
Called by transports (e.g.
Definition bus.hpp:251
Bus()
Definition bus.hpp:60
void drain() const
Definition bus.hpp:213
Subscription listen(F &&handler, int priority=comms::Prioritized::DEFAULT_PRIORITY)
Typed listener; the handler is called with const EventEnvelope& or const T&, or a shared_ptr<EventLis...
Definition bus.hpp:138
auto publish(T payload) -> void requires(std::is_base_of_v< parcel::ICell, T >)
Definition bus.hpp:204
Bus(Bus &&)=delete
void release(const detail::SubscriptionId id) noexcept override
SubscriptionBackref hook.
Definition bus.hpp:299
Bus(std::shared_ptr< EventRegistry > registry)
Definition bus.hpp:62
void shutdown() noexcept
Definition bus.hpp:224
Tp & use_transport(Args &&... args)
Definition bus.hpp:83
void publish(EventBuilder< T > &&b)
Definition bus.hpp:192
void report_transport_error(const std::string_view transport, const std::exception_ptr &ep) const noexcept
Surface a transport-level failure (e.g.
Definition bus.hpp:279
Bus & operator=(const Bus &)=delete
const std::shared_ptr< EventRegistry > & registry() const noexcept
Shared registry handle.
Definition bus.hpp:115
void use_transport(std::shared_ptr< Transport > t)
Definition bus.hpp:90
Subscription listen(const std::string_view pattern, F &&handler, const int priority=comms::Prioritized::DEFAULT_PRIORITY)
Runtime listener by exact name or glob pattern.
Definition bus.hpp:169
void publish(EventBuilder< T > &b)
Definition bus.hpp:198
Bus(const Bus &)=delete
Subscription listen(std::shared_ptr< EventListener< T > > listener)
Definition bus.hpp:153
Definition builder.hpp:25
EventEnvelope build()
Definition builder.hpp:76
Polymorphic envelope cell.
Definition envelope.hpp:62
std::string_view name() const noexcept
Definition envelope.hpp:91
Class-based listener — derive and override on_event.
Definition listener.hpp:104
Registers event cells for wire decoding.
Definition serialization.hpp:32
Multi-event subscriber base.
Definition listener.hpp:125
virtual void register_to(Bus &bus)=0
void on(Bus &bus, F &&handler)
Definition bus.hpp:526
static constexpr bool match(const std::string_view pattern, const std::string_view name) noexcept
Free function helper for one-shot matching.
Definition glob.hpp:103
RAII handle returned by Bus::listen(...).
Definition listener.hpp:47
Bus * bus() const noexcept
Definition transport.hpp:68
virtual void attach(Bus &bus)
Attach to a bus.
Definition bus.hpp:519
virtual void attach_with_sink(Bus &bus, InboundSink sink)
Attach to a bus using a caller-supplied inbound sink.
Definition transport.hpp:50
Erased back-reference the Subscription holds.
Definition listener.hpp:32
EventEnvelope — a parcel cell carrying conduit's envelope metadata plus a polymorphic payload cell.
Event<Self, Name> library base built on parcel::SelfStructCell.
Conduit flag tags, built atop comms::Flag / comms::FlagSet.
Event-name glob matcher.
Transport interface and the local/remote scope enum used for flag-based filtering.
Listener / Subscription / Subscriber primitives.
Always-on, backend-agnostic prom metrics for the conduit event system.
Middleware interface for the bus dispatch pipeline.
flags::FlagSet collect_default_flags()
Definition event.hpp:86
std::uint64_t SubscriptionId
Token used by the bus to unregister a listener.
Definition listener.hpp:27
prom::Counter & listener_invocations()
Listener handler calls during fan-out (counts every matched listener, summed over all delivered envel...
Definition metrics.hpp:66
prom::Counter & events_published()
Envelopes that entered the dispatch pipeline (one per Bus::publish).
Definition metrics.hpp:48
prom::Counter & transport_errors()
Transport-level inbound failures, selected by the transport label ("redis", "amqp",...
Definition metrics.hpp:82
prom::Counter & listener_errors()
Listener handlers that threw during fan-out.
Definition metrics.hpp:74
prom::Counter & events_dropped()
Envelopes dropped before listener fan-out (a middleware before_dispatch returned false,...
Definition metrics.hpp:57
prom::Gauge & listeners()
Live listener subscriptions across all buses in the process.
Definition metrics.hpp:90
prom::Histogram & dispatch_seconds()
Wall-clock seconds from publish to the end of the synchronous dispatch pipeline.
Definition metrics.hpp:99
Definition builder.hpp:22
EventBuilder< T > event(T payload)
Definition builder.hpp:99
Thin EventRegistry wrapper around parcel::ParcelRegistry, plus envelope encode/decode helpers for JSO...
Definition bus.hpp:37
bool is_pattern
Definition bus.hpp:41
SubscriptionId id
Definition bus.hpp:38
int listener_priority
Definition bus.hpp:42
int priority() const noexcept
Definition bus.hpp:44
std::string name_or_pattern
Definition bus.hpp:40
std::function< void(const EventEnvelope &)> handler
Definition bus.hpp:39
bool operator==(const ListenerEntry &other) const noexcept
Identity is the subscription id — that's what the bus uses to dedupe and remove a listener; two disti...
Definition bus.hpp:51