Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Types that stay in THIS repo because they're runtime-shaped (coupled to a runnin
- `run-loop.ts` — `runLoop`, the round-synchronous leaf kernel. Per round: `driver.plan()`→N tasks→one sandbox/iteration (bounded by `maxConcurrency`, round-robin `agentRuns`)→`streamPrompt`→`output.parse`→`validator.validate`→`driver.decide`. Owns iteration accounting, concurrency, abort, cost+token aggregation, trace emission, box teardown. Exports `defaultSelectWinner` (best-valid-score, ties→earliest) — the single-sourced selection the personify combinators reuse.
- `supervise/` — the recursive execution atom (keystone): `Scope` + `Supervisor` over the open `Executor` port, spawn/settle on a **conserved budget pool** so equal-compute holds by construction; journal→replay/resume. `runtime.ts` also holds `createExecutor({backend})` — the ONE built-in executor (backend-as-data: `router`/`router-tools`/`bridge`/`cli`/`sandbox`; `router-tools` is the off-box tool-using agentic loop — chat→tool_calls→`executeToolCall`→repeat — over the router's tool-calling, no sandbox); the per-backend bodies are internal case-arms, BYO agents implement `Executor` directly.
- `personify/` — the content-free generic combinators (`fanout`/`loopUntil`/`widen`/`panel`/`verify`/`pipeline`) + `definePersona`/`runPersonified` + the cross-run `Corpus` + `createScopeAnalyst` (the analyst-on-scope steer firewall).
- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`) AND queued for the driver to pull (`await_event`, kind-filterable; `await_next` is the settled-only view). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend.
- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`, immediate) AND queued for the driver to pull (`await_event`, kind-filterable; `await_next` is the settled-only view). The pull queue is **priority-ordered** — a blocking question (urgency→priority: `blocks-run`=20/`blocks-step`=10) is bumped ahead of queued settles/findings; ties FIFO by `seq`. Observability is first-class: every event is stamped (`seq`/`at`/`priority`), the full `history()` is an audit/replay trail, `stats()` counts throughput (both surfaced on `CoordinationTools` and the MCP handle). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend.

Two substrates coexist for the same "recursive agent decision" atom: the round-synchronous `runLoop` kernel (the leaf, what most sandbox benches drive today) and the reactive `Scope`/`Supervisor`+combinators (the canonical core — the agent-driver, `runAgentic`/`defineStrategy`/`runPersonified`). Prefer the latter for new recursive/keystone work. Both run over the one `Executor` port.

Expand Down
32 changes: 26 additions & 6 deletions src/mcp/tools/coordination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import type {
Settled,
Agent as SuperviseAgent,
} from '../../runtime'
import { createEventBus } from '../../runtime/supervise/event-bus'
import { type BusRecord, type BusStats, createEventBus } from '../../runtime/supervise/event-bus'
import type { McpToolDescriptor } from '../server'

/** A worker the driver has drained via `await_next`. */
Expand Down Expand Up @@ -101,6 +101,11 @@ export interface CoordinationTools {
stopReason(): string | undefined
settled(): ReadonlyArray<SettledWorker>
questions(): ReadonlyArray<QuestionRecord>
/** The full ordered log of every bus event (settled / question / finding) — the observability
* audit + replay trail. Each record carries seq, timestamp, and priority. */
history(): ReadonlyArray<BusRecord<CoordinationEvent>>
/** Bus throughput counters (published / pulled / by-kind) for live dashboards. */
stats(): BusStats
}

const idArg = { type: 'string', description: 'The workerId returned by spawn_worker.' } as const
Expand All @@ -114,11 +119,20 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin
const questions: QuestionRecord[] = []
const questionPolicy = opts.questionPolicy ?? 'auto'

