Wiki · Substrate piece

Event ledger

An append-only stream of everything that happens. Workflows publish; the workflow runner, reflexion, timeline UI, and anomaly detector subscribe. Decoupled producers from consumers — the substrate that makes the platform extensible.

Real · R553 Phase 52
What this is

One append-only stream, many consumers

The event ledger is the platform's CDC (change data capture) substrate. Every meaningful state change writes one row to the events table — append-only, ULID-keyed, with structured JSON payload. Producers don't care who's reading; consumers don't care who's writing. That decoupling is what lets us add a new behavior (customer health watcher, anomaly detector, timeline indexer) without changing the workflows that fire the underlying events.

The ledger landed in R553 / Phase 52, per Mike's high-leverage moves list. Before it, workflows would call downstream code synchronously and tight-coupled everything. Now they fan events; downstream subscribers register against event_type globs and the workflow runner drains the queue on a cron.

Schema lives in migrations/schema/123_events_ledger.sql. Three tables: events (the stream), event_subscriptions (who listens to what), event_drain_state (per-consumer cursor).

Producers

Who writes events

Consumers

Who reads events

Anatomy of an event

Event shape

{ "event_id": "01J0XYZ...", // ULID — sorts lexicographically by time "event_type": "price.changed", "entity_type": "item", "entity_id": "10472", // NS item id "payload_json": { "old_price": 1.42, "new_price": 1.48, "bid_id": "B5875", "customer_id": "2147", "approved_by": "mike", "proposed_action_id": 8421 }, "caused_by": "workflow:bid_price_update", "workflow_run_id": "run_01J0...", "source_system": "platform", "occurred_at": "2026-05-25T14:23:11Z", "sequence_no": 142 // optional per-entity counter }
Idempotency

The idempotency_key contract

Some events represent operations that must be exactly-once on consumers. For those, the producer sets an idempotency_key in payload_json. Consumers store the seen keys in their own per-consumer dedupe table and skip events they've already processed.

The canonical example: hitl.approved with key proposed_action_id. If the consumer (e.g. NS push handler) crashes mid-process and the workflow runner re-fires the event, the dedupe check makes the second fire a no-op. Combined with the R560 atomic claim at the approval boundary, we get end-to-end exactly-once semantics.

Why ULID and not autoincrement?

ULIDs are time-sortable, client-generatable, and globally unique without a coordinator. Producers can emit events at the edge without round-tripping to D1 for an ID. The occurred_at ordering survives backfills because recorded_at tracks insert time separately.

Subscriptions

How a workflow subscribes

The event_subscriptions table maps event_type_glob patterns to workflow_type. The drain loop on cron walks new events, matches against enabled subscriptions, applies the optional filter_expr, transforms the payload via input_mapper, and starts the workflow.

-- Example: vendor cost changes trigger margin re-check workflow INSERT INTO event_subscriptions (event_type_glob, workflow_type, input_mapper) VALUES ('vendor.cost_changed', 'vendor_cost_review', '{"vendor_id":"$.entity_id"}'); -- Wildcard: any AR bucket move pings customer health INSERT INTO event_subscriptions (event_type_glob, workflow_type) VALUES ('ar.*', 'recompute_customer_health');
Drain

Cron schedule + cursors

The drain loop runs on cron schedule */2 * * * * (every 2 minutes) for the workflow_runner drainer. Each drainer tracks its position via event_drain_state.last_event_id. The next run reads events where event_id > last_event_id ORDER BY event_id ASC LIMIT 500.

Other drainers may run on different schedules: timeline indexer hourly, anomaly detector every 15 min, reflexion analyzer every 30 min. Each has its own cursor row so they don't interfere.

Step-by-step: from emit to action

How an event flows

  1. 01

    Producer emits

    The originating workflow calls emitEvent({type, entity, payload}). The helper generates a ULID, fills caused_by from the calling context, and inserts. ~5ms.

    Writes events
    Time ~5ms
  2. 02

    Drainer wakes on cron

    The 2-minute cron fires the workflow_runner drainer. It reads its cursor from event_drain_state and selects up to 500 newer events.

    Reads event_drain_state, events
    Time ~100ms
  3. 03

    Match against subscriptions

    For each event, the drainer fans against event_subscriptions where the glob matches the event_type. Each match generates a workflow start request.

    Reads event_subscriptions
  4. 04

    Idempotency check + start

    If the subscription's workflow has an idempotency_key declared, the runner checks per-consumer dedupe. If unseen, it starts the workflow run via the same path as a manual trigger. Cursor advances on commit.

    Writes workflow_run_log, event_drain_state.last_event_id
Outcomes

What the substrate enables

Decoupling
Total
producers don't know consumers
Replay
Possible
cursor rewind
Drain lag
≤ 2 min
workflow runner cadence
Idempotency
Per consumer
exactly-once where needed
Failure modes

What can go wrong

Drainer stalls

If the cron run fails, events accumulate but aren't lost. Next run picks up at the cursor. Long stalls (> 1 hour) trigger alerting via the platform health endpoint.

Schema drift in payload

Payloads are typed by event_type but not enforced at insert. Consumers should defensively read fields. Schema docs live next to each event_type producer in code comments.

Wildcard storms

A subscription on '*' would re-fire everything. The drainer caps fanout per event at 8 subscribers as a safety net; alerts on cap hits.

Related

Adjacent substrate

For developers

Code paths + invariants

ConcernWhere
Schemamigrations/schema/123_events_ledger.sql
Emittersrc/index.ts emitEvent helper
Drainersrc/lib/workflow_runner.ts drainEvents
Cronwrangler.toml — */2 * * * * for workflow_runner
ULIDclient-generated for time-sortable ordering
Idempotencypayload_json.idempotency_key + per-consumer dedupe
Subscriptionsevent_subscriptions table — glob matching
Per-event fanout cap8 subscribers max — anti-storm safety