From 47e995c59a7a1082ebbca1660d7978b74eb96eac Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 14:41:39 -0600 Subject: [PATCH 1/4] feat(supervise): meter the driver's own inference against the conserved pool + A++ spend observability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The driver's chat-LLM tokens — the largest single consumer in an agentic loop — were invisible: routerDriverChat discarded the router's usage, and driver-executor summed only spawned children's spend. So equal-k under-counted any driver arm and maxTurns=0 had no real inference bound. This meters them, end to end, with a clean spend breakdown. - BudgetPool.observe(spend) / observedTotal(): a direct free→committed debit (no reserve/ reconcile ticket) for the drivers' own inference. Preserves total ≡ free + reserved + committed; free may go negative on overspend (honest exhaustion the in-loop guard reads). - Scope.meter(spend, detail): observes the spend AND emits an agent.turn trace event (turn index, tool calls, per-turn tokens/cost) — the live A++ view. All scopes share ONE pool, so root and nested driver inference both land in observedTotal. - DriverTurn gains usage/costUsd; routerDriverChat forwards the router's real usage; the driver meters each turn (when usage is present — a scripted/offline turn meters nothing, so equal-k stays exact in tests). This debit makes maxTurns=0 genuinely pool-bounded: a thinking driver drains the pool → poolStarved halts it (proven by a never-stopping-driver test). - SupervisedResult.winner gains spentBreakdown { driverInference, childWork } and spentTotal now includes inference (spentTotalFromJournal + observedTotal). driverInference + childWork === spentTotal. - Driver turns are NOT charged to the conserved iteration channel (turnSpend.iterations = 0) — maxIterations budgets child rounds, not driver turns; counting them would conflate the two and skew an equal-k iteration count. Turn count stays observable via the agent.turn events. - trajectoryReport gains extraRootSpend: a coordination-driver equal-k arm passes result.spentBreakdown.driverInference so report.total (→ equalKOnCost) matches spentTotal — the journal ledger and the pool ledger agree (closes a latent cross-run equal-k divergence). Adversarially reviewed: conservation SOUND and each token counted exactly once at root AND across nested depth (proven by arithmetic + executed probes). Full suite 1002 pass; lint/typecheck/build green. Built in an isolated worktree. --- src/runtime/personify/trajectory.ts | 16 + src/runtime/personify/wave-types.ts | 8 + src/runtime/supervise/budget.ts | 46 +++ src/runtime/supervise/coordination-driver.ts | 30 +- src/runtime/supervise/router-driver-chat.ts | 4 + src/runtime/supervise/scope.ts | 23 ++ src/runtime/supervise/supervisor.ts | 26 +- src/runtime/supervise/types.ts | 13 + tests/loops/driver-inference-metering.test.ts | 312 ++++++++++++++++++ tests/loops/router-driver-chat.test.ts | 19 ++ 10 files changed, 495 insertions(+), 2 deletions(-) create mode 100644 tests/loops/driver-inference-metering.test.ts diff --git a/src/runtime/personify/trajectory.ts b/src/runtime/personify/trajectory.ts index 88a28fc7..23e9ab7c 100644 --- a/src/runtime/personify/trajectory.ts +++ b/src/runtime/personify/trajectory.ts @@ -101,6 +101,22 @@ export async function trajectoryReport( if (ev.parent === undefined) continue requireNode(nodes, ev.parent, root).children.push(ev.id) } + // Fold the drivers' own metered inference (un-journaled — not a spawned child) onto the root + // node BEFORE roll-up, so `total` matches `SupervisedResult.spentTotal` and the equal-k gate + // counts the driver's tokens. Omitted ⇒ pure journal cost (the fanout/combinator arm case). + if (options.extraRootSpend) { + const r = requireNode(nodes, root, root) + const e = options.extraRootSpend + r.ownSpend = { + iterations: r.ownSpend.iterations + e.iterations, + tokens: { + input: r.ownSpend.tokens.input + e.tokens.input, + output: r.ownSpend.tokens.output + e.tokens.output, + }, + usd: r.ownSpend.usd + e.usd, + ms: r.ownSpend.ms + e.ms, + } + } const rolledUp = rollUpSpend(nodes, root) if (options.withOutputs) { diff --git a/src/runtime/personify/wave-types.ts b/src/runtime/personify/wave-types.ts index 2bcbe7d9..d9351bab 100644 --- a/src/runtime/personify/wave-types.ts +++ b/src/runtime/personify/wave-types.ts @@ -541,6 +541,14 @@ export interface TrajectoryReport { export interface TrajectoryReportOptions { /** Rehydrate each `done` node's `output` from the blob store. Off by default (cost-only report). */ readonly withOutputs?: boolean + /** + * Spend to add to the ROOT node before roll-up — the drivers' OWN inference that `Scope.meter` + * debited against the conserved pool but never journaled (it is not a spawned child). A + * coordination-driver equal-k arm passes `result.spentBreakdown?.driverInference` here so + * `report.total` (→ `equalKOnCost`) matches `SupervisedResult.spentTotal` — the two cost ledgers + * agree. Omit for a fanout/combinator arm (those never meter; the journal sum is the whole cost). + */ + readonly extraRootSpend?: Spend } /** `trajectoryReport(...)` — the tree+cost reconstructor. Async (reads journal + optionally blobs). */ diff --git a/src/runtime/supervise/budget.ts b/src/runtime/supervise/budget.ts index 4293e869..eb2c69e3 100644 --- a/src/runtime/supervise/budget.ts +++ b/src/runtime/supervise/budget.ts @@ -66,6 +66,19 @@ export interface BudgetPool { spendFrom(events: AsyncIterable | UsageEvent[]): Promise /** The current readout, reflecting all outstanding reservations. */ readout(): BudgetReadout + /** + * Record OBSERVED spend that did NOT go through reserve/reconcile — the driver's OWN inference + * (its chat turns), which is real compute but not a spawned child. A direct `free → committed` + * debit, so `total ≡ free + reserved + committed` is preserved: equal-k counts the driver's + * tokens and the in-loop budget guard (`readout().tokensLeft`) sees them. `free` may go negative + * when a run overspends — that is honest (the readout then signals exhaustion). It never throws: + * the spend already happened, so accounting records reality; the in-loop guard prevents MORE. + */ + observe(spend: Spend): void + /** Running total of all `observe`d spend (the drivers' own inference across the whole tree — + * every nested scope shares this ONE pool). Added to `spentTotal` so the reported number + * includes the driver's tokens, separable from spawned-child work. */ + observedTotal(): Spend /** Fail loud if any reservation is still open — the conserved-pool leak detector. Called at the * supervisor's join barrier: once every child has settled, no ticket may remain (a leaked * reservation would silently break `total ≡ free + reserved + committed`). */ @@ -135,6 +148,10 @@ export function createBudgetPool(root: Budget, now: () => number = Date.now): Bu const absoluteDeadlineMs = root.deadlineMs !== undefined ? now() + root.deadlineMs : 0 + // Observed (non-reserved) spend — the drivers' own inference. Tracked separately so it can be + // reported as a distinct line (`observedTotal`) while also debiting the shared conserved pool. + const observed: Spend = { iterations: 0, tokens: { input: 0, output: 0 }, usd: 0, ms: 0 } + let nextTicketId = 0 const open = new Set() @@ -219,6 +236,33 @@ export function createBudgetPool(root: Budget, now: () => number = Date.now): Bu } } + function observe(spend: Spend): void { + const tokens = totalTokens(spend.tokens) + // Direct free → committed debit (no reservation ticket). `free` may go negative on overspend — + // that is honest; the readout then reports exhaustion and the in-loop guard halts the driver. + freeTokens -= tokens + committedTokens += tokens + freeIterations -= spend.iterations + committedIterations += spend.iterations + committedUsd += spend.usd + if (usdCapped) freeUsd -= spend.usd + // Track it as its own line for the spend breakdown. + observed.iterations += spend.iterations + observed.tokens.input += spend.tokens.input + observed.tokens.output += spend.tokens.output + observed.usd += spend.usd + observed.ms += spend.ms + } + + function observedTotal(): Spend { + return { + iterations: observed.iterations, + tokens: { input: observed.tokens.input, output: observed.tokens.output }, + usd: observed.usd, + ms: observed.ms, + } + } + function readout(): BudgetReadout { return { tokensLeft: freeTokens, @@ -241,6 +285,8 @@ export function createBudgetPool(root: Budget, now: () => number = Date.now): Bu reconcile, spendFrom: foldUsage, readout, + observe, + observedTotal, assertNoOpenTickets, } } diff --git a/src/runtime/supervise/coordination-driver.ts b/src/runtime/supervise/coordination-driver.ts index 6d3bbf7b..ab6ffd5d 100644 --- a/src/runtime/supervise/coordination-driver.ts +++ b/src/runtime/supervise/coordination-driver.ts @@ -27,7 +27,7 @@ import { ValidationError } from '../../errors' import type { McpToolDescriptor } from '../../mcp/server' import { createCoordinationTools, type MakeWorkerAgent } from '../../mcp/tools/coordination' -import type { Agent, Budget, ResultBlobStore, Scope } from './types' +import type { Agent, Budget, ResultBlobStore, Scope, Spend } from './types' /** One tool call the driver LLM asks for this turn. */ export interface DriverToolCall { @@ -50,6 +50,12 @@ export interface DriverTurn { readonly toolCalls?: ReadonlyArray /** The driver's natural-language output — the answer when there are no tool calls. */ readonly content?: string + /** The driver LLM's OWN token usage for THIS turn — metered against the conserved pool so the + * driver's inference counts toward equal-k AND the in-loop budget guard. Omit for a scripted/ + * mock turn (no real inference); production `routerDriverChat` forwards it from the router. */ + readonly usage?: { readonly input: number; readonly output: number } + /** The turn's inference cost (usd), when the provider priced it. */ + readonly costUsd?: number } /** The injected driver-LLM seam: one turn over the conversation + the coordination tool specs. */ @@ -153,6 +159,28 @@ export function coordinationDriverAgent(opts: CoordinationDriverOptions): Agent< if (poolStarved(scope, opts.perWorker) || deadlinePassed(scope, now)) break const res = await opts.chat.next({ system, messages, tools: toolSpecs }) const calls = res.toolCalls ?? [] + // Meter the driver's OWN inference for this turn — the largest single token consumer in an + // agentic loop, and the one the conserved pool never saw. Only when the turn carried real + // usage: a scripted/mock turn meters nothing, so offline equal-k stays exact. This debit is + // what makes maxTurns=0 genuinely bounded — a thinking driver drains the pool → poolStarved. + if (res.usage || res.costUsd !== undefined) { + const turnSpend: Spend = { + // iterations:0 — the conserved iteration channel (`maxIterations`) budgets CHILD rounds, + // not driver turns; counting turns there would conflate the two AND make a driver arm's + // iteration count diverge from a blind arm's. The driver is bounded by maxTurns + the + // token/usd pool; its turn COUNT stays observable via the per-turn `agent.turn` events. + iterations: 0, + tokens: { input: res.usage?.input ?? 0, output: res.usage?.output ?? 0 }, + usd: res.costUsd ?? 0, + ms: 0, + } + scope.meter(turnSpend, { + kind: 'driver-inference', + driver: opts.name, + turn, + toolCalls: calls.map((c) => c.name), + }) + } if (calls.length === 0) { // The driver named no tool call — it is finished. Its deliverable is the best DELIVERED // child (the completion-oracle), NOT its own prose: a driver cannot self-declare done diff --git a/src/runtime/supervise/router-driver-chat.ts b/src/runtime/supervise/router-driver-chat.ts index 6aab0f5b..6bde0271 100644 --- a/src/runtime/supervise/router-driver-chat.ts +++ b/src/runtime/supervise/router-driver-chat.ts @@ -24,6 +24,10 @@ export function routerDriverChat(c: RouterConfig, opts: { temperature?: number } const r = await routerChatWithTools(c, oa, oaTools, { temperature, toolChoice: 'auto' }) return { ...(r.content ? { content: r.content } : {}), + // Forward the router's REAL usage + cost so the driver meters its own inference against the + // conserved pool (the integrity hole: these were dropped on the floor before). + ...(r.usage ? { usage: r.usage } : {}), + ...(typeof r.costUsd === 'number' ? { costUsd: r.costUsd } : {}), toolCalls: r.toolCalls.map((tc) => ({ id: tc.id, name: tc.name, diff --git a/src/runtime/supervise/scope.ts b/src/runtime/supervise/scope.ts index 4beb9d30..d5128bd7 100644 --- a/src/runtime/supervise/scope.ts +++ b/src/runtime/supervise/scope.ts @@ -179,6 +179,7 @@ export function createScope(args: ScopeArgs): Scope { // journal's per-tree uniqueness guard (which is scoped to the cursor namespace). let spawnOrdinal = 0 let cursorSeq = 0 + let meterSeq = 0 const now = args.now ?? Date.now function spawn( @@ -359,11 +360,33 @@ export function createScope(args: ScopeArgs): Scope { return true } + function meter(spend: Spend, detail?: Record): void { + // Debit the driver's own inference against the shared conserved pool (free → committed), so + // equal-k counts it and `budget.tokensLeft` reflects it for the in-loop guard. + args.pool.observe(spend) + // Emit it as an `agent.turn` event so the trace/topology view sees per-turn driver inference + // (the same stream `spawn`/`next` feed — one observable tree). + notifyRuntimeHookEvent( + args.hooks, + { + id: `${args.parentId}:meter:${meterSeq++}`, + runId: args.root, + target: 'agent.turn', + phase: 'after', + timestamp: now(), + parentId: args.parentId, + payload: { spend, ...(detail ?? {}) }, + }, + { signal: args.signal }, + ) + } + return { spawn, next, send, signal: args.signal, + meter, get view(): TreeView { return makeTreeView(args.parentId, children) }, diff --git a/src/runtime/supervise/supervisor.ts b/src/runtime/supervise/supervisor.ts index 81680d79..e8a296c7 100644 --- a/src/runtime/supervise/supervisor.ts +++ b/src/runtime/supervise/supervisor.ts @@ -174,12 +174,20 @@ export function createSupervisor(): Supervisor { // driver already selected. const outRef = contentAddress(out) await opts.blobs.put(outRef, out) + // `spentTotal` = the spawned children's reconciled spend (the journal sum) PLUS the drivers' + // OWN inference (metered via `Scope.meter` → `pool.observe`, the run-wide observed total). + // The breakdown keeps the two separable — the A++ view of where the tokens went. + const childWork = await spentTotalFromJournal(journal, opts.runId) + const driverInference = pool.observedTotal() return { kind: 'winner', out, outRef, tree, - spentTotal: await spentTotalFromJournal(journal, opts.runId), + spentTotal: addSpend(childWork, driverInference), + ...(isNonEmptySpend(driverInference) + ? { spentBreakdown: { driverInference, childWork } } + : {}), } } return { @@ -428,3 +436,19 @@ async function spentTotalFromJournal(journal: SpawnJournal, root: string): Promi } return total } + +/** Sum two conserved-spend tallies per channel — the child-work journal sum + the drivers' own + * metered inference, so `spentTotal` is the true cost of the run. */ +function addSpend(a: Spend, b: Spend): Spend { + return { + iterations: a.iterations + b.iterations, + tokens: { input: a.tokens.input + b.tokens.input, output: a.tokens.output + b.tokens.output }, + usd: a.usd + b.usd, + ms: a.ms + b.ms, + } +} + +/** True when any driver metered inference this run (so the winner carries a `spentBreakdown`). */ +function isNonEmptySpend(s: Spend): boolean { + return s.iterations > 0 || s.tokens.input > 0 || s.tokens.output > 0 || s.usd > 0 +} diff --git a/src/runtime/supervise/types.ts b/src/runtime/supervise/types.ts index 0c103e96..a4ad26b8 100644 --- a/src/runtime/supervise/types.ts +++ b/src/runtime/supervise/types.ts @@ -298,6 +298,15 @@ export interface Scope { * it to break promptly (the conserved pool + driver-stop are the other bounds). A nested * scope carries its own signal, chained off its driver child's abort. */ readonly signal: AbortSignal + /** + * Meter the driver's OWN compute against the conserved pool — its inference turns, which are + * real tokens/usd but not a spawned child (no reserve/reconcile). A direct `free → committed` + * debit, so equal-k counts the driver's tokens AND the in-loop budget guard (`budget.tokensLeft`) + * halts a driver that thinks the pool dry. `detail` rides an `agent.turn` trace event for live + * observability (turn index, tool calls, cumulative spend). The supervisor folds the run-wide + * observed total into `spentTotal`. A leaf never calls this; a driver meters each chat turn. + */ + meter(spend: Spend, detail?: Record): void /** The live tree — reads the in-memory nursery, not the journal. */ readonly view: TreeView /** Conserved-pool readouts (post-reservation). */ @@ -427,6 +436,10 @@ export type SupervisedResult = verdict?: DefaultVerdict tree: TreeView spentTotal: Spend + /** Where `spentTotal` went: `driverInference` = the drivers' own chat turns (metered via + * `Scope.meter`); `childWork` = every spawned child's reconciled spend (the journal sum). + * `driverInference + childWork === spentTotal`. Present whenever any driver metered. */ + spentBreakdown?: { driverInference: Spend; childWork: Spend } } | { kind: 'no-winner' diff --git a/tests/loops/driver-inference-metering.test.ts b/tests/loops/driver-inference-metering.test.ts new file mode 100644 index 00000000..94de9dee --- /dev/null +++ b/tests/loops/driver-inference-metering.test.ts @@ -0,0 +1,312 @@ +import type { AgentProfile } from '@tangle-network/sandbox' +import { describe, expect, it } from 'vitest' +import { InMemoryResultBlobStore, InMemorySpawnJournal } from '../../src/durable/spawn-journal' +import { trajectoryReport } from '../../src/runtime/personify/trajectory' +import { createBudgetPool } from '../../src/runtime/supervise/budget' +import { + type CoordinationDriverOptions, + coordinationDriverAgent, + type DriverChat, + type DriverMessage, + type DriverTurn, +} from '../../src/runtime/supervise/coordination-driver' +import { createExecutorRegistry } from '../../src/runtime/supervise/runtime' +import { createSupervisor } from '../../src/runtime/supervise/supervisor' +import type { + Agent, + AgentSpec, + Budget, + Executor, + ExecutorResult, + UsageEvent, +} from '../../src/runtime/supervise/types' +import type { RuntimeHookEvent } from '../../src/runtime-hooks' + +// ── A worker leaf with a known, fixed spend (no network/LLM) ───────────────────── +function workerLeaf( + name: string, + tokens: { input: number; output: number }, +): Agent { + const executor: Executor = { + runtime: 'router', + execute() { + return (async function* (): AsyncGenerator { + yield { kind: 'iteration' } + yield { kind: 'tokens', input: tokens.input, output: tokens.output } + })() + }, + teardown: () => Promise.resolve({ destroyed: true }), + resultArtifact(): ExecutorResult { + return { + outRef: `w:${name}`, + out: { worker: name }, + verdict: { valid: true, score: 1 }, + spent: { iterations: 1, tokens: { ...tokens }, usd: 0, ms: 0 }, + } + }, + } + const spec: AgentSpec = { profile: { name } as AgentProfile, harness: null, executor } + return { name, act: async () => ({ worker: name }), executorSpec: spec } as Agent< + unknown, + unknown + > & { + executorSpec: AgentSpec + } +} + +// ── A scripted driver-LLM that reports per-turn usage (the production shape) ────── +function meteredChat(turns: DriverTurn[]): DriverChat { + let i = 0 + return { + next: async () => { + const t = turns[Math.min(i, turns.length - 1)] ?? {} + i += 1 + return t + }, + } +} + +const perWorker: Budget = { maxIterations: 4, maxTokens: 1000 } + +describe("driver inference metering — the driver's own tokens count against the conserved pool", () => { + it('folds driver inference into spentTotal and exposes the driver-vs-child breakdown', async () => { + const blobs = new InMemoryResultBlobStore() + const journal = new InMemorySpawnJournal() + const worker = workerLeaf('w', { input: 10, output: 5 }) + + // 3 driver turns, each with REAL usage: spawn → await → stop. + const chat = meteredChat([ + { + toolCalls: [{ name: 'spawn_worker', arguments: { profile: {}, task: 'go' } }], + usage: { input: 100, output: 50 }, + costUsd: 0.01, + }, + { + toolCalls: [{ name: 'await_next', arguments: {} }], + usage: { input: 80, output: 40 }, + costUsd: 0.008, + }, + { content: 'delivered', usage: { input: 30, output: 10 }, costUsd: 0.002 }, + ]) + const opts: CoordinationDriverOptions = { + name: 'root', + chat, + blobs, + makeWorkerAgent: () => worker, + perWorker, + systemPrompt: 'drive', + maxTurns: 8, + } + const result = await createSupervisor().run( + coordinationDriverAgent(opts), + 'task', + { + budget: { maxIterations: 100, maxTokens: 100_000, maxUsd: 10 }, + runId: 'meter', + journal, + blobs, + executors: createExecutorRegistry(), + maxDepth: 2, + now: () => 0, + }, + ) + + expect(result.kind).toBe('winner') + if (result.kind !== 'winner') return + + // childWork = the worker's reconciled spend; driverInference = the 3 metered turns. + expect(result.spentBreakdown).toBeDefined() + expect(result.spentBreakdown?.childWork.tokens).toEqual({ input: 10, output: 5 }) + // Driver TOKENS + usd are metered; driver turns are NOT charged to the iteration channel. + expect(result.spentBreakdown?.driverInference.tokens).toEqual({ input: 210, output: 100 }) + expect(result.spentBreakdown?.driverInference.usd).toBeCloseTo(0.02, 6) + expect(result.spentBreakdown?.driverInference.iterations).toBe(0) + // spentTotal = child + driver — the driver's tokens are no longer invisible. + expect(result.spentTotal.tokens).toEqual({ input: 220, output: 105 }) + expect(result.spentTotal.usd).toBeCloseTo(0.02, 6) + expect(result.spentTotal.iterations).toBe(1) // the worker's 1 iteration; driver turns aren't charged here + }) + + it('maxTurns=0 is bounded by inference: a never-stopping driver halts when its OWN tokens drain the pool', async () => { + const blobs = new InMemoryResultBlobStore() + const journal = new InMemorySpawnJournal() + const seen: DriverMessage[][] = [] + // A driver that NEVER stops — only the pool bound can halt it. Each turn spends 200 tokens of + // its OWN inference on a no-spawn tool (list_questions reserves nothing), so only the metered + // inference drains the pool. + let n = 0 + const chat: DriverChat = { + next: async (input) => { + seen.push([...input.messages]) + n += 1 + return { + toolCalls: [{ name: 'list_questions', arguments: {} }], + usage: { input: 150, output: 50 }, + } + }, + } + const opts: CoordinationDriverOptions = { + name: 'root', + chat, + blobs, + makeWorkerAgent: () => workerLeaf('w', { input: 1, output: 1 }), + perWorker: { maxIterations: 4, maxTokens: 500 }, + systemPrompt: 'drive', + maxTurns: 0, // unlimited turn count — the pool is the only bound + } + const result = await createSupervisor().run( + coordinationDriverAgent(opts), + 'never-ending', + { + budget: { maxIterations: 100, maxTokens: 1000 }, // only ~5 turns of 200-token inference fit + runId: 'meter-bound', + journal, + blobs, + executors: createExecutorRegistry(), + maxDepth: 2, + now: () => 0, + }, + ) + + // free: 1000 → 800 → 600 → 400; the guard breaks at the top of turn 3 (free 400 < perWorker 500), + // so the never-stopping driver halts at 3 turns — NOT the 2000 tripwire. Metering made maxTurns=0 + // genuinely bounded by the conserved pool. + expect(seen.length).toBe(3) + expect(result.kind).toBe('no-winner') + expect(n).toBe(3) + }) + + it('emits an agent.turn observability event per metered driver turn (the live A++ view)', async () => { + const blobs = new InMemoryResultBlobStore() + const journal = new InMemorySpawnJournal() + const turnEvents: RuntimeHookEvent[] = [] + const chat = meteredChat([ + { + toolCalls: [{ name: 'spawn_worker', arguments: { profile: {}, task: 'go' } }], + usage: { input: 100, output: 50 }, + costUsd: 0.01, + }, + { toolCalls: [{ name: 'await_next', arguments: {} }], usage: { input: 80, output: 40 } }, + { content: 'done', usage: { input: 30, output: 10 } }, + ]) + const opts: CoordinationDriverOptions = { + name: 'root', + chat, + blobs, + makeWorkerAgent: () => workerLeaf('w', { input: 10, output: 5 }), + perWorker, + systemPrompt: 'drive', + maxTurns: 8, + } + await createSupervisor().run(coordinationDriverAgent(opts), 'task', { + budget: { maxIterations: 100, maxTokens: 100_000, maxUsd: 10 }, + runId: 'meter-obs', + journal, + blobs, + executors: createExecutorRegistry(), + maxDepth: 2, + now: () => 0, + hooks: { + onEvent: (e) => { + if (e.target === 'agent.turn') turnEvents.push(e) + }, + }, + }) + + // One agent.turn event per metered turn, carrying the driver-inference payload (turn index, + // tool calls, spend) — what a topology/cost viewer renders live. + expect(turnEvents.length).toBe(3) + const first = turnEvents[0]!.payload as { + kind: string + driver: string + turn: number + toolCalls: string[] + spend: { tokens: { input: number } } + } + expect(first.kind).toBe('driver-inference') + expect(first.driver).toBe('root') + expect(first.turn).toBe(0) + expect(first.toolCalls).toEqual(['spawn_worker']) + expect(first.spend.tokens.input).toBe(100) + }) +}) + +describe('equal-k ledger reconciliation — trajectoryReport.extraRootSpend folds in driver inference', () => { + it('without extraRootSpend the journal total is child-work only; with it, total matches spentTotal', async () => { + const journal = new InMemorySpawnJournal() + const blobs = new InMemoryResultBlobStore() + const at = new Date(0).toISOString() + await journal.beginTree('arm', at) + // A coordination-driver arm tree: root 'arm' + one worker that spends 10/5 tokens. The driver's + // own inference is NOT a journaled node (it was metered via Scope.meter → pool.observe). + await journal.appendEvent('arm', { + kind: 'spawned', + id: 'arm', + label: 'root', + budget: { maxIterations: 1, maxTokens: 1 }, + runtime: 'router', + seq: 0, + at, + }) + await journal.appendEvent('arm', { + kind: 'spawned', + id: 'arm:s0', + parent: 'arm', + label: 'w', + budget: { maxIterations: 1, maxTokens: 1 }, + runtime: 'router', + seq: 1, + at, + }) + await journal.appendEvent('arm', { + kind: 'settled', + id: 'arm:s0', + status: 'done', + outRef: 'blob:w', + spent: { iterations: 1, tokens: { input: 10, output: 5 }, usd: 0, ms: 0 }, + seq: 0, + at, + }) + + // Default: the journal sum is child-work only — the latent divergence vs SupervisedResult.spentTotal. + const childOnly = await trajectoryReport(journal, blobs, 'arm') + expect(childOnly.total.tokens).toEqual({ input: 10, output: 5 }) + + // Pass the run's driverInference (from result.spentBreakdown) → total now equals spentTotal, + // so equalKOnCost credits the driver arm for its OWN inference. The ledgers agree. + const driverInference = { iterations: 0, tokens: { input: 210, output: 100 }, usd: 0.02, ms: 0 } + const reconciled = await trajectoryReport(journal, blobs, 'arm', { + extraRootSpend: driverInference, + }) + expect(reconciled.total.tokens).toEqual({ input: 220, output: 105 }) + expect(reconciled.total.usd).toBeCloseTo(0.02, 6) + }) +}) + +describe('budget pool — observe() debits the conserved pool, observedTotal() tracks driver inference', () => { + it('moves free → committed (invariant preserved), accumulates observedTotal, and drives tokensLeft negative on overspend', () => { + const pool = createBudgetPool({ maxIterations: 10, maxTokens: 1000, maxUsd: 5 }, () => 0) + expect(pool.readout().tokensLeft).toBe(1000) + + pool.observe({ iterations: 1, tokens: { input: 100, output: 50 }, usd: 0.5, ms: 0 }) + expect(pool.readout().tokensLeft).toBe(850) // 1000 - 150 + expect(pool.observedTotal()).toEqual({ + iterations: 1, + tokens: { input: 100, output: 50 }, + usd: 0.5, + ms: 0, + }) + + // A second observe accumulates and can overshoot — free goes negative, an honest exhaustion signal. + pool.observe({ iterations: 1, tokens: { input: 800, output: 200 }, usd: 1, ms: 0 }) + expect(pool.readout().tokensLeft).toBe(-150) // 850 - 1000 + expect(pool.observedTotal()).toEqual({ + iterations: 2, + tokens: { input: 900, output: 250 }, + usd: 1.5, + ms: 0, + }) + // observe never opens a ticket — the leak detector stays clean. + expect(() => pool.assertNoOpenTickets()).not.toThrow() + }) +}) diff --git a/tests/loops/router-driver-chat.test.ts b/tests/loops/router-driver-chat.test.ts index 2ac433e7..d34ef60d 100644 --- a/tests/loops/router-driver-chat.test.ts +++ b/tests/loops/router-driver-chat.test.ts @@ -115,4 +115,23 @@ describe('routerDriverChat — the production DriverChat seam over the router to const [, , , opts] = routerMock.mock.calls[0]! expect(opts).toEqual({ temperature: 0.1, toolChoice: 'auto' }) }) + + it('forwards the router usage + costUsd so the driver can meter its inference', async () => { + routerMock.mockResolvedValue({ + content: 'x', + toolCalls: [], + usage: { input: 120, output: 45 }, + costUsd: 0.013, + }) + const turn = await routerDriverChat(cfg).next({ system: 'S', messages: [], tools: [] }) + expect(turn.usage).toEqual({ input: 120, output: 45 }) + expect(turn.costUsd).toBe(0.013) + }) + + it('omits usage/costUsd when the router reports none (a scripted/offline turn meters nothing)', async () => { + routerMock.mockResolvedValue({ content: 'x', toolCalls: [] }) + const turn = await routerDriverChat(cfg).next({ system: 'S', messages: [], tools: [] }) + expect(turn.usage).toBeUndefined() + expect(turn.costUsd).toBeUndefined() + }) }) From b9adc546d076133f7538c85afc39b2e01e7a9d52 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 15:43:21 -0600 Subject: [PATCH 2/4] fix(supervise): bound the driver on the usd channel too + review nits Address the PR reviewer's findings on the inference-metering PR: - MEDIUM: poolStarved ignored the usd channel. Now that meter() debits usd via observe(), a usd-capped pool with a large token ceiling (e.g. maxUsd:1, maxTokens:10M) could let the driver overspend usd up to the 2000-turn tripwire. poolStarved now breaks on usd exhaustion too; BudgetReadout gains `usdCapped` so the guard distinguishes a real usdLeft<=0 from an uncapped pool. New test: maxTurns=0 halts on the usd ceiling with the token ceiling untouched. - isNonEmptySpend now checks the ms channel (consistent with addSpend) so the spentBreakdown gate matches the total on every channel. - Tests: assert all three agent.turn events (not just the first); meteredChat returns a STOP turn past the script instead of silently repeating the last; cover the real costUsd:0 forwarding path. Full suite 1004 pass; lint/typecheck/build green. --- src/runtime/supervise/budget.ts | 6 +- src/runtime/supervise/coordination-driver.ts | 14 ++-- src/runtime/supervise/supervisor.ts | 6 +- src/runtime/supervise/types.ts | 1 + tests/loops/driver-inference-metering.test.ts | 65 ++++++++++++++++++- tests/loops/router-driver-chat.test.ts | 12 ++++ 6 files changed, 96 insertions(+), 8 deletions(-) diff --git a/src/runtime/supervise/budget.ts b/src/runtime/supervise/budget.ts index eb2c69e3..416998f1 100644 --- a/src/runtime/supervise/budget.ts +++ b/src/runtime/supervise/budget.ts @@ -37,10 +37,13 @@ export interface ReservationTicket { /** Post-reservation pool readout — the shape `Scope.budget` exposes. `tokensLeft`, * `usdLeft`, and `reservedTokens` reflect committed-but-unsettled reservations; - * `deadlineMs` is the ABSOLUTE wall-clock deadline (0 when the root set none). */ + * `deadlineMs` is the ABSOLUTE wall-clock deadline (0 when the root set none). + * `usdCapped` distinguishes a real `usdLeft <= 0` exhaustion from an uncapped pool (which always + * reads `usdLeft: 0`) — the in-loop guard needs it to bound a usd-capped driver. */ export type BudgetReadout = Readonly<{ tokensLeft: number usdLeft: number + usdCapped: boolean deadlineMs: number reservedTokens: number }> @@ -267,6 +270,7 @@ export function createBudgetPool(root: Budget, now: () => number = Date.now): Bu return { tokensLeft: freeTokens, usdLeft: usdCapped ? freeUsd : 0, + usdCapped, deadlineMs: absoluteDeadlineMs, reservedTokens, } diff --git a/src/runtime/supervise/coordination-driver.ts b/src/runtime/supervise/coordination-driver.ts index ab6ffd5d..7111897d 100644 --- a/src/runtime/supervise/coordination-driver.ts +++ b/src/runtime/supervise/coordination-driver.ts @@ -96,12 +96,18 @@ export interface CoordinationDriverOptions { * are the real bounds; no healthy run approaches this. */ const runawayTripwireTurns = 2000 -/** Spawn-progress is impossible: the pool can't afford another worker AND nothing is in flight - * to await. A long-horizon driver bounded by the conserved pool stops here instead of spinning - * (the in-loop budget guard the turn cap alone never provided). */ +/** Spawn-progress is impossible: the pool can't afford another worker AND nothing is in flight to + * await. A long-horizon driver bounded by the conserved pool stops here instead of spinning (the + * in-loop budget guard the turn cap alone never provided). Checks BOTH conserved channels: tokens + * (can't afford a worker) and usd (a usd-capped pool whose ceiling the driver's own metered + * inference has drained — `meter` debits usd, so without this a huge-token/small-usd pool would + * overspend usd up to the turn tripwire). */ function poolStarved(scope: Scope, perWorker: Budget): boolean { const b = scope.budget - return b.tokensLeft < perWorker.maxTokens && b.reservedTokens <= 0 + if (b.reservedTokens > 0) return false // a child is in flight — await it, don't finalize early + const tokenStarved = b.tokensLeft < perWorker.maxTokens + const usdStarved = b.usdCapped && b.usdLeft <= 0 + return tokenStarved || usdStarved } /** The absolute wall-clock deadline (when the root set one) has passed. */ diff --git a/src/runtime/supervise/supervisor.ts b/src/runtime/supervise/supervisor.ts index e8a296c7..b9de826d 100644 --- a/src/runtime/supervise/supervisor.ts +++ b/src/runtime/supervise/supervisor.ts @@ -448,7 +448,9 @@ function addSpend(a: Spend, b: Spend): Spend { } } -/** True when any driver metered inference this run (so the winner carries a `spentBreakdown`). */ +/** True when any driver metered inference this run (so the winner carries a `spentBreakdown`). + * Checks every channel `addSpend` sums — including `ms` — so the gate stays consistent with the + * total even though the coordination driver currently stamps `ms: 0`. */ function isNonEmptySpend(s: Spend): boolean { - return s.iterations > 0 || s.tokens.input > 0 || s.tokens.output > 0 || s.usd > 0 + return s.iterations > 0 || s.tokens.input > 0 || s.tokens.output > 0 || s.usd > 0 || s.ms > 0 } diff --git a/src/runtime/supervise/types.ts b/src/runtime/supervise/types.ts index a4ad26b8..db94c771 100644 --- a/src/runtime/supervise/types.ts +++ b/src/runtime/supervise/types.ts @@ -313,6 +313,7 @@ export interface Scope { readonly budget: Readonly<{ tokensLeft: number usdLeft: number + usdCapped: boolean deadlineMs: number reservedTokens: number }> diff --git a/tests/loops/driver-inference-metering.test.ts b/tests/loops/driver-inference-metering.test.ts index 94de9dee..22e6f803 100644 --- a/tests/loops/driver-inference-metering.test.ts +++ b/tests/loops/driver-inference-metering.test.ts @@ -59,7 +59,9 @@ function meteredChat(turns: DriverTurn[]): DriverChat { let i = 0 return { next: async () => { - const t = turns[Math.min(i, turns.length - 1)] ?? {} + // Past the script → a no-tool STOP turn (never silently repeat the last turn, which would + // loop forever if that turn carried tool calls). + const t = turns[i] ?? { content: 'stop' } i += 1 return t }, @@ -228,6 +230,67 @@ describe("driver inference metering — the driver's own tokens count against th expect(first.turn).toBe(0) expect(first.toolCalls).toEqual(['spawn_worker']) expect(first.spend.tokens.input).toBe(100) + + // ALL three events carry the right per-turn detail (turn index increments; the stop turn's + // toolCalls are empty) — a typo in the detail spread would otherwise slip past. + const at = (i: number) => + turnEvents[i]!.payload as { + turn: number + toolCalls: string[] + spend: { tokens: { input: number } } + } + expect(at(1).turn).toBe(1) + expect(at(1).toolCalls).toEqual(['await_next']) + expect(at(1).spend.tokens.input).toBe(80) + expect(at(2).turn).toBe(2) + expect(at(2).toolCalls).toEqual([]) // the stop turn named no tool + expect(at(2).spend.tokens.input).toBe(30) + }) + + it('maxTurns=0 is bounded by usd too: a usd-capped pool halts the driver when its inference drains the usd ceiling', async () => { + const blobs = new InMemoryResultBlobStore() + const journal = new InMemorySpawnJournal() + let n = 0 + // A never-stopping driver with a HUGE token ceiling but a small usd cap: only the usd channel + // can bound it. Each turn costs $0.04 (and few tokens), so tokensLeft never trips poolStarved. + const chat: DriverChat = { + next: async () => { + n += 1 + return { + toolCalls: [{ name: 'list_questions', arguments: {} }], + usage: { input: 5, output: 5 }, + costUsd: 0.04, + } + }, + } + const opts: CoordinationDriverOptions = { + name: 'root', + chat, + blobs, + makeWorkerAgent: () => workerLeaf('w', { input: 1, output: 1 }), + perWorker: { maxIterations: 4, maxTokens: 100 }, + systemPrompt: 'drive', + maxTurns: 0, + } + const result = await createSupervisor().run( + coordinationDriverAgent(opts), + 'usd-bound', + { + budget: { maxIterations: 1000, maxTokens: 10_000_000, maxUsd: 0.1 }, // ~2-3 turns of $0.04 fit + runId: 'meter-usd-bound', + journal, + blobs, + executors: createExecutorRegistry(), + maxDepth: 2, + now: () => 0, + }, + ) + + // usdLeft: 0.1 → 0.06 → 0.02 → -0.02; poolStarved's usd arm breaks at the top of turn 3 + // (usdLeft -0.02 <= 0). The driver halts on USD — NOT the 2000-turn tripwire, NOT the token + // ceiling (10M tokens untouched). This is the MEDIUM fix: maxTurns=0 is usd-bounded too. + expect(n).toBe(3) + expect(result.kind).toBe('no-winner') }) }) diff --git a/tests/loops/router-driver-chat.test.ts b/tests/loops/router-driver-chat.test.ts index d34ef60d..a06a246e 100644 --- a/tests/loops/router-driver-chat.test.ts +++ b/tests/loops/router-driver-chat.test.ts @@ -134,4 +134,16 @@ describe('routerDriverChat — the production DriverChat seam over the router to expect(turn.usage).toBeUndefined() expect(turn.costUsd).toBeUndefined() }) + + it('forwards a real costUsd of 0 (a priced model on a free/zero-cost turn — not dropped)', async () => { + routerMock.mockResolvedValue({ + content: 'x', + toolCalls: [], + usage: { input: 10, output: 0 }, + costUsd: 0, + }) + const turn = await routerDriverChat(cfg).next({ system: 'S', messages: [], tools: [] }) + expect(turn.usage).toEqual({ input: 10, output: 0 }) + expect(turn.costUsd).toBe(0) // typeof === 'number' guard forwards 0, doesn't drop it as absent + }) }) From 54bbddea177e6e6715e55d08ce86b6a9272856c2 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 17:18:19 -0600 Subject: [PATCH 3/4] feat(supervise): unify cost accounting on ONE journal ledger (metered event = the twin of observe) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Driver inference lived only in the pool (observe) while child work lived in BOTH the pool (reconcile) and the journal (settled). So two cost ledgers could disagree: the cross-run equal-k gate reads the journal and silently under-counted any coordination-driver arm (the manual extraRootSpend bridge was the symptom). Close it at the root: give the driver's inference its missing journal twin. - New SpawnEvent kind `metered` — a driver's own inference spend, the journal TWIN of the pool's `observe` exactly as `settled` is the twin of `reconcile`. Scope.meter journals it (and is now async/awaited, cost-critical). A driver re-homes its nested subtree's inference up to its parent as one metered event (ExecutorResult.metered → finalizeSettlement), mirroring how settled spend rolls child work up — so summing ANY sub-tree root yields its true driver-inference cost. - ONE ledger = the journal: spentFromJournal sums settled (childWork) + metered (driverInference); trajectoryReport folds metered onto node ownSpend. So spentTotal == trajectoryReport.total == the equal-k gate's number by construction, at any depth. Removed the pool→supervisor bridge (observedTotal, now dead) and trajectoryReport's extraRootSpend (now automatic). - Journal integrity preserved: metered is exempt from the cursor-uniqueness guard, skipped by replay (not a settlement), and folded in a separate additive pass by materialize/trajectory (order-independent). observe stays as the live pool debit for the in-loop guard. Adversarially reviewed SOUND on both journal/replay integrity AND conservation: each token counted exactly once at root AND across nested depth (traced root→mid→sub→worker), nested inference reaches the runId tree once via the re-homed copy, no replay/seq corruption. New nested-driver test proves re-homing (spentTotal 280/160 across 2 levels). Full suite 1005 pass; lint/typecheck/build green. --- src/durable/spawn-journal.ts | 28 +++- src/runtime/personify/trajectory.ts | 40 +++--- src/runtime/personify/wave-types.ts | 8 -- src/runtime/strategy.ts | 4 +- src/runtime/supervise/budget.ts | 28 +--- src/runtime/supervise/coordination-driver.ts | 2 +- src/runtime/supervise/driver-executor.ts | 51 +++++-- src/runtime/supervise/scope.ts | 42 +++++- src/runtime/supervise/supervisor.ts | 44 +++--- src/runtime/supervise/types.ts | 28 +++- tests/loops/driver-inference-metering.test.ts | 133 ++++++++++++++---- 11 files changed, 287 insertions(+), 121 deletions(-) diff --git a/src/durable/spawn-journal.ts b/src/durable/spawn-journal.ts index 5ba5c5b6..b70b5e56 100644 --- a/src/durable/spawn-journal.ts +++ b/src/durable/spawn-journal.ts @@ -274,8 +274,10 @@ type SpawnJournalRecord = * ordinal legitimately equals a later `settled` cursor seq and is not a collision. */ function assertSeqUnique(root: NodeId, events: SpawnEvent[], ev: SpawnEvent): void { - if (ev.kind === 'spawned') return - if (events.some((e) => e.kind !== 'spawned' && e.seq === ev.seq)) { + // `spawned` (ordinal namespace) and `metered` (informational spend, no settlement order) live + // outside the cursor-uniqueness namespace replay relies on. + if (ev.kind === 'spawned' || ev.kind === 'metered') return + if (events.some((e) => e.kind !== 'spawned' && e.kind !== 'metered' && e.seq === ev.seq)) { throw new Error( `spawn journal corrupted: duplicate cursor seq ${ev.seq} in tree '${root}'; ` + 'the cursor order replay relies on is not unique', @@ -313,6 +315,7 @@ export async function replaySpawnTree( const settled: Settled[] = [] for (const ev of ordered) { if (ev.kind === 'spawned') continue + if (ev.kind === 'metered') continue // a spend record, not a settlement — irrelevant to replay if (ev.kind === 'cancelled') { settled.push({ kind: 'down', @@ -386,7 +389,9 @@ export function materializeTreeView(events: SpawnEvent[]): TreeView { const spawns = events .filter((ev): ev is Extract => ev.kind === 'spawned') .sort((a, b) => a.seq - b.seq) - const settlements = events.filter((ev) => ev.kind !== 'spawned').sort((a, b) => a.seq - b.seq) + const settlements = events + .filter((ev) => ev.kind !== 'spawned' && ev.kind !== 'metered') + .sort((a, b) => a.seq - b.seq) for (const ev of spawns) { if (ev.parent === undefined && root === undefined) root = ev.id nodes.set(ev.id, { @@ -410,6 +415,13 @@ export function materializeTreeView(events: SpawnEvent[]): TreeView { node.status = 'cancelled' } } + // Driver inference: a separate pass so it accumulates ONTO the settled child-work base (no + // dependence on metered-vs-settled seq order) without touching node status. + for (const ev of events) { + if (ev.kind !== 'metered') continue + const node = requireNode(nodes, ev.id) + node.spent = addJournalSpend(node.spent, ev.spend) + } const snapshots = [...nodes.values()].map(freezeSnapshot) return { root: root ?? snapshots[0]?.id ?? '', @@ -433,6 +445,16 @@ function zeroSpend(): Spend { return { iterations: 0, tokens: zeroTokenUsage(), usd: 0, ms: 0 } } +/** Add a `metered` spend record onto a node's accumulated spend (per channel). */ +function addJournalSpend(a: Spend, b: Spend): Spend { + return { + iterations: a.iterations + b.iterations, + tokens: { input: a.tokens.input + b.tokens.input, output: a.tokens.output + b.tokens.output }, + usd: a.usd + b.usd, + ms: a.ms + b.ms, + } +} + function requireNode(nodes: Map, id: NodeId): MutableSnapshot { const node = nodes.get(id) if (!node) { diff --git a/src/runtime/personify/trajectory.ts b/src/runtime/personify/trajectory.ts index 23e9ab7c..6de467eb 100644 --- a/src/runtime/personify/trajectory.ts +++ b/src/runtime/personify/trajectory.ts @@ -64,7 +64,10 @@ export async function trajectoryReport( // them. The two seq namespaces overlap, so create every node from its `spawned` event // first, then apply settlements/cancellations — mirrors `materializeTreeView`. const spawns = events.filter(isSpawned).sort(bySeq) - const closes = events.filter((ev) => ev.kind !== 'spawned').sort(bySeq) + // `metered` events (driver inference) are folded onto each node in a separate pass below, so + // they accumulate ONTO the settled child-work base regardless of seq order; closes are the + // settlements/cancellations that set node status. + const closes = events.filter((ev) => ev.kind !== 'spawned' && ev.kind !== 'metered').sort(bySeq) const nodes = new Map() for (const ev of spawns) { @@ -89,6 +92,15 @@ export async function trajectoryReport( node.verdict = ev.verdict node.outRef = ev.outRef } + // Driver inference: add each `metered` event onto its node's ownSpend. Because a driver re-homes + // its nested subtree's inference up the tree, this single tree's events already carry the whole + // sub-tree's driver cost — so `total` matches `SupervisedResult.spentTotal` directly, with no + // caller plumbing. + for (const ev of events) { + if (ev.kind !== 'metered') continue + const node = requireNode(nodes, ev.id, root) + node.ownSpend = addNodeSpend(node.ownSpend, ev.spend) + } if (!nodes.has(root)) { throw new Error( @@ -101,22 +113,6 @@ export async function trajectoryReport( if (ev.parent === undefined) continue requireNode(nodes, ev.parent, root).children.push(ev.id) } - // Fold the drivers' own metered inference (un-journaled — not a spawned child) onto the root - // node BEFORE roll-up, so `total` matches `SupervisedResult.spentTotal` and the equal-k gate - // counts the driver's tokens. Omitted ⇒ pure journal cost (the fanout/combinator arm case). - if (options.extraRootSpend) { - const r = requireNode(nodes, root, root) - const e = options.extraRootSpend - r.ownSpend = { - iterations: r.ownSpend.iterations + e.iterations, - tokens: { - input: r.ownSpend.tokens.input + e.tokens.input, - output: r.ownSpend.tokens.output + e.tokens.output, - }, - usd: r.ownSpend.usd + e.usd, - ms: r.ownSpend.ms + e.ms, - } - } const rolledUp = rollUpSpend(nodes, root) if (options.withOutputs) { @@ -272,6 +268,16 @@ function zeroSpend(): Spend { return { iterations: 0, tokens: zeroTokenUsage(), usd: 0, ms: 0 } } +/** Add a `metered` event's spend onto a node's accumulated ownSpend (per channel). */ +function addNodeSpend(a: Spend, b: Spend): Spend { + return { + iterations: a.iterations + b.iterations, + tokens: { input: a.tokens.input + b.tokens.input, output: a.tokens.output + b.tokens.output }, + usd: a.usd + b.usd, + ms: a.ms + b.ms, + } +} + function cloneSpend(spend: Spend): Spend { return { iterations: spend.iterations, diff --git a/src/runtime/personify/wave-types.ts b/src/runtime/personify/wave-types.ts index d9351bab..2bcbe7d9 100644 --- a/src/runtime/personify/wave-types.ts +++ b/src/runtime/personify/wave-types.ts @@ -541,14 +541,6 @@ export interface TrajectoryReport { export interface TrajectoryReportOptions { /** Rehydrate each `done` node's `output` from the blob store. Off by default (cost-only report). */ readonly withOutputs?: boolean - /** - * Spend to add to the ROOT node before roll-up — the drivers' OWN inference that `Scope.meter` - * debited against the conserved pool but never journaled (it is not a spawned child). A - * coordination-driver equal-k arm passes `result.spentBreakdown?.driverInference` here so - * `report.total` (→ `equalKOnCost`) matches `SupervisedResult.spentTotal` — the two cost ledgers - * agree. Omit for a fanout/combinator arm (those never meter; the journal sum is the whole cost). - */ - readonly extraRootSpend?: Spend } /** `trajectoryReport(...)` — the tree+cost reconstructor. Async (reads journal + optionally blobs). */ diff --git a/src/runtime/strategy.ts b/src/runtime/strategy.ts index 4519172c..93130038 100644 --- a/src/runtime/strategy.ts +++ b/src/runtime/strategy.ts @@ -1003,8 +1003,8 @@ export async function runAgentic(opts: RunAgenticOptions): Promise return { ...core, diff --git a/src/runtime/supervise/budget.ts b/src/runtime/supervise/budget.ts index 416998f1..1467b718 100644 --- a/src/runtime/supervise/budget.ts +++ b/src/runtime/supervise/budget.ts @@ -76,12 +76,10 @@ export interface BudgetPool { * tokens and the in-loop budget guard (`readout().tokensLeft`) sees them. `free` may go negative * when a run overspends — that is honest (the readout then signals exhaustion). It never throws: * the spend already happened, so accounting records reality; the in-loop guard prevents MORE. + * The DURABLE record is the journal's `metered` event (written by `Scope.meter`); this debit + * only makes the live `readout()` reflect driver inference for the in-loop guard. */ observe(spend: Spend): void - /** Running total of all `observe`d spend (the drivers' own inference across the whole tree — - * every nested scope shares this ONE pool). Added to `spentTotal` so the reported number - * includes the driver's tokens, separable from spawned-child work. */ - observedTotal(): Spend /** Fail loud if any reservation is still open — the conserved-pool leak detector. Called at the * supervisor's join barrier: once every child has settled, no ticket may remain (a leaked * reservation would silently break `total ≡ free + reserved + committed`). */ @@ -151,10 +149,6 @@ export function createBudgetPool(root: Budget, now: () => number = Date.now): Bu const absoluteDeadlineMs = root.deadlineMs !== undefined ? now() + root.deadlineMs : 0 - // Observed (non-reserved) spend — the drivers' own inference. Tracked separately so it can be - // reported as a distinct line (`observedTotal`) while also debiting the shared conserved pool. - const observed: Spend = { iterations: 0, tokens: { input: 0, output: 0 }, usd: 0, ms: 0 } - let nextTicketId = 0 const open = new Set() @@ -243,27 +237,14 @@ export function createBudgetPool(root: Budget, now: () => number = Date.now): Bu const tokens = totalTokens(spend.tokens) // Direct free → committed debit (no reservation ticket). `free` may go negative on overspend — // that is honest; the readout then reports exhaustion and the in-loop guard halts the driver. + // The DURABLE record of this spend is the journal's `metered` event (the twin written by + // `Scope.meter`); this debit exists only to make the live `readout()` reflect driver inference. freeTokens -= tokens committedTokens += tokens freeIterations -= spend.iterations committedIterations += spend.iterations committedUsd += spend.usd if (usdCapped) freeUsd -= spend.usd - // Track it as its own line for the spend breakdown. - observed.iterations += spend.iterations - observed.tokens.input += spend.tokens.input - observed.tokens.output += spend.tokens.output - observed.usd += spend.usd - observed.ms += spend.ms - } - - function observedTotal(): Spend { - return { - iterations: observed.iterations, - tokens: { input: observed.tokens.input, output: observed.tokens.output }, - usd: observed.usd, - ms: observed.ms, - } } function readout(): BudgetReadout { @@ -290,7 +271,6 @@ export function createBudgetPool(root: Budget, now: () => number = Date.now): Bu spendFrom: foldUsage, readout, observe, - observedTotal, assertNoOpenTickets, } } diff --git a/src/runtime/supervise/coordination-driver.ts b/src/runtime/supervise/coordination-driver.ts index 7111897d..8a5453a0 100644 --- a/src/runtime/supervise/coordination-driver.ts +++ b/src/runtime/supervise/coordination-driver.ts @@ -180,7 +180,7 @@ export function coordinationDriverAgent(opts: CoordinationDriverOptions): Agent< usd: res.costUsd ?? 0, ms: 0, } - scope.meter(turnSpend, { + await scope.meter(turnSpend, { kind: 'driver-inference', driver: opts.name, turn, diff --git a/src/runtime/supervise/driver-executor.ts b/src/runtime/supervise/driver-executor.ts index 5a798069..06a32c56 100644 --- a/src/runtime/supervise/driver-executor.ts +++ b/src/runtime/supervise/driver-executor.ts @@ -148,10 +148,15 @@ export const driverExecutorFactory: ExecutorFactory = (spec, ctx) => { // `scope.next()`; a thrown `act` propagates so the PARENT scope types it into a down. const out = await driver.act(task, nestedScope) - // Read the nested tree's settled events ONCE — the same evidence the supervisor's - // `spentTotal` reads — and roll up both the conserved spend AND the delivery verdict. - const settled = await loadSettled(journal, nestedRoot) + // Read the nested tree's events ONCE. Two roll-ups, kept separate so the conserved invariant + // is not double-charged: + // - `spent` = settled child WORK → reconciled against THIS driver's reservation (as before). + // - `metered` = the nested subtree's driver INFERENCE → re-homed by the parent scope as a + // `metered` event, NOT reconciled (already pool-debited live via `observe`). + const events = await loadTreeEvents(journal, nestedRoot) + const settled = events.filter(isSettled) const spent = sumSpend(settled) + const metered = sumMetered(events) // Completion-oracle propagation: a driver "delivered" iff at least one of its DIRECT // children settled `valid` (the child its keep-best finalize returns). Deriving the // driver child's verdict this way composes delivery UP the recursion — a sub-driver is @@ -163,6 +168,7 @@ export const driverExecutorFactory: ExecutorFactory = (spec, ctx) => { out, spent, ...(verdict ? { verdict } : {}), + ...(isNonZeroSpend(metered) ? { metered } : {}), } return artifact }, @@ -222,25 +228,24 @@ function nextNestOrdinal(journal: SpawnJournal): number { return c.n++ } -/** The nested tree's `settled` events — the one evidence list the spend AND verdict roll-ups - * both read off the same journal the supervisor sums. */ -async function loadSettled( - journal: SpawnJournal, - nestedRoot: string, -): Promise[]> { +/** The nested tree's full event list — the one evidence the spend, verdict, AND driver-inference + * roll-ups read off the same journal the supervisor sums. */ +async function loadTreeEvents(journal: SpawnJournal, nestedRoot: string): Promise { const events = await journal.loadTree(nestedRoot) if (events === undefined) { throw new ValidationError( `driverExecutor: nested tree '${nestedRoot}' missing from the journal after run (corrupted log)`, ) } - return events.filter( - (ev): ev is Extract => ev.kind === 'settled', - ) + return events +} + +function isSettled(ev: SpawnEvent): ev is Extract { + return ev.kind === 'settled' } /** Sum the conserved spend over the nested tree's settled events — the honest per-channel - * roll-up of the whole sub-tree. */ + * roll-up of the whole sub-tree's child WORK. */ function sumSpend(settled: ReadonlyArray<{ spent: Spend }>): Spend { const total: Spend = { iterations: 0, tokens: { input: 0, output: 0 }, usd: 0, ms: 0 } for (const ev of settled) { @@ -253,6 +258,26 @@ function sumSpend(settled: ReadonlyArray<{ spent: Spend }>): Spend { return total } +/** Sum the nested tree's `metered` events — the sub-tree's whole driver INFERENCE (this driver's + * own turns + any sub-driver inference already re-homed into this tree). Re-homed up to the parent + * as one `metered` event; never reconciled (already pool-debited live via `observe`). */ +function sumMetered(events: ReadonlyArray): Spend { + const total: Spend = { iterations: 0, tokens: { input: 0, output: 0 }, usd: 0, ms: 0 } + for (const ev of events) { + if (ev.kind !== 'metered') continue + total.iterations += ev.spend.iterations + total.tokens.input += ev.spend.tokens.input + total.tokens.output += ev.spend.tokens.output + total.usd += ev.spend.usd + total.ms += ev.spend.ms + } + return total +} + +function isNonZeroSpend(s: Spend): boolean { + return s.iterations > 0 || s.tokens.input > 0 || s.tokens.output > 0 || s.usd > 0 || s.ms > 0 +} + /** Derive the driver child's delivery verdict from its DIRECT children's settlements: * `valid` iff any direct child settled `done` AND `valid` (the keep-best finalize's pick); * `score` = the best delivered score. Returns `undefined` when no child settled at all (the diff --git a/src/runtime/supervise/scope.ts b/src/runtime/supervise/scope.ts index d5128bd7..5615ef3b 100644 --- a/src/runtime/supervise/scope.ts +++ b/src/runtime/supervise/scope.ts @@ -111,7 +111,16 @@ interface LiveChild { /** A child's terminal settlement before the cursor stamps the monotonic `seq`. */ type PreSeqSettled = - | { kind: 'done'; out: unknown; outRef: string; verdict?: DefaultVerdict; spent: Spend } + | { + kind: 'done' + out: unknown + outRef: string + verdict?: DefaultVerdict + spent: Spend + /** A driver child's OWN-inference subtree total (from `ExecutorResult.metered`) — journaled + * as a `metered` event for this node, NOT reconciled (already debited live via `observe`). */ + metered?: Spend + } | { kind: 'down'; reason: string; infra: boolean; restartCount: number } /** @@ -360,16 +369,28 @@ export function createScope(args: ScopeArgs): Scope { return true } - function meter(spend: Spend, detail?: Record): void { + async function meter(spend: Spend, detail?: Record): Promise { + const seq = meterSeq++ // Debit the driver's own inference against the shared conserved pool (free → committed), so - // equal-k counts it and `budget.tokensLeft` reflects it for the in-loop guard. + // equal-k counts it live and `budget.tokensLeft` reflects it for the in-loop guard. args.pool.observe(spend) + // Journal it as a `metered` event — the durable TWIN of the pool debit (as `settled` is the + // twin of `reconcile`), so every journal-based cost reader sums driver inference automatically. + // Awaited like the settled append (cost-critical), so it has landed before the supervisor's + // join-barrier cost roll-up. + await args.journal.appendEvent(args.root, { + kind: 'metered', + id: args.parentId, + spend, + seq, + at: new Date(now()).toISOString(), + }) // Emit it as an `agent.turn` event so the trace/topology view sees per-turn driver inference // (the same stream `spawn`/`next` feed — one observable tree). notifyRuntimeHookEvent( args.hooks, { - id: `${args.parentId}:meter:${meterSeq++}`, + id: `${args.parentId}:meter:${seq}`, runId: args.root, target: 'agent.turn', phase: 'after', @@ -467,6 +488,18 @@ async function finalizeSettlement( seq, at: new Date(now()).toISOString(), }) + // Re-home a driver child's OWN-inference subtree total up to THIS (parent) tree as a `metered` + // event for the child node — mirroring how `settled.spent` rolls child WORK up. So summing any + // sub-tree root yields its true driver-inference cost, NOT reconciled (already pool-debited). + if (settlement.metered) { + await args.journal.appendEvent(args.root, { + kind: 'metered', + id: child.id, + spend: settlement.metered, + seq, + at: new Date(now()).toISOString(), + }) + } notifyRuntimeHookEvent( args.hooks, { @@ -566,6 +599,7 @@ async function runChild( outRef, ...(artifact.verdict ? { verdict: artifact.verdict } : {}), spent: live.spent, + ...(artifact.metered ? { metered: artifact.metered } : {}), } } catch (err) { // Reconcile the (likely partial) spend so the reservation is refunded even on a throw. diff --git a/src/runtime/supervise/supervisor.ts b/src/runtime/supervise/supervisor.ts index b9de826d..2c7d06b5 100644 --- a/src/runtime/supervise/supervisor.ts +++ b/src/runtime/supervise/supervisor.ts @@ -28,9 +28,9 @@ * no-winner; it does not restart anything. * * Selection lives in the driver, not here (selector≠judge): `act` returns the synthesized - * winner `Out`. The supervisor content-addresses that `Out` for its replay `outRef`, - * reads `spentTotal` off the conserved pool, and wraps it as a typed `winner` — it does - * not re-rank children behind the driver's back. + * winner `Out`. The supervisor content-addresses that `Out` for its replay `outRef`, reads + * `spentTotal` off the journal (`settled` child work + `metered` driver inference), and wraps + * it as a typed `winner` — it does not re-rank children behind the driver's back. */ import { contentAddress } from '../../durable/spawn-journal' @@ -174,11 +174,10 @@ export function createSupervisor(): Supervisor { // driver already selected. const outRef = contentAddress(out) await opts.blobs.put(outRef, out) - // `spentTotal` = the spawned children's reconciled spend (the journal sum) PLUS the drivers' - // OWN inference (metered via `Scope.meter` → `pool.observe`, the run-wide observed total). - // The breakdown keeps the two separable — the A++ view of where the tokens went. - const childWork = await spentTotalFromJournal(journal, opts.runId) - const driverInference = pool.observedTotal() + // ONE ledger: the journal. `settled` events carry spawned-child WORK; `metered` events carry + // the drivers' OWN inference (the twin of `pool.observe`). `spentTotal` is their sum and the + // breakdown keeps the two separable — the A++ view of where the tokens went. No pool bridge. + const { childWork, driverInference } = await spentFromJournal(journal, opts.runId) return { kind: 'winner', out, @@ -418,23 +417,34 @@ function poolExhausted(pool: BudgetPool, opts: SupervisorOpts): boolean { * loud if the tree was never journaled (the supervisor always `beginTree`s, so a missing * tree is a corrupted journal, not a normal path). */ -async function spentTotalFromJournal(journal: SpawnJournal, root: string): Promise { +async function spentFromJournal( + journal: SpawnJournal, + root: string, +): Promise<{ childWork: Spend; driverInference: Spend }> { const events = await journal.loadTree(root) if (events === undefined) { throw new RuntimeRunStateError( `supervisor: spawn tree '${root}' is missing from the journal after run (corrupted log)`, ) } - const total: Spend = { iterations: 0, tokens: { input: 0, output: 0 }, usd: 0, ms: 0 } + const childWork: Spend = { iterations: 0, tokens: { input: 0, output: 0 }, usd: 0, ms: 0 } + const driverInference: Spend = { iterations: 0, tokens: { input: 0, output: 0 }, usd: 0, ms: 0 } for (const ev of events) { - if (ev.kind !== 'settled') continue - total.iterations += ev.spent.iterations - total.tokens.input += ev.spent.tokens.input - total.tokens.output += ev.spent.tokens.output - total.usd += ev.spent.usd - total.ms += ev.spent.ms + // `settled` = spawned-child work (reconciled); `metered` = driver inference (re-homed up the + // tree, so this single root-tree pass already includes every nested driver's inference). + if (ev.kind === 'settled') accumulate(childWork, ev.spent) + else if (ev.kind === 'metered') accumulate(driverInference, ev.spend) } - return total + return { childWork, driverInference } +} + +/** Add `b` into `a` in place, per channel. */ +function accumulate(a: Spend, b: Spend): void { + a.iterations += b.iterations + a.tokens.input += b.tokens.input + a.tokens.output += b.tokens.output + a.usd += b.usd + a.ms += b.ms } /** Sum two conserved-spend tallies per channel — the child-work journal sum + the drivers' own diff --git a/src/runtime/supervise/types.ts b/src/runtime/supervise/types.ts index db94c771..06f079ff 100644 --- a/src/runtime/supervise/types.ts +++ b/src/runtime/supervise/types.ts @@ -110,6 +110,11 @@ export interface ExecutorResult { out: Out verdict?: DefaultVerdict spent: Spend + /** A driver-executor's OWN-inference subtree total, rolled up from its nested tree's `metered` + * events. The parent scope journals it as a `metered` event for this node — NOT reconciled (it + * was already debited live via the pool's `observe`), so it never trips the reservation clamp. + * Leaf executors omit it. */ + metered?: Spend } /** @@ -303,10 +308,13 @@ export interface Scope { * real tokens/usd but not a spawned child (no reserve/reconcile). A direct `free → committed` * debit, so equal-k counts the driver's tokens AND the in-loop budget guard (`budget.tokensLeft`) * halts a driver that thinks the pool dry. `detail` rides an `agent.turn` trace event for live - * observability (turn index, tool calls, cumulative spend). The supervisor folds the run-wide - * observed total into `spentTotal`. A leaf never calls this; a driver meters each chat turn. + * observability (turn index, tool calls, cumulative spend). It also journals a `metered` event — + * the durable twin of the pool debit (as `settled` is the twin of `reconcile`) — so every + * journal-based cost reader (`spentFromJournal`, `trajectoryReport`) sums driver inference + * automatically. A leaf never calls this; a driver meters each chat turn and awaits it (the + * metered event is cost-critical, so it lands before the join-barrier roll-up). */ - meter(spend: Spend, detail?: Record): void + meter(spend: Spend, detail?: Record): Promise /** The live tree — reads the in-memory nursery, not the journal. */ readonly view: TreeView /** Conserved-pool readouts (post-reservation). */ @@ -370,6 +378,20 @@ export type SpawnEvent = at: string } | { kind: 'cancelled'; id: NodeId; reason: string; seq: number; at: string } + | { + /** A driver's OWN inference spend, journaled separately from spawned-child work — the journal + * TWIN of `BudgetPool.observe`, exactly as `settled` is the twin of `reconcile`. So every + * journal-based cost reader sums it automatically — the journal is the single cost ledger. + * It carries spend only and is NOT a settlement: replay + `materializeTreeView` skip it for + * structure, and its `seq` lives outside the cursor-uniqueness namespace. A + * driver re-homes its nested subtree's metered total up to its parent (like settled spend), + * so summing any sub-tree root yields that sub-tree's true driver-inference cost. */ + kind: 'metered' + id: NodeId + spend: Spend + seq: number + at: string + } /** * The spawn-tree event source (mirrors `ConversationJournal`'s begin/append/load shape). diff --git a/tests/loops/driver-inference-metering.test.ts b/tests/loops/driver-inference-metering.test.ts index 22e6f803..6b3e16af 100644 --- a/tests/loops/driver-inference-metering.test.ts +++ b/tests/loops/driver-inference-metering.test.ts @@ -10,6 +10,7 @@ import { type DriverMessage, type DriverTurn, } from '../../src/runtime/supervise/coordination-driver' +import { driverChild, withDriverExecutor } from '../../src/runtime/supervise/driver-executor' import { createExecutorRegistry } from '../../src/runtime/supervise/runtime' import { createSupervisor } from '../../src/runtime/supervise/supervisor' import type { @@ -129,6 +130,83 @@ describe("driver inference metering — the driver's own tokens count against th expect(result.spentTotal.iterations).toBe(1) // the worker's 1 iteration; driver turns aren't charged here }) + it('re-homes a NESTED sub-driver inference up the tree: spentTotal counts every level (no silent undercount at depth)', async () => { + const blobs = new InMemoryResultBlobStore() + const journal = new InMemorySpawnJournal() + const worker = workerLeaf('leaf', { input: 10, output: 5 }) + + // root driver → mid sub-driver → worker leaf. The recursive resolver: a 'driver' profile becomes + // a driverChild wrapping another coordinationDriverAgent; a 'worker' profile becomes the leaf. + type P = { kind: 'driver'; name: string; turns: DriverTurn[] } | { kind: 'worker' } + const driverOf = (name: string, chat: DriverChat): CoordinationDriverOptions => ({ + name, + chat, + blobs, + makeWorkerAgent: makeAgent, + perWorker, + systemPrompt: 'drive', + maxTurns: 8, + }) + function makeAgent(raw: unknown): Agent { + const p = raw as P + if (p?.kind === 'driver') { + return driverChild( + p.name, + coordinationDriverAgent(driverOf(p.name, meteredChat(p.turns))), + journal, + ) + } + return worker + } + + // mid sub-driver inference = 60/40 + 30/20 + 10/5 = 100/65. + const midProfile: P = { + kind: 'driver', + name: 'mid', + turns: [ + { + toolCalls: [ + { name: 'spawn_worker', arguments: { profile: { kind: 'worker' }, task: 'sub' } }, + ], + usage: { input: 60, output: 40 }, + }, + { toolCalls: [{ name: 'await_next', arguments: {} }], usage: { input: 30, output: 20 } }, + { content: 'mid done', usage: { input: 10, output: 5 } }, + ], + } + // root driver inference = 100/50 + 50/30 + 20/10 = 170/90. + const rootChat = meteredChat([ + { + toolCalls: [{ name: 'spawn_worker', arguments: { profile: midProfile, task: 'go' } }], + usage: { input: 100, output: 50 }, + }, + { toolCalls: [{ name: 'await_next', arguments: {} }], usage: { input: 50, output: 30 } }, + { content: 'root done', usage: { input: 20, output: 10 } }, + ]) + + const result = await createSupervisor().run( + coordinationDriverAgent(driverOf('root', rootChat)), + 'task', + { + budget: { maxIterations: 100, maxTokens: 100_000 }, + runId: 'nested', + journal, + blobs, + executors: withDriverExecutor(createExecutorRegistry()), + maxDepth: 4, + now: () => 0, + }, + ) + + expect(result.kind).toBe('winner') + if (result.kind !== 'winner') return + // childWork = the worker (10/5). driverInference = root (170/90) + mid (100/65) = 270/155 — + // the mid sub-driver's inference re-homed up to the root tree. spentTotal = 280/160. + expect(result.spentBreakdown?.childWork.tokens).toEqual({ input: 10, output: 5 }) + expect(result.spentBreakdown?.driverInference.tokens).toEqual({ input: 270, output: 155 }) + expect(result.spentTotal.tokens).toEqual({ input: 280, output: 160 }) + }) + it('maxTurns=0 is bounded by inference: a never-stopping driver halts when its OWN tokens drain the pool', async () => { const blobs = new InMemoryResultBlobStore() const journal = new InMemorySpawnJournal() @@ -294,8 +372,8 @@ describe("driver inference metering — the driver's own tokens count against th }) }) -describe('equal-k ledger reconciliation — trajectoryReport.extraRootSpend folds in driver inference', () => { - it('without extraRootSpend the journal total is child-work only; with it, total matches spentTotal', async () => { +describe('equal-k ledger — trajectoryReport sums driver inference from the journal automatically', () => { + it('trajectoryReport.total includes driver inference from the journal automatically, matching spentTotal', async () => { const journal = new InMemorySpawnJournal() const blobs = new InMemoryResultBlobStore() const at = new Date(0).toISOString() @@ -330,45 +408,42 @@ describe('equal-k ledger reconciliation — trajectoryReport.extraRootSpend fold seq: 0, at, }) - - // Default: the journal sum is child-work only — the latent divergence vs SupervisedResult.spentTotal. - const childOnly = await trajectoryReport(journal, blobs, 'arm') - expect(childOnly.total.tokens).toEqual({ input: 10, output: 5 }) - - // Pass the run's driverInference (from result.spentBreakdown) → total now equals spentTotal, - // so equalKOnCost credits the driver arm for its OWN inference. The ledgers agree. - const driverInference = { iterations: 0, tokens: { input: 210, output: 100 }, usd: 0.02, ms: 0 } - const reconciled = await trajectoryReport(journal, blobs, 'arm', { - extraRootSpend: driverInference, + // The driver's OWN inference is a `metered` event on the root node — exactly what Scope.meter + // journals each turn. ONE ledger: trajectoryReport reads it automatically, no caller plumbing. + await journal.appendEvent('arm', { + kind: 'metered', + id: 'arm', + spend: { iterations: 0, tokens: { input: 210, output: 100 }, usd: 0.02, ms: 0 }, + seq: 0, + at, }) - expect(reconciled.total.tokens).toEqual({ input: 220, output: 105 }) - expect(reconciled.total.usd).toBeCloseTo(0.02, 6) + + // trajectoryReport.total (→ equalKOnCost) now includes the driver inference by construction — + // childWork (10/5) + driverInference (210/100) = 220/105 — matching SupervisedResult.spentTotal. + const report = await trajectoryReport(journal, blobs, 'arm') + expect(report.total.tokens).toEqual({ input: 220, output: 105 }) + expect(report.total.usd).toBeCloseTo(0.02, 6) + // The root node's ownSpend carries the inference; the worker node carries the child work. + const rootNode = report.nodes.find((n) => n.id === 'arm') + expect(rootNode?.ownSpend.tokens).toEqual({ input: 210, output: 100 }) }) }) -describe('budget pool — observe() debits the conserved pool, observedTotal() tracks driver inference', () => { - it('moves free → committed (invariant preserved), accumulates observedTotal, and drives tokensLeft negative on overspend', () => { +describe('budget pool — observe() debits the conserved pool for the live in-loop guard', () => { + it('moves free → committed (invariant preserved) and drives the readout negative on overspend', () => { const pool = createBudgetPool({ maxIterations: 10, maxTokens: 1000, maxUsd: 5 }, () => 0) expect(pool.readout().tokensLeft).toBe(1000) + expect(pool.readout().usdLeft).toBe(5) pool.observe({ iterations: 1, tokens: { input: 100, output: 50 }, usd: 0.5, ms: 0 }) expect(pool.readout().tokensLeft).toBe(850) // 1000 - 150 - expect(pool.observedTotal()).toEqual({ - iterations: 1, - tokens: { input: 100, output: 50 }, - usd: 0.5, - ms: 0, - }) + expect(pool.readout().usdLeft).toBe(4.5) - // A second observe accumulates and can overshoot — free goes negative, an honest exhaustion signal. + // A second observe overshoots — free goes negative, the honest exhaustion signal poolStarved reads. pool.observe({ iterations: 1, tokens: { input: 800, output: 200 }, usd: 1, ms: 0 }) expect(pool.readout().tokensLeft).toBe(-150) // 850 - 1000 - expect(pool.observedTotal()).toEqual({ - iterations: 2, - tokens: { input: 900, output: 250 }, - usd: 1.5, - ms: 0, - }) + expect(pool.readout().usdLeft).toBe(3.5) + expect(pool.readout().usdCapped).toBe(true) // observe never opens a ticket — the leak detector stays clean. expect(() => pool.assertNoOpenTickets()).not.toThrow() }) From c73059a6aefcdff91159ed169bb951a46e98c2ef Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 17:47:39 -0600 Subject: [PATCH 4/4] fix(supervise): re-home a crashed sub-driver's inference on the down path + close review nits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address the PR review of the single-ledger unification: - MEDIUM (real drift): a NESTED sub-driver that crashed mid-run kept its metered inference in the pool (observe) but never re-homed it to the parent journal — the down/abort settle path didn't carry metered. So spentTotal/trajectory silently under-counted a crashed sub-driver: the exact pool↔journal drift the unification was meant to kill. Fix: `Executor.metered()` (replacing `ExecutorResult.metered`) caches the nested-tree inference on BOTH the success AND crash paths; the scope re-homes it on every settle exit (done, aborted, crash). New test proves a crashed sub-driver's partial inference (40/20) lands in the journal. - MEDIUM (test gap): add a direct `materializeTreeView` test for the metered fold (the resume-path twin of trajectoryReport), incl. the seq-collision case. - Bench: `bench/src/atom-humaneval.mts` had a local `routerDriverChat` copy that dropped usage/costUsd — so its driver arms never metered. Swap to the shared export; delete the dead local copy + helpers. Now the one real-router consumer actually meters inference. - Nits: assert usd flows through the nested re-home; fix the stale tripwire comment (driver inference IS metered now — the tripwire only catches a no-usage chat seam); JSDoc. Full suite 1007 pass; lint/typecheck/build green. --- bench/src/atom-humaneval.mts | 56 +------ src/durable/spawn-journal.ts | 3 +- src/runtime/supervise/coordination-driver.ts | 9 +- src/runtime/supervise/driver-executor.ts | 83 +++++++--- src/runtime/supervise/scope.ts | 40 ++++- src/runtime/supervise/types.ts | 14 +- tests/loops/driver-inference-metering.test.ts | 150 +++++++++++++++++- 7 files changed, 253 insertions(+), 102 deletions(-) diff --git a/bench/src/atom-humaneval.mts b/bench/src/atom-humaneval.mts index 64c43f5f..f0e6ec06 100644 --- a/bench/src/atom-humaneval.mts +++ b/bench/src/atom-humaneval.mts @@ -25,16 +25,14 @@ import { coordinationDriverAgent, createExecutorRegistry, createSupervisor, - type DriverChat, - type DriverMessage, type Executor, type ExecutorResult, gateOnDeliverable, InMemoryResultBlobStore, InMemorySpawnJournal, type RouterConfig, - routerChatWithTools, routerChatWithUsage, + routerDriverChat, } from '../../src/runtime/index' import { createReplayRecorder, renderReplayHtml } from '../../src/topology/replay' import { basePrompt, extractCode, type HumanEvalTask, loadHumanEval, runChecker } from './benchmarks/humaneval' @@ -58,56 +56,8 @@ const cfg: RouterConfig = { } const driverCfg: RouterConfig = { ...cfg, model: process.env.DRIVER_MODEL ?? cfg.model } -// ── The real driver-LLM brain: routerChatWithTools adapted to the DriverChat seam ──────────── -function routerDriverChat(c: RouterConfig): DriverChat { - return { - next: async ({ system, messages, tools }) => { - const oa: Array> = [ - { role: 'system', content: system }, - ...messages.map(toOpenAI), - ] - const oaTools = tools.map((t) => ({ - type: 'function' as const, - function: { name: t.name, description: t.description, parameters: t.parameters }, - })) - const r = await routerChatWithTools(c, oa, oaTools, { temperature: 0.4, toolChoice: 'auto' }) - return { - ...(r.content ? { content: r.content } : {}), - toolCalls: r.toolCalls.map((tc) => ({ - id: tc.id, - name: tc.name, - arguments: safeParse(tc.arguments), - })), - } - }, - } -} - -function toOpenAI(m: DriverMessage): Record { - if (m.role === 'assistant' && m.toolCalls?.length) { - return { - role: 'assistant', - content: m.content ?? '', - tool_calls: m.toolCalls.map((tc) => ({ - id: tc.id ?? tc.name, - type: 'function', - function: { name: tc.name, arguments: JSON.stringify(tc.arguments) }, - })), - } - } - if (m.role === 'tool') { - return { role: 'tool', tool_call_id: m.toolCallId ?? m.name ?? 'call', content: m.content } - } - return { role: m.role, content: m.content } -} - -function safeParse(s: string): Record { - try { - return JSON.parse(s) as Record - } catch { - return {} - } -} +// The driver-LLM brain uses the SHARED `routerDriverChat` (imported from the library) — its local +// copy did not forward usage/costUsd, so this bench's driver arms never metered their inference. // ── A gated router worker: one router call → candidate code, settled valid ⟺ the tests pass ── function humanEvalWorker(task: HumanEvalTask, label: string): Agent { diff --git a/src/durable/spawn-journal.ts b/src/durable/spawn-journal.ts index b70b5e56..ec78bbaf 100644 --- a/src/durable/spawn-journal.ts +++ b/src/durable/spawn-journal.ts @@ -376,7 +376,8 @@ function replayHandle(id: NodeId, label: string, status: NodeStatus) { /** * Materialize the live tree (`TreeView`) from a journaled event list for resume. Folds - * `spawned`/`settled`/`cancelled` into a per-node snapshot in `seq` order so the + * `spawned`/`settled`/`cancelled` into a per-node snapshot in `seq` order, then adds each + * `metered` event's driver-inference spend onto its node in a separate additive pass — so the * resumed view matches what `scope.view` showed at the recorded cursor position. */ export function materializeTreeView(events: SpawnEvent[]): TreeView { diff --git a/src/runtime/supervise/coordination-driver.ts b/src/runtime/supervise/coordination-driver.ts index 8a5453a0..d6841881 100644 --- a/src/runtime/supervise/coordination-driver.ts +++ b/src/runtime/supervise/coordination-driver.ts @@ -90,10 +90,11 @@ export interface CoordinationDriverOptions { readonly now?: () => number } -/** maxTurns=0 anti-runaway tripwire: a finite ceiling that only catches a DEGENERATE driver - * looping on a no-spawn tool (the driver's own inference tokens are not yet metered against - * the conserved pool, so they alone cannot drain it). The conserved pool + deadline + abort - * are the real bounds; no healthy run approaches this. */ +/** maxTurns=0 anti-runaway tripwire: a finite ceiling for the ONE case the conserved pool can't + * bound — a driver whose chat seam reports NO usage (so `scope.meter`/`pool.observe` is never + * called and its turns don't drain the pool). With a usage-reporting seam, driver inference now + * meters into the pool and `poolStarved` halts it; the pool + deadline + abort are the real bounds + * and no healthy run approaches this. */ const runawayTripwireTurns = 2000 /** Spawn-progress is impossible: the pool can't afford another worker AND nothing is in flight to diff --git a/src/runtime/supervise/driver-executor.ts b/src/runtime/supervise/driver-executor.ts index 06a32c56..8b9bd812 100644 --- a/src/runtime/supervise/driver-executor.ts +++ b/src/runtime/supervise/driver-executor.ts @@ -133,6 +133,10 @@ export const driverExecutorFactory: ExecutorFactory = (spec, ctx) => { const seam = readNestedScopeSeam(ctx) let artifact: ExecutorResult | undefined + // The nested subtree's driver INFERENCE, cached so the parent re-homes it on settle. Computed + // on BOTH the success AND crash paths (metered events are durable in the nested tree regardless), + // so a sub-driver that crashes mid-run still re-homes its partial inference — pool + journal agree. + let meteredSpend: Spend | undefined return { runtime: driverRuntime, @@ -144,33 +148,43 @@ export const driverExecutorFactory: ExecutorFactory = (spec, ctx) => { const nestedScope: Scope = seam.mount(nestedRoot, signal) - // Run the driver. Its `act` spawns children into the nested scope and reacts via - // `scope.next()`; a thrown `act` propagates so the PARENT scope types it into a down. - const out = await driver.act(task, nestedScope) + try { + // Run the driver. Its `act` spawns children into the nested scope and reacts via + // `scope.next()`; a thrown `act` propagates so the PARENT scope types it into a down. + const out = await driver.act(task, nestedScope) - // Read the nested tree's events ONCE. Two roll-ups, kept separate so the conserved invariant - // is not double-charged: - // - `spent` = settled child WORK → reconciled against THIS driver's reservation (as before). - // - `metered` = the nested subtree's driver INFERENCE → re-homed by the parent scope as a - // `metered` event, NOT reconciled (already pool-debited live via `observe`). - const events = await loadTreeEvents(journal, nestedRoot) - const settled = events.filter(isSettled) - const spent = sumSpend(settled) - const metered = sumMetered(events) - // Completion-oracle propagation: a driver "delivered" iff at least one of its DIRECT - // children settled `valid` (the child its keep-best finalize returns). Deriving the - // driver child's verdict this way composes delivery UP the recursion — a sub-driver is - // `valid` only when it itself selected a delivered child — so a node never settles - // "done = delivered" on a sub-tree that delivered nothing (Foreman's 0/18 lesson). - const verdict = deriveDeliveryVerdict(settled) - artifact = { - outRef: `${driverRuntime}:${nestedRoot}`, - out, - spent, - ...(verdict ? { verdict } : {}), - ...(isNonZeroSpend(metered) ? { metered } : {}), + // Read the nested tree's events ONCE. Two roll-ups, kept separate so the conserved invariant + // is not double-charged: + // - `spent` = settled child WORK → reconciled against THIS driver's reservation (as before). + // - `metered` = the nested subtree's driver INFERENCE → re-homed by the parent scope as a + // `metered` event, NOT reconciled (already pool-debited live via `observe`). + const events = await loadTreeEvents(journal, nestedRoot) + const settled = events.filter(isSettled) + meteredSpend = nonZeroOrUndef(sumMetered(events)) + // Completion-oracle propagation: a driver "delivered" iff at least one of its DIRECT + // children settled `valid` (the child its keep-best finalize returns). Deriving the + // driver child's verdict this way composes delivery UP the recursion — a sub-driver is + // `valid` only when it itself selected a delivered child — so a node never settles + // "done = delivered" on a sub-tree that delivered nothing (Foreman's 0/18 lesson). + const verdict = deriveDeliveryVerdict(settled) + artifact = { + outRef: `${driverRuntime}:${nestedRoot}`, + out, + spent: sumSpend(settled), + ...(verdict ? { verdict } : {}), + } + return artifact + } catch (err) { + // Crash mid-run: the nested tree still holds the durable `metered` events the sub-driver + // already wrote (pool already debited them). Cache them so the parent's down-path re-home + // lands the partial inference and the two ledgers stay in agreement. A missing tree must + // not mask the original error. + meteredSpend = await safeSumMetered(journal, nestedRoot) + throw err } - return artifact + }, + metered(): Spend | undefined { + return meteredSpend }, teardown(): Promise<{ destroyed: boolean }> { // The nested scope's live children are torn down by the driver's own `act` discipline @@ -278,6 +292,25 @@ function isNonZeroSpend(s: Spend): boolean { return s.iterations > 0 || s.tokens.input > 0 || s.tokens.output > 0 || s.usd > 0 || s.ms > 0 } +/** A spend, or `undefined` when it is all-zero — so `metered()` returns undefined for a driver + * whose sub-tree did no inference (and the parent journals no empty `metered` event). */ +function nonZeroOrUndef(s: Spend): Spend | undefined { + return isNonZeroSpend(s) ? s : undefined +} + +/** Sum the nested tree's metered events, tolerating a missing tree (a crash before `beginTree` + * landed) — never throw here, or it would mask the original `act` error on the crash path. */ +async function safeSumMetered( + journal: SpawnJournal, + nestedRoot: string, +): Promise { + try { + return nonZeroOrUndef(sumMetered(await loadTreeEvents(journal, nestedRoot))) + } catch { + return undefined + } +} + /** Derive the driver child's delivery verdict from its DIRECT children's settlements: * `valid` iff any direct child settled `done` AND `valid` (the keep-best finalize's pick); * `score` = the best delivered score. Returns `undefined` when no child settled at all (the diff --git a/src/runtime/supervise/scope.ts b/src/runtime/supervise/scope.ts index 5615ef3b..02914c5d 100644 --- a/src/runtime/supervise/scope.ts +++ b/src/runtime/supervise/scope.ts @@ -117,11 +117,19 @@ type PreSeqSettled = outRef: string verdict?: DefaultVerdict spent: Spend - /** A driver child's OWN-inference subtree total (from `ExecutorResult.metered`) — journaled - * as a `metered` event for this node, NOT reconciled (already debited live via `observe`). */ + /** A driver child's OWN-inference subtree total (from `Executor.metered()`) — journaled as a + * `metered` event for this node, NOT reconciled (already debited live via `observe`). */ + metered?: Spend + } + | { + kind: 'down' + reason: string + infra: boolean + restartCount: number + /** A CRASHED driver child's partial OWN-inference subtree total — re-homed on the down path + * too, so the journal matches the pool (which already debited it via `observe`). */ metered?: Spend } - | { kind: 'down'; reason: string; infra: boolean; restartCount: number } /** * The recursion seam key. A `Scope` seeds a value of this on each child's @@ -445,6 +453,17 @@ async function finalizeSettlement( seq, at: new Date(now()).toISOString(), }) + // Re-home a crashed driver child's partial inference too (the pool already debited it via + // `observe`) — so spentTotal/trajectory never undercount a sub-driver that died mid-run. + if (settlement.metered) { + await args.journal.appendEvent(args.root, { + kind: 'metered', + id: child.id, + spend: settlement.metered, + seq, + at: new Date(now()).toISOString(), + }) + } notifyRuntimeHookEvent( args.hooks, { @@ -579,9 +598,13 @@ async function runChild( reconcileOnce(terminal.spent) } + // A driver child's OWN-inference subtree total — re-homed by the parent on EVERY settle exit + // (done, aborted, crash) so the journal always matches what the pool already debited. + const ownMetered = executor.metered?.() + if (childAbort.signal.aborted) { await teardownSafe(executor, opts.shutdown ?? 'brutalKill') - return downRecord('aborted before settle', true) + return downRecord('aborted before settle', true, ownMetered) } // The durable record is keyed by the canonical content address of the output — the @@ -599,14 +622,15 @@ async function runChild( outRef, ...(artifact.verdict ? { verdict: artifact.verdict } : {}), spent: live.spent, - ...(artifact.metered ? { metered: artifact.metered } : {}), + ...(ownMetered ? { metered: ownMetered } : {}), } } catch (err) { // Reconcile the (likely partial) spend so the reservation is refunded even on a throw. reconcileOnce(live.spent) await teardownSafe(executor, 'brutalKill') const aborted = childAbort.signal.aborted || isAbortError(err) - return downRecord(errMessage(err), aborted || isInfraError(err)) + // A crashed driver child still re-homes the partial inference it durably metered. + return downRecord(errMessage(err), aborted || isInfraError(err), executor.metered?.()) } } @@ -725,8 +749,8 @@ async function teardownSafe( } } -function downRecord(reason: string, infra: boolean): PreSeqSettled { - return { kind: 'down', reason, infra, restartCount: 0 } +function downRecord(reason: string, infra: boolean, metered?: Spend): PreSeqSettled { + return { kind: 'down', reason, infra, restartCount: 0, ...(metered ? { metered } : {}) } } function zeroSpend(): Spend { diff --git a/src/runtime/supervise/types.ts b/src/runtime/supervise/types.ts index 06f079ff..ef37e6a1 100644 --- a/src/runtime/supervise/types.ts +++ b/src/runtime/supervise/types.ts @@ -102,6 +102,15 @@ export interface Executor { * driver branched on, its verdict, and the conserved spend. Read once, after settle. */ resultArtifact(): { outRef: string; out: Out; verdict?: DefaultVerdict; spent: Spend } + /** + * A driver-executor's OWN-inference subtree total (rolled up from its nested tree's `metered` + * events) — the parent scope journals it as a `metered` event for this node on settle, on BOTH + * the done AND the down/crash paths, so a crashed sub-driver's partial inference still re-homes + * (the pool already debited it via `observe`; the journal must match). NOT reconciled, so it never + * trips the reservation clamp. Read on settle, valid after `execute` resolves OR throws. Leaf + * executors omit it (returns `undefined`). + */ + metered?(): Spend | undefined } /** Terminal artifact of a one-shot `Executor.execute`. */ @@ -110,11 +119,6 @@ export interface ExecutorResult { out: Out verdict?: DefaultVerdict spent: Spend - /** A driver-executor's OWN-inference subtree total, rolled up from its nested tree's `metered` - * events. The parent scope journals it as a `metered` event for this node — NOT reconciled (it - * was already debited live via the pool's `observe`), so it never trips the reservation clamp. - * Leaf executors omit it. */ - metered?: Spend } /** diff --git a/tests/loops/driver-inference-metering.test.ts b/tests/loops/driver-inference-metering.test.ts index 6b3e16af..728a759a 100644 --- a/tests/loops/driver-inference-metering.test.ts +++ b/tests/loops/driver-inference-metering.test.ts @@ -1,6 +1,10 @@ import type { AgentProfile } from '@tangle-network/sandbox' import { describe, expect, it } from 'vitest' -import { InMemoryResultBlobStore, InMemorySpawnJournal } from '../../src/durable/spawn-journal' +import { + InMemoryResultBlobStore, + InMemorySpawnJournal, + materializeTreeView, +} from '../../src/durable/spawn-journal' import { trajectoryReport } from '../../src/runtime/personify/trajectory' import { createBudgetPool } from '../../src/runtime/supervise/budget' import { @@ -159,7 +163,7 @@ describe("driver inference metering — the driver's own tokens count against th return worker } - // mid sub-driver inference = 60/40 + 30/20 + 10/5 = 100/65. + // mid sub-driver inference = 60/40 + 30/20 + 10/5 = 100/65 tokens, $0.05 (re-homed up). const midProfile: P = { kind: 'driver', name: 'mid', @@ -169,16 +173,18 @@ describe("driver inference metering — the driver's own tokens count against th { name: 'spawn_worker', arguments: { profile: { kind: 'worker' }, task: 'sub' } }, ], usage: { input: 60, output: 40 }, + costUsd: 0.05, }, { toolCalls: [{ name: 'await_next', arguments: {} }], usage: { input: 30, output: 20 } }, { content: 'mid done', usage: { input: 10, output: 5 } }, ], } - // root driver inference = 100/50 + 50/30 + 20/10 = 170/90. + // root driver inference = 100/50 + 50/30 + 20/10 = 170/90 tokens, $0.02. const rootChat = meteredChat([ { toolCalls: [{ name: 'spawn_worker', arguments: { profile: midProfile, task: 'go' } }], usage: { input: 100, output: 50 }, + costUsd: 0.02, }, { toolCalls: [{ name: 'await_next', arguments: {} }], usage: { input: 50, output: 30 } }, { content: 'root done', usage: { input: 20, output: 10 } }, @@ -188,7 +194,7 @@ describe("driver inference metering — the driver's own tokens count against th coordinationDriverAgent(driverOf('root', rootChat)), 'task', { - budget: { maxIterations: 100, maxTokens: 100_000 }, + budget: { maxIterations: 100, maxTokens: 100_000, maxUsd: 10 }, runId: 'nested', journal, blobs, @@ -200,13 +206,97 @@ describe("driver inference metering — the driver's own tokens count against th expect(result.kind).toBe('winner') if (result.kind !== 'winner') return - // childWork = the worker (10/5). driverInference = root (170/90) + mid (100/65) = 270/155 — - // the mid sub-driver's inference re-homed up to the root tree. spentTotal = 280/160. + // childWork = the worker (10/5). driverInference = root (170/90, $0.02) + mid (100/65, $0.05) — + // the mid sub-driver's inference re-homed up to the root tree, BOTH tokens AND usd. spentTotal = 280/160. + expect(result.spentBreakdown?.driverInference.usd).toBeCloseTo(0.07, 6) expect(result.spentBreakdown?.childWork.tokens).toEqual({ input: 10, output: 5 }) expect(result.spentBreakdown?.driverInference.tokens).toEqual({ input: 270, output: 155 }) expect(result.spentTotal.tokens).toEqual({ input: 280, output: 160 }) }) + it('re-homes a CRASHED sub-driver partial inference on the down path (pool and journal stay in agreement)', async () => { + const blobs = new InMemoryResultBlobStore() + const journal = new InMemorySpawnJournal() + + // A sub-driver that meters turn 0 (40/20) then CRASHES (chat throws) on turn 1 — the crash + // settles it `down`, which must STILL re-home the partial inference it durably metered. + const makeAgent = (raw: unknown): Agent => { + const p = raw as { kind?: string } + if (p?.kind === 'driver') { + let t = 0 + const crashingChat: DriverChat = { + next: async () => { + t += 1 + if (t === 1) + return { + toolCalls: [{ name: 'list_questions', arguments: {} }], + usage: { input: 40, output: 20 }, + } + throw new Error('sub-driver network crash') + }, + } + return driverChild( + 'mid', + coordinationDriverAgent({ + name: 'mid', + chat: crashingChat, + blobs, + makeWorkerAgent: makeAgent, + perWorker, + systemPrompt: 'drive', + maxTurns: 8, + }), + journal, + ) + } + return workerLeaf('w', { input: 1, output: 1 }) + } + const rootChat = meteredChat([ + { + toolCalls: [ + { name: 'spawn_worker', arguments: { profile: { kind: 'driver' }, task: 'go' } }, + ], + usage: { input: 100, output: 50 }, + }, + { toolCalls: [{ name: 'await_next', arguments: {} }] }, // drains the sub-driver's down settlement + { content: 'root done' }, // no usage → root inference = 100/50 + ]) + + await createSupervisor().run( + coordinationDriverAgent({ + name: 'root', + chat: rootChat, + blobs, + makeWorkerAgent: makeAgent, + perWorker, + systemPrompt: 'drive', + maxTurns: 8, + }), + 'task', + { + budget: { maxIterations: 100, maxTokens: 100_000 }, + runId: 'crash', + journal, + blobs, + executors: withDriverExecutor(createExecutorRegistry()), + maxDepth: 4, + now: () => 0, + }, + ) + + // The crashed sub-driver (node `crash:s0`) settled `down`, yet its partial inference (40/20) was + // re-homed into the root tree as a `metered` event — so the journal matches what the pool debited. + const events = (await journal.loadTree('crash')) ?? [] + const subMetered = events.filter( + (e): e is Extract<(typeof events)[number], { kind: 'metered' }> => + e.kind === 'metered' && e.id === 'crash:s0', + ) + expect(subMetered.length).toBe(1) + expect(subMetered[0]!.spend.tokens).toEqual({ input: 40, output: 20 }) + // Before the down-path re-home, this event would be absent → the sub-driver's inference would + // live in the pool but not the journal (a silent undercount). Now it's present. + }) + it('maxTurns=0 is bounded by inference: a never-stopping driver halts when its OWN tokens drain the pool', async () => { const blobs = new InMemoryResultBlobStore() const journal = new InMemorySpawnJournal() @@ -427,6 +517,54 @@ describe('equal-k ledger — trajectoryReport sums driver inference from the jou const rootNode = report.nodes.find((n) => n.id === 'arm') expect(rootNode?.ownSpend.tokens).toEqual({ input: 210, output: 100 }) }) + + it('materializeTreeView folds metered driver inference onto node snapshots (resume fidelity)', () => { + const at = new Date(0).toISOString() + const view = materializeTreeView([ + { + kind: 'spawned', + id: 'r', + label: 'root', + budget: { maxIterations: 1, maxTokens: 1 }, + runtime: 'inline', + seq: 0, + at, + }, + { + kind: 'spawned', + id: 'r:s0', + parent: 'r', + label: 'w', + budget: { maxIterations: 1, maxTokens: 1 }, + runtime: 'router', + seq: 1, + at, + }, + { + kind: 'settled', + id: 'r:s0', + status: 'done', + outRef: 'x', + spent: { iterations: 1, tokens: { input: 10, output: 5 }, usd: 0, ms: 0 }, + seq: 0, + at, + }, + // metered seq 0 deliberately collides with the settled seq 0 — the separate additive pass + // makes it order-independent, and a resumed tree must reflect the driver's inference. + { + kind: 'metered', + id: 'r', + spend: { iterations: 0, tokens: { input: 70, output: 30 }, usd: 0.01, ms: 0 }, + seq: 0, + at, + }, + ]) + const root = view.nodes.find((n) => n.id === 'r') + expect(root?.spent.tokens).toEqual({ input: 70, output: 30 }) // metered folded onto the root node + expect(root?.spent.usd).toBeCloseTo(0.01, 6) + const worker = view.nodes.find((n) => n.id === 'r:s0') + expect(worker?.spent.tokens).toEqual({ input: 10, output: 5 }) // settled child work intact + }) }) describe('budget pool — observe() debits the conserved pool for the live in-loop guard', () => {