11#include <threadman/manager.hpp>
13#include <condition_variable>
47 const bool name_is_default =
48 cfg.name.empty() || cfg.name == threadman::ThreadPoolOptions{}.name;
53 if (name_is_default) {
54 cfg.name =
"conduit:local-queue";
56 }
else if (name_is_default) {
57 cfg.name =
"conduit:local";
62 capacity_ = cfg.max_queue_size;
63 cfg.max_queue_size = 0;
64 pool_ = std::make_unique<threadman::ThreadPool>(std::move(cfg));
72 cfg.max_queue_size = queue_capacity;
92 std::unique_lock lock(drain_mu_);
100 space_cv_.wait(lock, [
this] {
return outstanding_ < capacity_; });
105 pool_->execute([
this, snap]()
mutable {
125 std::unique_lock lock(drain_mu_);
126 drained_cv_.wait(lock, [
this] {
return outstanding_ == 0; });
134 void complete_one() {
135 std::scoped_lock lock(drain_mu_);
136 if (--outstanding_ == 0) {
137 drained_cv_.notify_all();
139 space_cv_.notify_one();
143 std::unique_ptr<threadman::ThreadPool> pool_;
144 std::size_t capacity_ = 0;
146 std::mutex drain_mu_;
147 std::condition_variable drained_cv_;
148 std::condition_variable space_cv_;
149 std::size_t outstanding_ = 0;
Bus — owns transports, middleware, listeners; dispatches envelopes.
Polymorphic envelope cell.
Definition envelope.hpp:62
const flags::FlagSet & flags() const noexcept
Definition envelope.hpp:103
Definition transport.hpp:30
void deliver_inbound(const EventEnvelopeView &v) const
Subclasses call this for inbound delivery instead of touching the bus directly.
Definition transport.hpp:62
Definition transport.hpp:37
void dispatch(const EventEnvelopeView &v) override
Definition transport.hpp:84
TransportScope scope() const noexcept override
Definition transport.hpp:76
void detach() noexcept override
Definition transport.hpp:80
Execution mode() const noexcept
Definition transport.hpp:129
static Transport queued(const std::size_t queue_capacity=0)
Definition transport.hpp:70
Transport(const Execution mode=Execution::Direct, ThreadPoolConfig cfg={})
Definition transport.hpp:39
void flush() override
Definition transport.hpp:121
static Transport thread_pool(ThreadPoolConfig cfg={})
Definition transport.hpp:67
EventEnvelope — a parcel cell carrying conduit's envelope metadata plus a polymorphic payload cell.
Conduit flag tags, built atop comms::Flag / comms::FlagSet.
Transport interface and the local/remote scope enum used for flag-based filtering.
Definition transport.hpp:21
threadman::ThreadPoolOptions ThreadPoolConfig
Configuration for the Queue / ThreadPool execution modes.
Definition transport.hpp:35
Execution
Definition transport.hpp:23
TransportScope
Distinguishes in-process transports from off-machine ones.
Definition transport.hpp:21
Force same-thread dispatch even in Queue / ThreadPool execution modes.
Definition flags.hpp:41