Wiki · Substrate piece

Workflow runner

One function (executeWorkflowContract) that every contract on the platform runs through. Seven stages, four entrypoints, 22 contracts wired today, 94 fan-out targets. R552 shipped it; R560 hardened it; R564/R571/R576 polished the runtime.

Real · R552/R560 substrate
What this is

The single execution surface for every workflow contract

The workflow runner is executeWorkflowContract in src/lib/workflow_runner.ts. It's the one function every workflow on the platform passes through — bid_price_update, vendor_cost_update, ar_aging_action_plan, all 22 of them. Reads a row from workflow_definitions, walks 7 stages, writes audit rows to workflow_step_log, workflow_run_log, and (conditionally) reflexion_log, proposed_actions, and workflow_verify_results.

It shipped in R552 as the Tier 1 "every role becomes a workflow-driven agent" move. R560 was the hardening pass — closing five bug classes the Codex audit found, including the silent-swallow on stage_proposed_action INSERT failures (the Cal-Maine-class bug: workflow reports "ok" with zero NS writes). R564 documented the workflow.completed event intent (not yet wired). R571 added Play-animation runtime hooks. R576 tuned the active-state dwell timer for the UI.

Diagram: substrate-workflow-runner.html. The runner is 615 LOC; it's small because the heavy lifting is in the contracts themselves.

When it fires

Four entrypoints

The seven stages

What every workflow goes through

  1. 01

    loadContract(workflowType)

    Single SELECT against workflow_definitions WHERE workflow_type=?1 AND enabled=1. Returns the contract row: precondition rules, fan-out targets, verify checks, risk level, reflexion flag. Null result → status='failed' with workflow_type_not_found.

    Reads workflow_definitions
  2. 02

    loadContext() — N parallel SELECTs

    For each query in context_to_load_json: resolve binds (R560 fix: ordered positional bind to ? placeholders, not string-substitution), optionally skip if q.if = "X present" evaluates false. Errors no longer silently swallowed — they push to contextErrors[] which surfaces in result.errors.

    Reads N parallel SELECTs
    Writes none
  3. 03

    checkPreconditions()

    Walks preconditions_json and evaluates each rule. Grammar: "X present", "X is null", "X >= N" (also < <= = != >). severity:'block' + (fail OR unevaluable) → blocked=true, returns status='aborted'. severity:'warn' → push to warnings, continue.

    Writes none (early return possible)
  4. 04

    stageHitlProposal() — if risk_level ≥ 3

    If risk_level ≥ 3 AND !opts.hitl_approved: INSERT proposed_actions with action_type='workflow_<type>', entity_type='workflow_run', entity_ref=run_id, status='pending', proposed_by='workflow_runner'. Return status='pending_hitl'. Stages 5-7 wait for approval. The decide endpoint re-invokes the runner with opts.hitl_approved=true.

    Writes proposed_actions
    Gate risk_level ≥ 3
  5. 05

    executeFanOut() — per target

    For each target in fan_out_targets_json: evaluate t.if → skip / continue. Dispatch by kind. REAL kinds: kv_invalidate, stage_proposed_action (R560: throws on INSERT failure). STUB kinds (R560 marks status='stub', NOT counted as executed): d1_write, ns_push, http_call, chat_tool, hitl_email_draft, flag, workflow_class_invoke, loop_*, dispatch_workflow. Per-step rows INSERTed into workflow_step_log on entry & exit. on_failure='abort' → break.

    Writes workflow_step_log (2/step), conditional
    R560 stubs ↔ executed=0
  6. 06

    scheduleVerifyChecks()

    Per check in verify_checks_json: INSERT workflow_verify_results with status='pending', expected_json=<window+expected+sql_check>, notes='scheduled by runner'. The verify scheduler at cron 45 5 * * * picks these up after the configured window and runs the SQL.

    Writes workflow_verify_results
  7. 07

    executePostActions()

    Always: INSERT workflow_run_log (run_id, status, duration_ms, errors_count, output_json). If reflexion_enabled=1: INSERT reflexion_log with entity_type='workflow_run', tags='workflow:<type>,status:<status>'. Return status: completed | partial | failed | aborted | pending_hitl. Note: workflow.completed event documented (R564) but NOT yet wired.

    Writes workflow_run_log, optionally reflexion_log
Worked example

Mike approves Driscoll's $0.06 bump on B5875

