Architecture · Guarantees · Usage Patterns
Use ← → for sections · ↑ ↓ for slides within a section
Tight coupling through raw callback pointers
A raw function pointer is captured during construction and stored without lifetime management.
type FilterPushHandler* =
proc(pubsubTopic: PubsubTopic,
message: WakuMessage)
{.async, gcsafe, closure.}
waku/waku_core/subscription/subscription_manager.nim
type SubscriptionManager* = object
subscriptions:
TableRef[(string, ContentTopic),
FilterPushHandler] # raw proc pointer
proc registerSubscription*(m, pubsubTopic,
contentTopic, handler: FilterPushHandler) =
m.subscriptions[(pubsubTopic, contentTopic)]
= handler # stored forever, no cleanup API
Problems
SubscriptionManager must be threaded through every layer
type WakuFilterClient* = ref object of LPProtocol
rng: ref HmacDrbgContext
peerManager: PeerManager
pushHandlers: seq[FilterPushHandler] # ← raw list
# Anyone wiring up the node must call this
proc registerPushHandler*(
wfc: WakuFilterClient,
handler: FilterPushHandler) =
wfc.pushHandlers.add(handler) # add and forget
# Deep inside recv_service initialization:
node.wakuFilterClient
.registerPushHandler(
proc(pubsubTopic: PubsubTopic,
message: WakuMessage) {.async, closure.} =
recvService.recentReceivedMsgs.add(...)
)
pushHandlers is a plain seq. No way to remove a handler — not even when the owning service stops.
recvService is captured by the closure. If RecvService is destroyed first, the closure holds a stale reference.
Adding a new subscriber means touching filter client construction code — import dependency in both directions.
Every new subscriber forces a new dependency across the entire init hierarchy.
Direct dependency: RecvService must know about WakuFilterClient just to receive events.
No cleanup contract: stopRecvService cannot deregister — the callback lives as long as WakuFilterClient does.
The goal: modules should talk to each other without importing or knowing about each other. That's what the Broker pattern delivers.
Reactive · Pub/Sub · Many emitters → Many listeners
emit() — zero knowledge of who is listeninglisten() and receive a handle for unregistering
-- Declare once:
EventBroker:
type MyEvent = object
field: string
-- Anywhere in your code:
MyEvent.listen(handler) # register
MyEvent.listen(ctx, handler) # context-aware
MyEvent.emit(event) # fire
MyEvent.emit(ctx, field = "x") # shorthand ctor
await MyEvent.dropListener(handle) # drain in-flight, then cleanup
MyEvent.dropAllListeners() # drain all in-flight (5s timeout)
Thread-local storage — no locks needed
Async dispatch — listeners run as independent tasks
Handle-based — explicit, safe cleanup
One macro call per event type — can live in a shared events module.
waku/events/delivery_events.nim
import waku/common/broker/event_broker
EventBroker:
type DeliveryFeedbackEvent* = ref object
success*: DeliverySuccess
dir*: DeliveryDirection
comment*: string
msgHash*: WakuMessageHash
msg*: WakuMessage
EventBroker:
type OnFilterSubscribeEvent* = object
pubsubTopic*: string
contentTopics*: seq[string]
EventBroker:
type OnFilterUnSubscribeEvent* = object
pubsubTopic*: string
contentTopics*: seq[string]
The macro exports the type plus a companion broker — both live in thread-local storage. No global state, no locks.
The emitter doesn't import or know about any listener.
waku/waku_filter_v2/client.nim
proc subscribe*(wfc: WakuFilterClient, ...) {.async.} =
# ... network call ...
?await wfc.sendSubscribeRequest(peer, req)
# Fire-and-forget: no knowledge of RecvService
OnFilterSubscribeEvent.emit(
wfc.brokerCtx, pubsubTopic, contentTopicSeq)
proc unsubscribe*(wfc: WakuFilterClient, ...) {.async.} =
# ... network call ...
?await wfc.sendSubscribeRequest(peer, req)
OnFilterUnSubscribeEvent.emit(
wfc.brokerCtx, pubsubTopic, contentTopicSeq)
emit() spawns async tasks for each listener. The emitter returns immediately — no blocking, no coupling.
proc performDeliveryFeedback(
self: RecvService,
success: DeliverySuccess,
dir: DeliveryDirection,
comment: string,
msgHash: WakuMessageHash,
msg: WakuMessage,
) {.gcsafe, raises: [].} =
info "recv monitor performDeliveryFeedback",
success, dir, comment,
msg_hash = shortLog(msgHash)
DeliveryFeedbackEvent.emit(
brokerCtx = self.brokerCtx,
success = success,
dir = dir,
comment = comment,
msgHash = msgHash,
msg = msg,
)
The inline constructor overload (emit(brokerCtx, field = value, ...)) is auto-generated for object types — no manual construction needed.
RecvService manages its own listeners — zero coupling with the filter client.
type RecvService* = ref object of RootObj
brokerCtx: BrokerContext
onSubscribeListener: OnFilterSubscribeEventListener
onUnsubscribeListener: OnFilterUnSubscribeEventListener
proc startRecvService*(self: RecvService) =
self.onSubscribeListener =
OnFilterSubscribeEvent.listen(
self.brokerCtx,
proc(ev: OnFilterSubscribeEvent)
{.async: (raises: []).} =
self.onSubscribe(
ev.pubsubTopic, ev.contentTopics)
).valueOr:
error "Failed to set listener"
quit(QuitFailure)
self.onUnsubscribeListener =
OnFilterUnSubscribeEvent.listen(
self.brokerCtx,
proc(ev: OnFilterUnSubscribeEvent)
{.async: (raises: []).} =
self.onUnsubscribe(
ev.pubsubTopic, ev.contentTopics)
).valueOr:
error "Failed to set listener"
quit(QuitFailure)
proc stopRecvService*(self: RecvService) {.async.} =
OnFilterSubscribeEvent.dropListener(
self.brokerCtx, self.onSubscribeListener)
OnFilterUnSubscribeEvent.dropListener(
self.brokerCtx, self.onUnsubscribeListener)
Symmetric start/stop — handles stored on the owner, cleanup is explicit and complete.
listen() returns a typed handledropListener(handle) removes the registration and drains in-flight futures (5 s timeout)dropListener returns, no callbacks for that handle are running — safe to release resourcesasyncSpawn task, tracked in inFlightdropListener drains in-flight on shutdownNo import cycle: emitter and listener only share the event type declaration module.
Thread-local: broker state per event-loop thread — no mutex for dispatch.
Type safety: handler signature enforced at compile time via generated proc type.
Proactive · Request/Response · Many requesters → One provider
proc signature declarationasync (default) and sync for simple state accessors
-- Declare once:
RequestBroker: # or RequestBroker(sync):
type MyData = object
value: string
proc signature*():
Future[Result[MyData, string]]
-- Provider side (e.g. in WakuNode):
MyData.setProvider(
proc(): Future[...] {.async.} =
ok(MyData(value: "hello")))
-- Requester side (anywhere):
let data = await MyData.request()
Sync variants — simple state accessors, no Future.
# Zero-arg sync: simple state accessor
RequestBroker(sync):
type RequestConnectionStatus* = object
connectionStatus*: ConnectionStatus
# Arg-based sync: parameterised lookup
RequestBroker(sync):
type RequestContentTopicsHealth* = object
contentTopicHealth*:
seq[tuple[topic: ContentTopic,
health: TopicHealth]]
proc signature(topics: seq[ContentTopic]):
Result[RequestContentTopicsHealth, string]
Async variants — for IO-bound operations.
waku/requests/health_requests.nim
# Zero-arg async: may involve IO
RequestBroker:
type RequestHealthReport* = object
healthReport*: HealthReport
# Arg-based async: query a specific protocol
RequestBroker:
type RequestProtocolHealth* = object
healthStatus*: ProtocolHealth
proc signature(protocol: WakuProtocol):
Future[Result[RequestProtocolHealth, string]]
The proc signature declaration enforces the provider's type at compile time. If omitted, a zero-arg form is generated automatically.
Provider registers at startup — requesters don't need to know who provides.
waku/factory/waku.nim
## Sync provider — zero-arg, no IO involved
RequestConnectionStatus.setProvider(
threadGlobalBrokerContext(),
proc(): Result[RequestConnectionStatus, string] =
try:
let report = waku[].healthMonitor
.getSyncNodeHealthReport()
ok(RequestConnectionStatus(
connectionStatus: report.connectionStatus))
except:
err("Failed: " & getCurrentExceptionMsg()),
).isOkOr:
error "Failed to set provider"
## Async provider — may perform IO
RequestHealthReport.setProvider(
threadGlobalBrokerContext(),
proc(): Future[
Result[RequestHealthReport, string]]
{.async.} =
try:
let report = await waku[].healthMonitor
.getNodeHealthReport()
ok(RequestHealthReport(healthReport: report))
except:
err("Failed: " & getCurrentExceptionMsg()),
).isOkOr:
error "Failed to set provider"
## Arg-based sync provider
RequestShardTopicsHealth.setProvider(
node.brokerCtx,
proc(topics: seq[PubsubTopic]):
Result[RequestShardTopicsHealth, string] =
var response: RequestShardTopicsHealth
for shard in topics:
var healthStatus = TopicHealth.UNHEALTHY
if not node.wakuRelay.isNil:
healthStatus = node.wakuRelay.topicsHealth
.getOrDefault(shard, TopicHealth.NOT_SUBSCRIBED)
response.topicHealth.add((shard, healthStatus))
return ok(response),
).isOkOr:
error "Can't set provider for RequestShardTopicsHealth",
error = error
Any module can request without knowing who provides — or even if a provider is registered.
## Sync request — no await needed
let statusRes =
RequestConnectionStatus.request()
statusRes.isOkOr:
error "No connection status provider"
return
echo statusRes.get().connectionStatus
## Sync with args
let healthRes =
RequestShardTopicsHealth.request(topics)
healthRes.isOkOr:
error "Shard health unavailable"; return
## Async request
let reportRes =
await RequestHealthReport.request()
reportRes.isOkOr:
error "Health report unavailable"; return
let report = reportRes.get().healthReport
If no provider is registered, request() returns err(...) — never panics.
Provider exceptions are caught and converted to err — requester always gets a Result.
One provider per context: calling setProvider twice returns an error — no silent overwrites.
Fan-out · Collect all · Many providers → One result set
MultiRequestBroker:
type CollectedStatus* = object
label*: string
proc signature*():
Future[Result[CollectedStatus, string]]
# Register N providers:
let h1 = CollectedStatus.setProvider(
proc(): Future[...]
{.async: (raises: []).} =
ok(CollectedStatus(label: "relay")))
let h2 = CollectedStatus.setProvider(
proc(): Future[...]
{.async: (raises: []).} =
ok(CollectedStatus(label: "filter")))
# Request fans out — returns seq[T]:
let results = await CollectedStatus.request()
# Result[seq[CollectedStatus], string]
# @["relay", "filter"]
# Remove individual provider via handle:
CollectedStatus.removeProvider(h1)
# or clear all:
CollectedStatus.clearProviders()
allFinishedUnlike RequestBroker, setProvider returns a handle (not just ok/err) so providers can be removed individually.
Instance isolation · Multi-node · Same thread, separate worlds
Scenario: two WakuNode instances on the same thread (integration tests, embedded usage). Both need their own listeners for OnFilterSubscribeEvent. Without context isolation, the single thread-local broker merges them — node A's events fire node B's listeners.
BrokerContextDefaultBrokerContext — used when no context is specifiedNewBrokerContext() and stores as fieldContext isolation is opt-in: omit the brokerCtx argument and the default context is used transparently.
type BrokerContext* = distinct uint32
const DefaultBrokerContext* =
BrokerContext(0xCAFFE14E'u32)
proc NewBrokerContext*(): BrokerContext =
# Atomically increment global counter
BrokerContext(
gContextCounter.fetchAdd(1, moRelaxed))
# In component initialization:
type WakuNode* = ref object
brokerCtx: BrokerContext # unique per instance
proc new*(T: type WakuNode, ...): T =
WakuNode(
brokerCtx: NewBrokerContext(), ...)
# Thread-global context binding (sync, usable from {.thread.} init):
proc setThreadBrokerContext*(ctx: BrokerContext)
## Adopts ctx as this thread's global context
proc initThreadBrokerContext*(): BrokerContext
## Creates new ctx + sets as thread-global
proc threadGlobalBrokerContext*(): BrokerContext
## Returns this thread's global context (lock-free)
# Async scoped context (backward compat):
template lockGlobalBrokerContext*(
brokerCtx: BrokerContext,
body: untyped) =
await noCancel(
globalBrokerContextLock.acquire())
globalBrokerContextValue = brokerCtx
try: body
finally: restore + release
Two nodes, same thread
let nodeA = WakuNode.new(...) # ctx = 0x0001
let nodeB = WakuNode.new(...) # ctx = 0x0002
let recvA = RecvService.new(nodeA, ...)
let recvB = RecvService.new(nodeB, ...)
# recvA registers under nodeA's context:
OnFilterSubscribeEvent.listen(
nodeA.brokerCtx,
proc(ev: ...) {.async.} =
recvA.onSubscribe(...))
# recvB registers under nodeB's context:
OnFilterSubscribeEvent.listen(
nodeB.brokerCtx,
proc(ev: ...) {.async.} =
recvB.onSubscribe(...))
# Emit from nodeA — only recvA fires:
OnFilterSubscribeEvent.emit(
nodeA.brokerCtx,
pubsubTopic, contentTopics)
Bucket cleanup is automatic: dropAllListeners(ctx) with a non-default context removes the bucket entirely.
Emit from nodeA.brokerCtx → only Bucket 1 listeners fire. Node B is completely unaffected.
Cross-thread messaging · Same API · New trade-offs
Single-thread brokers keep listeners and emitters on one Chronos loop. When subsystems move to dedicated threads for CPU isolation or blocking I/O, those in-thread shortcuts break.
Both EventBroker and RequestBroker accept an (mt) driver annotation. The public API stays almost identical — the macro swaps the internal plumbing.
threadvar seqs(mt)LockAsyncChannel per (context, listener-thread)threadvar — only metadata is sharedSame-thread optimisation: when emitter and listener share a thread, the channel is bypassed — listeners are dispatched directly via asyncSpawn, giving near-zero overhead identical to the single-thread broker.
| Cost | Where it comes from | Mitigation |
|---|---|---|
| Lock contention | Every emit / listen / drop acquires the global lock to read or mutate the bucket registry |
Lock is held only for metadata scan — no callbacks run under lock |
| Channel overhead | Cross-thread delivery copies the event message into an AsyncChannel via sendSync |
Same-thread path skips channels entirely; keep hot-path listeners co-located |
| Message copying | Event / request payloads are value-copied into the channel buffer | Keep payloads small or use ref types for large data |
| Shared memory | Bucket array & channels live in createShared heap — manual lifecycle, no GC |
Managed automatically by the macro-generated init / grow / drop procs |
| Shutdown complexity | dropListener / dropAllListeners drain in-flight futures (5 s timeout per future) before returning — ensures no callbacks touch released resources |
Call await dropListener(handle) before releasing resources; allow an event-loop tick after dropAllListeners for processLoop cleanup |
| Benefit | Detail |
|---|---|
| Same public API | emit, listen, request, setProvider, dropAllListeners — identical call-sites; only the macro declaration changes. MT adds setRequestTimeout / requestTimeout. |
| Timeout protection | Cross-thread request returns err after a configurable timeout (default 5 s) if the provider is unresponsive — no indefinite hangs |
| Transparent fan-out | One emit reaches listeners on every thread that registered — no manual channel wiring |
| Same-thread fast path | Co-located emitters and listeners pay zero channel cost — identical performance to single-thread mode |
| GC safety | Closures never cross heap boundaries — stored in per-thread threadvar; only plain-old-data metadata is shared |
| Context isolation preserved | BrokerContext scoping works identically — two node instances on two threads stay fully isolated |
| Clean shutdown | dropListener and dropAllListeners drain in-flight callbacks (5 s timeout), clean up channels, and remove remote threadvars — safe to release resources immediately after return |
listen must be called from the thread that will own the listenerdropListener must be called from the same thread that registered it; drains in-flight futures (async, 5 s timeout)dropAllListeners can be called from any thread; drains in-flight futures before returningemit can be called from any threadEventBroker: emit is async — use await in Chronos contexts, waitFor from bare {.thread.} procs.
BrokerContext isolation instead of threadsRule of thumb: start with single-thread. Switch to (mt) only when a subsystem genuinely needs its own thread.
(mt) — SpecificsThe multi-thread RequestBroker preserves the single-provider-per-context guarantee across threads.
AsyncChannel; expect ~2× the cost of a single emitprocessLoop receives requests and dispatches to the local provider closurewithTimeout (default 5 s); if the provider is unresponsive, request returns err. Configure via T.setRequestTimeout(duration) / T.requestTimeout(). Same-thread requests are unaffected.(mt) — SpecificsThe multi-thread EventBroker delivers events to all listeners on all threads (broadcast fan-out).
AsyncChannel per (context, listener-thread) pair; multiple listeners on the same thread share one channelprocessLoop tracks running listener futures and drains them on shutdown (5 s timeout)emit is {.async: (raises: []).}; cross-thread sends use sendSync (brief: buffer + signal), same-thread dispatches via asyncSpawn| Aspect | EventBroker | RequestBroker | MultiRequestBroker |
|---|---|---|---|
| Pattern | Reactive pub/sub | Proactive req/resp | Proactive fan-out |
| Direction | N emitters → N listeners | N requesters → 1 provider | N requesters → N providers |
| Return type | void per listener |
Result[T, string] |
Result[seq[T], string] |
| Mode | Always async | Async or Sync | Always async |
| Providers / Listeners | Unlimited, handle per reg | Exactly 1 per context | Unlimited, handle per reg |
| Context-aware | Yes | Yes | Yes |
Multi-thread (mt) |
Yes | Yes | No |
| Use when | Something happened; notify who cares | Query from one authoritative source | Gather data from multiple subsystems |
Before — Raw callbacks
seq with no cleanup pathAfter — Brokers
start/stop)listen() in its own moduleModules interact through a shared vocabulary of typed events and requests — not through direct references or init-time wiring.
Typed shared-library surface · C ABI · C++ and Python wrappers
Clean contractual design: the library interface is described as a typed set of broker requests and broker events, so the contract stays explicit, reviewable, and compile-time checked.
Decoupled modules: the same broker vocabulary lets internal components collaborate without direct references, while BrokerContext keeps independent instances sandboxed from each other.
Less FFI boilerplate: request exports, event callback registration, wrappers, and lifecycle glue are all generated from the API definitions instead of being hand-written and repeated.
Safer by construction: the generated layer removes much of the manual synchronization, ownership plumbing, and calling-convention risk that usually makes FFI code fragile and easy to misuse.
In short: define the API once as brokers, then let the generator turn that contract into a usable shared-library interface.
RequestBroker(API) turns typed requests into exported C ABI functionsEventBroker(API) turns typed events into callback registration functionsregisterBrokerLibrary adds lifecycle, context management, header generation, and wrapper generationResult: instead of hand-writing fragile callback tables and request trampolines, the broker declarations become the authoritative API schema.
mylib_createContext() performs one-time runtime initialization if neededmylib_createContext() starts the delivery thread first, then the processing threadmylib_shutdown(ctx) first invokes ShutdownRequest on the processing threadOperational rule: keep foreign callbacks lightweight. Blocking in a callback stalls later callback delivery on the delivery thread.
when defined(BrokerFfiApi):
import brokers/api_library
RequestBroker(API):
type InitializeRequest = object
configPath*: string
initialized*: bool
proc signature*(configPath: string):
Future[Result[InitializeRequest, string]] {.async.}
RequestBroker(API):
type ShutdownRequest = object
status*: int32
proc signature*():
Future[Result[ShutdownRequest, string]] {.async.}
EventBroker(API):
type DeviceDiscovered = object
deviceId*: int64
name*: string
deviceType*: string
address*: string
when defined(BrokerFfiApi):
registerBrokerLibrary:
name: "mylib"
initializeRequest: InitializeRequest
shutdownRequest: ShutdownRequest
Single source of truth: the request and event types define the foreign API surface as well as the internal broker contracts.
Provider setup: setupProviders(ctx) runs on the processing thread and installs the request handlers that back the exported ABI.
Lifecycle split: foreign callers use mylib_createContext() and mylib_shutdown(ctx); library authors implement InitializeRequest and ShutdownRequest.
Wrapper generation: from this one declaration set, the build emits libmylib, mylib.h, the C++ wrapper class, and optionally the Python ctypes wrapper.
Mylib lib;
auto created = lib.createContext();
if (!created.ok()) {
fprintf(stderr, "FATAL: %s\n", created.error().c_str());
return 1;
}
auto h_disc = lib.onDeviceDiscovered(
[](Mylib& owner, int64_t id, const std::string_view name,
const std::string_view type,
const std::string_view addr) {
(void)owner;
printf("discovered: %.*s\n",
(int)name.size(), name.data());
});
auto res = lib.initializeRequest("/opt/devices.yaml");
std::vector fleet = {
AddDeviceSpec{"Core-Router", "router", "10.0.0.1"}
};
auto add = lib.addDevice(fleet);
auto list = lib.listDevices();
lib.offDeviceDiscovered(h_disc);
lib.shutdown();
free_*_result calls in user codeResult<T> wrappers preserve the broker success/error modelMylib& owner, Python callbacks receive owner: MylibMylib instance, so registration and delivery stay instance-specificShutdownRequest directly; shutdown() handles teardownThreading implication: the callback above runs on the delivery thread, while initializeRequest(), addDevice(), and listDevices() synchronously wait for provider work on the processing thread.
nim c \
-d:BrokerFfiApi \
--threads:on \
--app:lib \
--nimMainPrefix:mylib \
--mm:orc \
--outdir:examples/ffiapi/nimlib/build \
examples/ffiapi/nimlib/mylib.nim
Add -d:BrokerFfiApiGenPy when you also want the generated Python wrapper module.
-d:BrokerFfiApi enables the library generator path--threads:on enables the delivery and processing thread runtime--app:lib builds a shared library instead of an executable--nimMainPrefix:mylib must match name: "mylib" in registerBrokerLibrary--mm:orc or --mm:refc are the supported memory managersIn this repository the convenience entry points are nimble buildFfiExample, nimble buildFfiExampleCpp, and nimble runFfiExampleCpp.
A simple way to navigate the broker toolbox:
1. Start with single-thread brokers
Use async or sync brokers when you want mockable, type-safe decoupling between layers inside one module, library, or application without introducing thread complexity.
2. Move to multi-thread brokers
Use multi-thread brokers when you want lightweight in-process IPC, need to off-load work onto worker threads, and still want communication between those threads to remain structured and safe.
3. Choose the FFI-API layer
Use the Broker FFI-API when the library is meant to be consumed broadly from other languages or external runtimes and you want a safe shared-library surface without hand-written plumbing.
Pick the smallest layer that solves the problem: in-thread decoupling first, cross-thread messaging next, and generated FFI only when the boundary must leave Nim.
A natural extension point beyond the core broker runtime:
Serializable by design: broker requests and events are plain structured data, which makes them a strong fit for explicit serialization formats.
This is where brokers can evolve from an internal decoupling tool into a portable interface definition that survives beyond a single process.