Logos.co
λogos · Messaging

Event & Request
Broker Design

NagyZoltanPeter/nim-brokers

Architecture · Guarantees · Usage Patterns

EventBroker RequestBroker MultiRequestBroker BrokerContext Multi-Thread Brokers
Broker FFI-API

Use ← → for sections  ·  ↑ ↓ for slides within a section

01

The Problem

Tight coupling through raw callback pointers

Callbacks Wired at Init Time

A raw function pointer is captured during construction and stored without lifetime management.

waku/waku_core/subscription/push_handler.nim

type FilterPushHandler* =
  proc(pubsubTopic: PubsubTopic,
       message: WakuMessage)
    {.async, gcsafe, closure.}
        
waku/waku_core/subscription/subscription_manager.nim

type SubscriptionManager* = object
  subscriptions:
    TableRef[(string, ContentTopic),
              FilterPushHandler]   # raw proc pointer

proc registerSubscription*(m, pubsubTopic,
    contentTopic, handler: FilterPushHandler) =
  m.subscriptions[(pubsubTopic, contentTopic)]
    = handler   # stored forever, no cleanup API
        

Problems

  • Handler stored in a table with no expiry
  • If the owner is destroyed, the closure becomes a dangling capture
  • SubscriptionManager must be threaded through every layer
  • Init order matters — register too early and the sender doesn't exist yet
  • Hard to test in isolation

Raw Push Handlers in the Filter Client

waku/waku_filter_v2/client.nim

type WakuFilterClient* = ref object of LPProtocol
  rng: ref HmacDrbgContext
  peerManager: PeerManager
  pushHandlers: seq[FilterPushHandler] # ← raw list

# Anyone wiring up the node must call this
proc registerPushHandler*(
    wfc: WakuFilterClient,
    handler: FilterPushHandler) =
  wfc.pushHandlers.add(handler)     # add and forget

# Deep inside recv_service initialization:
node.wakuFilterClient
  .registerPushHandler(
    proc(pubsubTopic: PubsubTopic,
         message: WakuMessage) {.async, closure.} =
      recvService.recentReceivedMsgs.add(...)
  )
    

Raw Push Handlers — What Goes Wrong

pushHandlers is a plain seq. No way to remove a handler — not even when the owning service stops.

recvService is captured by the closure. If RecvService is destroyed first, the closure holds a stale reference.

Adding a new subscriber means touching filter client construction code — import dependency in both directions.

The Coupling Chain

Every new subscriber forces a new dependency across the entire init hierarchy.

WakuNode
creates
FilterClient
holds seq[Handler]
RecvService
registers handler
Closure
captures recvService

Direct dependency: RecvService must know about WakuFilterClient just to receive events.

No cleanup contract: stopRecvService cannot deregister — the callback lives as long as WakuFilterClient does.

The goal: modules should talk to each other without importing or knowing about each other. That's what the Broker pattern delivers.

02

EventBroker

Reactive · Pub/Sub · Many emitters → Many listeners

EventBroker at a Glance

What it is

  • A Nim macro that generates a type-safe, thread-local pub/sub broker for a single event type
  • Emitters call emit()zero knowledge of who is listening
  • Listeners register with listen() and receive a handle for unregistering
  • Any number of listeners, any number of emitters

Generated API


-- Declare once:
EventBroker:
  type MyEvent = object
    field: string

-- Anywhere in your code:
MyEvent.listen(handler)        # register
MyEvent.listen(ctx, handler)   # context-aware
MyEvent.emit(event)            # fire
MyEvent.emit(ctx, field = "x") # shorthand ctor
await MyEvent.dropListener(handle)   # drain in-flight, then cleanup
MyEvent.dropAllListeners()           # drain all in-flight (5s timeout)
        

Thread-local storage — no locks needed

Async dispatch — listeners run as independent tasks

Handle-based — explicit, safe cleanup

Step 1 · Declare Your Events

One macro call per event type — can live in a shared events module.

waku/events/delivery_events.nim

import waku/common/broker/event_broker