// The one child→parent pipe. `onEvent` (back-compat) becomes a pass-through subscriber, so every
// event kind — question, settled, finding — reaches it, and the driver pulls queued findings /
// questions via `await_event`.
// The one child→parent pipe. `onEvent` (back-compat) becomes a pass-through subscriber receiving
// the bare event, so every kind — question, settled, finding — reaches it immediately, and the
// driver pulls queued findings / questions via `await_event`.
const bus = createEventBus<CoordinationEvent>()
if (opts.onEvent) bus.subscribe(opts.onEvent)
if (opts.onEvent) {
const cb = opts.onEvent
bus.subscribe((rec) => cb(rec.event))
}

// Urgency → bus priority: a blocking question is bumped ahead of queued settles/findings so the
// driver sees it FIRST when it drains the inbox (and pass-through already delivered it the instant
// it was raised). Non-blocking messages share priority 0 and resolve FIFO.
const urgencyPriority = (u: QuestionUrgency): number =>
u === 'blocks-run' ? 20 : u === 'blocks-step' ? 10 : 0

const str = (v: unknown, field: string): string => {
if (typeof v !== 'string' || v.length === 0)
Expand Down Expand Up @@ -245,7 +259,11 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin
question: QuestionRecord
added: boolean
}): Promise<QuestionRecord> => {
if (record.added) await bus.publish({ type: 'question', question: record.question })
if (record.added)
await bus.publish(
{ type: 'question', question: record.question },
{ priority: urgencyPriority(record.question.urgency) },
)
return record.question
}
const decideQuestion = (questionId: string, decision: QuestionDecision): QuestionRecord => {
Expand Down Expand Up @@ -530,6 +548,8 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin

return {
tools,
history: () => bus.history(),
stats: () => bus.stats(),
isStopped: () => stopped,
stopReason: () => reason,
settled: () => ledger,
Expand Down
9 changes: 8 additions & 1 deletion src/runtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,14 @@ export {
} from './supervise/driver-executor'
// The child→parent message bus: the one typed pipe carrying settled outputs, questions, and
// analyst findings up to the driver (pass-through + queued lanes, transport-agnostic).
export { type BusEvent, createEventBus, type EventBus } from './supervise/event-bus'
export {
type BusEvent,
type BusRecord,
type BusStats,
createEventBus,
type EventBus,
type PublishOptions,
} from './supervise/event-bus'
// The production `DriverChat`: adapt the router's tool-calling to the seam a
// `coordinationDriverAgent` drives. The one turnkey piece a consumer needs to run the driver
// brain in-process — tests script a mock `DriverChat`, production passes `routerDriverChat(cfg)`.
Expand Down
7 changes: 7 additions & 0 deletions src/runtime/supervise/coordination-mcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { createMcpServer } from '../../mcp/server'
import {
type AnalystRegistry,
type CoordinationEvent,
type CoordinationTools,
createCoordinationTools,
type MakeWorkerAgent,
type QuestionPolicy,
Expand All @@ -30,6 +31,10 @@ export interface CoordinationMcpHandle {
/** The coordination tools' settled-worker ledger (for the driver's finalize). */
settled(): ReadonlyArray<{ status: string; score?: number; valid?: boolean; outRef?: string }>
isStopped(): boolean
/** The full ordered bus-event log — observability audit + replay trail. */
history: CoordinationTools['history']
/** Bus throughput counters for live dashboards. */
stats: CoordinationTools['stats']
close(): Promise<void>
}

Expand Down Expand Up @@ -112,6 +117,8 @@ export async function serveCoordinationMcp(opts: {
port,
settled: () => coord.settled(),
isStopped: () => coord.isStopped(),
history: () => coord.history(),
stats: () => coord.stats(),
close: () =>
new Promise<void>((resolve) => {
server.close(() => resolve())
Expand Down
119 changes: 98 additions & 21 deletions src/runtime/supervise/event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
*
* The child→parent message bus: the ONE pipe carrying every message a worker, sub-driver, or
* analyst sends up to the driver — settled outputs, questions, and trace-analyst findings. It
* unifies three channels that were ad-hoc before (the settled-worker cursor, the ask-parent
* question channel, and analyst results) into a single typed primitive with two lanes:
* unifies channels that were ad-hoc before (the settled-worker cursor, the ask-parent question
* channel, and analyst results) into a single typed primitive with two lanes:
*
* - PASS-THROUGH (`subscribe`): every published event reaches subscribers immediately — the
* express lane for online steering and live observation (a UI, a hook, the parent's box).
* - STANDBY (`pull`): events also queue so the driver consumes them on its own cadence (and a
* blocking question parks the worker until its answer arrives) — the accumulative lane.
* - STANDBY (`pull`): events also queue so the driver consumes them on its own cadence. The queue
* is PRIORITY-ordered: a higher-`priority` event (a blocking question) is bumped ahead of
* queued settles/findings so the driver sees it first; ties resolve FIFO by publish order.
*
* Observability is first-class (A++): every event is stamped with a monotonic `seq` and wall-clock
* `at`, the full ordered `history()` is retained as an audit/replay trail, and `stats()` exposes
* published/pulled counts by kind. Subscribers receive the stamped record, not a bare event.
*
* The interface is transport-agnostic on purpose. Same box → this in-process queue. Cross box →
* the SAME publish/pull/subscribe surface backed by a durable mailbox on the parent's box (children
Expand All @@ -22,39 +27,111 @@ export interface BusEvent {
readonly type: string
}

/** A published event stamped for ordering and observability. `seq` is the monotonic publish index;
* `priority` drives pull order (higher = bumped ahead); `at` is the wall-clock publish time (ms). */
export interface BusRecord<E extends BusEvent> {
readonly seq: number
readonly at: number
readonly priority: number
readonly event: E
}

export interface PublishOptions {
/** Higher = pulled ahead of lower-priority queued events (default 0). A blocking question sets
* this so it bumps to the front of the driver's inbox. */
readonly priority?: number
}

export interface BusStats {
readonly published: number
readonly pulled: number
/** Count published per event `type`. */
readonly byKind: Readonly<Record<string, number>>
}

export interface EventBus<E extends BusEvent> {
/** Push an event: queue it for `pull`, then deliver to every `subscribe` handler in order. */
publish(event: E): Promise<void>
/** Remove and return the oldest QUEUED event whose type is in `kinds` (any type if omitted);
* `undefined` when nothing matches. The pass-through copy already went to subscribers. */
/** Stamp + queue the event, then deliver the stamped record to every subscriber in order.
* Returns the stamped record. */
publish(event: E, opts?: PublishOptions): Promise<BusRecord<E>>
/** Remove and return the highest-priority QUEUED event whose type is in `kinds` (any if omitted),
* ties broken FIFO by `seq`; `undefined` when nothing matches. */
pull(kinds?: ReadonlyArray<E['type']>): E | undefined
/** Register a pass-through handler; it receives every event published after registration. */
subscribe(handler: (event: E) => void | Promise<void>): void
/** Like `pull` but non-destructive — inspect the next event without consuming it. */
peek(kinds?: ReadonlyArray<E['type']>): E | undefined
/** Register a pass-through handler; it receives the stamped record of every event published after
* registration. Returns an unsubscribe fn. */
subscribe(handler: (record: BusRecord<E>) => void | Promise<void>): () => void
/** Count of queued, not-yet-pulled events (filtered by `kinds` when given). */
pending(kinds?: ReadonlyArray<E['type']>): number
/** The full ordered log of every event ever published (the audit/replay trail). */
history(): ReadonlyArray<BusRecord<E>>
/** Throughput counters for observability dashboards. */
stats(): BusStats
}

export function createEventBus<E extends BusEvent>(): EventBus<E> {
const queue: E[] = []
const subscribers: Array<(event: E) => void | Promise<void>> = []
const matches = (e: E, kinds?: ReadonlyArray<E['type']>) => !kinds || kinds.includes(e.type)
export function createEventBus<E extends BusEvent>(now: () => number = Date.now): EventBus<E> {
const queue: BusRecord<E>[] = []
const log: BusRecord<E>[] = []
const subscribers: Array<(record: BusRecord<E>) => void | Promise<void>> = []
const byKind: Record<string, number> = {}
let seq = 0
let pulled = 0

const matches = (r: BusRecord<E>, kinds?: ReadonlyArray<E['type']>) =>
!kinds || kinds.includes(r.event.type)

// Index of the highest-priority matching record; ties resolve to the earliest `seq` (the queue is
// maintained in publish order, so the first scan hit at the max priority is the oldest).
const bestIndex = (kinds?: ReadonlyArray<E['type']>): number => {
let best = -1
let bestPriority = Number.NEGATIVE_INFINITY
for (let i = 0; i < queue.length; i++) {
const r = queue[i]
if (!r || !matches(r, kinds)) continue
if (r.priority > bestPriority) {
best = i
bestPriority = r.priority
}
}
return best
}

return {
async publish(event) {
queue.push(event)
async publish(event, opts) {
const record: BusRecord<E> = { seq: seq++, at: now(), priority: opts?.priority ?? 0, event }
queue.push(record)
log.push(record)
byKind[event.type] = (byKind[event.type] ?? 0) + 1
// Sequential, not Promise.all: a subscriber that steers off this event must observe a
// consistent order, and a throwing subscriber must not silently drop siblings' delivery.
for (const handler of subscribers) await handler(event)
for (const handler of subscribers) await handler(record)
return record
},
pull(kinds) {
const index = queue.findIndex((e) => matches(e, kinds))
if (index < 0) return undefined
return queue.splice(index, 1)[0]
const i = bestIndex(kinds)
if (i < 0) return undefined
pulled++
return queue.splice(i, 1)[0]?.event
},
peek(kinds) {
const i = bestIndex(kinds)
return i < 0 ? undefined : queue[i]?.event
},
subscribe(handler) {
subscribers.push(handler)
return () => {
const i = subscribers.indexOf(handler)
if (i >= 0) subscribers.splice(i, 1)
}
},
pending(kinds) {
return kinds ? queue.filter((e) => matches(e, kinds)).length : queue.length
return kinds ? queue.filter((r) => matches(r, kinds)).length : queue.length
},
history() {
return log
},
stats() {
return { published: seq, pulled, byKind: { ...byKind } }
},
}
}
37 changes: 37 additions & 0 deletions tests/loops/coordination.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,43 @@ describe('coordination tools', () => {
)
})

it('await_event bumps a blocking question ahead of a non-blocking one (urgency→priority)', async () => {
const { scope } = mockScope()
const tb = createCoordinationTools({
scope,
blobs,
makeWorkerAgent,
perWorker: { maxIterations: 1, maxTokens: 10 },
})
// A low-urgency question is raised first...
await tool(tb, 'ask_parent').handler({
from: 'w-a',
level: 'worker',
question: 'nice-to-know?',
reason: 'minor',
urgency: 'continue-without',
})
// ...then a blocking one. It arrives later but must be pulled FIRST.
await tool(tb, 'ask_parent').handler({
from: 'w-b',
level: 'driver',
question: 'which API version?',
reason: 'blocks the run',
urgency: 'blocks-run',
})
expect(await tool(tb, 'await_event').handler({ kinds: ['question'] })).toMatchObject({
type: 'question',
question: { question: 'which API version?', urgency: 'blocks-run' },
})
expect(await tool(tb, 'await_event').handler({ kinds: ['question'] })).toMatchObject({
type: 'question',
question: { question: 'nice-to-know?' },
})
// The history audit trail recorded both, in publish order, with the bumped priority stamped.
expect(tb.history().map((r) => r.priority)).toEqual([0, 20])
expect(tb.stats()).toMatchObject({ published: 2, pulled: 2, byKind: { question: 2 } })
})

it('analyze-on-settle auto-runs lenses and await_event surfaces settled + finding', async () => {
const { scope } = mockScope()
const settlements = [
Expand Down
Loading
Loading