|
conduit 0.6.0
Modern C++23 header-only event-dispatching / event-transport library
|
conduit is a C++23 event bus and event-transport library. You declare events as C++ types with compile-time names, hand them to a Bus, and the bus delivers them — synchronously, on a thread pool, or out over MQTT / AMQP / NATS / Redis / ZeroMQ. The same event object also serializes itself to JSON or CBOR so it can leave the process and come back.
[](const OrderCreated& o){...}).relay transport hands matching envelopes to a user callback; point it at your websocket / HTTP webhook / log sink.bus.listen("order.**", ...)) without rolling your own matcher.std::function, std::mutex, and (for broker transports) heap-allocated wire buffers. It is fire-and-forget by default — failures are reported through middleware, not thrown at the publisher.Durable / Persistent / RequireAck flags through, but actual durability is the broker adapter's job (MQTT QoS, AMQP confirm.select, etc.).A few things to notice in this example:
Greeted derives from conduit::Event<Greeted, "greeted">. The CRTP parameter is the event class itself; the fixed-string is the event's wire-stable name.event_field_descriptors is required (even for empty events — return b;). It is how parcel learns to encode the event for transport.bus.publish(...) is void and does not throw on listener errors; exceptions are routed to middleware (see Error handling).sub handle is RAII: when it is destroyed the listener is unregistered.conduit is a CMake project. The core library is header-only; each broker adapter is an opt-in static library gated by CONDUIT_TRANSPORT_<NAME>=ON.
To opt into a transport adapter — for example MQTT — set the option before fetching:
conduit generates an install rule when it is built top-level and none of its dependencies were pulled via FetchContent. After a regular cmake --install, downstream projects can:
Drop the repo into a third_party/ folder and add_subdirectory(third_party/conduit). The library exports conduit::conduit and one conduit::transport_<name> target per enabled adapter.
nlohmann/json 3.12.0cpp-ulid 1.0.0cpp-parcel 0.2.2cpp-metadata 0.2.0cpp-commons 0.1.5cpp-threadman 0.1.0 — all threading (in-process executor pool + transport background threads)cpp-prom 0.1.0 — always-on event metricsBus records conduit_* series directly from its publish / fan-out / error / subscription paths.CONDUIT_BUILD_TESTS=ON, default on top-level builds).| Option | Pulls in |
|---|---|
CONDUIT_TRANSPORT_MQTT | Paho MQTT C++ + Paho MQTT C |
CONDUIT_TRANSPORT_AMQP | AMQP-CPP (optional OpenSSL with CONDUIT_TRANSPORT_AMQP_TLS) |
CONDUIT_TRANSPORT_NATS | nats.c (optional OpenSSL with CONDUIT_TRANSPORT_NATS_TLS) |
CONDUIT_TRANSPORT_REDIS | redis-plus-plus + hiredis (optional TLS) |
CONDUIT_TRANSPORT_ZMQ | libzmq + cppzmq (optional libsodium with CONDUIT_TRANSPORT_ZMQ_CURVE) |
find_package(Threads REQUIRED) is unconditional.User events derive from this CRTP base. Self is the event class itself; Name is the wire-stable name as a parcel::fixed_string. Each event must provide a static event_field_descriptors hook that declares its fields — parcel uses it for JSON/CBOR encode and decode.
An event with no fields still needs the hook — return the builder unchanged:
Events must be default-constructible: the registry decodes by calling std::make_shared<T>() and then populating fields.
The polymorphic wrapper that flows through the bus. It carries:
id().flags() — see Flags.metadata() — a JSON-shaped key/value tree backed by md::Metadata. Values may be strings, booleans, integers, floats, arrays, or nested objects; access via require_string("k") / get_string_if("k") / at("k").as_int() etc., or path-style with require_path("device.firmware.major").timestamps() — created_at, published_at, received_at, delivered_at, failed_at.correlation_id() and causation_id() (ULIDs).payload_as<T>() returning std::shared_ptr<const T> (or nullptr if the payload is not a T).EventEnvelopeView is a source-compatibility alias for EventEnvelope. The bus stores envelopes by shared_ptr internally, so copying an envelope is cheap and the core is shared — mutations through a non-const accessor on a copy show up on every other copy.
A fluent builder. Returned by conduit::event(payload); finalized with .build() or by implicit conversion to EventEnvelope (used by bus.publish(builder)):
Owns transports, middleware, and listeners. Constructible on the stack or via shared_ptr — the bus tracks subscriptions through a self-aliasing pointer so both work. Bus is non-copyable and non-movable; pass it by reference.
The bus is destroyed last: its destructor calls shutdown(), which flushes and detaches every transport. shutdown() is idempotent and noexcept.
An abstract base for anything that carries envelopes. The library ships with:
conduit::local::Transport — in-process. Three modes: Direct (same thread), Queue (single worker), ThreadPool (N workers).conduit::relay::Transport — hands envelopes whose name matches a glob to a user callback.conduit::FilteredTransport — wraps another transport with outbound/inbound predicates.conduit::mqtt::Transport, amqp::Transport, nats::Transport, redis::Transport, zmq::Transport — opt-in broker adapters.A transport returns TransportScope::Local or TransportScope::Remote from scope(). The bus uses that together with the LocalOnly / RemoteOnly flags on an envelope to decide where it goes.
Hooks invoked around every publish:
before_dispatch can return false to drop the envelope.on_error fires when a listener throws (or when an invariant like LocalOnly + RemoteOnly is violated).on_transport_error fires when a transport adapter fails to decode an inbound message — there is no envelope at that point, so the hook receives the transport's short name and the exception.Bus::listen(...) returns a Subscription (a move-only RAII handle). Drop it to unregister. Subscription::detach() and Subscription::release() exist; prefer letting the handle's destructor do the work.
EventSubscriber is a base class that owns several Subscriptions in one place — convenient for projection / aggregator objects that listen to many events.
bus.listen<T>(handler) registers the event type with the bus's registry automatically, so a remote transport on the same bus can decode incoming T envelopes too.
When projection is destroyed, every subscription it owns is cleaned up.
max_queue_size = 0 is unbounded — convenient for tests, hazardous in production if a consumer falls behind. Set a positive cap so dispatch() blocks the producer when the queue is full (conduit blocks rather than dropping; threadman's native reject-on-overflow is not used here).
flags::Direct on an envelope overrides the mode and forces inline delivery even when the local transport is queued or pooled — useful for "this must happen synchronously" events like config reload:
relay::Transport accepts a glob; you can add_route / remove_route at runtime. Callbacks run on whatever thread dispatch() happens to be called from — usually the publisher's, unless a local::Transport in Queue/ThreadPool mode is in front of you. Exceptions thrown from a relay callback are swallowed so the bus's fire-and-forget contract holds.
FilteredTransport wraps an inner transport with up to two predicates. Outbound gating decides what dispatch() actually sends; inbound gating decides what the bus sees from arrivals.
Either predicate may be empty ({}) — meaning "pass everything".
Skip the whole pipeline for one envelope with flags::NoMiddleware:
Every published AppConfigReloadEvent automatically has LocalOnly set; the bus will route it through local listeners but skip any TransportScope::Remote transport. If you can't modify the event type, specialize the trait:
Routing matrix (from Bus::publish_impl):
| envelope flags | transport scope | dispatched? |
|---|---|---|
LocalOnly | Local | yes |
LocalOnly | Remote | no — silently skipped |
RemoteOnly | Local | no |
RemoteOnly | Remote | yes |
| both set | any | dropped + routed to on_error |
| neither set | any | yes |
Custom flags follow the same Flag<"name"> pattern:
The fixed-string name is the flag's stable identity on the wire — no registration step.
The wire shape is stable:
Each flag's wire name is the literal passed to Flag<"...">, so flag identity survives across processes and language boundaries without registration. serialization::encode_json / encode_cbor / EventRegistry are also exposed under the conduit::serialization:: namespace for backwards compatibility.
Separate from the per-Bus decode EventRegistry above, conduit keeps a process-wide type catalog for introspection: for each event type its name, its shape, and its optional display_info — plus a JSON schema derived from the underlying parcel descriptor. It decodes nothing, and it is not fed by the Bus: register a type explicitly with add<T>() or self-register it at static-init with CONDUIT_REGISTER_EVENT(T).
EventTypeRegistry is also usable standalone (conduit::EventTypeRegistry local; local.add<OrderCreated>();), independent of the global instance. schema(name) is the per-type descriptor schema:
One mqtt::Transport instance binds to a single topic and carries traffic in both directions. To route different events onto different topics, attach a second instance with its own Config::topic, optionally wrapped in FilteredTransport. Other broker adapters (AMQP, NATS, Redis, ZMQ) follow the same shape — one transport instance per logical channel; see the per-adapter examples under transports/<name>/examples/.
conduit is fire-and-forget. Bus::publish returns void and does not throw under normal operation. Failures are surfaced through middleware:
| Failure | Path |
|---|---|
| Listener throws | Middleware::on_error(envelope, exception_ptr) |
LocalOnly + RemoteOnly set on the same envelope | on_error with std::runtime_error("LocalOnly + RemoteOnly conflict — envelope dropped") |
Transport's dispatch throws | on_error with the offending envelope |
| Transport fails to decode an inbound message (no envelope yet) | Middleware::on_transport_error(transport_name, exception_ptr) |
| Relay callback throws | swallowed — the bus contract is fire-and-forget |
| Middleware itself throws | swallowed inside the pipeline |
Configuration-time failures do throw — and every exception conduit raises derives from conduit::Exception (defined in conduit/exception.hpp), so a single catch (const conduit::Exception&) handles them all:
mqtt::Transport, nats::Transport, amqp::Transport, redis::Transport, zmq::Transport throw conduit::ConfigError from their constructor if required fields are empty (topic / subject / channel / etc.), or conduit::TlsNotSupportedError when TLS is requested in a build that disabled it.attach() throws the relevant per-transport *Error (e.g. conduit::mqtt::MqttError) — all conduit::TransportError subtypes — if it cannot connect to the broker.A typical pattern:
Subscription you forget to keep alive cancels immediately. bus.listen(...) returns a [[nodiscard]] handle whose destructor unsubscribes. bus.listen<T>([]{ ... }); (no assignment) is almost always a bug.bus.publish(...) with no transport attached** still delivers to local listeners — there is a fallback that fans out inline. This is convenient for tests; in production, attach at least one transport so the routing matrix runs.local::Transport in Queue / ThreadPool mode** runs listeners on a different thread. Capture-by-reference into bus.listen requires that the captured object outlive the bus. bus.drain() waits for the current backlog; bus.shutdown() is implicit in the destructor and is idempotent.max_queue_size = 0 is unbounded.** A slow consumer + fast producer + unbounded queue is the classic memory-leak shape. Pick a real number.std::make_shared<T>() and then populates fields. A missing default constructor is a compile error inside parcel's machinery — the message is long; the fix is short.payload_as<T>() returns nullptr when the envelope's payload is some other type.** This happens when a pattern listener ("order.*") fires for an event whose C++ type the listener does not know — always null-check before dereferencing.Middleware::on_transport_error. If you do not install a middleware that handles it, malformed wire data is silently dropped.LocalOnly and RemoteOnly set on the same envelope** is treated as an invariant violation: the envelope is dropped and on_error fires. The builder will happily let you do it, so prefer setting one or the other.Bus cannot be moved or copied.** Use a shared_ptr<Bus> if you need shared ownership; the bus keeps an internal self-alias so shared_from_this() works either way.shared_ptr. This is intentional — it is how middleware can stamp trace_id into metadata and have it reach the listeners. It also means local.timestamps().received_at = ... in a transport is visible everywhere.flags::NoMiddleware skips the whole middleware pipeline**, including your audit logging. Use it carefully — it is meant for noisy traffic that is already accounted for, not as a generic opt-out.attach. bus.use_transport<conduit::mqtt::Transport>(cfg) will block until it connects or fails — be prepared for conduit::TransportError (or the per-transport subclass) at startup.* matches within one segment (order.* matches order.created but not order.line.added); ** crosses segments. Anything else matches literally.| API | Lives in | Purpose |
|---|---|---|
conduit::Event<Self, "name"> | conduit/event.hpp | CRTP base for user events. |
conduit::DefaultFlags<...> / event_traits<T> | conduit/event.hpp | Attach default flags to an event type. |
conduit::EventEnvelope (alias EventEnvelopeView) | conduit/envelope.hpp | The envelope passed around the bus. |
conduit::EventBuilder<T> / conduit::event(T) | conduit/builder.hpp | Fluent builder for envelopes. |
conduit::Bus | conduit/bus.hpp | Dispatch root; owns transports, middleware, listeners. |
conduit::Transport | conduit/transport.hpp | Abstract transport base; returns Local / Remote scope. |
conduit::local::Transport | conduit/local/transport.hpp | In-process delivery (Direct / Queue / ThreadPool). |
conduit::relay::Transport | conduit/relay/transport.hpp | Callback transport, glob-routed. |
conduit::FilteredTransport | conduit/filtered_transport.hpp | Bidirectional outbound/inbound filter wrapper. |
conduit::mqtt::Transport (etc.) | conduit/mqtt/transport.hpp (etc.) | Broker adapters — opt-in via CMake flags. |
conduit::Middleware | conduit/middleware.hpp | before_dispatch / after_dispatch / on_error / on_transport_error. |
conduit::EventListener<T> / EventSubscriber | conduit/listener.hpp | Class-based listener / multi-event subscriber. |
conduit::Subscription | conduit/listener.hpp | RAII unsubscribe handle. |
conduit::Glob | conduit/glob.hpp | * (within segment) / ** (across) matcher. |
conduit::flags::Flag<"name">, FlagSet, built-in flags | conduit/flags.hpp | Type-tag-based flag bitset. |
conduit::EventRegistry | conduit/serialization.hpp | Registers event types for wire decode. |
conduit::encode_json / encode_cbor | conduit/serialization.hpp | Encode an envelope to the wire. |
conduit::Exception + ConfigError, TransportError, … | conduit/exception.hpp | Root exception hierarchy thrown by conduit (catch one type, not many). |
conduit::EventTypeRegistry / global_event_types() | conduit/event_type_registry.hpp | Process-wide event type catalog (introspection + JSON schema). |
CONDUIT_REGISTER_EVENT(T) / registered_event_types() | conduit/event_type_registry.hpp | Self-register a type into the catalog / snapshot all registered types. |
conduit::Metadata (= md::Metadata), Timestamps | conduit/metadata.hpp | Envelope metadata (typed JSON-shaped tree) + timestamp struct. |
Built-in flags: Direct, Durable, Persistent, NoMiddleware, RequireAck, Broadcast, LocalOnly, RemoteOnly.
The examples/ directory contains compact programs that map to specific concepts. Each builds as conduit_<name> when CONDUIT_BUILD_EXAMPLES=ON (the default at top level).
| Example | Demonstrates |
|---|---|
examples/hello.cpp | Minimal: event, listen, publish. |
examples/typed_listener.cpp | Listener receives the envelope instead of the payload. |
examples/subscriber.cpp | EventSubscriber wires up several listeners as one unit. |
examples/pattern_listener.cpp | bus.listen("order.*", ...) glob subscription. |
examples/middleware_logging.cpp | A logging middleware with before_dispatch / after_dispatch. |
examples/threadpool_local.cpp | ThreadPool execution + bus.drain(). |
examples/flags_direct.cpp | flags::Direct forces inline delivery even in Queue mode. |
examples/local_only_event.cpp | DefaultFlags<LocalOnly> keeps an event off remote transports. |
examples/relay_to_callback.cpp | relay::Transport routes matching events to a callback. |
examples/filtered_transport.cpp | Per-leg FilteredTransport predicates. |
examples/serialization_roundtrip.cpp | JSON + CBOR encode/decode via EventRegistry. |
examples/metrics.cpp | Always-on conduit_* prom metrics — inspect the conduit scope. |
transports/<name>/examples/*.cpp | Broker-specific recipes — publish/subscribe, multi-topic, queue groups. |
The repository ships a Makefile that wraps the common workflows:
Each broker smoke test (MqttSmoke, AmqpSmoke, NatsSmoke, RedisSmoke, ZmqSmoke) is skipped if its environment variable is not set:
| Env var | Example |
|---|---|
CONDUIT_MQTT_TEST_BROKER | tcp://localhost:1883 |
CONDUIT_AMQP_TEST_BROKER | amqp://guest:guest@localhost:5672/ |
CONDUIT_NATS_TEST_BROKER | nats://localhost:4222 |
CONDUIT_REDIS_TEST_BROKER | tcp://localhost:6379 |
CONDUIT_ZMQ_TEST_ENDPOINT | tcp://127.0.0.1:25557 |
CI brings each broker up as a service container and runs the corresponding smoke test against it.
GitHub Actions runs:
build — Ubuntu + macOS, Debug + Release, GCC 14 / Clang 20.sanitizers — ASan + UBSan on Ubuntu / GCC 14.clang-tidy — macOS / Homebrew LLVM.format — clang-format-22 dry-run.mqtt, amqp, nats, redis, zmq — each boots its broker as a service container (or, for ZMQ, uses a loopback endpoint) and runs the adapter's smoke test.docs — Doxygen build, deployed to GitHub Pages.Is the core header-only? Yes. The conduit::conduit CMake target is INTERFACE. Each broker adapter is a separate static library because it pulls in heavy C dependencies (paho, nats.c, libzmq, etc.).
Does the bus take ownership of listeners? It stores the handler. The returned Subscription is the owner of the registration — destroy it to unregister. Capture-by-reference into a handler is fine as long as the captured objects outlive the bus.
Can I use it from multiple threads? Yes. Bus::publish, Bus::listen, and Bus::use_transport / use_middleware all take an internal mutex. Listeners themselves may run on any thread depending on the local-transport mode; treat handler bodies as multi-threaded code unless you are in Direct mode.
What is parcel? A separate serialization library (cpp-parcel) used to encode the typed payload. Events use its FieldsBuilder to declare schema; conduit builds the envelope JSON/CBOR around it.
Can I use it without any transport? Yes — if no transport is attached the bus performs an inline local fan-out as a fallback. This is mostly useful for tests; for real applications attach at least local::Transport so the routing matrix and scope filtering run.
How do I send the same event to two different MQTT topics? Attach two mqtt::Transport instances, each with its own Config::topic. Wrap each in a FilteredTransport if you only want certain envelope names to go down each path.
What happens when the broker disconnects mid-publish? The dispatch attempt is caught and surfaced through Middleware::on_error. The reconnect policy belongs to the underlying broker client (paho, nats.c, etc.); see their docs.
Where does retention / durability live? In the broker adapter, not the core. Durable / Persistent / RequireAck are carried as flags so adapters can honor them — for MQTT that means QoS and retain; for AMQP it means delivery_mode=2 and confirm.select; etc. The core promises only to carry the flags.
Can I add my own transport? Yes. Inherit from conduit::Transport, implement scope() and dispatch(const EventEnvelopeView&), and optionally override attach_with_sink, detach, and flush. For inbound delivery, call deliver_inbound(envelope) from your read path; for decode failures, call bus()->report_transport_error("my_transport", std::current_exception()).
Contributions to the library are welcome! If you encounter any issues or have suggestions for improvements, please feel free to submit a pull request or open an issue on the project's repository.
This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.