Mike taps Approve on a bid_price_update proposed_action for SKU 10472, $1.42 → $1.48 on bid B5875. The runner fires.

Stage 1 loads the bid_price_update contract from workflow_definitions (~2ms). Stage 2 loads context: the bid row, the customer row, the item row, the current price (~80ms, 4 parallel SELECTs). Stage 3 checks preconditions: "bid_id present" — pass; "item_id present" — pass; "new_price > 0" — pass. Stage 4 sees risk_level=3 but Mike's approval came in as opts.hitl_approved=true — skip. Stage 5 fans out 7 targets: kv_invalidate (real, deletes HUB_CACHE keys) + 6 STUB pushes (ns_push, d1_write, etc. — marked status='stub', NOT counted). 14 rows in workflow_step_log. Stage 6 schedules 2 verify checks ("NS price matches after 5 min", "hub page reflects new price after 60s"). Stage 7 writes workflow_run_log with status='completed', executed=1, stubbed=6 and a reflexion_log entry.

Mike sees: ok with executed=1. He knows the 6 stubs mean the actual NS push runs through the legacy NS_PUSH_QUEUE path, not the runner's stub dispatcher. That's the migration runway — gradually wire each stub to its real implementation without changing contract definitions.

Outcomes

What the substrate enables

Contracts
22
on the runner today
Fan-out targets
94
20 real, 74 stub
HITL-gated
18
risk ≥ 3
Code size
615 LOC
single function
R560 hardening

The five bug classes closed

Bug class (pre-R560)Fix
loadContext silently swallowed query errorserrors push to contextErrors[]result.errors
checkPreconditions treated unevaluable as 'pass'block-severity unevaluable now blocks; warn downgrades with marker
stage_proposed_action INSERT used .catch(()=>null) — HITL bypasstry/catch → explicit throw; outer catch marks step 'failed'
Stub kinds returned ok & incremented executedmarked status='stub', NOT counted — surfaces wiring gaps
Concurrent drainer firings could double-invoke runnerKV concurrency lock per drainer (TTL 300s)
Failure modes

What can go wrong

Contract row missing

loadContract returns null → status='failed' with workflow_type_not_found. Detection: ops console "failed" filter. Recovery: migration to add the contract row; re-trigger.

Stub fanout reports completed with executed=0

This is by design (R560). It tells Mike a contract is wired in shape but not in implementation. Detection: workflow_run_log.steps_executed=0 AND steps_total>0. Recovery: wire the stub kinds for that contract's fan-out targets.

workflow.completed event not fired (documented intent only)

R564 documented the intent; not yet wired in stage 7. Downstream consumers waiting for workflow.completed currently poll workflow_run_log instead. Tracked in punch list.

HITL pending past expectation window

Stage 4 returns pending_hitl; stages 5-7 never fire until decide. Detection: workflow_run_log rows in pending_hitl > N hours. Recovery: ping Mike via recap.

Related

Adjacent substrate

For developers

Code paths + invariants

ConcernWhere
Runnersrc/lib/workflow_runner.ts (615 LOC)
Schemamigrations/schema/117_workflow_definitions_v2.sql, 118_workflow_contracts_v2.sql
run_logmigrations/schema/121_workflow_run_log.sql
reflexion_logmigrations/schema/122_reflexion_log.sql
HTTP entrysrc/index.ts:14304 — POST /api/workflow/execute
Cron entrysrc/index.ts:33255 — 45 5 * * * verify scheduler
Event drainersrc/lib/events.ts drainEvents()
R552 commitsubstrate ship: workflow runner + 7-stage execution
R560 hardening5 bug classes closed, stub-vs-real surface
R564 event intentworkflow.completed (documented, not wired)
R571 Play animationruntime hooks for diagram play-through
R576 dwell timeractive-state dwell tuning in UI
// 7-stage outline (executeWorkflowContract) const contract = await loadContract(workflowType); // 1 const context = await loadContext(contract, input); // 2 const pre = checkPreconditions(contract, context); // 3 if (pre.blocked) return { status: 'aborted', ... }; if (contract.risk_level >= 3 && !opts.hitl_approved) { // 4 await stageHitlProposal(contract, run_id); return { status: 'pending_hitl' }; } const fan = await executeFanOut(contract, context); // 5 await scheduleVerifyChecks(contract, run_id); // 6 await executePostActions(contract, run_id, fan); // 7