Append-only events stream + subscription-driven workflow dispatch. Every meaningful state change writes a row. Consumers drain by cursor — one cursor per drainer, glob-matched against event_subscriptions. The single substrate that lets workflows compose without producers needing to know who consumes.
| table | key columns | purpose |
|---|---|---|
events | event_id (ULID PK, client-generated), event_type, entity_type, entity_id, payload_json, caused_by, workflow_run_id, source_system, occurred_at, recorded_at, sequence_no, idempotency_key (R560) | append-only ledger; ULID PK gives lexicographic time order |
event_subscriptions | subscription_id, event_type_glob (e.g. price.*), workflow_type, input_mapper (JSONPath recipe), filter_expr, enabled | maps event_type glob → workflow_type. Drainer reads enabled rows only |
event_drain_state | drainer_id (PK), last_event_id, last_drained_at, events_processed_total | per-drainer cursor. Multiple consumers drain independently |
R560 idempotency (migration 127): CREATE UNIQUE INDEX idx_events_idempotency ON events(idempotency_key) WHERE idempotency_key IS NOT NULL — partial unique. Producer retries with the same key collapse to one row via INSERT OR IGNORE.
// Pseudocode of the pre-R560 loop:
for (const ev of rows) {
for (const sub of subs) {
if (matches(sub, ev)) {
await executeWorkflowContract(...)
// errors silently swallowed
}
}
}
// cursor advanced past ALL rows in batch
// even ones that failed dispatch →
// triggers permanently lost.
let lastSuccessfulEventId = lastId;
eventLoop:
for (const ev of rows) {
for (const sub of compiledSubs) {
if (!sub._matcher(ev.event_type)) continue;
try {
await executeWorkflowContract(...);
} catch (e) {
await recordEvent({
event_type: 'workflow.dispatch_failed',
...
});
haltedOnEventId = ev.event_id;
break eventLoop; // halt drain
}
}
lastSuccessfulEventId = ev.event_id;
}
// cursor ← lastSuccessfulEventId
// failed event re-picked next pass.
| event_type | source_system | caused_by | fired at |
|---|---|---|---|
sync.tier_completed | sync | sync:tier=<tier> | src/index.ts:31302 |
sync.tier_completed (error variant) | sync | sync:tier=<tier> | src/index.ts:31313 |
hitl.approved | hitl | hitl:api:decide | src/index.ts:25103 |
hitl.rejected | hitl | hitl:api:decide | src/index.ts:25051 |
price.changed | drain | drain:push_type=bulk_cost_basis | src/index.ts:25220 |
email.parsed | email | email:mailbox=<mailbox> | src/email.ts:547 |
The line at src/index.ts:14479 is the POST /api/events/record generic-producer endpoint — admin-write path for manual event injection (also used in self-test).
lock:events_drain:<drainerId> with 300s TTL. Existing lock → return { skipped_due_to_lock: true }. Cleaned in finally block.SELECT last_event_id FROM event_drain_state WHERE drainer_id=?1. Empty cursor → '' (first drain reads everything).SELECT * FROM events WHERE event_id > ?1 ORDER BY event_id ASC LIMIT ?2. ULID lexicographic order = time order.SELECT * FROM event_subscriptions WHERE enabled=1; each subscription's glob is precompiled once into a regex matcher (R560 perf fix — was per-cell).applyInputMapper(payload, sub.input_mapper, eventRow) translates the event into workflow inputs (JSONPath-like recipe: $.payload.x, $.entity_id, literal:foo).executeWorkflowContract(env, sub.workflow_type, inputs, { invoked_by: 'event:<type>:<id>' }).triggered[], mark lastSuccessfulEventId = ev.event_id, continue.workflow.dispatch_failed event, set haltedOnEventId, break eventLoop (drain stops at last success).UPSERT event_drain_state SET last_event_id = lastSuccessfulEventId, increment events_processed_total.Producers that may retry (queue consumers, sync re-runs) pass a stable idempotency_key like ${event_type}:${entity_id}:${trandate}. INSERT OR IGNORE against the partial UNIQUE index collapses duplicates to one row.
// migration 127
ALTER TABLE events ADD COLUMN idempotency_key TEXT;
CREATE UNIQUE INDEX IF NOT EXISTS idx_events_idempotency
ON events(idempotency_key) WHERE idempotency_key IS NOT NULL;
// recordEvent (src/lib/events.ts)
const result = await env.DB.prepare(
`INSERT OR IGNORE INTO events (event_id, ..., idempotency_key)
VALUES (?1, ..., ?11)`
).bind(...).run();
if (result?.meta?.changes === 0) {
// Collapsed — look up existing row's event_id for caller's reference.
const existing = await env.DB.prepare(
`SELECT event_id FROM events WHERE idempotency_key = ?1 LIMIT 1`
).bind(ev.idempotency_key).first();
return existing?.event_id || null;
}
return id;
| method | path | purpose | src |
|---|---|---|---|
| GET | /api/events/recent | Read recent events (filterable); read by timeline.html | src/index.ts:14509 |
| POST | /api/events/record | Admin manual event injection (also used for self-test) | src/index.ts:14471 |
| POST | /api/events/drain | Trigger drain for a drainer (default workflow_runner). Also fired by cron. | src/index.ts:14496 |
src/index.ts @ 25051, 25103, 25220, 31302, 31313; src/email.ts:547