Workflow Runner — executeWorkflowContract

Substrate diagram · R549/R553 ship, R560 hardening · src/lib/workflow_runner.ts (615 LOC)

The single function every workflow contract runs through. Reads a row from workflow_definitions, walks 7 stages, writes audit rows to workflow_step_log + workflow_run_log + (optionally) reflexion_log + (when risk ≥ 3) proposed_actions + (per verify) workflow_verify_results. Caller decides what triggered the run; the runner decides what happens next.

REAL implementation STUB / not implemented HITL gated 22 contracts

Entrypoints — 4 ways a contract gets executed

1. HTTP API REAL

POST /api/workflow/execute   ?preview=true

Direct invocation. preview=true sets dry_run=true in opts — runs through all stages but skips writes inside executeFanOut (each step traces with status:'dry_run'). Used by ops console + manual triggers. src/index.ts:14304.

2. Event-driven REAL

drainEvents() → subscription match → executeWorkflowContract

Cron-fired drainer reads events table since last cursor, matches against event_subscriptions, applies input_mapper JSONPath translation, then invokes the runner with invoked_by: "event:<type>:<id>". KV concurrency lock prevents double-fire. See substrate-event-ledger.

3. Chat-tool invocation STUB

execute_workflow chat tool (admin-gated)

Tool registered in palette; calls into the same executeWorkflowContract entry. Most chat surfaces stage a workflow_* proposed_action first and run on approval (HITL invariant per ADR-031).

4. Cron-driven REAL

scheduled handler → verify scheduler + cron-triggered drains

Scheduled workflows (e.g. monthly_margin_review) fire from the cron handler. Also: the verify-check scheduler at 45 5 * * * reads pending workflow_verify_results rows and runs the verify SQL. src/index.ts:33255.

The 7 stages — vertical flowchart

Stage with real side-effects
Conditional / HITL gated
Stub or planned
1. loadContract(workflowType) SELECT * FROM workflow_definitions WHERE workflow_type=?1 AND enabled=1 returns null → status='failed' (workflow_type not found) 2. loadContext() — N parallel SELECTs For each q in context_to_load_json: resolve q.binds (R560 fix) → ordered positional bind to ? placeholders q.if = "X present" → skip if input X is null/empty errors no longer swallowed: contextErrors[] surfaces in result.errors writes: none · reads: many 3. checkPreconditions() evaluateRule grammar: "X present" | "X is null" | "X >= N" (and < <= = != >) severity:'block' + (fail OR unevaluable) → blocked=true severity:'warn' → push to warnings[], continue blocked → return status='aborted' (no fan-out, no HITL) 4. stageHitlProposal() — if risk_level ≥ 3 AND !opts.hitl_approved INSERT proposed_actions (action_type='workflow_<type>', entity_type='workflow_run', entity_ref=run_id, status='pending', proposed_by='workflow_runner') RETURNING action_id return status='pending_hitl' — stages 5-7 wait for approval 5. executeFanOut() — per target in fan_out_targets_json t.if (evaluateIf) → skip / continue REAL kinds: kv_invalidate, stage_proposed_action (R560: throws on INSERT fail) STUB kinds (R560: marked status='stub', NOT counted as executed): d1_write, ns_push, http_call, chat_tool, hitl_email_draft, flag, workflow_class_invoke, loop_over_*, dispatch_workflow per-step row INSERTed into workflow_step_log on entry & exit on_failure='abort' → break (subsequent steps marked unreached) 6. scheduleVerifyChecks() per check in verify_checks_json: INSERT workflow_verify_results (status='pending', expected_json=<window+expected+sql_check>, notes='scheduled by runner') verify scheduler (cron 45 5 * * *) picks up after configured window 7. executePostActions() ALWAYS: INSERT workflow_run_log (run_id, status, duration_ms, errors_count, ...) if reflexion_enabled=1: INSERT reflexion_log ('workflow_run', run_id, observation, tags) return status: completed | partial | failed | aborted | pending_hitl NOTE: no event.recordEvent fired here today (see "factual inconsistencies" note)

Fan-out target kinds — the kind-dispatcher inside executeFanOut

kindstatuswhat it does todayside effects
kv_invalidateREALiterates t.keys[], calls env.CACHE.delete(k)KV deletes
stage_proposed_actionREALINSERT proposed_actions with status='pending'. R560 fix: INSERT failure now throws (was silent .catch)proposed_actions row
d1_writeSTUBechoes {kind, note:'stub_not_implemented'}; marks step status='stub'none
ns_pushSTUBsame as d1_write — the Cal-Maine-class bug R560 closed (was reporting ok with zero NS writes)none
http_callSTUBstubnone
chat_toolSTUBstubnone
hitl_email_draftSTUBstubnone
flagSTUBstubnone
workflow_class_invokeSTUBstub (would invoke CF Workflows class, e.g. annual_roll_workflow)none
loop_over_recipientsSTUBstubnone
loop_over_agingSTUBstubnone
dispatch_workflowSTUBstub (would recursively invoke executeWorkflowContract)none
<unknown>STUBstepStatus='failed', output.note='unrecognized_step_kind'workflow_step_log row with status='failed'

