conduit 0.6.0
Modern C++23 header-only event-dispatching / event-transport library
Loading...
Searching...
No Matches
envelope.hpp
Go to the documentation of this file.
1#pragma once
2
6
7#include <conduit/event.hpp>
9#include <conduit/flags.hpp>
10#include <conduit/metadata.hpp>
11
12#include <ulid/ulid.h>
13
14#include <chrono>
15#include <compare>
16#include <cstdint>
17#include <memory>
18#include <optional>
19#include <string>
20#include <string_view>
21#include <utility>
22
23#include <parcel/parcel.h>
24
25namespace conduit {
26
27namespace detail {
28
33 ulid::Ulid id;
37 std::optional<ulid::Ulid> correlation_id;
38 std::optional<ulid::Ulid> causation_id;
39 parcel::cell_t payload_cell;
40};
41
42[[nodiscard]] inline std::int64_t
43to_ms_since_epoch(const std::chrono::system_clock::time_point tp) noexcept {
44 return std::chrono::duration_cast<std::chrono::milliseconds>(tp.time_since_epoch()).count();
45}
46
47[[nodiscard]] inline std::chrono::system_clock::time_point
48from_ms_since_epoch(const std::int64_t ms) noexcept {
49 return std::chrono::system_clock::time_point{std::chrono::milliseconds{ms}};
50}
51
52} // namespace detail
53
62 : public parcel::BaseCell<EventEnvelope, std::shared_ptr<detail::EnvelopeCore>> {
63 using base_t = parcel::BaseCell<EventEnvelope, std::shared_ptr<detail::EnvelopeCore>>;
64
65public:
66 static constexpr std::string_view kind_id = "conduit:envelope";
67
68 EventEnvelope() : base_t(std::make_shared<detail::EnvelopeCore>()) {}
69
72 template <typename T>
73 requires std::derived_from<T, parcel::ICell>
74 explicit EventEnvelope(T payload) : base_t(std::make_shared<detail::EnvelopeCore>()) {
75 this->value->payload_cell = std::make_shared<T>(std::move(payload));
76 }
77
78 EventEnvelope(detail::EnvelopeCore core, parcel::cell_t payload)
79 : base_t(std::make_shared<detail::EnvelopeCore>(std::move(core))) {
80 this->value->payload_cell = std::move(payload);
81 }
82
83 explicit EventEnvelope(std::shared_ptr<detail::EnvelopeCore> core) : base_t(std::move(core)) {}
84
85 // -- accessors ----------------------------------------------------------
86
87 [[nodiscard]] const ulid::Ulid& id() const noexcept {
88 return this->value->id;
89 }
90
91 [[nodiscard]] std::string_view name() const noexcept {
92 if (!this->value || !this->value->payload_cell) {
93 return {};
94 }
95 const std::string_view k = this->value->payload_cell->kind();
96 if (k.starts_with(event_kind_prefix)) {
97 return k.substr(event_kind_prefix.size());
98 }
99
100 return k;
101 }
102
103 [[nodiscard]] const flags::FlagSet& flags() const noexcept {
104 return this->value->flags;
105 }
106 [[nodiscard]] const Metadata& metadata() const noexcept {
107 return this->value->metadata;
108 }
109 [[nodiscard]] const Timestamps& timestamps() const noexcept {
110 return this->value->timestamps;
111 }
112
113 [[nodiscard]] const std::optional<ulid::Ulid>& correlation_id() const noexcept {
114 return this->value->correlation_id;
115 }
116 [[nodiscard]] const std::optional<ulid::Ulid>& causation_id() const noexcept {
117 return this->value->causation_id;
118 }
119
120 [[nodiscard]] const parcel::cell_t& payload_cell() const noexcept {
121 return this->value->payload_cell;
122 }
123
126 template <typename T>
127 [[nodiscard]] std::shared_ptr<const T> payload_as() const noexcept {
128 if (!this->value || !this->value->payload_cell) {
129 return nullptr;
130 }
131 return std::dynamic_pointer_cast<const T>(this->value->payload_cell);
132 }
133
134 // Transport-mutable views. Mutating through a non-const accessor on a
135 // copy (e.g. `EventEnvelope local = v; local.timestamps().received_at = ...`)
136 // updates the shared core, so all envelope copies observe the change.
137 Timestamps& timestamps() noexcept {
138 return this->value->timestamps;
139 }
140 Metadata& metadata() noexcept {
141 return this->value->metadata;
142 }
143 flags::FlagSet& flags() noexcept {
144 return this->value->flags;
145 }
146
147 [[nodiscard]] std::shared_ptr<detail::EnvelopeCore> core_ptr() const noexcept {
148 return this->value;
149 }
150
151 [[nodiscard]] bool valid() const noexcept {
152 return this->value != nullptr && this->value->payload_cell != nullptr;
153 }
154
155 // -- parcel cell overrides ---------------------------------------------
156
157 [[nodiscard]] std::string to_string() const override {
158 return std::string{name()};
159 }
160
161 [[nodiscard]] parcel::json_t to_json() const override;
162
163 static parcel::cell_t from_json(parcel::json_t const& j, parcel::ParcelRegistry const& reg);
164
165 [[nodiscard]] std::partial_ordering compare(parcel::ICell const& other) const override {
166 if (const auto kc = this->kind() <=> other.kind(); kc != 0) {
167 return kc;
168 }
169 // Envelope core/payload comparison isn't meaningful for delivery —
170 // skip a deep compare to keep the cell instantiable.
171 return std::partial_ordering::unordered;
172 }
173
174 static parcel::cell_type_descriptor_t descriptor() {
175 static const auto d =
176 std::make_shared<parcel::SimpleCellTypeDescriptor<EventEnvelope>>(parcel::DisplayInfo{
177 .name = "EventEnvelope",
178 .description = "Conduit event envelope (custom wire layout).",
179 });
180 return d;
181 }
182};
183
189
190// ---------------------------------------------------------------------------
191// JSON layout — hand-written, kept in one place for the whole envelope wire.
192// ---------------------------------------------------------------------------
193
194inline parcel::json_t EventEnvelope::to_json() const {
195 if (!this->value || !this->value->payload_cell) {
196 throw SerializationError{"EventEnvelope::to_json: envelope has no payload"};
197 }
199 *this->value;
200
201 parcel::json_t v_obj = parcel::json_t::object();
202 v_obj["id"] = id.string();
203 v_obj["name"] = std::string{name()};
204
205 parcel::json_t flags_arr = parcel::json_t::array();
206 for (const auto& f : flags) {
207 flags_arr.push_back(std::string{f.name});
208 }
209 v_obj["flags"] = std::move(flags_arr);
210
211 if (correlation_id.has_value()) {
212 v_obj["correlation_id"] = correlation_id->string();
213 }
214 if (causation_id.has_value()) {
215 v_obj["causation_id"] = causation_id->string();
216 }
217
218 v_obj["metadata"] = md::to_json(metadata);
219
220 parcel::json_t ts = parcel::json_t::object();
224 }
227 }
230 }
231 if (timestamps.failed_at) {
233 }
234 v_obj["timestamps"] = std::move(ts);
235
236 v_obj["payload"] = payload_cell->to_json();
237
238 parcel::json_t out{
239 {parcel::ICell::KEY_KIND, kind_id},
240 {parcel::ICell::KEY_VALUE, std::move(v_obj)},
241 };
242 this->inject_display_info(out);
243 return out;
244}
245
246inline parcel::cell_t EventEnvelope::from_json(parcel::json_t const& j,
247 parcel::ParcelRegistry const& reg) {
248 if (!j.is_object()) {
249 throw parcel::InvalidJsonException("Expected JSON object for EventEnvelope",
250 std::string(kind_id));
251 }
252 const auto it_k = j.find(parcel::ICell::KEY_KIND);
253 if (it_k == j.end() || !it_k->is_string()) {
254 throw parcel::InvalidJsonException("EventEnvelope: missing/invalid 'k'",
255 std::string(kind_id));
256 }
257 if (it_k->get<std::string_view>() != kind_id) {
258 throw parcel::KindMismatchException("EventEnvelope: kind mismatch", std::string(kind_id));
259 }
260 const auto it_v = j.find(parcel::ICell::KEY_VALUE);
261 if (it_v == j.end() || !it_v->is_object()) {
262 throw parcel::InvalidJsonException("EventEnvelope: missing/invalid 'v' (expected object)",
263 std::string(kind_id));
264 }
265 const auto& v = *it_v;
266
267 auto core = std::make_shared<detail::EnvelopeCore>();
268
269 if (!v.contains("id") || !v.at("id").is_string()) {
270 throw parcel::InvalidJsonException("EventEnvelope: missing/invalid 'id'",
271 std::string(kind_id));
272 }
273 const auto id_opt = ulid::Ulid::from_string(v.at("id").get<std::string>());
274 if (!id_opt.has_value()) {
275 throw parcel::InvalidJsonException("EventEnvelope: invalid ULID for 'id'",
276 std::string(kind_id));
277 }
278 core->id = *id_opt;
279
280 if (v.contains("correlation_id")) {
281 const auto cid = ulid::Ulid::from_string(v.at("correlation_id").get<std::string>());
282 if (!cid.has_value()) {
283 throw parcel::InvalidJsonException("EventEnvelope: invalid ULID for 'correlation_id'",
284 std::string(kind_id));
285 }
286 core->correlation_id = *cid;
287 }
288 if (v.contains("causation_id")) {
289 const auto cid = ulid::Ulid::from_string(v.at("causation_id").get<std::string>());
290 if (!cid.has_value()) {
291 throw parcel::InvalidJsonException("EventEnvelope: invalid ULID for 'causation_id'",
292 std::string(kind_id));
293 }
294 core->causation_id = *cid;
295 }
296
297 if (v.contains("flags")) {
298 for (const auto& fn : v.at("flags")) {
299 if (auto ref = comms::GlobalFlagRegistry::instance().find(fn.get<std::string>())) {
300 core->flags.insert(*ref);
301 }
302 }
303 }
304
305 if (v.contains("metadata")) {
306 if (const auto& md_json = v.at("metadata"); md_json.is_object()) {
307 md::from_json(md_json, core->metadata);
308 }
309 }
310
311 if (v.contains("timestamps")) {
312 const auto& ts = v.at("timestamps");
313 if (ts.contains("created_at")) {
314 core->timestamps.created_at =
315 detail::from_ms_since_epoch(ts.at("created_at").get<std::int64_t>());
316 }
317 if (ts.contains("published_at")) {
318 core->timestamps.published_at =
319 detail::from_ms_since_epoch(ts.at("published_at").get<std::int64_t>());
320 }
321 if (ts.contains("received_at")) {
322 core->timestamps.received_at =
323 detail::from_ms_since_epoch(ts.at("received_at").get<std::int64_t>());
324 }
325 if (ts.contains("delivered_at")) {
326 core->timestamps.delivered_at =
327 detail::from_ms_since_epoch(ts.at("delivered_at").get<std::int64_t>());
328 }
329 if (ts.contains("failed_at")) {
330 core->timestamps.failed_at =
331 detail::from_ms_since_epoch(ts.at("failed_at").get<std::int64_t>());
332 }
333 }
334
335 if (!v.contains("payload")) {
336 throw parcel::InvalidJsonException("EventEnvelope: missing 'payload'",
337 std::string(kind_id));
338 }
339 core->payload_cell = reg.cell_from_json(v.at("payload"));
340
341 auto out = std::make_shared<EventEnvelope>(std::move(core));
342 base_t::absorb_display_info(j, out);
343 return out;
344}
345
346} // namespace conduit
Polymorphic envelope cell.
Definition envelope.hpp:62
static parcel::cell_type_descriptor_t descriptor()
Definition envelope.hpp:174
EventEnvelope()
Definition envelope.hpp:68
bool valid() const noexcept
Definition envelope.hpp:151
const flags::FlagSet & flags() const noexcept
Definition envelope.hpp:103
flags::FlagSet & flags() noexcept
Definition envelope.hpp:143
EventEnvelope(std::shared_ptr< detail::EnvelopeCore > core)
Definition envelope.hpp:83
EventEnvelope(T payload)
Build an envelope around a typed payload event.
Definition envelope.hpp:74
const std::optional< ulid::Ulid > & correlation_id() const noexcept
Definition envelope.hpp:113
Metadata & metadata() noexcept
Definition envelope.hpp:140
std::shared_ptr< detail::EnvelopeCore > core_ptr() const noexcept
Definition envelope.hpp:147
const Timestamps & timestamps() const noexcept
Definition envelope.hpp:109
const parcel::cell_t & payload_cell() const noexcept
Definition envelope.hpp:120
parcel::json_t to_json() const override
Definition envelope.hpp:194
Timestamps & timestamps() noexcept
Definition envelope.hpp:137
static parcel::cell_t from_json(parcel::json_t const &j, parcel::ParcelRegistry const &reg)
Definition envelope.hpp:246
static constexpr std::string_view kind_id
Definition envelope.hpp:66
std::string_view name() const noexcept
Definition envelope.hpp:91
std::partial_ordering compare(parcel::ICell const &other) const override
Definition envelope.hpp:165
const Metadata & metadata() const noexcept
Definition envelope.hpp:106
std::shared_ptr< const T > payload_as() const noexcept
Typed payload accessor: returns shared_ptr<const T> if the underlying payload is a T cell,...
Definition envelope.hpp:127
const std::optional< ulid::Ulid > & causation_id() const noexcept
Definition envelope.hpp:116
std::string to_string() const override
Definition envelope.hpp:157
const ulid::Ulid & id() const noexcept
Definition envelope.hpp:87
EventEnvelope(detail::EnvelopeCore core, parcel::cell_t payload)
Definition envelope.hpp:78
Raised by envelope/cell deserialization when the wire data is malformed.
Definition exception.hpp:56
Event<Self, Name> library base built on parcel::SelfStructCell.
Root exception hierarchy for the conduit library.
Conduit flag tags, built atop comms::Flag / comms::FlagSet.
Envelope metadata container and lifecycle timestamps.
flags::FlagSet collect_default_flags()
Definition event.hpp:86
std::chrono::system_clock::time_point from_ms_since_epoch(const std::int64_t ms) noexcept
Definition envelope.hpp:48
std::int64_t to_ms_since_epoch(const std::chrono::system_clock::time_point tp) noexcept
Definition envelope.hpp:43
comms::FlagSet FlagSet
Definition flags.hpp:32
Definition builder.hpp:22
md::Metadata Metadata
Envelope metadata: a typed JSON-shaped key/value tree.
Definition metadata.hpp:20
constexpr std::string_view event_kind_prefix
Wire-kind prefix shared by every conduit event type.
Definition event.hpp:21
Lifecycle timestamps tracked by the bus / middleware / transports.
Definition metadata.hpp:23
std::optional< std::chrono::system_clock::time_point > received_at
Definition metadata.hpp:26
std::optional< std::chrono::system_clock::time_point > published_at
Definition metadata.hpp:25
std::optional< std::chrono::system_clock::time_point > failed_at
Definition metadata.hpp:28
std::optional< std::chrono::system_clock::time_point > delivered_at
Definition metadata.hpp:27
std::chrono::system_clock::time_point created_at
Definition metadata.hpp:24
Internal core shared between envelope copies — accessors return references into this struct so transp...
Definition envelope.hpp:32
parcel::cell_t payload_cell
Definition envelope.hpp:39
std::optional< ulid::Ulid > correlation_id
Definition envelope.hpp:37
std::optional< ulid::Ulid > causation_id
Definition envelope.hpp:38
ulid::Ulid id
Definition envelope.hpp:33
flags::FlagSet flags
Definition envelope.hpp:34
Metadata metadata
Definition envelope.hpp:35
Timestamps timestamps
Definition envelope.hpp:36