EventBroker:
  type DeliveryFeedbackEvent* = ref object
    success*:  DeliverySuccess
    dir*:      DeliveryDirection
    comment*:  string
    msgHash*:  WakuMessageHash
    msg*:      WakuMessage

EventBroker:
  type OnFilterSubscribeEvent* = object
    pubsubTopic*:   string
    contentTopics*: seq[string]

EventBroker:
  type OnFilterUnSubscribeEvent* = object
    pubsubTopic*:   string
    contentTopics*: seq[string]
    

The macro exports the type plus a companion broker — both live in thread-local storage. No global state, no locks.

Step 2 · Emit Events

The emitter doesn't import or know about any listener.

waku/waku_filter_v2/client.nim

proc subscribe*(wfc: WakuFilterClient, ...) {.async.} =
  # ... network call ...
  ?await wfc.sendSubscribeRequest(peer, req)

  # Fire-and-forget: no knowledge of RecvService
  OnFilterSubscribeEvent.emit(
    wfc.brokerCtx, pubsubTopic, contentTopicSeq)

proc unsubscribe*(wfc: WakuFilterClient, ...) {.async.} =
  # ... network call ...
  ?await wfc.sendSubscribeRequest(peer, req)

  OnFilterUnSubscribeEvent.emit(
    wfc.brokerCtx, pubsubTopic, contentTopicSeq)
    

emit() spawns async tasks for each listener. The emitter returns immediately — no blocking, no coupling.

Step 2 · Emit Events (continued)

waku/node/delivery_service/recv_service/recv_service.nim

proc performDeliveryFeedback(
    self: RecvService,
    success: DeliverySuccess,
    dir: DeliveryDirection,
    comment: string,
    msgHash: WakuMessageHash,
    msg: WakuMessage,
) {.gcsafe, raises: [].} =
  info "recv monitor performDeliveryFeedback",
    success, dir, comment,
    msg_hash = shortLog(msgHash)

  DeliveryFeedbackEvent.emit(
    brokerCtx = self.brokerCtx,
    success   = success,
    dir       = dir,
    comment   = comment,
    msgHash   = msgHash,
    msg       = msg,
  )
    

The inline constructor overload (emit(brokerCtx, field = value, ...)) is auto-generated for object types — no manual construction needed.

Step 3 · Listen & Clean Up

RecvService manages its own listeners — zero coupling with the filter client.

waku/node/delivery_service/recv_service/recv_service.nim

type RecvService* = ref object of RootObj
  brokerCtx:             BrokerContext
  onSubscribeListener:   OnFilterSubscribeEventListener
  onUnsubscribeListener: OnFilterUnSubscribeEventListener

proc startRecvService*(self: RecvService) =
  self.onSubscribeListener =
    OnFilterSubscribeEvent.listen(
      self.brokerCtx,
      proc(ev: OnFilterSubscribeEvent)
          {.async: (raises: []).} =
        self.onSubscribe(
          ev.pubsubTopic, ev.contentTopics)
    ).valueOr:
      error "Failed to set listener"
      quit(QuitFailure)
    

Step 3 · Listen & Clean Up (continued)

waku/node/delivery_service/recv_service/recv_service.nim

  self.onUnsubscribeListener =
    OnFilterUnSubscribeEvent.listen(
      self.brokerCtx,
      proc(ev: OnFilterUnSubscribeEvent)
          {.async: (raises: []).} =
        self.onUnsubscribe(
          ev.pubsubTopic, ev.contentTopics)
    ).valueOr:
      error "Failed to set listener"
      quit(QuitFailure)

proc stopRecvService*(self: RecvService) {.async.} =
  OnFilterSubscribeEvent.dropListener(
    self.brokerCtx, self.onSubscribeListener)
  OnFilterUnSubscribeEvent.dropListener(
    self.brokerCtx, self.onUnsubscribeListener)
    

Symmetric start/stop — handles stored on the owner, cleanup is explicit and complete.

EventBroker Guarantees

