17#include <commons/prioritized.hpp>
31#include <parcel/parcel.h>
52 return id ==
other.id;
69 std::scoped_lock lock(mu_);
70 for (
const auto& e : listeners_) {
82 template <
typename Tp,
typename... Args>
84 auto t = std::make_shared<Tp>(std::forward<Args>(args)...);
86 attach_transport(std::move(t));
91 attach_transport(std::move(t));
96 template <
typename M,
typename... Args>
98 auto m = std::make_shared<M>(std::forward<Args>(args)...);
100 std::scoped_lock lock(mu_);
101 middleware_.insert(std::move(m));
106 std::scoped_lock lock(mu_);
107 middleware_.insert(std::move(m));
115 [[nodiscard]]
const std::shared_ptr<EventRegistry>&
registry() const noexcept {
123 template <
typename T>
125 static_assert(std::derived_from<T, parcel::ICell>,
126 "Bus::register_event<T>(): T must derive from "
127 "conduit::Event<T, Name>");
128 registry_->add_descriptor(T::descriptor());
137 template <
typename T,
typename F>
139 int priority = comms::Prioritized::DEFAULT_PRIORITY) {
140 static_assert(std::derived_from<T, parcel::ICell>,
141 "Bus::listen<T>(F): T must derive from conduit::Event<T, Name>");
144 const int effective_priority = derive_listener_priority(handler, priority);
145 auto wrapped = wrap_typed_handler<T>(std::forward<F>(handler));
146 return register_listener_for_name(std::string{T::event_name_v},
152 template <
typename T>
155 const int priority = listener ? listener->priority() : comms::Prioritized::DEFAULT_PRIORITY;
157 if (
auto typed = v.payload_as<T>(); typed) {
158 listener->on_event(*typed);
161 return register_listener_for_name(std::string{T::event_name_v},
168 template <
typename F>
171 const int priority = comms::Prioritized::DEFAULT_PRIORITY) {
172 const bool is_pattern = (pattern.contains(
'*'));
174 [h = std::forward<F>(handler)](
const EventEnvelope& v)
mutable { h(v); };
175 return register_listener_for_name(
176 std::string{pattern}, std::move(wrapped), is_pattern, priority);
191 template <
typename T>
194 publish_impl(std::move(b).build());
197 template <
typename T>
200 publish_impl(b.
build());
203 template <
typename T>
205 requires(std::is_base_of_v<parcel::ICell, T>)
208 publish_impl(
event(std::move(payload)).build());
214 std::vector<std::shared_ptr<Transport>> tcopy;
216 std::scoped_lock lock(mu_);
217 tcopy.assign(transports_.begin(), transports_.end());
219 for (
const auto& t : tcopy) {
225 if (shutdown_.exchange(
true)) {
228 std::vector<std::shared_ptr<Transport>> tcopy;
230 std::scoped_lock lock(mu_);
231 tcopy.assign(transports_.begin(), transports_.end());
234 for (
const auto& t : tcopy) {
252 std::vector<detail::ListenerEntry> snapshot;
254 std::scoped_lock lock(mu_);
255 snapshot.assign(listeners_.begin(), listeners_.end());
257 const std::string_view name = v.
name();
258 for (
const auto& e : snapshot) {
260 e.is_pattern ?
Glob::match(e.name_or_pattern, name) : (e.name_or_pattern == name);
264 const auto labels = event_labels(name);
270 run_on_error(v, std::current_exception());
280 const std::exception_ptr& ep)
const noexcept {
282 .labels(prom::Labels{{
"transport", std::string{transport}}})
284 std::vector<std::shared_ptr<Middleware>> snap;
286 std::scoped_lock lock(mu_);
287 snap.assign(middleware_.begin(), middleware_.end());
289 for (
const auto& m : snap) {
291 m->on_transport_error(transport, ep);
300 std::scoped_lock lock(mu_);
301 for (
auto it = listeners_.begin(); it != listeners_.end(); ++it) {
303 const auto labels = event_labels(it->name_or_pattern);
304 listeners_.erase(it);
317 [[nodiscard]]
static prom::Labels event_labels(
const std::string_view
event) {
318 return prom::Labels{{
"event", std::string{
event}}};
321 void attach_transport(std::shared_ptr<Transport> t) {
323 std::scoped_lock lock(mu_);
324 transports_.insert(std::move(t));
330 template <
typename H>
331 static int derive_listener_priority(
const H& handler,
int explicit_priority)
noexcept {
332 if (explicit_priority != comms::Prioritized::DEFAULT_PRIORITY) {
333 return explicit_priority;
335 if constexpr (
requires { handler->priority(); }) {
336 return handler ? handler->priority() : explicit_priority;
338 return explicit_priority;
342 template <
typename T,
typename F>
343 static std::function<void(
const EventEnvelope&)> wrap_typed_handler(F&& handler) {
344 using Handler = std::decay_t<F>;
345 if constexpr (std::is_invocable_v<Handler, const T&>) {
346 return [h = std::forward<F>(handler)](
const EventEnvelope& v)
mutable {
347 if (
auto typed = v.payload_as<T>(); typed) {
351 }
else if constexpr (std::is_invocable_v<Handler, const EventEnvelope&>) {
352 return [h = std::forward<F>(handler)](
const EventEnvelope& v)
mutable { h(v); };
353 }
else if constexpr (
requires(Handler h,
const T& t) { h->on_event(t); }) {
354 return [h = std::forward<F>(handler)](
const EventEnvelope& v)
mutable {
355 if (
auto typed = v.payload_as<T>(); typed) {
360 static_assert(
sizeof(Handler) == 0,
361 "Bus::listen<T>(F): handler must take (const T&) or "
362 "(const EventEnvelope&), or be a pointer-like "
368 Subscription register_listener_for_name(std::string name_or_pattern,
369 std::function<
void(
const EventEnvelope&)> handler,
370 const bool is_pattern,
371 const int priority) {
372 const auto id = next_id_.fetch_add(1, std::memory_order_relaxed) + 1;
373 const auto labels = event_labels(name_or_pattern);
375 std::scoped_lock lock(mu_);
378 detail::ListenerEntry{
379 id, std::move(handler), std::move(name_or_pattern), is_pattern, priority});
383 std::static_pointer_cast<detail::SubscriptionBackref>(shared_from_this_safe()),
id};
386 std::shared_ptr<Bus> shared_from_this_safe() {
391 self_alias_ = std::shared_ptr<Bus>(
this, [](
Bus*) {});
396 void publish_impl(
const EventEnvelope& v)
const {
397 EventEnvelope local = v;
399 const bool local_only = local.flags().contains<flags::LocalOnly>();
400 const bool remote_only = local.flags().contains<flags::RemoteOnly>();
402 const auto publish_started = std::chrono::system_clock::now();
403 local.timestamps().published_at = publish_started;
407 std::vector<std::shared_ptr<Middleware>> mw_snapshot;
408 const bool run_mw = !local.flags().contains<flags::NoMiddleware>();
410 std::scoped_lock lock(mu_);
411 mw_snapshot.assign(middleware_.begin(), middleware_.end());
416 for (
const auto& m : mw_snapshot) {
418 if (!m->before_dispatch(local)) {
425 m->on_error(local, std::current_exception());
433 std::vector<std::shared_ptr<Transport>> transports_snapshot;
435 std::scoped_lock lock(mu_);
436 transports_snapshot.assign(transports_.begin(), transports_.end());
439 if (local_only && remote_only) {
441 std::make_exception_ptr(std::runtime_error(
442 "LocalOnly + RemoteOnly conflict — envelope dropped")));
449 for (
const auto& t : transports_snapshot) {
450 const auto sc = t->scope();
459 run_on_error(local, std::current_exception());
464 if (transports_snapshot.empty() && !remote_only) {
472 for (
auto it = mw_snapshot.rbegin(); it != mw_snapshot.rend(); ++it) {
474 (*it)->after_dispatch(local);
477 (*it)->on_error(local, std::current_exception());
484 std::chrono::duration<double>(std::chrono::system_clock::now() - publish_started)
486 if (elapsed >= 0.0) {
491 void run_on_error(
const EventEnvelope& v,
const std::exception_ptr& ep)
const noexcept {
492 std::vector<std::shared_ptr<Middleware>> snap;
494 std::scoped_lock lock(mu_);
495 snap.assign(middleware_.begin(), middleware_.end());
497 EventEnvelope mv = v;
498 for (
const auto& m : snap) {
507 mutable std::mutex mu_;
508 std::shared_ptr<EventRegistry> registry_;
509 comms::PrioritizedSet<std::shared_ptr<Transport>> transports_;
510 comms::PrioritizedSet<std::shared_ptr<Middleware>> middleware_;
511 comms::PrioritizedSet<detail::ListenerEntry> listeners_;
512 std::atomic<detail::SubscriptionId> next_id_{0};
513 std::atomic<bool> shutdown_{
false};
514 std::shared_ptr<Bus> self_alias_;
525template <
typename T,
typename F>
527 subscriptions_.push_back(bus.
listen<T>(std::forward<F>(handler)));
532 subscriptions_.push_back(bus.
listen(pattern, std::forward<F>(handler)));
Fluent builder for EventEnvelope plus the conduit::event(...) and conduit::make_event<T>(....
Bus & operator=(Bus &&)=delete
void register_event()
Register an event type T with the bus's event registry.
Definition bus.hpp:124
void register_subscriber(EventSubscriber &s)
Definition bus.hpp:181
~Bus() override
Definition bus.hpp:65
void use_middleware(std::shared_ptr< Middleware > m)
Definition bus.hpp:105
M & use_middleware(Args &&... args)
Definition bus.hpp:97
void publish(const EventEnvelope &env)
Definition bus.hpp:187
void deliver_to_listeners(const EventEnvelope &v) const
Called by transports (e.g.
Definition bus.hpp:251
Bus()
Definition bus.hpp:60
void drain() const
Definition bus.hpp:213
Subscription listen(F &&handler, int priority=comms::Prioritized::DEFAULT_PRIORITY)
Typed listener; the handler is called with const EventEnvelope& or const T&, or a shared_ptr<EventLis...
Definition bus.hpp:138
auto publish(T payload) -> void requires(std::is_base_of_v< parcel::ICell, T >)
Definition bus.hpp:204
void release(const detail::SubscriptionId id) noexcept override
SubscriptionBackref hook.
Definition bus.hpp:299
Bus(std::shared_ptr< EventRegistry > registry)
Definition bus.hpp:62
void shutdown() noexcept
Definition bus.hpp:224
Tp & use_transport(Args &&... args)
Definition bus.hpp:83
void publish(EventBuilder< T > &&b)
Definition bus.hpp:192
void report_transport_error(const std::string_view transport, const std::exception_ptr &ep) const noexcept
Surface a transport-level failure (e.g.
Definition bus.hpp:279
Bus & operator=(const Bus &)=delete
const std::shared_ptr< EventRegistry > & registry() const noexcept
Shared registry handle.
Definition bus.hpp:115
void use_transport(std::shared_ptr< Transport > t)
Definition bus.hpp:90
Subscription listen(const std::string_view pattern, F &&handler, const int priority=comms::Prioritized::DEFAULT_PRIORITY)
Runtime listener by exact name or glob pattern.
Definition bus.hpp:169
void publish(EventBuilder< T > &b)
Definition bus.hpp:198
Subscription listen(std::shared_ptr< EventListener< T > > listener)
Definition bus.hpp:153
Definition builder.hpp:25
EventEnvelope build()
Definition builder.hpp:76
Polymorphic envelope cell.
Definition envelope.hpp:62
std::string_view name() const noexcept
Definition envelope.hpp:91
Class-based listener — derive and override on_event.
Definition listener.hpp:104
Registers event cells for wire decoding.
Definition serialization.hpp:32
Multi-event subscriber base.
Definition listener.hpp:125
virtual void register_to(Bus &bus)=0
void on(Bus &bus, F &&handler)
Definition bus.hpp:526
static constexpr bool match(const std::string_view pattern, const std::string_view name) noexcept
Free function helper for one-shot matching.
Definition glob.hpp:103
RAII handle returned by Bus::listen(...).
Definition listener.hpp:47
Bus * bus() const noexcept
Definition transport.hpp:68
virtual void attach(Bus &bus)
Attach to a bus.
Definition bus.hpp:519
virtual void attach_with_sink(Bus &bus, InboundSink sink)
Attach to a bus using a caller-supplied inbound sink.
Definition transport.hpp:50
Erased back-reference the Subscription holds.
Definition listener.hpp:32
EventEnvelope — a parcel cell carrying conduit's envelope metadata plus a polymorphic payload cell.
Event<Self, Name> library base built on parcel::SelfStructCell.
Conduit flag tags, built atop comms::Flag / comms::FlagSet.
Transport interface and the local/remote scope enum used for flag-based filtering.
Listener / Subscription / Subscriber primitives.
Always-on, backend-agnostic prom metrics for the conduit event system.
Middleware interface for the bus dispatch pipeline.
flags::FlagSet collect_default_flags()
Definition event.hpp:86
std::uint64_t SubscriptionId
Token used by the bus to unregister a listener.
Definition listener.hpp:27
prom::Counter & listener_invocations()
Listener handler calls during fan-out (counts every matched listener, summed over all delivered envel...
Definition metrics.hpp:66
prom::Counter & events_published()
Envelopes that entered the dispatch pipeline (one per Bus::publish).
Definition metrics.hpp:48
prom::Counter & transport_errors()
Transport-level inbound failures, selected by the transport label ("redis", "amqp",...
Definition metrics.hpp:82
prom::Counter & listener_errors()
Listener handlers that threw during fan-out.
Definition metrics.hpp:74
prom::Counter & events_dropped()
Envelopes dropped before listener fan-out (a middleware before_dispatch returned false,...
Definition metrics.hpp:57
prom::Gauge & listeners()
Live listener subscriptions across all buses in the process.
Definition metrics.hpp:90
prom::Histogram & dispatch_seconds()
Wall-clock seconds from publish to the end of the synchronous dispatch pipeline.
Definition metrics.hpp:99
Definition builder.hpp:22
EventBuilder< T > event(T payload)
Definition builder.hpp:99
Thin EventRegistry wrapper around parcel::ParcelRegistry, plus envelope encode/decode helpers for JSO...
bool is_pattern
Definition bus.hpp:41
SubscriptionId id
Definition bus.hpp:38
int listener_priority
Definition bus.hpp:42
int priority() const noexcept
Definition bus.hpp:44
std::string name_or_pattern
Definition bus.hpp:40
std::function< void(const EventEnvelope &)> handler
Definition bus.hpp:39
bool operator==(const ListenerEntry &other) const noexcept
Identity is the subscription id — that's what the bus uses to dedupe and remove a listener; two disti...
Definition bus.hpp:51