From b6eaf6de25f6fd7b1c37c33de66e5142be17f5f0 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 14:35:45 -0600 Subject: [PATCH 01/10] =?UTF-8?q?feat(supervise):=20bidirectional=20bus=20?= =?UTF-8?q?=E2=80=94=20down-leg=20(steer/answer/resume)=20+=20resume=5Fwor?= =?UTF-8?q?ker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Close the bus to 100% bidirectional. The parent→child down-leg routes to the child inbox (scope.send→deliver) AND records a queue:false event on the same bus: it lands in history() + reaches subscribers for the audit trail, but is never pulled back by the parent. New: resume_worker (continue a parked worker — the protocol had {resume} but no verb); answer_question now routes the answer DOWN to the asking worker, unparking it. EventBus gains PublishOptions.queue for record-only events. down-leg + bidirectional history tests; full suite 1000 pass; typecheck/build/lint clean. --- CLAUDE.md | 2 +- src/mcp/tools/coordination.ts | 85 ++++++++++++++++++++++++------ src/runtime/supervise/event-bus.ts | 7 ++- tests/loops/coordination.test.ts | 42 ++++++++++++++- 4 files changed, 117 insertions(+), 19 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index f3bba77a..8b416885 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -62,7 +62,7 @@ Types that stay in THIS repo because they're runtime-shaped (coupled to a runnin - `run-loop.ts` — `runLoop`, the round-synchronous leaf kernel. Per round: `driver.plan()`→N tasks→one sandbox/iteration (bounded by `maxConcurrency`, round-robin `agentRuns`)→`streamPrompt`→`output.parse`→`validator.validate`→`driver.decide`. Owns iteration accounting, concurrency, abort, cost+token aggregation, trace emission, box teardown. Exports `defaultSelectWinner` (best-valid-score, ties→earliest) — the single-sourced selection the personify combinators reuse. - `supervise/` — the recursive execution atom (keystone): `Scope` + `Supervisor` over the open `Executor` port, spawn/settle on a **conserved budget pool** so equal-compute holds by construction; journal→replay/resume. `runtime.ts` also holds `createExecutor({backend})` — the ONE built-in executor (backend-as-data: `router`/`router-tools`/`bridge`/`cli`/`sandbox`; `router-tools` is the off-box tool-using agentic loop — chat→tool_calls→`executeToolCall`→repeat — over the router's tool-calling, no sandbox); the per-backend bodies are internal case-arms, BYO agents implement `Executor` directly. - `personify/` — the content-free generic combinators (`fanout`/`loopUntil`/`widen`/`panel`/`verify`/`pipeline`) + `definePersona`/`runPersonified` + the cross-run `Corpus` + `createScopeAnalyst` (the analyst-on-scope steer firewall). -- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`, immediate) AND queued for the driver to pull (`await_event`, kind-filterable; `await_next` is the settled-only view). The pull queue is **priority-ordered** — a blocking question (urgency→priority: `blocks-run`=20/`blocks-step`=10) is bumped ahead of queued settles/findings; ties FIFO by `seq`. Observability is first-class: every event is stamped (`seq`/`at`/`priority`), the full `history()` is an audit/replay trail, `stats()` counts throughput (both surfaced on `CoordinationTools` and the MCP handle). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend. +- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`, immediate) AND queued for the driver to pull (`await_event`, kind-filterable; `await_next` is the settled-only view). The pull queue is **priority-ordered** — a blocking question (urgency→priority: `blocks-run`=20/`blocks-step`=10) is bumped ahead of queued settles/findings; ties FIFO by `seq`. The bus is **bidirectional**: UP (settled/question/finding) is queued+pullable; DOWN (`steer_worker`/`answer_question`/`resume_worker`) routes to the child inbox via `scope.send`→`deliver` AND records a `queue:false` event (history + subscribers, never pulled back). Observability is first-class: every event both ways is stamped (`seq`/`at`/`priority`), the full `history()` is an audit/replay trail, `stats()` counts throughput (both surfaced on `CoordinationTools` and the MCP handle). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend. Two substrates coexist for the same "recursive agent decision" atom: the round-synchronous `runLoop` kernel (the leaf, what most sandbox benches drive today) and the reactive `Scope`/`Supervisor`+combinators (the canonical core — the agent-driver, `runAgentic`/`defineStrategy`/`runPersonified`). Prefer the latter for new recursive/keystone work. Both run over the one `Executor` port. diff --git a/src/mcp/tools/coordination.ts b/src/mcp/tools/coordination.ts index 41239fbd..726449ee 100644 --- a/src/mcp/tools/coordination.ts +++ b/src/mcp/tools/coordination.ts @@ -71,12 +71,24 @@ export interface AnalystFindingEvent { readonly findings: unknown } -/** Every message a worker/sub-driver/analyst sends up to the driver — the one typed pipe. New kinds - * are additive: a `{type:'question'}` consumer keeps matching. */ +/** A parent→child message (the down-leg): recorded for observability, delivered via the child inbox, + * never pulled back by the parent. `delivered` mirrors whether the live child accepted it. */ +export interface DownMessageEvent { + readonly toWorker: string + readonly instruction: string + readonly delivered: boolean +} + +/** Every message on the one typed pipe. UP (child→parent): question / settled / finding — queued for + * the driver to `pull`. DOWN (parent→child): steer / answer / resume — record-only (history + + * subscribers), routed to the child inbox. New kinds are additive. */ export type CoordinationEvent = | { readonly type: 'question'; readonly question: QuestionRecord } | { readonly type: 'settled'; readonly worker: SettledWorker } | { readonly type: 'finding'; readonly finding: AnalystFindingEvent } + | { readonly type: 'steer'; readonly down: DownMessageEvent } + | { readonly type: 'answer'; readonly down: DownMessageEvent; readonly questionId: string } + | { readonly type: 'resume'; readonly down: DownMessageEvent } export type MakeWorkerAgent = (profile: unknown) => SuperviseAgent @@ -189,6 +201,12 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin return true } + // The down-leg: record a parent→child message on the bus for the audit trail (history + + // subscribers) WITHOUT enqueuing it — the parent must never pull its own outbound message back. + const sendDown = async (type: 'steer' | 'resume', down: DownMessageEvent): Promise => { + await bus.publish({ type, down }, { queue: false }) + } + // Consumer projection: the wire shape the driver sees for a pulled bus event. const projectEvent = (ev: CoordinationEvent): Record => { if (ev.type === 'settled') { @@ -205,7 +223,10 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin : { type: 'settled', settled: w.id, status: 'down', reason: w.reason } } if (ev.type === 'question') return { type: 'question', question: ev.question } - return { type: 'finding', ...ev.finding } + if (ev.type === 'finding') return { type: 'finding', ...ev.finding } + // Down-leg kinds are record-only (never queued), so the driver never pulls them; project + // defensively for completeness. + return { type: ev.type, ...ev.down } } const nextQuestionId = (from: string): string => `${from}:q${questionSeq++}` @@ -330,7 +351,7 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin }, { name: 'steer_worker', - description: 'Deliver an out-of-band instruction to a running worker inbox.', + description: 'Deliver an out-of-band instruction to a running worker inbox (parent→child).', inputSchema: { type: 'object', properties: { @@ -339,12 +360,34 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin }, required: ['workerId', 'instruction'], }, - handler: (raw) => { + handler: async (raw) => { const a = obj(raw) - const delivered = opts.scope.send(str(a.workerId, 'workerId'), { - steer: str(a.instruction, 'instruction'), - }) - return Promise.resolve({ delivered }) + const workerId = str(a.workerId, 'workerId') + const instruction = str(a.instruction, 'instruction') + const delivered = opts.scope.send(workerId, { steer: instruction }) + await sendDown('steer', { toWorker: workerId, instruction, delivered }) + return { delivered } + }, + }, + { + name: 'resume_worker', + description: + 'Resume a parked/idle worker with a follow-up message, continuing its session (parent→child).', + inputSchema: { + type: 'object', + properties: { + workerId: idArg, + message: { type: 'string', description: 'The follow-up to continue the worker with.' }, + }, + required: ['workerId', 'message'], + }, + handler: async (raw) => { + const a = obj(raw) + const workerId = str(a.workerId, 'workerId') + const message = str(a.message, 'message') + const delivered = opts.scope.send(workerId, { resume: message }) + await sendDown('resume', { toWorker: workerId, instruction: message, delivered }) + return { delivered } }, }, { @@ -422,17 +465,27 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin }, required: ['questionId'], }, - handler: (raw) => { + handler: async (raw) => { const a = obj(raw) const questionId = str(a.questionId, 'questionId') if (typeof a.answer === 'string' && a.answer.length > 0) { - return Promise.resolve({ - question: decideQuestion(questionId, { - kind: 'answer', - answer: a.answer, - by: typeof a.by === 'string' && a.by.length > 0 ? a.by : 'user', - }), + const answer = a.answer + const question = decideQuestion(questionId, { + kind: 'answer', + answer, + by: typeof a.by === 'string' && a.by.length > 0 ? a.by : 'user', }) + // Route the answer DOWN to the worker that asked, unparking it, and record the down-leg. + const delivered = opts.scope.send(question.from, { answer, questionId }) + await bus.publish( + { + type: 'answer', + questionId, + down: { toWorker: question.from, instruction: answer, delivered }, + }, + { queue: false }, + ) + return { question } } if (typeof a.deferReason === 'string' && a.deferReason.length > 0) { return Promise.resolve({ diff --git a/src/runtime/supervise/event-bus.ts b/src/runtime/supervise/event-bus.ts index c5d06702..d5ac392c 100644 --- a/src/runtime/supervise/event-bus.ts +++ b/src/runtime/supervise/event-bus.ts @@ -40,6 +40,10 @@ export interface PublishOptions { /** Higher = pulled ahead of lower-priority queued events (default 0). A blocking question sets * this so it bumps to the front of the driver's inbox. */ readonly priority?: number + /** Whether the event enters the pull queue (default true). Set `false` for record-only events — + * the parent→child down-leg (steer / answer / resume): they belong in `history()` and reach + * `subscribe` observers, but the parent must never `pull` its own outbound message back. */ + readonly queue?: boolean } export interface BusStats { @@ -99,7 +103,8 @@ export function createEventBus(now: () => number = Date.now) return { async publish(event, opts) { const record: BusRecord = { seq: seq++, at: now(), priority: opts?.priority ?? 0, event } - queue.push(record) + // Record-only events (the down-leg) skip the pull queue but still hit the log + subscribers. + if (opts?.queue !== false) queue.push(record) log.push(record) byKind[event.type] = (byKind[event.type] ?? 0) + 1 // Sequential, not Promise.all: a subscriber that steers off this event must observe a diff --git a/tests/loops/coordination.test.ts b/tests/loops/coordination.test.ts index 96f0cea6..530792c4 100644 --- a/tests/loops/coordination.test.ts +++ b/tests/loops/coordination.test.ts @@ -188,7 +188,15 @@ describe('coordination tools', () => { stopped: true, }) expect(tb.questions()[0]).toMatchObject({ status: 'answered' }) - expect(emitted).toEqual([{ type: 'question', question: expect.objectContaining(r.question) }]) + // The pass-through trail records BOTH legs: the question up, then the answer routed down. + expect(emitted).toEqual([ + { type: 'question', question: expect.objectContaining(r.question) }, + { + type: 'answer', + questionId: r.question.id, + down: { toWorker: 'driver-1', instruction: 'Target v2.', delivered: false }, + }, + ]) }) it('list_analysts surfaces the menu and run_analyst applies a lens to a settled worker', async () => { @@ -264,6 +272,38 @@ describe('coordination tools', () => { expect(tb.stats()).toMatchObject({ published: 2, pulled: 2, byKind: { question: 2 } }) }) + it('steer_worker and resume_worker route down + record in history but are never pulled back', async () => { + const { scope, sent } = mockScope() + const emitted: Array<{ type: string }> = [] + const tb = createCoordinationTools({ + scope, + blobs, + makeWorkerAgent, + perWorker: { maxIterations: 1, maxTokens: 10 }, + onEvent: (e) => emitted.push(e), + }) + expect(await tool(tb, 'steer_worker').handler({ workerId: 'w0', instruction: 'do X' })).toEqual( + { + delivered: true, + }, + ) + expect( + await tool(tb, 'resume_worker').handler({ workerId: 'w0', message: 'continue' }), + ).toEqual({ + delivered: true, + }) + // Both reached the child inbox (down delivery)... + expect(sent).toEqual([ + { id: 'w0', msg: { steer: 'do X' } }, + { id: 'w0', msg: { resume: 'continue' } }, + ]) + // ...and were recorded for observability (pass-through + history)... + expect(emitted.map((e) => e.type)).toEqual(['steer', 'resume']) + expect(tb.history().map((r) => r.event.type)).toEqual(['steer', 'resume']) + // ...but the parent never pulls its own outbound messages back. + expect(await tool(tb, 'await_event').handler({})).toEqual({ idle: true }) + }) + it('analyze-on-settle auto-runs lenses and await_event surfaces settled + finding', async () => { const { scope } = mockScope() const settlements = [ From 8b509e56364603107df086a986ff16993fddf0ba Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 15:41:44 -0600 Subject: [PATCH 02/10] fix(supervise): answer_question returns delivered; close down-leg review gaps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address PR #318 review: - BLOCKING: answer_question computed `delivered` but returned only { question } — now returns { question, delivered }, consistent with steer_worker/resume_worker (no longer hides whether the answer reached a live worker). - tests: answer routed down to a LIVE worker (delivered:true happy path); resume_worker delivered:false path; a focused event-bus queue:false unit test (history+subscribers see it, pull queue never does). - resume_worker added to OPERATOR_TOOLS + the driver system prompt so the driver is actually prompted to use it. --- bench/src/profiles.ts | 5 ++-- src/mcp/tools/coordination.ts | 4 +++- tests/loops/coordination.test.ts | 40 +++++++++++++++++++++++++++++--- tests/loops/event-bus.test.ts | 18 ++++++++++++++ 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/bench/src/profiles.ts b/bench/src/profiles.ts index 877db868..d0ca4ddf 100644 --- a/bench/src/profiles.ts +++ b/bench/src/profiles.ts @@ -21,7 +21,8 @@ export const OPERATOR_TOOLS = [ 'run_analyst', // run an analyst over a worker's trace → findings (selector≠judge: trace, not score) 'observe_worker', // a worker's in-flight trace, or its last finished episode/shot 'spawn_worker', // start a worker (or a sub-analyst) — drive many; parallelize when independent - 'steer_worker', // send a running/parked worker its next instruction / an interrupt + 'steer_worker', // send a running worker its next instruction / an interrupt (down-leg) + 'resume_worker', // continue a parked/idle worker with a follow-up message (down-leg) 'stop', // declare the task complete (verified) or abandon a line ] as const @@ -95,7 +96,7 @@ export const driverProfile: RoleProfile = { ' analysts are cheap; make them when a worker’s failure mode needs a focused lens.', '- observe_worker(worker): the worker’s IN-FLIGHT trace if it is still running, else its last', ' finished episode/shot.', - '- spawn_worker(profile, task) / steer_worker(worker, instruction) / stop.', + '- spawn_worker(profile, task) / steer_worker(worker, instruction) / resume_worker(worker, message) / stop.', '- the artifact’s own tools (read/edit/run) — use them to inspect the workspace and to contribute', ' decisive work yourself.', '', diff --git a/src/mcp/tools/coordination.ts b/src/mcp/tools/coordination.ts index 726449ee..0b20bd63 100644 --- a/src/mcp/tools/coordination.ts +++ b/src/mcp/tools/coordination.ts @@ -485,7 +485,9 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin }, { queue: false }, ) - return { question } + // Surface `delivered` like steer_worker/resume_worker — the caller must see whether the + // answer actually reached a live worker (false when it parked/settled or has no inbox). + return { question, delivered } } if (typeof a.deferReason === 'string' && a.deferReason.length > 0) { return Promise.resolve({ diff --git a/tests/loops/coordination.test.ts b/tests/loops/coordination.test.ts index 530792c4..747f442f 100644 --- a/tests/loops/coordination.test.ts +++ b/tests/loops/coordination.test.ts @@ -292,18 +292,52 @@ describe('coordination tools', () => { ).toEqual({ delivered: true, }) + // A down-message to a worker with no live inbox reports delivered:false (mirrors steer_worker). + expect(await tool(tb, 'resume_worker').handler({ workerId: 'gone', message: 'x' })).toEqual({ + delivered: false, + }) // Both reached the child inbox (down delivery)... expect(sent).toEqual([ { id: 'w0', msg: { steer: 'do X' } }, { id: 'w0', msg: { resume: 'continue' } }, ]) - // ...and were recorded for observability (pass-through + history)... - expect(emitted.map((e) => e.type)).toEqual(['steer', 'resume']) - expect(tb.history().map((r) => r.event.type)).toEqual(['steer', 'resume']) + // ...and were recorded for observability (pass-through + history; the failed resume too)... + expect(emitted.map((e) => e.type)).toEqual(['steer', 'resume', 'resume']) + expect(tb.history().map((r) => r.event.type)).toEqual(['steer', 'resume', 'resume']) // ...but the parent never pulls its own outbound messages back. expect(await tool(tb, 'await_event').handler({})).toEqual({ idle: true }) }) + it('answer_question routes the answer down to a LIVE worker and surfaces delivered:true', async () => { + const { scope, sent } = mockScope() + const emitted: Array<{ type: string }> = [] + const tb = createCoordinationTools({ + scope, + blobs, + makeWorkerAgent, + perWorker: { maxIterations: 1, maxTokens: 10 }, + onEvent: (e) => emitted.push(e), + }) + // The question originates from the live worker w0, so the answer routes back to its inbox. + const r = (await tool(tb, 'ask_parent').handler({ + from: 'w0', + level: 'worker', + question: 'which path?', + reason: 'ambiguous', + urgency: 'blocks-step', + })) as { question: { id: string } } + expect( + await tool(tb, 'answer_question').handler({ questionId: r.question.id, answer: 'path B' }), + ).toEqual({ + question: expect.objectContaining({ id: r.question.id, status: 'answered' }), + delivered: true, + }) + // The answer reached w0's inbox shaped { answer, questionId }... + expect(sent).toEqual([{ id: 'w0', msg: { answer: 'path B', questionId: r.question.id } }]) + // ...and both legs are on the trail: question up, answer down. + expect(emitted.map((e) => e.type)).toEqual(['question', 'answer']) + }) + it('analyze-on-settle auto-runs lenses and await_event surfaces settled + finding', async () => { const { scope } = mockScope() const settlements = [ diff --git a/tests/loops/event-bus.test.ts b/tests/loops/event-bus.test.ts index b8b2226a..85d081e8 100644 --- a/tests/loops/event-bus.test.ts +++ b/tests/loops/event-bus.test.ts @@ -82,6 +82,24 @@ describe('event bus', () => { }) }) + it('queue:false records to history + subscribers but never enters the pull queue', async () => { + const bus = createEventBus() + const seen: string[] = [] + bus.subscribe((r) => { + seen.push(r.event.type) + }) + await bus.publish({ type: 'settled', id: 'w1' }) + await bus.publish({ type: 'question', q: 'down-leg' }, { queue: false }) + // The record-only event reached subscribers + the audit log... + expect(seen).toEqual(['settled', 'question']) + expect(bus.history().map((r) => r.event.type)).toEqual(['settled', 'question']) + expect(bus.stats()).toMatchObject({ published: 2 }) + // ...but is invisible to the pull queue: only the queued settled is pending/pullable. + expect(bus.pending()).toBe(1) + expect(bus.pull()).toEqual({ type: 'settled', id: 'w1' }) + expect(bus.pull()).toBeUndefined() + }) + it('unsubscribe stops delivery', async () => { const bus = createEventBus() const seen: string[] = [] From cea04b56f67c01fb10ce99e2ba4227c0e16dcea4 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 16:45:16 -0600 Subject: [PATCH 03/10] =?UTF-8?q?feat(supervise):=20functional=20down-leg?= =?UTF-8?q?=20=E2=80=94=20workers=20drain=20a=20steerable=20inbox?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make the down-leg actually move a live worker (was observable-only). New createInbox (supervise/inbox.ts) is the receive end an executor exposes as Executor.deliver; the owned tool-loop (routerToolsInlineExecutor) drains it two ways: - QUEUED (default): flush at each step boundary AND before the worker may settle — it can't finish while a steer/answer it never read is pending. - FORCEFUL (steer_worker interrupt:true): aborts the in-flight turn so the worker re-plans immediately, breaking it off a wrong path mid-task. Black-box CLI harnesses can't be interrupted mid-step → down-leg degrades to next spawn. inbox 4 + executor-drains-inbox integration test (flush-before-settle proven end to end through the real executor); full suite 1008 pass; typecheck/build/lint clean. --- CLAUDE.md | 2 +- src/mcp/tools/coordination.ts | 9 ++- src/runtime/index.ts | 3 + src/runtime/supervise/inbox.ts | 85 +++++++++++++++++++++++++ src/runtime/supervise/runtime.ts | 62 +++++++++++++----- tests/loops/coordination.test.ts | 4 +- tests/loops/inbox.test.ts | 104 +++++++++++++++++++++++++++++++ 7 files changed, 248 insertions(+), 21 deletions(-) create mode 100644 src/runtime/supervise/inbox.ts create mode 100644 tests/loops/inbox.test.ts diff --git a/CLAUDE.md b/CLAUDE.md index 8b416885..d773c7a9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -62,7 +62,7 @@ Types that stay in THIS repo because they're runtime-shaped (coupled to a runnin - `run-loop.ts` — `runLoop`, the round-synchronous leaf kernel. Per round: `driver.plan()`→N tasks→one sandbox/iteration (bounded by `maxConcurrency`, round-robin `agentRuns`)→`streamPrompt`→`output.parse`→`validator.validate`→`driver.decide`. Owns iteration accounting, concurrency, abort, cost+token aggregation, trace emission, box teardown. Exports `defaultSelectWinner` (best-valid-score, ties→earliest) — the single-sourced selection the personify combinators reuse. - `supervise/` — the recursive execution atom (keystone): `Scope` + `Supervisor` over the open `Executor` port, spawn/settle on a **conserved budget pool** so equal-compute holds by construction; journal→replay/resume. `runtime.ts` also holds `createExecutor({backend})` — the ONE built-in executor (backend-as-data: `router`/`router-tools`/`bridge`/`cli`/`sandbox`; `router-tools` is the off-box tool-using agentic loop — chat→tool_calls→`executeToolCall`→repeat — over the router's tool-calling, no sandbox); the per-backend bodies are internal case-arms, BYO agents implement `Executor` directly. - `personify/` — the content-free generic combinators (`fanout`/`loopUntil`/`widen`/`panel`/`verify`/`pipeline`) + `definePersona`/`runPersonified` + the cross-run `Corpus` + `createScopeAnalyst` (the analyst-on-scope steer firewall). -- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`, immediate) AND queued for the driver to pull (`await_event`, kind-filterable; `await_next` is the settled-only view). The pull queue is **priority-ordered** — a blocking question (urgency→priority: `blocks-run`=20/`blocks-step`=10) is bumped ahead of queued settles/findings; ties FIFO by `seq`. The bus is **bidirectional**: UP (settled/question/finding) is queued+pullable; DOWN (`steer_worker`/`answer_question`/`resume_worker`) routes to the child inbox via `scope.send`→`deliver` AND records a `queue:false` event (history + subscribers, never pulled back). Observability is first-class: every event both ways is stamped (`seq`/`at`/`priority`), the full `history()` is an audit/replay trail, `stats()` counts throughput (both surfaced on `CoordinationTools` and the MCP handle). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend. +- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`, immediate) AND queued for the driver to pull (`await_event`, kind-filterable; `await_next` is the settled-only view). The pull queue is **priority-ordered** — a blocking question (urgency→priority: `blocks-run`=20/`blocks-step`=10) is bumped ahead of queued settles/findings; ties FIFO by `seq`. The bus is **bidirectional**: UP (settled/question/finding) is queued+pullable; DOWN (`steer_worker`/`answer_question`/`resume_worker`) routes to the child inbox via `scope.send`→`deliver` AND records a `queue:false` event (history + subscribers, never pulled back). The receive end is `createInbox` (`supervise/inbox.ts`), which the owned tool-loop executor (`routerToolsInlineExecutor`) exposes as `Executor.deliver`: QUEUED messages flush at each step boundary AND before the worker may settle (it can't finish with an unread steer); a FORCEFUL `steer_worker({interrupt:true})` aborts the in-flight turn so the worker re-plans immediately. Black-box CLI harnesses can't be interrupted mid-step, so there the down-leg degrades to the next spawn. Observability is first-class: every event both ways is stamped (`seq`/`at`/`priority`), the full `history()` is an audit/replay trail, `stats()` counts throughput (both surfaced on `CoordinationTools` and the MCP handle). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend. Two substrates coexist for the same "recursive agent decision" atom: the round-synchronous `runLoop` kernel (the leaf, what most sandbox benches drive today) and the reactive `Scope`/`Supervisor`+combinators (the canonical core — the agent-driver, `runAgentic`/`defineStrategy`/`runPersonified`). Prefer the latter for new recursive/keystone work. Both run over the one `Executor` port. diff --git a/src/mcp/tools/coordination.ts b/src/mcp/tools/coordination.ts index 0b20bd63..1d89605e 100644 --- a/src/mcp/tools/coordination.ts +++ b/src/mcp/tools/coordination.ts @@ -357,6 +357,12 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin properties: { workerId: idArg, instruction: { type: 'string', description: 'What the worker should do next.' }, + interrupt: { + type: 'boolean', + description: + 'true = forceful: break the worker out of its current step NOW to handle this. ' + + 'false/omitted = queued: it flushes at the next step boundary (and before it may settle).', + }, }, required: ['workerId', 'instruction'], }, @@ -364,7 +370,8 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin const a = obj(raw) const workerId = str(a.workerId, 'workerId') const instruction = str(a.instruction, 'instruction') - const delivered = opts.scope.send(workerId, { steer: instruction }) + const interrupt = a.interrupt === true + const delivered = opts.scope.send(workerId, { steer: instruction, interrupt }) await sendDown('steer', { toWorker: workerId, instruction, delivered }) return { delivered } }, diff --git a/src/runtime/index.ts b/src/runtime/index.ts index 904692c8..12f8b54e 100644 --- a/src/runtime/index.ts +++ b/src/runtime/index.ts @@ -336,6 +336,9 @@ export { type EventBus, type PublishOptions, } from './supervise/event-bus' +// The down-leg receive end: a per-worker inbox an executor exposes as `Executor.deliver`; the loop +// drains it at the step boundary + before settle (queued) or aborts the turn (forceful interrupt). +export { createInbox, type Inbox, type InboxMessage } from './supervise/inbox' // The production `DriverChat`: adapt the router's tool-calling to the seam a // `coordinationDriverAgent` drives. The one turnkey piece a consumer needs to run the driver // brain in-process — tests script a mock `DriverChat`, production passes `routerDriverChat(cfg)`. diff --git a/src/runtime/supervise/inbox.ts b/src/runtime/supervise/inbox.ts new file mode 100644 index 00000000..18193f40 --- /dev/null +++ b/src/runtime/supervise/inbox.ts @@ -0,0 +1,85 @@ +/** + * @experimental + * + * The worker-side receive end of the down-leg: a per-worker inbox an executor exposes as + * `Executor.deliver`. The driver's `steer_worker` / `answer_question` / `resume_worker` land here, + * and the worker's agent loop drains them at two points (Drew's two delivery modes): + * + * - QUEUED (default): the message accumulates and is FLUSHED at the next step boundary — folded + * into the conversation before the next think. A worker is also forced to flush BEFORE it may + * settle, so it can never finish while a steer/answer it never read is still pending. + * - FORCEFUL (`interrupt: true`): trips `freshInterrupt()`'s signal so the loop can abort its + * in-flight turn immediately, then re-plan with the message folded in — breaking the worker out + * of a wrong path mid-task instead of waiting for it to finish the step. + * + * `deliver` never throws — a malformed message is ignored, per the `Executor.deliver` contract. + */ + +export interface InboxMessage { + readonly kind: 'steer' | 'answer' | 'resume' + readonly text: string + /** Forceful messages abort the in-flight turn; queued ones wait for the boundary flush. */ + readonly interrupt: boolean + /** Present for an `answer` — the question id it resolves. */ + readonly questionId?: string +} + +export interface Inbox { + /** The `Executor.deliver` implementation — accept a raw down-message from `Scope.send`. */ + deliver(msg: unknown): void + /** Remove and return all pending messages (the flush). */ + drain(): InboxMessage[] + pending(): number + /** Open a fresh per-turn interrupt signal; a later forceful `deliver` aborts it. The loop links + * this into the signal it passes to its inference call, then re-plans when it fires. */ + freshInterrupt(): AbortSignal + /** Render drained messages as ONE operator turn to fold into the worker's conversation. */ + fold(messages: ReadonlyArray): string +} + +function parseDown(msg: unknown): InboxMessage | undefined { + if (!msg || typeof msg !== 'object') return undefined + const m = msg as Record + const interrupt = m.interrupt === true + if (typeof m.steer === 'string') return { kind: 'steer', text: m.steer, interrupt } + if (typeof m.resume === 'string') return { kind: 'resume', text: m.resume, interrupt } + if (typeof m.answer === 'string') + return { + kind: 'answer', + text: m.answer, + interrupt, + ...(typeof m.questionId === 'string' ? { questionId: m.questionId } : {}), + } + return undefined +} + +export function createInbox(): Inbox { + const pending: InboxMessage[] = [] + let live: AbortController | null = null + return { + deliver(msg) { + const m = parseDown(msg) + if (!m) return + pending.push(m) + // A forceful message aborts the turn currently in flight (if any). + if (m.interrupt && live && !live.signal.aborted) live.abort() + }, + drain() { + return pending.splice(0, pending.length) + }, + pending: () => pending.length, + freshInterrupt() { + live = new AbortController() + return live.signal + }, + fold(messages) { + const lines = messages.map((m) => { + if (m.kind === 'answer') + return `- Answer to your question${m.questionId ? ` (${m.questionId})` : ''}: ${m.text}` + if (m.kind === 'resume') return `- Continue: ${m.text}` + return `- New instruction from your supervisor: ${m.text}` + }) + return `[SUPERVISOR] Out-of-band message(s) — address these before continuing:\n${lines.join('\n')}` + }, + } +} diff --git a/src/runtime/supervise/runtime.ts b/src/runtime/supervise/runtime.ts index b28431b5..7b7dc635 100644 --- a/src/runtime/supervise/runtime.ts +++ b/src/runtime/supervise/runtime.ts @@ -40,6 +40,7 @@ import type { SandboxClient, } from '../types' import { zeroTokenUsage } from '../util' +import { createInbox } from './inbox' import type { AgentSpec, DefaultVerdict, @@ -286,37 +287,59 @@ export const routerToolsInlineExecutor: ExecutorFactory = (spec, ctx) = abortIfSignalled() if (!ctx.signal.aborted) ctx.signal.addEventListener('abort', abortIfSignalled, { once: true }) + // The down-leg receive end: the driver's steer/answer/resume land here via `Scope.send`. + const inbox = createInbox() + let artifact: ExecutorResult | undefined return { runtime: 'router' as Runtime, + deliver: (m) => inbox.deliver(m), async execute(task, signal): Promise> { const started = Date.now() - const linked = linkSignals(signal, controller.signal) const messages: Array> = [ ...(taskToMessages(task, spec) as Array>), ] const tokens = zeroTokenUsage() let turns = 0 let lastText = '' + // Fold any queued down-messages into the conversation as one operator turn (the boundary flush). + const flush = () => { + const pending = inbox.drain() + if (pending.length) messages.push({ role: 'user', content: inbox.fold(pending) }) + return pending.length > 0 + } for (let t = 0; t < maxTurns; t += 1) { turns += 1 - const res = await fetch(`${seam.routerBaseUrl.replace(/\/$/, '')}/chat/completions`, { - method: 'POST', - headers: { - 'content-type': 'application/json', - authorization: `Bearer ${seam.routerKey}`, - }, - body: JSON.stringify({ - model, - messages, - tools: seam.tools, - tool_choice: 'auto', - temperature: 0.2, - }), - ...(linked ? { signal: linked } : {}), - }) + // QUEUED messages flush at the step boundary, before this turn's inference. + flush() + // A forceful (interrupt) message aborts THIS turn so the worker re-plans immediately. + const interruptSig = inbox.freshInterrupt() + const turnSignal = AbortSignal.any([signal, controller.signal, interruptSig]) + let res: Response + try { + res = await fetch(`${seam.routerBaseUrl.replace(/\/$/, '')}/chat/completions`, { + method: 'POST', + headers: { + 'content-type': 'application/json', + authorization: `Bearer ${seam.routerKey}`, + }, + body: JSON.stringify({ + model, + messages, + tools: seam.tools, + tool_choice: 'auto', + temperature: 0.2, + }), + signal: turnSignal, + }) + } catch (e) { + // A forceful inbox message aborted this turn — discard it and re-plan (the next iteration's + // flush folds the message in). An EXTERNAL abort (teardown/budget) is fatal — rethrow. + if (interruptSig.aborted && !signal.aborted && !controller.signal.aborted) continue + throw e + } if (!res.ok) { throw new ValidationError( `routerToolsInlineExecutor: router ${res.status}: ${(await res.text()).slice(0, 200)}`, @@ -331,7 +354,12 @@ export const routerToolsInlineExecutor: ExecutorFactory = (spec, ctx) = const msg = data.choices?.[0]?.message if (msg?.content) lastText = msg.content const toolCalls = msg?.tool_calls ?? [] - if (toolCalls.length === 0) break // the model answered — loop done + if (toolCalls.length === 0) { + // Before settling, flush once more — a worker may not finish while a steer/answer it never + // read is still pending. If anything flushed, keep going; otherwise it is truly done. + if (flush()) continue + break + } // Record the assistant turn verbatim, then run each call on the host and // fold the result back as a `tool` message for the next turn. diff --git a/tests/loops/coordination.test.ts b/tests/loops/coordination.test.ts index 747f442f..4a90132e 100644 --- a/tests/loops/coordination.test.ts +++ b/tests/loops/coordination.test.ts @@ -114,7 +114,7 @@ describe('coordination tools', () => { expect( await tool(tb, 'steer_worker').handler({ workerId: 'w0', instruction: 'do X next' }), ).toEqual({ delivered: true }) - expect(sent).toEqual([{ id: 'w0', msg: { steer: 'do X next' } }]) + expect(sent).toEqual([{ id: 'w0', msg: { steer: 'do X next', interrupt: false } }]) expect(await tool(tb, 'steer_worker').handler({ workerId: 'gone', instruction: 'x' })).toEqual({ delivered: false, }) @@ -298,7 +298,7 @@ describe('coordination tools', () => { }) // Both reached the child inbox (down delivery)... expect(sent).toEqual([ - { id: 'w0', msg: { steer: 'do X' } }, + { id: 'w0', msg: { steer: 'do X', interrupt: false } }, { id: 'w0', msg: { resume: 'continue' } }, ]) // ...and were recorded for observability (pass-through + history; the failed resume too)... diff --git a/tests/loops/inbox.test.ts b/tests/loops/inbox.test.ts new file mode 100644 index 00000000..15c9804a --- /dev/null +++ b/tests/loops/inbox.test.ts @@ -0,0 +1,104 @@ +import type { AgentProfile } from '@tangle-network/sandbox' +import { afterEach, describe, expect, it, vi } from 'vitest' +import { type AgentSpec, createExecutor, createInbox } from '../../src/runtime' + +describe('worker inbox (down-leg receive end)', () => { + it('parses the down-message shapes; ignores malformed', () => { + const inbox = createInbox() + inbox.deliver({ steer: 'do X' }) + inbox.deliver({ resume: 'keep going' }) + inbox.deliver({ answer: 'use v2', questionId: 'q1' }) + inbox.deliver({ junk: true }) // ignored, never throws + inbox.deliver(null) + const drained = inbox.drain() + expect(drained).toEqual([ + { kind: 'steer', text: 'do X', interrupt: false }, + { kind: 'resume', text: 'keep going', interrupt: false }, + { kind: 'answer', text: 'use v2', interrupt: false, questionId: 'q1' }, + ]) + // drain is destructive + expect(inbox.pending()).toBe(0) + }) + + it('folds queued messages into one operator turn', () => { + const inbox = createInbox() + inbox.deliver({ steer: 'switch to recursion' }) + inbox.deliver({ answer: 'v2', questionId: 'q7' }) + const folded = inbox.fold(inbox.drain()) + expect(folded).toContain('[SUPERVISOR]') + expect(folded).toContain('New instruction from your supervisor: switch to recursion') + expect(folded).toContain('Answer to your question (q7): v2') + }) + + it('a forceful message aborts the live turn signal; a queued one does not', () => { + const inbox = createInbox() + const sig = inbox.freshInterrupt() + expect(sig.aborted).toBe(false) + inbox.deliver({ steer: 'note for later' }) // queued — no interrupt + expect(sig.aborted).toBe(false) + inbox.deliver({ steer: 'STOP, wrong path', interrupt: true }) // forceful + expect(sig.aborted).toBe(true) + }) + + it('each freshInterrupt is independent — a stale signal is not re-aborted', () => { + const inbox = createInbox() + const first = inbox.freshInterrupt() + inbox.deliver({ steer: 'x', interrupt: true }) + expect(first.aborted).toBe(true) + // A new turn opens a fresh signal; the prior forceful message does not abort it. + const second = inbox.freshInterrupt() + expect(second.aborted).toBe(false) + }) +}) + +describe('router-tools executor drains the inbox', () => { + afterEach(() => vi.unstubAllGlobals()) + + const noToolReply = () => + new Response( + JSON.stringify({ + choices: [{ message: { content: 'done', tool_calls: [] } }], + usage: { prompt_tokens: 1, completion_tokens: 1 }, + }), + { status: 200, headers: { 'content-type': 'application/json' } }, + ) + + it('a worker may not settle while a steer is pending — it flushes, folds it in, and continues', async () => { + const bodies: Array<{ messages: Array<{ role: string; content: string }> }> = [] + let calls = 0 + let deliver: (m: unknown) => void = () => {} + vi.stubGlobal( + 'fetch', + vi.fn(async (_url: string, init?: { body?: string }) => { + bodies.push(JSON.parse(init?.body ?? '{}')) + calls += 1 + // The driver steers the worker WHILE it is mid-turn, just as it first tries to finish. + if (calls === 1) deliver({ steer: 'also handle the wide-char edge case' }) + return noToolReply() + }), + ) + + const factory = createExecutor({ + backend: 'router-tools', + model: 'test-model', + routerBaseUrl: 'http://router.test', + routerKey: 'k', + tools: [], + executeToolCall: async () => '', + }) + const spec: AgentSpec = { + profile: { name: 'w', prompt: { systemPrompt: 'sys' } } as unknown as AgentProfile, + harness: null, + } as AgentSpec + const exec = factory(spec, { signal: new AbortController().signal, seams: {} }) + deliver = (m) => exec.deliver?.(m) + + await exec.execute('implement wcwidth', new AbortController().signal) + + // Turn 1 saw no tool calls but DID NOT settle — the pending steer forced a second turn... + expect(calls).toBe(2) + // ...and that second turn's conversation carries the folded steer. + const turn2 = bodies[1]?.messages ?? [] + expect(turn2.some((m) => m.content?.includes('also handle the wide-char edge case'))).toBe(true) + }) +}) From 83da8dfaf6abef2c607a58578e70b078da32de6d Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 16:50:55 -0600 Subject: [PATCH 04/10] =?UTF-8?q?chore(supervise):=20address=20review=20ni?= =?UTF-8?q?ts=20=E2=80=94=20accurate=20resume=5Fworker=20desc,=20sendDown?= =?UTF-8?q?=20covers=20answer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #318 audit follow-ups (non-blocking): - resume_worker description no longer implies a park/resume lifecycle the scope model lacks — a settled (drained) worker is gone; says so and points to spawning fresh. - sendDown now covers the 'answer' down-leg too (removes the inline bus.publish duplication; one helper for all three down kinds). - history() docstring lists the down-leg event kinds. full suite 1008 pass; typecheck/lint clean. --- src/mcp/tools/coordination.ts | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/mcp/tools/coordination.ts b/src/mcp/tools/coordination.ts index 1d89605e..1d72dc46 100644 --- a/src/mcp/tools/coordination.ts +++ b/src/mcp/tools/coordination.ts @@ -113,8 +113,9 @@ export interface CoordinationTools { stopReason(): string | undefined settled(): ReadonlyArray questions(): ReadonlyArray - /** The full ordered log of every bus event (settled / question / finding) — the observability - * audit + replay trail. Each record carries seq, timestamp, and priority. */ + /** The full ordered log of every bus event — UP (settled / question / finding) and DOWN + * (steer / answer / resume) — the observability audit + replay trail. Each record carries seq, + * timestamp, and priority. */ history(): ReadonlyArray> /** Bus throughput counters (published / pulled / by-kind) for live dashboards. */ stats(): BusStats @@ -203,8 +204,15 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin // The down-leg: record a parent→child message on the bus for the audit trail (history + // subscribers) WITHOUT enqueuing it — the parent must never pull its own outbound message back. - const sendDown = async (type: 'steer' | 'resume', down: DownMessageEvent): Promise => { - await bus.publish({ type, down }, { queue: false }) + const sendDown = async ( + type: 'steer' | 'resume' | 'answer', + down: DownMessageEvent, + questionId?: string, + ): Promise => { + await bus.publish( + type === 'answer' ? { type, down, questionId: questionId ?? '' } : { type, down }, + { queue: false }, + ) } // Consumer projection: the wire shape the driver sees for a pulled bus event. @@ -379,7 +387,9 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin { name: 'resume_worker', description: - 'Resume a parked/idle worker with a follow-up message, continuing its session (parent→child).', + 'Deliver a follow-up message to a still-LIVE worker, continuing its run (parent→child). ' + + 'A worker that already settled (drained via await_next/await_event) is gone from the live ' + + 'set and cannot be resumed — that returns delivered:false; spawn a fresh worker instead.', inputSchema: { type: 'object', properties: { @@ -484,13 +494,10 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin }) // Route the answer DOWN to the worker that asked, unparking it, and record the down-leg. const delivered = opts.scope.send(question.from, { answer, questionId }) - await bus.publish( - { - type: 'answer', - questionId, - down: { toWorker: question.from, instruction: answer, delivered }, - }, - { queue: false }, + await sendDown( + 'answer', + { toWorker: question.from, instruction: answer, delivered }, + questionId, ) // Surface `delivered` like steer_worker/resume_worker — the caller must see whether the // answer actually reached a live worker (false when it parked/settled or has no inbox). From 809e53ceb08d4a008c8372ae7b1688dee0996181 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 17:17:28 -0600 Subject: [PATCH 05/10] =?UTF-8?q?refactor(supervise):=20unify=20the=20coor?= =?UTF-8?q?dination=20surface=20(12=E2=86=9210=20tools)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Simplify without losing capability: - MERGE steer_worker + resume_worker → one steer_worker (any live worker; the only real axis was interrupt forceful-vs-queued, already a param). 'Resume' = a non- interrupt steer. Removes a redundant verb + dissolves the resume-vs-steer prompt nits. - REMOVE await_next — it was a strict subset of await_event({kinds:['settled']}). One wait-verb now; callers/prompts pass kinds:['settled'] for the next finished worker. - DROP bus.peek() — speculative, only its own test used it (YAGNI). Down-leg event union + inbox shed the dead 'resume' kind. Full suite 1007 pass; typecheck/build/lint clean. --- CLAUDE.md | 2 +- bench/src/profiles.ts | 5 +- src/mcp/tools/coordination.ts | 70 ++++++----------------- src/runtime/supervise/authoring.ts | 2 +- src/runtime/supervise/coordination-mcp.ts | 2 +- src/runtime/supervise/event-bus.ts | 6 -- src/runtime/supervise/inbox.ts | 6 +- tests/loops/completion-gate.test.ts | 8 +-- tests/loops/coordination-driver.test.ts | 14 ++--- tests/loops/coordination-mcp.test.ts | 4 +- tests/loops/coordination.test.ts | 41 ++++++------- tests/loops/event-bus.test.ts | 9 --- tests/loops/inbox.test.ts | 2 - tests/loops/router-driver-chat.test.ts | 4 +- tests/loops/supervisor-authoring.test.ts | 4 +- 15 files changed, 59 insertions(+), 120 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index d773c7a9..efb652bc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -62,7 +62,7 @@ Types that stay in THIS repo because they're runtime-shaped (coupled to a runnin - `run-loop.ts` — `runLoop`, the round-synchronous leaf kernel. Per round: `driver.plan()`→N tasks→one sandbox/iteration (bounded by `maxConcurrency`, round-robin `agentRuns`)→`streamPrompt`→`output.parse`→`validator.validate`→`driver.decide`. Owns iteration accounting, concurrency, abort, cost+token aggregation, trace emission, box teardown. Exports `defaultSelectWinner` (best-valid-score, ties→earliest) — the single-sourced selection the personify combinators reuse. - `supervise/` — the recursive execution atom (keystone): `Scope` + `Supervisor` over the open `Executor` port, spawn/settle on a **conserved budget pool** so equal-compute holds by construction; journal→replay/resume. `runtime.ts` also holds `createExecutor({backend})` — the ONE built-in executor (backend-as-data: `router`/`router-tools`/`bridge`/`cli`/`sandbox`; `router-tools` is the off-box tool-using agentic loop — chat→tool_calls→`executeToolCall`→repeat — over the router's tool-calling, no sandbox); the per-backend bodies are internal case-arms, BYO agents implement `Executor` directly. - `personify/` — the content-free generic combinators (`fanout`/`loopUntil`/`widen`/`panel`/`verify`/`pipeline`) + `definePersona`/`runPersonified` + the cross-run `Corpus` + `createScopeAnalyst` (the analyst-on-scope steer firewall). -- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`, immediate) AND queued for the driver to pull (`await_event`, kind-filterable; `await_next` is the settled-only view). The pull queue is **priority-ordered** — a blocking question (urgency→priority: `blocks-run`=20/`blocks-step`=10) is bumped ahead of queued settles/findings; ties FIFO by `seq`. The bus is **bidirectional**: UP (settled/question/finding) is queued+pullable; DOWN (`steer_worker`/`answer_question`/`resume_worker`) routes to the child inbox via `scope.send`→`deliver` AND records a `queue:false` event (history + subscribers, never pulled back). The receive end is `createInbox` (`supervise/inbox.ts`), which the owned tool-loop executor (`routerToolsInlineExecutor`) exposes as `Executor.deliver`: QUEUED messages flush at each step boundary AND before the worker may settle (it can't finish with an unread steer); a FORCEFUL `steer_worker({interrupt:true})` aborts the in-flight turn so the worker re-plans immediately. Black-box CLI harnesses can't be interrupted mid-step, so there the down-leg degrades to the next spawn. Observability is first-class: every event both ways is stamped (`seq`/`at`/`priority`), the full `history()` is an audit/replay trail, `stats()` counts throughput (both surfaced on `CoordinationTools` and the MCP handle). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend. +- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`, immediate) AND queued for the driver to pull (`await_event`, kind-filterable; `await_next` is the settled-only view). The pull queue is **priority-ordered** — a blocking question (urgency→priority: `blocks-run`=20/`blocks-step`=10) is bumped ahead of queued settles/findings; ties FIFO by `seq`. The bus is **bidirectional**: UP (settled/question/finding) is queued+pullable; DOWN (`steer_worker` for any live worker — instruction/correction/continuation; `answer_question` routes an answer down) goes to the child inbox via `scope.send`→`deliver` AND records a `queue:false` event (history + subscribers, never pulled back). The receive end is `createInbox` (`supervise/inbox.ts`), which the owned tool-loop executor (`routerToolsInlineExecutor`) exposes as `Executor.deliver`: QUEUED messages flush at each step boundary AND before the worker may settle (it can't finish with an unread steer); a FORCEFUL `steer_worker({interrupt:true})` aborts the in-flight turn so the worker re-plans immediately. Black-box CLI harnesses can't be interrupted mid-step, so there the down-leg degrades to the next spawn. The driver waits on ONE verb — `await_event({kinds?})` (`kinds:['settled']` = next finished worker; omit = also questions/findings). Observability is first-class: every event both ways is stamped (`seq`/`at`/`priority`), the full `history()` is an audit/replay trail, `stats()` counts throughput (both surfaced on `CoordinationTools` and the MCP handle). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend. Two substrates coexist for the same "recursive agent decision" atom: the round-synchronous `runLoop` kernel (the leaf, what most sandbox benches drive today) and the reactive `Scope`/`Supervisor`+combinators (the canonical core — the agent-driver, `runAgentic`/`defineStrategy`/`runPersonified`). Prefer the latter for new recursive/keystone work. Both run over the one `Executor` port. diff --git a/bench/src/profiles.ts b/bench/src/profiles.ts index d0ca4ddf..00ae9838 100644 --- a/bench/src/profiles.ts +++ b/bench/src/profiles.ts @@ -21,8 +21,7 @@ export const OPERATOR_TOOLS = [ 'run_analyst', // run an analyst over a worker's trace → findings (selector≠judge: trace, not score) 'observe_worker', // a worker's in-flight trace, or its last finished episode/shot 'spawn_worker', // start a worker (or a sub-analyst) — drive many; parallelize when independent - 'steer_worker', // send a running worker its next instruction / an interrupt (down-leg) - 'resume_worker', // continue a parked/idle worker with a follow-up message (down-leg) + 'steer_worker', // send a live worker a message down: instruction, course-correction, or continuation (interrupt? for forceful) 'stop', // declare the task complete (verified) or abandon a line ] as const @@ -96,7 +95,7 @@ export const driverProfile: RoleProfile = { ' analysts are cheap; make them when a worker’s failure mode needs a focused lens.', '- observe_worker(worker): the worker’s IN-FLIGHT trace if it is still running, else its last', ' finished episode/shot.', - '- spawn_worker(profile, task) / steer_worker(worker, instruction) / resume_worker(worker, message) / stop.', + '- spawn_worker(profile, task) / steer_worker(worker, instruction, interrupt?) / stop.', '- the artifact’s own tools (read/edit/run) — use them to inspect the workspace and to contribute', ' decisive work yourself.', '', diff --git a/src/mcp/tools/coordination.ts b/src/mcp/tools/coordination.ts index 1d72dc46..a5a5808b 100644 --- a/src/mcp/tools/coordination.ts +++ b/src/mcp/tools/coordination.ts @@ -17,7 +17,7 @@ import type { import { type BusRecord, type BusStats, createEventBus } from '../../runtime/supervise/event-bus' import type { McpToolDescriptor } from '../server' -/** A worker the driver has drained via `await_next`. */ +/** A worker the driver has drained via `await_event`. */ export interface SettledWorker { readonly id: string readonly status: 'done' | 'down' @@ -80,15 +80,14 @@ export interface DownMessageEvent { } /** Every message on the one typed pipe. UP (child→parent): question / settled / finding — queued for - * the driver to `pull`. DOWN (parent→child): steer / answer / resume — record-only (history + - * subscribers), routed to the child inbox. New kinds are additive. */ + * the driver to `pull`. DOWN (parent→child): steer / answer — record-only (history + subscribers), + * routed to the child inbox. New kinds are additive. */ export type CoordinationEvent = | { readonly type: 'question'; readonly question: QuestionRecord } | { readonly type: 'settled'; readonly worker: SettledWorker } | { readonly type: 'finding'; readonly finding: AnalystFindingEvent } | { readonly type: 'steer'; readonly down: DownMessageEvent } | { readonly type: 'answer'; readonly down: DownMessageEvent; readonly questionId: string } - | { readonly type: 'resume'; readonly down: DownMessageEvent } export type MakeWorkerAgent = (profile: unknown) => SuperviseAgent @@ -205,7 +204,7 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin // The down-leg: record a parent→child message on the bus for the audit trail (history + // subscribers) WITHOUT enqueuing it — the parent must never pull its own outbound message back. const sendDown = async ( - type: 'steer' | 'resume' | 'answer', + type: 'steer' | 'answer', down: DownMessageEvent, questionId?: string, ): Promise => { @@ -359,7 +358,11 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin }, { name: 'steer_worker', - description: 'Deliver an out-of-band instruction to a running worker inbox (parent→child).', + description: + 'Send a message DOWN to a still-LIVE worker (parent→child): a new instruction, a course ' + + 'correction, or a continuation. The worker drains it at its next step boundary — and before ' + + 'it may settle, so it cannot finish while a message it never read is pending. A worker that ' + + 'already settled is gone (returns delivered:false) — spawn a fresh one instead.', inputSchema: { type: 'object', properties: { @@ -384,54 +387,15 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin return { delivered } }, }, - { - name: 'resume_worker', - description: - 'Deliver a follow-up message to a still-LIVE worker, continuing its run (parent→child). ' + - 'A worker that already settled (drained via await_next/await_event) is gone from the live ' + - 'set and cannot be resumed — that returns delivered:false; spawn a fresh worker instead.', - inputSchema: { - type: 'object', - properties: { - workerId: idArg, - message: { type: 'string', description: 'The follow-up to continue the worker with.' }, - }, - required: ['workerId', 'message'], - }, - handler: async (raw) => { - const a = obj(raw) - const workerId = str(a.workerId, 'workerId') - const message = str(a.message, 'message') - const delivered = opts.scope.send(workerId, { resume: message }) - await sendDown('resume', { toWorker: workerId, instruction: message, delivered }) - return { delivered } - }, - }, - { - name: 'await_next', - description: - 'Wait for the next spawned worker to settle. Returns { idle: true } when none are live. ' + - '(A settle also fires any analyze-on-settle lenses, whose findings queue for await_event.)', - inputSchema: { type: 'object', properties: {} }, - handler: async () => { - if (bus.pending(['settled']) === 0 && !(await drainSettlement())) return { idle: true } - const ev = bus.pull(['settled']) - if (!ev || ev.type !== 'settled') return { idle: true } - const w = ev.worker - return w.status === 'done' - ? { settled: w.id, status: 'done', score: w.score, valid: w.valid, outRef: w.outRef } - : { settled: w.id, status: 'down', reason: w.reason } - }, - }, { name: 'await_event', description: - 'Pull the next message a worker, sub-driver, or analyst sent up — the unified inbox. An ' + - "event is one of: a settled worker output ('settled'), a question needing your answer " + - "('question', from ask_parent / the worker's ask-user), or a trace-analyst finding " + - "('finding', from analyze-on-settle). Optional `kinds` filters which to wait for. Returns " + - '{ idle: true } when nothing is queued and no workers are live. Prefer this over await_next ' + - 'when you also want questions and findings, not just settlements.', + 'Wait for and pull the next message a worker, sub-driver, or analyst sent up — the unified ' + + "inbox. An event is one of: a settled worker output ('settled'), a question needing your " + + "answer ('question', from ask_parent / the worker's ask-user), or a trace-analyst finding " + + "('finding', from analyze-on-settle). Pass kinds:['settled'] for just the next finished " + + 'worker; omit `kinds` to also receive questions and findings. Returns { idle: true } when ' + + 'nothing is queued and no workers are live.', inputSchema: { type: 'object', properties: { @@ -499,8 +463,8 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin { toWorker: question.from, instruction: answer, delivered }, questionId, ) - // Surface `delivered` like steer_worker/resume_worker — the caller must see whether the - // answer actually reached a live worker (false when it parked/settled or has no inbox). + // Surface `delivered` like steer_worker — the caller must see whether the answer actually + // reached a live worker (false when it already settled or has no inbox). return { question, delivered } } if (typeof a.deferReason === 'string' && a.deferReason.length > 0) { diff --git a/src/runtime/supervise/authoring.ts b/src/runtime/supervise/authoring.ts index 1239d688..4b5cf45d 100644 --- a/src/runtime/supervise/authoring.ts +++ b/src/runtime/supervise/authoring.ts @@ -53,7 +53,7 @@ export function supervisorSkill(opts?: { goal?: string }): string { ' • systemPrompt: rich, specific instructions for THIS sub-task — tell the worker exactly what to produce, how to use its tools fully, and what "done" means. Never a one-liner; write the prompt a power-user would write.', ' • model: the model best suited to this sub-task (omit to use the default).', ' NEVER spawn a worker with an empty profile. The quality of the worker IS the quality of the profile you write.', - '3. await_next to collect each worker. Its result says valid:true only if the deployable check passed.', + "3. await_event (kinds:['settled']) to collect each worker. Its result says valid:true only if the deployable check passed.", '4. If a worker did NOT deliver, AUTHOR A NEW worker whose systemPrompt names the SPECIFIC failure and how to fix it — never just retry the same prompt.', '5. Stop (reply with no tool call) once the work is delivered. You cannot declare done yourself — only a delivered (valid:true) worker counts.', ...(opts?.goal ? ['', `The goal: ${opts.goal}`] : []), diff --git a/src/runtime/supervise/coordination-mcp.ts b/src/runtime/supervise/coordination-mcp.ts index 67296b85..a87a478a 100644 --- a/src/runtime/supervise/coordination-mcp.ts +++ b/src/runtime/supervise/coordination-mcp.ts @@ -1,7 +1,7 @@ /** * @experimental * - * Serve the coordination verbs (spawn_worker / await_next / observe_worker / steer_worker / stop) + * Serve the coordination verbs (spawn_worker / await_event / observe_worker / steer_worker / stop) * as a real HTTP MCP server over a LIVE `Scope`. This is the keystone that lets a coding-harness * agent (opencode via the cli-bridge, claude-code, codex) BE the supervisor: it mounts this MCP * (`mcp.mcpServers.coordination`) and calls `spawn_worker` as a native tool, which lands on diff --git a/src/runtime/supervise/event-bus.ts b/src/runtime/supervise/event-bus.ts index d5ac392c..26522496 100644 --- a/src/runtime/supervise/event-bus.ts +++ b/src/runtime/supervise/event-bus.ts @@ -60,8 +60,6 @@ export interface EventBus { /** Remove and return the highest-priority QUEUED event whose type is in `kinds` (any if omitted), * ties broken FIFO by `seq`; `undefined` when nothing matches. */ pull(kinds?: ReadonlyArray): E | undefined - /** Like `pull` but non-destructive — inspect the next event without consuming it. */ - peek(kinds?: ReadonlyArray): E | undefined /** Register a pass-through handler; it receives the stamped record of every event published after * registration. Returns an unsubscribe fn. */ subscribe(handler: (record: BusRecord) => void | Promise): () => void @@ -118,10 +116,6 @@ export function createEventBus(now: () => number = Date.now) pulled++ return queue.splice(i, 1)[0]?.event }, - peek(kinds) { - const i = bestIndex(kinds) - return i < 0 ? undefined : queue[i]?.event - }, subscribe(handler) { subscribers.push(handler) return () => { diff --git a/src/runtime/supervise/inbox.ts b/src/runtime/supervise/inbox.ts index 18193f40..42ef61a7 100644 --- a/src/runtime/supervise/inbox.ts +++ b/src/runtime/supervise/inbox.ts @@ -2,7 +2,7 @@ * @experimental * * The worker-side receive end of the down-leg: a per-worker inbox an executor exposes as - * `Executor.deliver`. The driver's `steer_worker` / `answer_question` / `resume_worker` land here, + * `Executor.deliver`. The driver's `steer_worker` / `answer_question` land here, * and the worker's agent loop drains them at two points (Drew's two delivery modes): * * - QUEUED (default): the message accumulates and is FLUSHED at the next step boundary — folded @@ -16,7 +16,7 @@ */ export interface InboxMessage { - readonly kind: 'steer' | 'answer' | 'resume' + readonly kind: 'steer' | 'answer' readonly text: string /** Forceful messages abort the in-flight turn; queued ones wait for the boundary flush. */ readonly interrupt: boolean @@ -42,7 +42,6 @@ function parseDown(msg: unknown): InboxMessage | undefined { const m = msg as Record const interrupt = m.interrupt === true if (typeof m.steer === 'string') return { kind: 'steer', text: m.steer, interrupt } - if (typeof m.resume === 'string') return { kind: 'resume', text: m.resume, interrupt } if (typeof m.answer === 'string') return { kind: 'answer', @@ -76,7 +75,6 @@ export function createInbox(): Inbox { const lines = messages.map((m) => { if (m.kind === 'answer') return `- Answer to your question${m.questionId ? ` (${m.questionId})` : ''}: ${m.text}` - if (m.kind === 'resume') return `- Continue: ${m.text}` return `- New instruction from your supervisor: ${m.text}` }) return `[SUPERVISOR] Out-of-band message(s) — address these before continuing:\n${lines.join('\n')}` diff --git a/tests/loops/completion-gate.test.ts b/tests/loops/completion-gate.test.ts index 7b3604d5..85203bd9 100644 --- a/tests/loops/completion-gate.test.ts +++ b/tests/loops/completion-gate.test.ts @@ -163,7 +163,7 @@ function gatedWorkerLeaf( const spawnAwaitStop: DriverTurn[] = [ { toolCalls: [{ name: 'spawn_worker', arguments: { profile: { kind: 'worker' }, task: 'go' } }] }, - { toolCalls: [{ name: 'await_next', arguments: {} }] }, + { toolCalls: [{ name: 'await_event', arguments: {} }] }, { content: 'stop' }, ] @@ -236,8 +236,8 @@ describe('completion-oracle settle — settled ⟺ DELIVERED (Foreman 0/18)', () }, { toolCalls: [ - { name: 'await_next', arguments: {} }, - { name: 'await_next', arguments: {} }, + { name: 'await_event', arguments: {} }, + { name: 'await_event', arguments: {} }, ], }, { content: 'stop' }, @@ -282,7 +282,7 @@ describe('completion-oracle settle — settled ⟺ DELIVERED (Foreman 0/18)', () { name: 'spawn_worker', arguments: { profile: { kind: 'driver' }, task: 'delegate' } }, ], }, - { toolCalls: [{ name: 'await_next', arguments: {} }] }, + { toolCalls: [{ name: 'await_event', arguments: {} }] }, { content: 'stop' }, ] const root = coordinationDriverAgent(driverOpts('root', scriptedChat(rootTurns), makeAgent)) diff --git a/tests/loops/coordination-driver.test.ts b/tests/loops/coordination-driver.test.ts index 720550cc..944898d6 100644 --- a/tests/loops/coordination-driver.test.ts +++ b/tests/loops/coordination-driver.test.ts @@ -127,7 +127,7 @@ describe('coordinationDriverAgent — the driver BRAIN (LLM tool-loop drives rea { name: 'spawn_worker', arguments: { profile: { kind: 'worker' }, task: 'go' } }, ], }, - { toolCalls: [{ name: 'await_next', arguments: {} }] }, + { toolCalls: [{ name: 'await_event', arguments: {} }] }, { content: 'done' }, ], seen, @@ -149,11 +149,11 @@ describe('coordinationDriverAgent — the driver BRAIN (LLM tool-loop drives rea expect(result.kind).toBe('winner') // Feed-back proof: by turn 2 (the 3rd chat call), the conversation the driver saw contains a - // `tool` message carrying the await_next settlement — i.e. the tool RESULT was folded back. + // `tool` message carrying the await_event settlement — i.e. the tool RESULT was folded back. const turn2Convo = seen[2]! const toolMsgs = turn2Convo.filter((m) => m.role === 'tool') - expect(toolMsgs.length).toBeGreaterThanOrEqual(2) // spawn_worker result + await_next result - expect(toolMsgs.some((m) => m.name === 'await_next' && m.content.includes('done'))).toBe(true) + expect(toolMsgs.length).toBeGreaterThanOrEqual(2) // spawn_worker result + await_event result + expect(toolMsgs.some((m) => m.name === 'await_event' && m.content.includes('done'))).toBe(true) // A real worker spawn is recorded in the journal (not a mock-bypassed result). const root_tree = (await journal.loadTree('cd')) as SpawnEvent[] @@ -200,7 +200,7 @@ describe('coordinationDriverAgent — the driver BRAIN (LLM tool-loop drives rea { name: 'spawn_worker', arguments: { profile: { kind: 'worker' }, task: 'sub' } }, ], }, - { toolCalls: [{ name: 'await_next', arguments: {} }] }, + { toolCalls: [{ name: 'await_event', arguments: {} }] }, { content: 'mid done' }, ], } @@ -213,7 +213,7 @@ describe('coordinationDriverAgent — the driver BRAIN (LLM tool-loop drives rea { name: 'spawn_worker', arguments: { profile: midProfile, task: 'delegate' } }, ], }, - { toolCalls: [{ name: 'await_next', arguments: {} }] }, + { toolCalls: [{ name: 'await_event', arguments: {} }] }, { content: 'root done' }, ], rootSeen, @@ -237,7 +237,7 @@ describe('coordinationDriverAgent — the driver BRAIN (LLM tool-loop drives rea // recorded the worker's settlement fed back — proof the inner agent reasoned, not scripted-bypassed. expect(midSeen.length).toBeGreaterThanOrEqual(2) const midToolMsgs = midSeen[midSeen.length - 1]!.filter((m) => m.role === 'tool') - expect(midToolMsgs.some((m) => m.name === 'await_next')).toBe(true) + expect(midToolMsgs.some((m) => m.name === 'await_event')).toBe(true) // A SEPARATE nested tree exists under the root — the mid driver's sub-tree, holding the // worker spawn. A non-recursive build (mid as a leaf) could not produce a nested tree. diff --git a/tests/loops/coordination-mcp.test.ts b/tests/loops/coordination-mcp.test.ts index 45de14ab..3fbbf03a 100644 --- a/tests/loops/coordination-mcp.test.ts +++ b/tests/loops/coordination-mcp.test.ts @@ -73,7 +73,7 @@ describe('coordination MCP over a live Scope — the real keystone (HTTP → MCP name: 'spawn_worker', arguments: { profile: {}, task: 'go' }, }) - await jsonRpc(mcp.url, 'tools/call', { name: 'await_next', arguments: {} }) + await jsonRpc(mcp.url, 'tools/call', { name: 'await_event', arguments: {} }) observed = { toolsList: toolsList.result, settled: mcp.settled() } const done = mcp.settled().filter((w) => w.status === 'done' && w.valid === true) return done[0]?.outRef ? await blobs.get(done[0].outRef) : undefined @@ -102,6 +102,6 @@ describe('coordination MCP over a live Scope — the real keystone (HTTP → MCP (t) => t.name, ) expect(names).toContain('spawn_worker') - expect(names).toContain('await_next') + expect(names).toContain('await_event') }) }) diff --git a/tests/loops/coordination.test.ts b/tests/loops/coordination.test.ts index 4a90132e..bea4943a 100644 --- a/tests/loops/coordination.test.ts +++ b/tests/loops/coordination.test.ts @@ -120,7 +120,7 @@ describe('coordination tools', () => { }) }) - it('await_next drains settlements into the driver ledger', async () => { + it('await_event(settled) drains settlements into the driver ledger', async () => { const { scope } = mockScope() const settlements = [ { @@ -143,14 +143,15 @@ describe('coordination tools', () => { makeWorkerAgent, perWorker: { maxIterations: 1, maxTokens: 10 }, }) - expect(await tool(tb, 'await_next').handler({})).toEqual({ + expect(await tool(tb, 'await_event').handler({ kinds: ['settled'] })).toEqual({ + type: 'settled', settled: 'w7', status: 'done', score: 0.83, valid: true, outRef: 'blob:w7', }) - expect(await tool(tb, 'await_next').handler({})).toEqual({ idle: true }) + expect(await tool(tb, 'await_event').handler({ kinds: ['settled'] })).toEqual({ idle: true }) expect(tb.settled()).toEqual([ { id: 'w7', status: 'done', score: 0.83, valid: true, outRef: 'blob:w7' }, ]) @@ -272,7 +273,7 @@ describe('coordination tools', () => { expect(tb.stats()).toMatchObject({ published: 2, pulled: 2, byKind: { question: 2 } }) }) - it('steer_worker and resume_worker route down + record in history but are never pulled back', async () => { + it('steer_worker routes down + records in history but is never pulled back', async () => { const { scope, sent } = mockScope() const emitted: Array<{ type: string }> = [] const tb = createCoordinationTools({ @@ -282,28 +283,22 @@ describe('coordination tools', () => { perWorker: { maxIterations: 1, maxTokens: 10 }, onEvent: (e) => emitted.push(e), }) - expect(await tool(tb, 'steer_worker').handler({ workerId: 'w0', instruction: 'do X' })).toEqual( - { - delivered: true, - }, - ) expect( - await tool(tb, 'resume_worker').handler({ workerId: 'w0', message: 'continue' }), - ).toEqual({ - delivered: true, - }) - // A down-message to a worker with no live inbox reports delivered:false (mirrors steer_worker). - expect(await tool(tb, 'resume_worker').handler({ workerId: 'gone', message: 'x' })).toEqual({ + await tool(tb, 'steer_worker').handler({ + workerId: 'w0', + instruction: 'do X', + interrupt: true, + }), + ).toEqual({ delivered: true }) + // A steer to a worker with no live inbox reports delivered:false. + expect(await tool(tb, 'steer_worker').handler({ workerId: 'gone', instruction: 'x' })).toEqual({ delivered: false, }) - // Both reached the child inbox (down delivery)... - expect(sent).toEqual([ - { id: 'w0', msg: { steer: 'do X', interrupt: false } }, - { id: 'w0', msg: { resume: 'continue' } }, - ]) - // ...and were recorded for observability (pass-through + history; the failed resume too)... - expect(emitted.map((e) => e.type)).toEqual(['steer', 'resume', 'resume']) - expect(tb.history().map((r) => r.event.type)).toEqual(['steer', 'resume', 'resume']) + // The forceful steer reached the child inbox (down delivery)... + expect(sent).toEqual([{ id: 'w0', msg: { steer: 'do X', interrupt: true } }]) + // ...and both attempts were recorded for observability (pass-through + history)... + expect(emitted.map((e) => e.type)).toEqual(['steer', 'steer']) + expect(tb.history().map((r) => r.event.type)).toEqual(['steer', 'steer']) // ...but the parent never pulls its own outbound messages back. expect(await tool(tb, 'await_event').handler({})).toEqual({ idle: true }) }) diff --git a/tests/loops/event-bus.test.ts b/tests/loops/event-bus.test.ts index 85d081e8..f121e16b 100644 --- a/tests/loops/event-bus.test.ts +++ b/tests/loops/event-bus.test.ts @@ -58,15 +58,6 @@ describe('event bus', () => { expect(bus.pull()).toEqual({ type: 'finding', claim: 'late finding' }) }) - it('peek is non-destructive and respects priority', async () => { - const bus = createEventBus() - await bus.publish({ type: 'settled', id: 'w1' }) - await bus.publish({ type: 'question', q: 'q' }, { priority: 10 }) - expect(bus.peek()).toEqual({ type: 'question', q: 'q' }) - expect(bus.pending()).toBe(2) // peek consumed nothing - expect(bus.pull()).toEqual({ type: 'question', q: 'q' }) - }) - it('history is the full ordered audit trail; stats count throughput', async () => { const bus = createEventBus(fakeClock()) await bus.publish({ type: 'settled', id: 'w1' }) diff --git a/tests/loops/inbox.test.ts b/tests/loops/inbox.test.ts index 15c9804a..6c69b377 100644 --- a/tests/loops/inbox.test.ts +++ b/tests/loops/inbox.test.ts @@ -6,14 +6,12 @@ describe('worker inbox (down-leg receive end)', () => { it('parses the down-message shapes; ignores malformed', () => { const inbox = createInbox() inbox.deliver({ steer: 'do X' }) - inbox.deliver({ resume: 'keep going' }) inbox.deliver({ answer: 'use v2', questionId: 'q1' }) inbox.deliver({ junk: true }) // ignored, never throws inbox.deliver(null) const drained = inbox.drain() expect(drained).toEqual([ { kind: 'steer', text: 'do X', interrupt: false }, - { kind: 'resume', text: 'keep going', interrupt: false }, { kind: 'answer', text: 'use v2', interrupt: false, questionId: 'q1' }, ]) // drain is destructive diff --git a/tests/loops/router-driver-chat.test.ts b/tests/loops/router-driver-chat.test.ts index 2ac433e7..0ff357d3 100644 --- a/tests/loops/router-driver-chat.test.ts +++ b/tests/loops/router-driver-chat.test.ts @@ -102,11 +102,11 @@ describe('routerDriverChat — the production DriverChat seam over the router to it('omits empty-string content (truthy check), not just null', async () => { routerMock.mockResolvedValue({ content: '', - toolCalls: [{ id: 'c1', name: 'await_next', arguments: '{}' }], + toolCalls: [{ id: 'c1', name: 'await_event', arguments: '{}' }], }) const turn = await routerDriverChat(cfg).next({ system: 'S', messages: [], tools: [] }) expect(turn.content).toBeUndefined() - expect(turn.toolCalls).toEqual([{ id: 'c1', name: 'await_next', arguments: {} }]) + expect(turn.toolCalls).toEqual([{ id: 'c1', name: 'await_event', arguments: {} }]) }) it('honors a custom temperature', async () => { diff --git a/tests/loops/supervisor-authoring.test.ts b/tests/loops/supervisor-authoring.test.ts index bb30a8e1..23de1991 100644 --- a/tests/loops/supervisor-authoring.test.ts +++ b/tests/loops/supervisor-authoring.test.ts @@ -97,8 +97,8 @@ describe('supervisor authoring — the supervisor DESIGNS each worker (profile), }, { toolCalls: [ - { name: 'await_next', arguments: {} }, - { name: 'await_next', arguments: {} }, + { name: 'await_event', arguments: {} }, + { name: 'await_event', arguments: {} }, ], }, { content: 'done' }, From 6f8b82d095b43b628a81a6abaaca216a3823c11e Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 18:25:33 -0600 Subject: [PATCH 06/10] feat(supervise): online detector monitor on the worker pipe (reuses agent-eval kernel) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit createDetectorMonitor (supervise/detector-monitor.ts) — the online analyst on the live worker pipe. Folds each tool step through agent-eval 0.93.0's published streaming kernel (repeatedActionDetector/errorStreakDetector — the SAME kernel control-runtime folds; no detection logic reimplemented) and fires onSignal → a finding on the bus the moment a worker loops or error-storms. routerToolsInlineExecutor feeds it via a new onToolStep seam. Bumps agent-eval ^0.93.0. monitor tests (4); full suite 1011 pass; typecheck/build/lint clean. --- CLAUDE.md | 2 +- package.json | 2 +- pnpm-lock.yaml | 10 ++-- src/runtime/index.ts | 9 +++ src/runtime/supervise/detector-monitor.ts | 67 +++++++++++++++++++++++ src/runtime/supervise/runtime.ts | 20 ++++++- tests/loops/detector-monitor.test.ts | 47 ++++++++++++++++ 7 files changed, 149 insertions(+), 8 deletions(-) create mode 100644 src/runtime/supervise/detector-monitor.ts create mode 100644 tests/loops/detector-monitor.test.ts diff --git a/CLAUDE.md b/CLAUDE.md index efb652bc..5efd4c1d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -62,7 +62,7 @@ Types that stay in THIS repo because they're runtime-shaped (coupled to a runnin - `run-loop.ts` — `runLoop`, the round-synchronous leaf kernel. Per round: `driver.plan()`→N tasks→one sandbox/iteration (bounded by `maxConcurrency`, round-robin `agentRuns`)→`streamPrompt`→`output.parse`→`validator.validate`→`driver.decide`. Owns iteration accounting, concurrency, abort, cost+token aggregation, trace emission, box teardown. Exports `defaultSelectWinner` (best-valid-score, ties→earliest) — the single-sourced selection the personify combinators reuse. - `supervise/` — the recursive execution atom (keystone): `Scope` + `Supervisor` over the open `Executor` port, spawn/settle on a **conserved budget pool** so equal-compute holds by construction; journal→replay/resume. `runtime.ts` also holds `createExecutor({backend})` — the ONE built-in executor (backend-as-data: `router`/`router-tools`/`bridge`/`cli`/`sandbox`; `router-tools` is the off-box tool-using agentic loop — chat→tool_calls→`executeToolCall`→repeat — over the router's tool-calling, no sandbox); the per-backend bodies are internal case-arms, BYO agents implement `Executor` directly. - `personify/` — the content-free generic combinators (`fanout`/`loopUntil`/`widen`/`panel`/`verify`/`pipeline`) + `definePersona`/`runPersonified` + the cross-run `Corpus` + `createScopeAnalyst` (the analyst-on-scope steer firewall). -- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`, immediate) AND queued for the driver to pull (`await_event`, kind-filterable; `await_next` is the settled-only view). The pull queue is **priority-ordered** — a blocking question (urgency→priority: `blocks-run`=20/`blocks-step`=10) is bumped ahead of queued settles/findings; ties FIFO by `seq`. The bus is **bidirectional**: UP (settled/question/finding) is queued+pullable; DOWN (`steer_worker` for any live worker — instruction/correction/continuation; `answer_question` routes an answer down) goes to the child inbox via `scope.send`→`deliver` AND records a `queue:false` event (history + subscribers, never pulled back). The receive end is `createInbox` (`supervise/inbox.ts`), which the owned tool-loop executor (`routerToolsInlineExecutor`) exposes as `Executor.deliver`: QUEUED messages flush at each step boundary AND before the worker may settle (it can't finish with an unread steer); a FORCEFUL `steer_worker({interrupt:true})` aborts the in-flight turn so the worker re-plans immediately. Black-box CLI harnesses can't be interrupted mid-step, so there the down-leg degrades to the next spawn. The driver waits on ONE verb — `await_event({kinds?})` (`kinds:['settled']` = next finished worker; omit = also questions/findings). Observability is first-class: every event both ways is stamped (`seq`/`at`/`priority`), the full `history()` is an audit/replay trail, `stats()` counts throughput (both surfaced on `CoordinationTools` and the MCP handle). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend. +- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`, immediate) AND queued for the driver to pull (`await_event({kinds?})` — the ONE wait verb; `kinds:['settled']` = next finished worker, omit = also questions/findings). The pull queue is **priority-ordered** — a blocking question (urgency→priority: `blocks-run`=20/`blocks-step`=10) is bumped ahead of queued settles/findings; ties FIFO by `seq`. The bus is **bidirectional**: UP (settled/question/finding) is queued+pullable; DOWN (`steer_worker` for any live worker — instruction/correction/continuation; `answer_question` routes an answer down) goes to the child inbox via `scope.send`→`deliver` AND records a `queue:false` event (history + subscribers, never pulled back). The receive end is `createInbox` (`supervise/inbox.ts`), which the owned tool-loop executor (`routerToolsInlineExecutor`) exposes as `Executor.deliver`: QUEUED messages flush at each step boundary AND before the worker may settle (it can't finish with an unread steer); a FORCEFUL `steer_worker({interrupt:true})` aborts the in-flight turn so the worker re-plans immediately. Black-box CLI harnesses can't be interrupted mid-step, so there the down-leg degrades to the next spawn. The driver waits on ONE verb — `await_event({kinds?})` (`kinds:['settled']` = next finished worker; omit = also questions/findings). Observability is first-class: every event both ways is stamped (`seq`/`at`/`priority`), the full `history()` is an audit/replay trail, `stats()` counts throughput (both surfaced on `CoordinationTools` and the MCP handle). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). ONLINE (mid-run) detection rides `createDetectorMonitor` (`supervise/detector-monitor.ts`) — the thin runtime adapter that folds each live tool step through agent-eval's published streaming kernel (`repeatedActionDetector`/`errorStreakDetector`, the SAME kernel `control-runtime` folds) and fires `onSignal` → a `finding`; `routerToolsInlineExecutor` feeds it via the `onToolStep` seam. Detection logic + the failure taxonomy live in agent-eval; never reimplement them here. The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend. Two substrates coexist for the same "recursive agent decision" atom: the round-synchronous `runLoop` kernel (the leaf, what most sandbox benches drive today) and the reactive `Scope`/`Supervisor`+combinators (the canonical core — the agent-driver, `runAgentic`/`defineStrategy`/`runPersonified`). Prefer the latter for new recursive/keystone work. Both run over the one `Executor` port. diff --git a/package.json b/package.json index 91ff0110..053a581b 100644 --- a/package.json +++ b/package.json @@ -106,7 +106,7 @@ }, "devDependencies": { "@biomejs/biome": "^2.4.15", - "@tangle-network/agent-eval": "^0.92.0", + "@tangle-network/agent-eval": "^0.93.0", "@tangle-network/sandbox": "^0.6.0", "@types/node": "^25.9.3", "playwright": "^1.61.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 488953e0..867449b8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -16,8 +16,8 @@ importers: specifier: ^2.4.15 version: 2.4.15 '@tangle-network/agent-eval': - specifier: ^0.92.0 - version: 0.92.0(@tangle-network/sandbox@0.6.1(viem@2.48.8(typescript@5.9.3)(zod@4.4.2)))(typescript@5.9.3) + specifier: ^0.93.0 + version: 0.93.0(@tangle-network/sandbox@0.6.1(viem@2.48.8(typescript@5.9.3)(zod@4.4.2)))(typescript@5.9.3) '@tangle-network/sandbox': specifier: ^0.6.0 version: 0.6.1(viem@2.48.8(typescript@5.9.3)(zod@4.4.2)) @@ -467,8 +467,8 @@ packages: engines: {node: '>=20'} hasBin: true - '@tangle-network/agent-eval@0.92.0': - resolution: {integrity: sha512-Sj/ejnn74ILewRM+48gkWbZnEInB0xQgF6bJC5DwpVeMdankuCuljZVR34Y66Ba+eg+YwNxf3BJUEX0aV4LB5w==} + '@tangle-network/agent-eval@0.93.0': + resolution: {integrity: sha512-EPpluyrHTPnBQxi2Mbx6LGkmPLndk8XhsY4iPIxxhuv23GGoL6FYvrwPq/eu+uxTwmfIVwdJsKchk2nj/ke1tA==} engines: {node: '>=20'} hasBin: true peerDependencies: @@ -1394,7 +1394,7 @@ snapshots: - typescript - utf-8-validate - '@tangle-network/agent-eval@0.92.0(@tangle-network/sandbox@0.6.1(viem@2.48.8(typescript@5.9.3)(zod@4.4.2)))(typescript@5.9.3)': + '@tangle-network/agent-eval@0.93.0(@tangle-network/sandbox@0.6.1(viem@2.48.8(typescript@5.9.3)(zod@4.4.2)))(typescript@5.9.3)': dependencies: '@asteasolutions/zod-to-openapi': 8.5.0(zod@4.4.3) '@ax-llm/ax': 19.0.45(zod@4.4.3) diff --git a/src/runtime/index.ts b/src/runtime/index.ts index 12f8b54e..a9f357db 100644 --- a/src/runtime/index.ts +++ b/src/runtime/index.ts @@ -316,6 +316,15 @@ export { // Supervisor-as-MCP: serve the coordination verbs as a real HTTP MCP over a live Scope, so any // harness (claude-code / codex / opencode) BECOMES the supervisor by mounting one MCP server. export { type CoordinationMcpHandle, serveCoordinationMcp } from './supervise/coordination-mcp' +// The online analyst on the worker pipe: folds each live tool step through agent-eval's streaming +// detectors and fires `onSignal` (→ a `finding` on the bus) the moment a worker loops or error-storms. +export { + createDetectorMonitor, + type DetectorMonitor, + type DetectorMonitorOptions, + defaultToolDetectors, + type ToolStep, +} from './supervise/detector-monitor' // The recursive driver-executor: a spawned child can BE a driver (agents drive agents), // resolved through `withDriverExecutor` and run over a nested `Scope` one depth deeper on // the SAME conserved pool. diff --git a/src/runtime/supervise/detector-monitor.ts b/src/runtime/supervise/detector-monitor.ts new file mode 100644 index 00000000..257ca2f9 --- /dev/null +++ b/src/runtime/supervise/detector-monitor.ts @@ -0,0 +1,67 @@ +/** + * @experimental + * + * The online analyst on the worker pipe. As a worker streams its tool calls, this folds each one + * through agent-eval's streaming detector kernel (`repeatedActionDetector` etc. — the SAME kernel the + * control loop uses, published in @tangle-network/agent-eval) and fires `onSignal` the moment one + * trips. The wiring point: `onSignal` raises a `finding` on the coordination bus, so the driver learns + * mid-round that its worker is stuck — instead of finding out only at settle. + * + * Detection logic + the failure taxonomy live in agent-eval (substrate); this module is the thin + * runtime adapter that projects a live tool call into a `DetectorEvent` (the tool name + `argHash` of + * its args → an action fingerprint) and pumps the detectors. No detection logic is reimplemented here. + */ + +import { + argHash, + type DetectorSignal, + errorStreakDetector, + observeAll, + repeatedActionDetector, + type StreamingDetector, +} from '@tangle-network/agent-eval' + +export interface ToolStep { + readonly toolName: string + readonly args: unknown + /** Whether the tool call errored (drives error-streak detection). Omit/`'ok'` when unknown. */ + readonly status?: 'ok' | 'error' +} + +export interface DetectorMonitor { + /** Fold one tool step through the detectors; returns (and fires `onSignal` for) every signal. */ + observeToolStep(step: ToolStep): DetectorSignal[] + reset(): void +} + +export interface DetectorMonitorOptions { + /** The detectors to run online. Defaults to a stuck-loop + error-streak panel. */ + readonly detectors?: ReadonlyArray + /** Fired for each signal a detector raises — the seam that raises a `finding` on the bus. */ + readonly onSignal?: (signal: DetectorSignal) => void | Promise +} + +/** The default online panel for a tool-call pipe: a worker repeating the same call, or hammering + * consecutive errors. (No-progress needs a domain progress-probe, so it is opt-in, not default.) */ +export function defaultToolDetectors(): StreamingDetector[] { + return [repeatedActionDetector({ maxRepeated: 3 }), errorStreakDetector({ maxErrors: 3 })] +} + +export function createDetectorMonitor(opts: DetectorMonitorOptions = {}): DetectorMonitor { + const detectors = opts.detectors ?? defaultToolDetectors() + return { + observeToolStep(step) { + const signals = observeAll(detectors, { + // Same fingerprint scheme as agent-eval's batch stuck-loop view: tool name + hashed args. + actionFingerprint: `${step.toolName}|${argHash(step.args)}`, + ...(step.status ? { status: step.status } : {}), + label: step.toolName, + }) + for (const s of signals) void opts.onSignal?.(s) + return signals + }, + reset() { + for (const d of detectors) d.reset() + }, + } +} diff --git a/src/runtime/supervise/runtime.ts b/src/runtime/supervise/runtime.ts index 7b7dc635..9d9a4421 100644 --- a/src/runtime/supervise/runtime.ts +++ b/src/runtime/supervise/runtime.ts @@ -244,6 +244,13 @@ export interface RouterToolsSeam { model?: string tools: ReadonlyArray executeToolCall: (name: string, args: Record, task: unknown) => Promise + /** Online observer of each tool step — the seam a `DetectorMonitor` taps to watch the live pipe + * (raise a `finding` when the worker loops/errors). Called after every tool call resolves. */ + onToolStep?: (step: { + toolName: string + args: Record + status: 'ok' | 'error' + }) => void /** Max inference turns. Default 200 (runaway backstop — set far above any * legitimate workflow). For tighter per-workflow limits use a cost budget * or wall-clock deadline at the call site. */ @@ -388,8 +395,19 @@ export const routerToolsInlineExecutor: ExecutorFactory = (spec, ctx) = }) continue } - const result = await seam.executeToolCall(tc?.function?.name ?? '', args, task) + const toolName = tc?.function?.name ?? '' + let result: string + let status: 'ok' | 'error' = 'ok' + try { + result = await seam.executeToolCall(toolName, args, task) + } catch (e) { + status = 'error' + result = `error: ${e instanceof Error ? e.message : String(e)}` + } messages.push({ role: 'tool', tool_call_id: id, content: result }) + // Feed the online detector pipe (stuck-loop / error-streak) — a worker repeating the same + // call or hammering errors is caught mid-run, not only at settle. + seam.onToolStep?.({ toolName, args, status }) } } diff --git a/tests/loops/detector-monitor.test.ts b/tests/loops/detector-monitor.test.ts new file mode 100644 index 00000000..9c4d13a5 --- /dev/null +++ b/tests/loops/detector-monitor.test.ts @@ -0,0 +1,47 @@ +import type { DetectorSignal } from '@tangle-network/agent-eval' +import { describe, expect, it } from 'vitest' +import { createDetectorMonitor } from '../../src/runtime' + +describe('detector monitor (online analyst on the worker pipe)', () => { + it('raises a stuck-loop signal when a worker repeats the same tool call', () => { + const signals: DetectorSignal[] = [] + const monitor = createDetectorMonitor({ onSignal: (s) => signals.push(s) }) + // The worker runs the SAME tool with the SAME args three times — a stuck loop. + monitor.observeToolStep({ toolName: 'run_tests', args: { path: 'src/' } }) + monitor.observeToolStep({ toolName: 'run_tests', args: { path: 'src/' } }) + monitor.observeToolStep({ toolName: 'run_tests', args: { path: 'src/' } }) // 3rd → trip + expect(signals).toHaveLength(1) + expect(signals[0]).toMatchObject({ + detector: 'repeated-action', + streak: 3, + failureClass: 'tool_recovery_failure', + evidence: { action: 'run_tests' }, + }) + }) + + it('does NOT signal when args differ (real progress through distinct calls)', () => { + const signals: DetectorSignal[] = [] + const monitor = createDetectorMonitor({ onSignal: (s) => signals.push(s) }) + monitor.observeToolStep({ toolName: 'edit', args: { file: 'a.ts' } }) + monitor.observeToolStep({ toolName: 'edit', args: { file: 'b.ts' } }) + monitor.observeToolStep({ toolName: 'edit', args: { file: 'c.ts' } }) + expect(signals).toHaveLength(0) + }) + + it('raises an error-streak signal when consecutive tool calls error', () => { + const signals: DetectorSignal[] = [] + const monitor = createDetectorMonitor({ onSignal: (s) => signals.push(s) }) + monitor.observeToolStep({ toolName: 'build', args: { n: 1 }, status: 'error' }) + monitor.observeToolStep({ toolName: 'build', args: { n: 2 }, status: 'error' }) + const fired = monitor.observeToolStep({ toolName: 'build', args: { n: 3 }, status: 'error' }) + expect(fired.some((s) => s.detector === 'error-streak' && s.streak === 3)).toBe(true) + }) + + it('reset clears the streak so a fresh run starts clean', () => { + const monitor = createDetectorMonitor() + monitor.observeToolStep({ toolName: 't', args: {} }) + monitor.observeToolStep({ toolName: 't', args: {} }) + monitor.reset() + expect(monitor.observeToolStep({ toolName: 't', args: {} })).toHaveLength(0) // streak back to 1 + }) +}) From d92fe1a65bb7198f854b2982defc313c655f17c9 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 18:36:23 -0600 Subject: [PATCH 07/10] fix(supervise): address #318 review + wire raiseFinding (last mile) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Last mile: createCoordinationTools.raiseFinding (exposed on the MCP handle) — the seam an ONLINE detector uses to publish a finding on the live bus mid-run. Proven end-to-end: a stuck-loop on the worker pipe → monitor → raiseFinding → await_event surfaces it. Review fixes (audit on the earlier commit): - HIGH: AbortSignal.any (needs Node 20.3, floor is 20) → portable mergeAbortSignals. - forceful interrupt: docstring no longer overpromises (aborts in-flight inference, a tool mid-exec finishes first); interrupted turns no longer count toward maxTurns; added the e2e test (forceful steer aborts the turn, re-plans, aborted turn is free). - answer to a BLOCKING question is now delivered forcefully (interrupt) to unpark the worker immediately, not at its next boundary. - sendDown 'answer' now REQUIRES questionId (overload; no silent ?? '' mask). - tool-step status captured (error vs ok) for the error-streak detector. - stale await_next purged from bench prompts + docs; history() docstring drops 'resume'. - added tests: answer delivered:false + return asserted; await_event idle-on-mismatch. full suite 1014 pass; typecheck/build/lint clean. --- bench/src/atom-humaneval.mts | 2 +- bench/src/atom-mcp-e2e.mts | 2 +- bench/src/mcp-mount-probe.mts | 8 +-- docs/architecture-visual.md | 2 +- docs/execution-model.md | 6 +- docs/glossary.md | 8 +-- src/mcp/tools/coordination.ts | 26 ++++++-- src/runtime/supervise/coordination-mcp.ts | 3 + src/runtime/supervise/runtime.ts | 23 ++++++- tests/loops/coordination.test.ts | 78 +++++++++++++++++++++-- tests/loops/inbox.test.ts | 44 +++++++++++++ 11 files changed, 173 insertions(+), 29 deletions(-) diff --git a/bench/src/atom-humaneval.mts b/bench/src/atom-humaneval.mts index 64c43f5f..43d4d6e4 100644 --- a/bench/src/atom-humaneval.mts +++ b/bench/src/atom-humaneval.mts @@ -143,7 +143,7 @@ function humanEvalWorker(task: HumanEvalTask, label: string): Agent