Lifetime safety

  • listen() returns a typed handle
  • Owner stores the handle as a field — lifetime tied to the object
  • dropListener(handle) removes the registration and drains in-flight futures (5 s timeout)
  • After dropListener returns, no callbacks for that handle are running — safe to release resources

Error isolation

  • Each listener runs as an independent asyncSpawn task, tracked in inFlight
  • A panicking listener is caught and logged — others still fire
  • Emitter never awaits listener completion — dropListener drains in-flight on shutdown

No import cycle: emitter and listener only share the event type declaration module.

Thread-local: broker state per event-loop thread — no mutex for dispatch.

Type safety: handler signature enforced at compile time via generated proc type.

03

RequestBroker

Proactive · Request/Response · Many requesters → One provider

RequestBroker at a Glance

What it is

  • Decoupled request/response — one provider answers many requesters
  • Provider registers itself; requesters call the broker — neither imports the other
  • Signature compile-time enforced via a proc signature declaration
  • Two modes: async (default) and sync for simple state accessors

Generated API


-- Declare once:
RequestBroker:        # or RequestBroker(sync):
  type MyData = object
    value: string
  proc signature*():
    Future[Result[MyData, string]]

-- Provider side (e.g. in WakuNode):
MyData.setProvider(
  proc(): Future[...] {.async.} =
    ok(MyData(value: "hello")))

-- Requester side (anywhere):
let data = await MyData.request()
        

Step 1 · Declare Your Requests

Sync variants — simple state accessors, no Future.

waku/requests/health_requests.nim

# Zero-arg sync: simple state accessor
RequestBroker(sync):
  type RequestConnectionStatus* = object
    connectionStatus*: ConnectionStatus

# Arg-based sync: parameterised lookup
RequestBroker(sync):
  type RequestContentTopicsHealth* = object
    contentTopicHealth*:
      seq[tuple[topic: ContentTopic,
                health: TopicHealth]]
  proc signature(topics: seq[ContentTopic]):
    Result[RequestContentTopicsHealth, string]
    

Step 1 · Declare Your Requests (async)

Async variants — for IO-bound operations.

waku/requests/health_requests.nim

# Zero-arg async: may involve IO
RequestBroker:
  type RequestHealthReport* = object
    healthReport*: HealthReport

# Arg-based async: query a specific protocol
RequestBroker:
  type RequestProtocolHealth* = object
    healthStatus*: ProtocolHealth
  proc signature(protocol: WakuProtocol):
    Future[Result[RequestProtocolHealth, string]]
    

The proc signature declaration enforces the provider's type at compile time. If omitted, a zero-arg form is generated automatically.

Step 2 · Register a Provider (sync)

Provider registers at startup — requesters don't need to know who provides.

waku/factory/waku.nim

## Sync provider — zero-arg, no IO involved
RequestConnectionStatus.setProvider(
  threadGlobalBrokerContext(),
  proc(): Result[RequestConnectionStatus, string] =
    try:
      let report = waku[].healthMonitor
                         .getSyncNodeHealthReport()
      ok(RequestConnectionStatus(
        connectionStatus: report.connectionStatus))
    except:
      err("Failed: " & getCurrentExceptionMsg()),
).isOkOr:
  error "Failed to set provider"
    

Step 2 · Register a Provider (async)

waku/factory/waku.nim

## Async provider — may perform IO
RequestHealthReport.setProvider(
  threadGlobalBrokerContext(),
  proc(): Future[
      Result[RequestHealthReport, string]]
      {.async.} =
    try:
      let report = await waku[].healthMonitor
                               .getNodeHealthReport()
      ok(RequestHealthReport(healthReport: report))
    except:
      err("Failed: " & getCurrentExceptionMsg()),
).isOkOr:
  error "Failed to set provider"
    

Step 2 · Register a Provider (arg-based)

waku/node/waku_node.nim

