Event Ledger — CDC substrate

Substrate diagram · R553 Phase 52, R560 hardening · src/lib/events.ts (248 LOC)

Append-only events stream + subscription-driven workflow dispatch. Every meaningful state change writes a row. Consumers drain by cursor — one cursor per drainer, glob-matched against event_subscriptions. The single substrate that lets workflows compose without producers needing to know who consumes.

REAL implementation PLANNED 5 producers wired 2 subscriptions seeded (both enabled=0)

Current state

Events recorded
120
Producers wired
5
Subscriptions seeded
2
Subscriptions enabled
0

Schema — 3 tables (migrations 123 + 127)

tablekey columnspurpose
eventsevent_id (ULID PK, client-generated), event_type, entity_type, entity_id, payload_json, caused_by, workflow_run_id, source_system, occurred_at, recorded_at, sequence_no, idempotency_key (R560)append-only ledger; ULID PK gives lexicographic time order
event_subscriptionssubscription_id, event_type_glob (e.g. price.*), workflow_type, input_mapper (JSONPath recipe), filter_expr, enabledmaps event_type glob → workflow_type. Drainer reads enabled rows only
event_drain_statedrainer_id (PK), last_event_id, last_drained_at, events_processed_totalper-drainer cursor. Multiple consumers drain independently

R560 idempotency (migration 127): CREATE UNIQUE INDEX idx_events_idempotency ON events(idempotency_key) WHERE idempotency_key IS NOT NULL — partial unique. Producer retries with the same key collapse to one row via INSERT OR IGNORE.

Full pipeline — producers → ledger → drainer → runner

PRODUCERS (5 wired) sync.tier_completed src/index.ts:31302 (success) sync.tier_completed (error) src/index.ts:31313 (error variant) hitl.approved src/index.ts:25103 (decide endpoint) hitl.rejected src/index.ts:25051 price.changed src/index.ts:25220 (bulk_cost_basis drain) email.parsed src/email.ts:547 recordEvent(env, ev) ULID() → event_id INSERT OR IGNORE (R560) events table append-only PK: event_id (ULID) drainEvents(drainerId) 1. KV lock (TTL 300s) 2. SELECT cursor 3. SELECT events > lastId 4. compile globs 5. dispatch matched event_drain_state drainer_id → cursor event_subscriptions enabled=1 only workflow_runner executeWorkflowContract (per match) invoked_by = "event:<type>:<id>" on dispatch failure (R560) 1. recordEvent workflow.dispatch_failed 2. halt drain at last success 3. retry next pass flow: producer → recordEvent → events table → drainer reads after cursor → glob-match subs → executeWorkflowContract cursor advances ONLY past events successfully dispatched (R560 fix). Failure halts; next drain pass retries. KV concurrency lock prevents two cron firings + a manual drain from double-invoking subscriptions. idempotency_key (optional) collapses producer retries to one row via INSERT OR IGNORE (migration 127).

Cursor advance — R560 fix (the silent loss bug)

OLD — broken

// Pseudocode of the pre-R560 loop:
for (const ev of rows) {
  for (const sub of subs) {
    if (matches(sub, ev)) {
      await executeWorkflowContract(...)
      // errors silently swallowed
    }
  }
}
// cursor advanced past ALL rows in batch
// even ones that failed dispatch →
// triggers permanently lost.

NEW — R560

let lastSuccessfulEventId = lastId;
eventLoop:
for (const ev of rows) {
  for (const sub of compiledSubs) {
    if (!sub._matcher(ev.event_type)) continue;
    try {
      await executeWorkflowContract(...);
    } catch (e) {
      await recordEvent({
        event_type: 'workflow.dispatch_failed',
        ...
      });
      haltedOnEventId = ev.event_id;
      break eventLoop;       // halt drain
    }
  }
  lastSuccessfulEventId = ev.event_id;
}
// cursor ← lastSuccessfulEventId
// failed event re-picked next pass.

Producers — 5 wired today

event_typesource_systemcaused_byfired at
sync.tier_completedsyncsync:tier=<tier>src/index.ts:31302
sync.tier_completed (error variant)syncsync:tier=<tier>src/index.ts:31313
hitl.approvedhitlhitl:api:decidesrc/index.ts:25103
hitl.rejectedhitlhitl:api:decidesrc/index.ts:25051
price.changeddraindrain:push_type=bulk_cost_basissrc/index.ts:25220
email.parsedemailemail:mailbox=<mailbox>src/email.ts:547

The line at src/index.ts:14479 is the POST /api/events/record generic-producer endpoint — admin-write path for manual event injection (also used in self-test).

Consumer side — drainEvents pipeline

  1. KV concurrency locklock:events_drain:<drainerId> with 300s TTL. Existing lock → return { skipped_due_to_lock: true }. Cleaned in finally block.
  2. Read cursorSELECT last_event_id FROM event_drain_state WHERE drainer_id=?1. Empty cursor → '' (first drain reads everything).
  3. Fetch new eventsSELECT * FROM events WHERE event_id > ?1 ORDER BY event_id ASC LIMIT ?2. ULID lexicographic order = time order.
  4. Load + precompile subscriptionsSELECT * FROM event_subscriptions WHERE enabled=1; each subscription's glob is precompiled once into a regex matcher (R560 perf fix — was per-cell).
  5. Per event × per sub — matcher hit → applyInputMapper(payload, sub.input_mapper, eventRow) translates the event into workflow inputs (JSONPath-like recipe: $.payload.x, $.entity_id, literal:foo).
  6. Invoke runnerexecuteWorkflowContract(env, sub.workflow_type, inputs, { invoked_by: 'event:<type>:<id>' }).
  7. Success — push to triggered[], mark lastSuccessfulEventId = ev.event_id, continue.
  8. Failure — record workflow.dispatch_failed event, set haltedOnEventId, break eventLoop (drain stops at last success).
  9. Update cursorUPSERT event_drain_state SET last_event_id = lastSuccessfulEventId, increment events_processed_total.

Idempotency — R560 / migration 127

Producers that may retry (queue consumers, sync re-runs) pass a stable idempotency_key like ${event_type}:${entity_id}:${trandate}. INSERT OR IGNORE against the partial UNIQUE index collapses duplicates to one row.

// migration 127
ALTER TABLE events ADD COLUMN idempotency_key TEXT;
CREATE UNIQUE INDEX IF NOT EXISTS idx_events_idempotency
  ON events(idempotency_key) WHERE idempotency_key IS NOT NULL;

// recordEvent (src/lib/events.ts)
const result = await env.DB.prepare(
  `INSERT OR IGNORE INTO events (event_id, ..., idempotency_key)
   VALUES (?1, ..., ?11)`
).bind(...).run();
if (result?.meta?.changes === 0) {
  // Collapsed — look up existing row's event_id for caller's reference.
  const existing = await env.DB.prepare(
    `SELECT event_id FROM events WHERE idempotency_key = ?1 LIMIT 1`
  ).bind(ev.idempotency_key).first();
  return existing?.event_id || null;
}
return id;

HTTP surface

methodpathpurposesrc
GET/api/events/recentRead recent events (filterable); read by timeline.htmlsrc/index.ts:14509
POST/api/events/recordAdmin manual event injection (also used for self-test)src/index.ts:14471
POST/api/events/drainTrigger drain for a drainer (default workflow_runner). Also fired by cron.src/index.ts:14496

Source files