R560 invariant: stubs no longer count toward steps_executed. Pre-R560, an all-stub workflow returned ok with executed=N; post-R560 it returns ok with executed=0 and the step trace shows status='stub'. This is how Mike spots which contracts still need wiring.

Side effects per stage — every storage touched

stageD1 writesKV touchesevents firederror class on failure
1. loadContractnone (1 SELECT)nonenoneworkflow_type_not_found → status='failed'
2. loadContextnone (N parallel SELECTs)nonenonecontext.<name>:<msg> pushed to result.errors; non-fatal
3. checkPreconditionsnonenonenoneblock-severity fail or unevaluable → status='aborted'
4. stageHitlProposalINSERT proposed_actionsnonenone directly (HITL endpoint fires hitl.approved later)action_id null → status='pending_hitl' with hitl_proposed_action_id=undefined
5. executeFanOut (per step)INSERT workflow_step_log (entry + exit); + per-kind: INSERT proposed_actions (stage_proposed_action only) or D1 writes (none yet for stubs)DELETE per kv_invalidate.keys[]noneper-step error string in trace[]; on_failure='abort' breaks loop
6. scheduleVerifyChecksINSERT workflow_verify_results (status='pending', one per verify check)nonenonesilently caught (run still completes); ids stripped from verify_check_ids
7. executePostActionsINSERT workflow_run_log; if reflexion_enabled: INSERT reflexion_lognonenone today (see inconsistencies note)silently caught; run already complete

R560 error containment — the hardening pass

bug class (pre-R560)fix (R560)visible to operator how
loadContext silently swallowed query errors (.catch(()=>null))errors push to contextErrors[]result.errorsoperator sees context.<name>: <msg> in result body
checkPreconditions evaluated unevaluable rules as 'pass' (advisory)block-severity unevaluable now blocks; warn-severity downgrades with markeraborted runs surface the offending rule with "(unevaluable, downgraded to warning)" suffix
stage_proposed_action INSERT used .catch(()=>null) — HITL bypass if INSERT failedtry/catch → explicit throw; outer catch marks step 'failed'Mike sees missing approval card; step_log row with status='failed' + error message
stub kinds (ns_push, d1_write, ...) returned ok and incremented executedmarked status='stub', NOT counted as executedcompleted runs with executed=0 mean the contract still needs wiring
(event-driven) concurrent drainer firings could double-invoke runnerKV concurrency lock per drainer (TTL 300s); 2nd firing returns skipped_due_to_locksee substrate-event-ledger

What changes when a workflow runs end-to-end

Aggregate of every persisted row + side-effect a successful run can produce. Always rows happen on every non-aborted run; conditional rows happen per contract config.

storagerow(s)when
workflow_run_log1 row (UPSERT on run_id) — status, duration_ms, errors_count, output_jsonalways (stage 7)
workflow_step_log2 rows per fan-out target (entry status='running' + exit status='ok'|'stub'|'failed'|'skipped'|'dry_run')always (stage 5, per step)
workflow_verify_resultsN rows, one per verify_checks_json entry; status='pending'per contract (stage 6)
proposed_actions1 row from HITL gate (risk ≥ 3); + 1 row per stage_proposed_action fan-out targetconditional (stages 4 + 5)
reflexion_log1 row, entity_type='workflow_run', tags='workflow:<type>,status:<status>'if reflexion_enabled=1 (stage 7)
events0 rows from runner today; workflow.completed is documented intent but NOT wired (see inconsistencies)n/a today
KV env.CACHEper kv_invalidate step: N keys deletedconditional (stage 5)

22 contracts that run on this substrate

94 total fan-out targets across 22 contracts. 20 REAL, 74 STUB. 18 are HITL-gated (risk ≥ 3). Per-workflow detail: see index → Workflow contracts (v2).

categoryworkflowsdoc
AR & financear_aging_action_plan, service_hold_triggerar-aging-action-plan, service-hold-trigger
Anomaly & guardrailanomaly_remediationanomaly-remediation
Bid & specbid_amendment_arrives, bid_award_notification, bid_price_update, spec_deviation_flagged, usda_drawdown_commitbid-amendment-arrives, bid-award-notification, bid-price-update, spec-deviation-flagged, usda-drawdown-commit
Comms & triagebatch_notify, inbound_email_triagebatch-notify, inbound-email-triage
Customer opscustomer_invoice_dispute, customer_quote, draft_quote, new_customercustomer-invoice-dispute, customer-quote, draft-quote, new-customer
Item & vendornew_assembly_item, new_ingredient_vendor_onboard, vendor_cost_updatenew-assembly-item, new-ingredient-vendor-onboard, vendor-cost-update
Otherassembly_build_review, item_record_reviewassembly-build-review, item-record-review
Scheduled cycleannual_price_roll, monthly_margin_review, quarterly_bid_eligibility_refreshannual-price-roll, monthly-margin-review, quarterly-bid-eligibility-refresh

Source files