## Arg-based sync provider
RequestShardTopicsHealth.setProvider(
  node.brokerCtx,
  proc(topics: seq[PubsubTopic]):
      Result[RequestShardTopicsHealth, string] =
    var response: RequestShardTopicsHealth

    for shard in topics:
      var healthStatus = TopicHealth.UNHEALTHY

      if not node.wakuRelay.isNil:
        healthStatus = node.wakuRelay.topicsHealth
          .getOrDefault(shard, TopicHealth.NOT_SUBSCRIBED)

      response.topicHealth.add((shard, healthStatus))

    return ok(response),
).isOkOr:
  error "Can't set provider for RequestShardTopicsHealth",
    error = error
    

Step 3 · Make a Request

Any module can request without knowing who provides — or even if a provider is registered.


## Sync request — no await needed
let statusRes =
  RequestConnectionStatus.request()
statusRes.isOkOr:
  error "No connection status provider"
  return
echo statusRes.get().connectionStatus

## Sync with args
let healthRes =
  RequestShardTopicsHealth.request(topics)
healthRes.isOkOr:
  error "Shard health unavailable"; return

## Async request
let reportRes =
  await RequestHealthReport.request()
reportRes.isOkOr:
  error "Health report unavailable"; return
let report = reportRes.get().healthReport
        

If no provider is registered, request() returns err(...) — never panics.

Provider exceptions are caught and converted to err — requester always gets a Result.

One provider per context: calling setProvider twice returns an error — no silent overwrites.

04

MultiRequestBroker

Fan-out · Collect all · Many providers → One result set

MultiRequestBroker — Code


MultiRequestBroker:
  type CollectedStatus* = object
    label*: string

  proc signature*():
    Future[Result[CollectedStatus, string]]

# Register N providers:
let h1 = CollectedStatus.setProvider(
  proc(): Future[...]
      {.async: (raises: []).} =
    ok(CollectedStatus(label: "relay")))

let h2 = CollectedStatus.setProvider(
  proc(): Future[...]
      {.async: (raises: []).} =
    ok(CollectedStatus(label: "filter")))

# Request fans out — returns seq[T]:
let results = await CollectedStatus.request()
# Result[seq[CollectedStatus], string]
#   @["relay", "filter"]

# Remove individual provider via handle:
CollectedStatus.removeProvider(h1)
# or clear all:
CollectedStatus.clearProviders()
    

MultiRequestBroker — Use Case

  • Collecting status/metrics from multiple independent subsystems
  • Each subsystem registers itself — no central orchestrator
  • All providers called concurrently via allFinished
  • Succeeds only if all providers succeed — one failure short-circuits

Unlike RequestBroker, setProvider returns a handle (not just ok/err) so providers can be removed individually.

05

BrokerContext

Instance isolation · Multi-node · Same thread, separate worlds

Why We Needed BrokerContext

Scenario: two WakuNode instances on the same thread (integration tests, embedded usage). Both need their own listeners for OnFilterSubscribeEvent. Without context isolation, the single thread-local broker merges them — node A's events fire node B's listeners.

How it works

  • Every broker stores listeners in buckets, keyed by BrokerContext
  • Bucket 0 is always DefaultBrokerContext — used when no context is specified
  • Each component instance calls NewBrokerContext() and stores as field
  • Context is propagated to submodules at init time

Context isolation is opt-in: omit the brokerCtx argument and the default context is used transparently.

BrokerContext Implementation

waku/common/broker/broker_context.nim

type BrokerContext* = distinct uint32

