One append-only stream, many consumers
The event ledger is the platform's CDC (change data capture) substrate. Every meaningful state change writes one row to the events table — append-only, ULID-keyed, with structured JSON payload. Producers don't care who's reading; consumers don't care who's writing. That decoupling is what lets us add a new behavior (customer health watcher, anomaly detector, timeline indexer) without changing the workflows that fire the underlying events.
The ledger landed in R553 / Phase 52, per Mike's high-leverage moves list. Before it, workflows would call downstream code synchronously and tight-coupled everything. Now they fan events; downstream subscribers register against event_type globs and the workflow runner drains the queue on a cron.
Schema lives in migrations/schema/123_events_ledger.sql. Three tables: events (the stream), event_subscriptions (who listens to what), event_drain_state (per-consumer cursor).
Who writes events
- sync —
sync.tier_completedfires after each hot/warm/cold tier run. - HITL approval flow —
hitl.approved,hitl.rejected,hitl.deferredper decision. - Pricing —
price.changed,price.bid_line_changed,price.quote_line_changed. - Email —
email.parsed,email.bounced,email.repliedfrom the inbound triage pipeline. - Workflow runner —
workflow.run_started,workflow.run_completed,workflow.run_failed. - AR/Customer —
ar.bucket_moved,ar.collection_sent,customer.updated. - Vendor/Item —
vendor.cost_changed,item.created,spec.updated.
Who reads events
- workflow_runner — drains pending events on cron, matches them against
event_subscriptionsglobs, starts subscribed workflows. - reflexion — reads
hitl.rejectedto learn what proposals to avoid; readsworkflow.run_failedto flag patterns. - customer health watcher — subscribes to
ar.*,email.*,order.*to recompute health signals in near real-time. - timeline UI — per-entity timeline reads events filtered by
entity_type/entity_id. - anomaly detector — windowed reads to flag unusual sequences.
- replay tooling — can re-fire a window of events for debugging.
Event shape
The idempotency_key contract
Some events represent operations that must be exactly-once on consumers. For those, the producer sets an idempotency_key in payload_json. Consumers store the seen keys in their own per-consumer dedupe table and skip events they've already processed.
The canonical example: hitl.approved with key proposed_action_id. If the consumer (e.g. NS push handler) crashes mid-process and the workflow runner re-fires the event, the dedupe check makes the second fire a no-op. Combined with the R560 atomic claim at the approval boundary, we get end-to-end exactly-once semantics.
ULIDs are time-sortable, client-generatable, and globally unique without a coordinator. Producers can emit events at the edge without round-tripping to D1 for an ID. The occurred_at ordering survives backfills because recorded_at tracks insert time separately.
How a workflow subscribes
The event_subscriptions table maps event_type_glob patterns to workflow_type. The drain loop on cron walks new events, matches against enabled subscriptions, applies the optional filter_expr, transforms the payload via input_mapper, and starts the workflow.
Cron schedule + cursors
The drain loop runs on cron schedule */2 * * * * (every 2 minutes) for the workflow_runner drainer. Each drainer tracks its position via event_drain_state.last_event_id. The next run reads events where event_id > last_event_id ORDER BY event_id ASC LIMIT 500.
Other drainers may run on different schedules: timeline indexer hourly, anomaly detector every 15 min, reflexion analyzer every 30 min. Each has its own cursor row so they don't interfere.
- workflow_runner — every 2 min — starts subscribed workflows.
- timeline_indexer — hourly — materializes per-entity event lists for UI.
- anomaly_detector — every 15 min — windowed pattern matching.
- reflexion_analyzer — every 30 min — feeds reflexion rules.
- customer_health_watcher — every 5 min — incremental score recompute.
How an event flows
-
01
Producer emits
The originating workflow calls
emitEvent({type, entity, payload}). The helper generates a ULID, fillscaused_byfrom the calling context, and inserts. ~5ms. -
02
Drainer wakes on cron
The 2-minute cron fires the workflow_runner drainer. It reads its cursor from
event_drain_stateand selects up to 500 newer events. -
03
Match against subscriptions
For each event, the drainer fans against
event_subscriptionswhere the glob matches the event_type. Each match generates a workflow start request. -
04
Idempotency check + start
If the subscription's workflow has an
idempotency_keydeclared, the runner checks per-consumer dedupe. If unseen, it starts the workflow run via the same path as a manual trigger. Cursor advances on commit.
What the substrate enables
- New consumers can be added without touching producers — just register a subscription.
- Per-entity timelines come for free — filter the events table by entity.
- Replay tooling can re-fire historical events into a fresh consumer to backfill state.
- Anomaly detection has a single uniform stream to watch.
What can go wrong
If the cron run fails, events accumulate but aren't lost. Next run picks up at the cursor. Long stalls (> 1 hour) trigger alerting via the platform health endpoint.
Payloads are typed by event_type but not enforced at insert. Consumers should defensively read fields. Schema docs live next to each event_type producer in code comments.
A subscription on '*' would re-fire everything. The drainer caps fanout per event at 8 subscribers as a safety net; alerts on cap hits.
Adjacent substrate
Code paths + invariants
| Concern | Where |
|---|---|
| Schema | migrations/schema/123_events_ledger.sql |
| Emitter | src/index.ts emitEvent helper |
| Drainer | src/lib/workflow_runner.ts drainEvents |
| Cron | wrangler.toml — */2 * * * * for workflow_runner |
| ULID | client-generated for time-sortable ordering |
| Idempotency | payload_json.idempotency_key + per-consumer dedupe |
| Subscriptions | event_subscriptions table — glob matching |
| Per-event fanout cap | 8 subscribers max — anti-storm safety |