const DefaultBrokerContext* =
  BrokerContext(0xCAFFE14E'u32)

proc NewBrokerContext*(): BrokerContext =
  # Atomically increment global counter
  BrokerContext(
    gContextCounter.fetchAdd(1, moRelaxed))

# In component initialization:
type WakuNode* = ref object
  brokerCtx: BrokerContext  # unique per instance

proc new*(T: type WakuNode, ...): T =
  WakuNode(
    brokerCtx: NewBrokerContext(), ...)

# Thread-global context binding (sync, usable from {.thread.} init):
proc setThreadBrokerContext*(ctx: BrokerContext)
  ## Adopts ctx as this thread's global context
proc initThreadBrokerContext*(): BrokerContext
  ## Creates new ctx + sets as thread-global
proc threadGlobalBrokerContext*(): BrokerContext
  ## Returns this thread's global context (lock-free)

# Async scoped context (backward compat):
template lockGlobalBrokerContext*(
    brokerCtx: BrokerContext,
    body: untyped) =
  await noCancel(
    globalBrokerContextLock.acquire())
  globalBrokerContextValue = brokerCtx
  try: body
  finally: restore + release
    

Context Isolation — Code

Two nodes, same thread


let nodeA = WakuNode.new(...)  # ctx = 0x0001
let nodeB = WakuNode.new(...)  # ctx = 0x0002

let recvA = RecvService.new(nodeA, ...)
let recvB = RecvService.new(nodeB, ...)

# recvA registers under nodeA's context:
OnFilterSubscribeEvent.listen(
  nodeA.brokerCtx,
  proc(ev: ...) {.async.} =
    recvA.onSubscribe(...))

# recvB registers under nodeB's context:
OnFilterSubscribeEvent.listen(
  nodeB.brokerCtx,
  proc(ev: ...) {.async.} =
    recvB.onSubscribe(...))

# Emit from nodeA — only recvA fires:
OnFilterSubscribeEvent.emit(
  nodeA.brokerCtx,
  pubsubTopic, contentTopics)
    

Context Isolation — Bucket Structure

Thread-local Broker (OnFilterSubscribeEvent)
Bucket 0
DefaultBrokerContext
listeners: [ ]
Bucket 1
nodeA.brokerCtx
listeners: [recvA]
Bucket 2
nodeB.brokerCtx
listeners: [recvB]

Bucket cleanup is automatic: dropAllListeners(ctx) with a non-default context removes the bucket entirely.

Emit from nodeA.brokerCtx → only Bucket 1 listeners fire. Node B is completely unaffected.

06

Multi-Thread Brokers

Cross-thread messaging · Same API · New trade-offs

Why Go Multi-Thread?

Single-thread brokers keep listeners and emitters on one Chronos loop. When subsystems move to dedicated threads for CPU isolation or blocking I/O, those in-thread shortcuts break.

Use cases that push you across thread boundaries

  • CPU-bound work — heavy cryptography, proof generation, or tree hashing that would block the main event loop
  • Blocking FFI — C libraries (e.g. database drivers, hardware access) that don’t expose async APIs
  • Thread-per-service architecture — each node service owns a thread + Chronos loop; brokers bridge them
  • Integration tests — multiple node instances in one process, each on its own thread, sharing typed events

Enabling Multi-Thread Mode

Both EventBroker and RequestBroker accept an (mt) driver annotation. The public API stays almost identical — the macro swaps the internal plumbing.

Single-thread (default)

  • Listeners stored directly in threadvar seqs
  • Emit / request dispatched inline on the same Chronos loop
  • No locks, no channels, no shared memory

Multi-thread (mt)

  • Shared bucket registry under Lock
  • One AsyncChannel per (context, listener-thread)
  • Closures stay in threadvar — only metadata is shared
  • Same-thread fast-path bypasses channels entirely

Cross-Thread Delivery

Thread A
emit()
Thread B
emit()
↓ sendSync ↓
Shared Bucket Registry
Lock-protected · createShared
↓ AsyncChannel per listener thread ↓
Thread C
processLoop
Thread D
processLoop

Same-thread optimisation: when emitter and listener share a thread, the channel is bypassed — listeners are dispatched directly via asyncSpawn, giving near-zero overhead identical to the single-thread broker.

Costs of Multi-Thread Mode

Cost Where it comes from Mitigation
Lock contention Every emit / listen / drop acquires the global lock to read or mutate the bucket registry Lock is held only for metadata scan — no callbacks run under lock
Channel overhead Cross-thread delivery copies the event message into an AsyncChannel via sendSync Same-thread path skips channels entirely; keep hot-path listeners co-located
Message copying Event / request payloads are value-copied into the channel buffer Keep payloads small or use ref types for large data
Shared memory Bucket array & channels live in createShared heap — manual lifecycle, no GC Managed automatically by the macro-generated init / grow / drop procs
Shutdown complexity dropListener / dropAllListeners drain in-flight futures (5 s timeout per future) before returning — ensures no callbacks touch released resources Call await dropListener(handle) before releasing resources; allow an event-loop tick after dropAllListeners for processLoop cleanup

Benefits of Multi-Thread Mode

Benefit Detail
Same public API emit, listen, request, setProvider, dropAllListeners — identical call-sites; only the macro declaration changes. MT adds setRequestTimeout / requestTimeout.
Timeout protection Cross-thread request returns err after a configurable timeout (default 5 s) if the provider is unresponsive — no indefinite hangs
Transparent fan-out One emit reaches listeners on every thread that registered — no manual channel wiring
Same-thread fast path Co-located emitters and listeners pay zero channel cost — identical performance to single-thread mode
GC safety Closures never cross heap boundaries — stored in per-thread threadvar; only plain-old-data metadata is shared
Context isolation preserved BrokerContext scoping works identically — two node instances on two threads stay fully isolated
Clean shutdown dropListener and dropAllListeners drain in-flight callbacks (5 s timeout), clean up channels, and remove remote threadvars — safe to release resources immediately after return

Design Considerations

Thread ownership rules

  • listen must be called from the thread that will own the listener
  • dropListener must be called from the same thread that registered it; drains in-flight futures (async, 5 s timeout)
  • dropAllListeners can be called from any thread; drains in-flight futures before returning
  • emit can be called from any thread

EventBroker: emit is async — use await in Chronos contexts, waitFor from bare {.thread.} procs.

When to stay single-thread

  • All emitters and listeners live on the same Chronos loop
  • No blocking FFI or CPU-heavy work in the pipeline
  • Integration tests with multiple nodes can use BrokerContext isolation instead of threads

Rule of thumb: start with single-thread. Switch to (mt) only when a subsystem genuinely needs its own thread.

RequestBroker (mt) — Specifics

The multi-thread RequestBroker preserves the single-provider-per-context guarantee across threads.

  • Cross-thread request — caller sends request via channel, blocks (or awaits) until the provider thread responds via a separate response channel
  • Round-trip latency — two channel hops (request + response), both using AsyncChannel; expect ~2× the cost of a single emit
  • Provider thread must run a Chronos loopprocessLoop receives requests and dispatches to the local provider closure
  • Same-thread fast path — when requester and provider share a thread, the provider is called directly with zero channel overhead
  • Timeout handling — cross-thread requests use withTimeout (default 5 s); if the provider is unresponsive, request returns err. Configure via T.setRequestTimeout(duration) / T.requestTimeout(). Same-thread requests are unaffected.

EventBroker (mt) — Specifics

The multi-thread EventBroker delivers events to all listeners on all threads (broadcast fan-out).

  • Per-listener-thread channel — one AsyncChannel per (context, listener-thread) pair; multiple listeners on the same thread share one channel
  • In-flight trackingprocessLoop tracks running listener futures and drains them on shutdown (5 s timeout)
  • Listener concurrency — within a thread, all listeners for an event run concurrently (not serialised)
  • Async emitemit is {.async: (raises: []).}; cross-thread sends use sendSync (brief: buffer + signal), same-thread dispatches via asyncSpawn
  • Ordering — per-thread FIFO guaranteed by the channel; cross-thread ordering is per-target-thread, not global

Broker Family: Quick Reference

Aspect EventBroker RequestBroker MultiRequestBroker
Pattern Reactive pub/sub Proactive req/resp Proactive fan-out
Direction N emitters → N listeners N requesters → 1 provider N requesters → N providers
Return type void per listener Result[T, string] Result[seq[T], string]
Mode Always async Async or Sync Always async
Providers / Listeners Unlimited, handle per reg Exactly 1 per context Unlimited, handle per reg
Context-aware Yes Yes Yes
Multi-thread (mt) Yes Yes No
Use when Something happened; notify who cares Query from one authoritative source Gather data from multiple subsystems

Before vs After

Before — Raw callbacks

  • Callback stored in seq with no cleanup path
  • Sender must import receiver's types
  • Init order matters — wire-up at the right moment
  • Stale captures after owner destruction
  • Adding a subscriber → modifying unrelated code

After — Brokers

  • Handle stored on owner; cleanup explicit (start/stop)
  • Emitter and listener share only the event module
  • Register/provide after construction — order doesn't matter
  • No stale captures — handle tracks registration
  • Adding a subscriber → one listen() in its own module

Modules interact through a shared vocabulary of typed events and requests — not through direct references or init-time wiring.

09

Broker FFI-API

Typed shared-library surface · C ABI · C++ and Python wrappers

The Concept

Clean contractual design: the library interface is described as a typed set of broker requests and broker events, so the contract stays explicit, reviewable, and compile-time checked.

Decoupled modules: the same broker vocabulary lets internal components collaborate without direct references, while BrokerContext keeps independent instances sandboxed from each other.

Less FFI boilerplate: request exports, event callback registration, wrappers, and lifecycle glue are all generated from the API definitions instead of being hand-written and repeated.

Safer by construction: the generated layer removes much of the manual synchronization, ownership plumbing, and calling-convention risk that usually makes FFI code fragile and easy to misuse.

In short: define the API once as brokers, then let the generator turn that contract into a usable shared-library interface.

What the Broker FFI-API Adds

Use brokers as the library contract

  • RequestBroker(API) turns typed requests into exported C ABI functions
  • EventBroker(API) turns typed events into callback registration functions
  • registerBrokerLibrary adds lifecycle, context management, header generation, and wrapper generation
  • The Nim implementation stays decoupled internally while foreign callers see a stable library interface
Foreign App
C / C++ / Python
Generated ABI
header + wrappers
API Brokers
requests + events

Result: instead of hand-writing fragile callback tables and request trampolines, the broker declarations become the authoritative API schema.

Threading Model of a Created Library Context

Foreign caller
mylib_createContext()
blocking request calls
Delivery thread
owns listener registration
runs foreign callbacks
←→
Processing thread
runs setupProviders(ctx)
executes request providers

Lifecycle semantics

  • mylib_createContext() performs one-time runtime initialization if needed
  • mylib_createContext() starts the delivery thread first, then the processing thread
  • Creation blocks until both threads report readiness
  • When create returns, listener registration and API requests are immediately usable
  • mylib_shutdown(ctx) first invokes ShutdownRequest on the processing thread
  • After that, shutdown joins the delivery thread and then the processing thread

Execution ownership

  • Request providers always run on the processing thread
  • Foreign event callbacks always run on the delivery thread
  • Generated C/C++/Python request calls block until the provider finishes
  • Events cross from Nim emitters to the delivery thread before invoking the FFI callback

Operational rule: keep foreign callbacks lightweight. Blocking in a callback stalls later callback delivery on the delivery thread.

Author the Library Interface in Nim

examples/ffiapi/nimlib/mylib.nim

when defined(BrokerFfiApi):
  import brokers/api_library

RequestBroker(API):
  type InitializeRequest = object
    configPath*: string
    initialized*: bool
  proc signature*(configPath: string):
    Future[Result[InitializeRequest, string]] {.async.}

RequestBroker(API):
  type ShutdownRequest = object
    status*: int32
  proc signature*():
    Future[Result[ShutdownRequest, string]] {.async.}

EventBroker(API):
  type DeviceDiscovered = object
    deviceId*: int64
    name*: string
    deviceType*: string
    address*: string

when defined(BrokerFfiApi):
  registerBrokerLibrary:
    name: "mylib"
    initializeRequest: InitializeRequest
    shutdownRequest: ShutdownRequest
        

Single source of truth: the request and event types define the foreign API surface as well as the internal broker contracts.

Provider setup: setupProviders(ctx) runs on the processing thread and installs the request handlers that back the exported ABI.

Lifecycle split: foreign callers use mylib_createContext() and mylib_shutdown(ctx); library authors implement InitializeRequest and ShutdownRequest.

Wrapper generation: from this one declaration set, the build emits libmylib, mylib.h, the C++ wrapper class, and optionally the Python ctypes wrapper.

Consume It from C++ Through the Generated Wrapper

examples/ffiapi/cpp_example/main.cpp

Mylib lib;
auto created = lib.createContext();
if (!created.ok()) {
  fprintf(stderr, "FATAL: %s\n", created.error().c_str());
  return 1;
}

auto h_disc = lib.onDeviceDiscovered(
    [](Mylib& owner, int64_t id, const std::string_view name,
     const std::string_view type,
     const std::string_view addr) {
      (void)owner;
    printf("discovered: %.*s\n",
           (int)name.size(), name.data());
  });

auto res = lib.initializeRequest("/opt/devices.yaml");
  std::vector fleet = {
    AddDeviceSpec{"Core-Router", "router", "10.0.0.1"}
  };
  auto add = lib.addDevice(fleet);
auto list = lib.listDevices();

lib.offDeviceDiscovered(h_disc);
lib.shutdown();
        

Why the wrapper matters

  • No manual free_*_result calls in user code
  • Result<T> wrappers preserve the broker success/error model
  • Callbacks are owner-aware in both wrappers: C++ lambdas receive Mylib& owner, Python callbacks receive owner: Mylib
  • Event callbacks are routed through the owning Mylib instance, so registration and delivery stay instance-specific
  • The library consumer never needs to call ShutdownRequest directly; shutdown() handles teardown

Threading implication: the callback above runs on the delivery thread, while initializeRequest(), addDevice(), and listDevices() synchronously wait for provider work on the processing thread.

Build Flags That Enable the Feature


nim c \
  -d:BrokerFfiApi \
  --threads:on \
  --app:lib \
  --nimMainPrefix:mylib \
  --mm:orc \
  --outdir:examples/ffiapi/nimlib/build \
  examples/ffiapi/nimlib/mylib.nim
        

Add -d:BrokerFfiApiGenPy when you also want the generated Python wrapper module.

Required knobs

  • -d:BrokerFfiApi enables the library generator path
  • --threads:on enables the delivery and processing thread runtime
  • --app:lib builds a shared library instead of an executable
  • --nimMainPrefix:mylib must match name: "mylib" in registerBrokerLibrary
  • --mm:orc or --mm:refc are the supported memory managers

In this repository the convenience entry points are nimble buildFfiExample, nimble buildFfiExampleCpp, and nimble runFfiExampleCpp.

Summary

A simple way to navigate the broker toolbox:

1. Start with single-thread brokers

Use async or sync brokers when you want mockable, type-safe decoupling between layers inside one module, library, or application without introducing thread complexity.

2. Move to multi-thread brokers

Use multi-thread brokers when you want lightweight in-process IPC, need to off-load work onto worker threads, and still want communication between those threads to remain structured and safe.

3. Choose the FFI-API layer

Use the Broker FFI-API when the library is meant to be consumed broadly from other languages or external runtimes and you want a safe shared-library surface without hand-written plumbing.

Pick the smallest layer that solves the problem: in-thread decoupling first, cross-thread messaging next, and generated FFI only when the boundary must leave Nim.

Opportunities!

A natural extension point beyond the core broker runtime:

Serializable by design: broker requests and events are plain structured data, which makes them a strong fit for explicit serialization formats.

  • Encode broker payloads with CBOR, JSON, or Protobuf
  • Reuse the same contract across process boundaries
  • Treat the broker interface as a stable schema for plugins, RPC, or transport layers
Broker request / event type

↓ serialize ↓

JSON / CBOR / Protobuf

↓ transport or storage ↓

RPC · plugin boundary · cross-process API

This is where brokers can evolve from an internal decoupling tool into a portable interface definition that survives beyond a single process.