Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 3 additions & 53 deletions bench/src/atom-humaneval.mts
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@ import {
coordinationDriverAgent,
createExecutorRegistry,
createSupervisor,
type DriverChat,
type DriverMessage,
type Executor,
type ExecutorResult,
gateOnDeliverable,
InMemoryResultBlobStore,
InMemorySpawnJournal,
type RouterConfig,
routerChatWithTools,
routerChatWithUsage,
routerDriverChat,
} from '../../src/runtime/index'
import { createReplayRecorder, renderReplayHtml } from '../../src/topology/replay'
import { basePrompt, extractCode, type HumanEvalTask, loadHumanEval, runChecker } from './benchmarks/humaneval'
Expand All @@ -58,56 +56,8 @@ const cfg: RouterConfig = {
}
const driverCfg: RouterConfig = { ...cfg, model: process.env.DRIVER_MODEL ?? cfg.model }

// ── The real driver-LLM brain: routerChatWithTools adapted to the DriverChat seam ────────────
function routerDriverChat(c: RouterConfig): DriverChat {
return {
next: async ({ system, messages, tools }) => {
const oa: Array<Record<string, unknown>> = [
{ role: 'system', content: system },
...messages.map(toOpenAI),
]
const oaTools = tools.map((t) => ({
type: 'function' as const,
function: { name: t.name, description: t.description, parameters: t.parameters },
}))
const r = await routerChatWithTools(c, oa, oaTools, { temperature: 0.4, toolChoice: 'auto' })
return {
...(r.content ? { content: r.content } : {}),
toolCalls: r.toolCalls.map((tc) => ({
id: tc.id,
name: tc.name,
arguments: safeParse(tc.arguments),
})),
}
},
}
}

function toOpenAI(m: DriverMessage): Record<string, unknown> {
if (m.role === 'assistant' && m.toolCalls?.length) {
return {
role: 'assistant',
content: m.content ?? '',
tool_calls: m.toolCalls.map((tc) => ({
id: tc.id ?? tc.name,
type: 'function',
function: { name: tc.name, arguments: JSON.stringify(tc.arguments) },
})),
}
}
if (m.role === 'tool') {
return { role: 'tool', tool_call_id: m.toolCallId ?? m.name ?? 'call', content: m.content }
}
return { role: m.role, content: m.content }
}

function safeParse(s: string): Record<string, unknown> {
try {
return JSON.parse(s) as Record<string, unknown>
} catch {
return {}
}
}
// The driver-LLM brain uses the SHARED `routerDriverChat` (imported from the library) — its local
// copy did not forward usage/costUsd, so this bench's driver arms never metered their inference.

// ── A gated router worker: one router call → candidate code, settled valid ⟺ the tests pass ──
function humanEvalWorker(task: HumanEvalTask, label: string): Agent<unknown, unknown> {
Expand Down
31 changes: 27 additions & 4 deletions src/durable/spawn-journal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,10 @@ type SpawnJournalRecord =
* ordinal legitimately equals a later `settled` cursor seq and is not a collision.
*/
function assertSeqUnique(root: NodeId, events: SpawnEvent[], ev: SpawnEvent): void {
if (ev.kind === 'spawned') return
if (events.some((e) => e.kind !== 'spawned' && e.seq === ev.seq)) {
// `spawned` (ordinal namespace) and `metered` (informational spend, no settlement order) live
// outside the cursor-uniqueness namespace replay relies on.
if (ev.kind === 'spawned' || ev.kind === 'metered') return
if (events.some((e) => e.kind !== 'spawned' && e.kind !== 'metered' && e.seq === ev.seq)) {
throw new Error(
`spawn journal corrupted: duplicate cursor seq ${ev.seq} in tree '${root}'; ` +
'the cursor order replay relies on is not unique',
Expand Down Expand Up @@ -313,6 +315,7 @@ export async function replaySpawnTree(
const settled: Settled<unknown>[] = []
for (const ev of ordered) {
if (ev.kind === 'spawned') continue
if (ev.kind === 'metered') continue // a spend record, not a settlement — irrelevant to replay
if (ev.kind === 'cancelled') {
settled.push({
kind: 'down',
Expand Down Expand Up @@ -373,7 +376,8 @@ function replayHandle(id: NodeId, label: string, status: NodeStatus) {

/**
* Materialize the live tree (`TreeView`) from a journaled event list for resume. Folds
* `spawned`/`settled`/`cancelled` into a per-node snapshot in `seq` order so the
* `spawned`/`settled`/`cancelled` into a per-node snapshot in `seq` order, then adds each
* `metered` event's driver-inference spend onto its node in a separate additive pass — so the
* resumed view matches what `scope.view` showed at the recorded cursor position.
*/
export function materializeTreeView(events: SpawnEvent[]): TreeView {
Expand All @@ -386,7 +390,9 @@ export function materializeTreeView(events: SpawnEvent[]): TreeView {
const spawns = events
.filter((ev): ev is Extract<SpawnEvent, { kind: 'spawned' }> => ev.kind === 'spawned')
.sort((a, b) => a.seq - b.seq)
const settlements = events.filter((ev) => ev.kind !== 'spawned').sort((a, b) => a.seq - b.seq)
const settlements = events
.filter((ev) => ev.kind !== 'spawned' && ev.kind !== 'metered')
.sort((a, b) => a.seq - b.seq)
for (const ev of spawns) {
if (ev.parent === undefined && root === undefined) root = ev.id
nodes.set(ev.id, {
Expand All @@ -410,6 +416,13 @@ export function materializeTreeView(events: SpawnEvent[]): TreeView {
node.status = 'cancelled'
}
}
// Driver inference: a separate pass so it accumulates ONTO the settled child-work base (no
// dependence on metered-vs-settled seq order) without touching node status.
for (const ev of events) {
if (ev.kind !== 'metered') continue
const node = requireNode(nodes, ev.id)
node.spent = addJournalSpend(node.spent, ev.spend)
}
const snapshots = [...nodes.values()].map(freezeSnapshot)
return {
root: root ?? snapshots[0]?.id ?? '',
Expand All @@ -433,6 +446,16 @@ function zeroSpend(): Spend {
return { iterations: 0, tokens: zeroTokenUsage(), usd: 0, ms: 0 }
}

/** Add a `metered` spend record onto a node's accumulated spend (per channel). */
function addJournalSpend(a: Spend, b: Spend): Spend {
return {
iterations: a.iterations + b.iterations,
tokens: { input: a.tokens.input + b.tokens.input, output: a.tokens.output + b.tokens.output },
usd: a.usd + b.usd,
ms: a.ms + b.ms,
}
}

function requireNode(nodes: Map<NodeId, MutableSnapshot>, id: NodeId): MutableSnapshot {
const node = nodes.get(id)
if (!node) {
Expand Down
24 changes: 23 additions & 1 deletion src/runtime/personify/trajectory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ export async function trajectoryReport(
// them. The two seq namespaces overlap, so create every node from its `spawned` event
// first, then apply settlements/cancellations — mirrors `materializeTreeView`.
const spawns = events.filter(isSpawned).sort(bySeq)
const closes = events.filter((ev) => ev.kind !== 'spawned').sort(bySeq)
// `metered` events (driver inference) are folded onto each node in a separate pass below, so
// they accumulate ONTO the settled child-work base regardless of seq order; closes are the
// settlements/cancellations that set node status.
const closes = events.filter((ev) => ev.kind !== 'spawned' && ev.kind !== 'metered').sort(bySeq)

const nodes = new Map<NodeId, MutableNode>()
for (const ev of spawns) {
Expand All @@ -89,6 +92,15 @@ export async function trajectoryReport(
node.verdict = ev.verdict
node.outRef = ev.outRef
}
// Driver inference: add each `metered` event onto its node's ownSpend. Because a driver re-homes
// its nested subtree's inference up the tree, this single tree's events already carry the whole
// sub-tree's driver cost — so `total` matches `SupervisedResult.spentTotal` directly, with no
// caller plumbing.
for (const ev of events) {
if (ev.kind !== 'metered') continue
const node = requireNode(nodes, ev.id, root)
node.ownSpend = addNodeSpend(node.ownSpend, ev.spend)
}

if (!nodes.has(root)) {
throw new Error(
Expand Down Expand Up @@ -256,6 +268,16 @@ function zeroSpend(): Spend {
return { iterations: 0, tokens: zeroTokenUsage(), usd: 0, ms: 0 }
}

/** Add a `metered` event's spend onto a node's accumulated ownSpend (per channel). */
function addNodeSpend(a: Spend, b: Spend): Spend {
return {
iterations: a.iterations + b.iterations,
tokens: { input: a.tokens.input + b.tokens.input, output: a.tokens.output + b.tokens.output },
usd: a.usd + b.usd,
ms: a.ms + b.ms,
}
}

function cloneSpend(spend: Spend): Spend {
return {
iterations: spend.iterations,
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1003,8 +1003,8 @@ export async function runAgentic(opts: RunAgenticOptions): Promise<AgenticRunRes
: `no-winner: ${result.reason}`
throw new Error(`runAgentic(${strategy.name}) produced no result — ${reason}`)
}
// Drivers deliver the strategy outcome; the cost vector is stamped here from the
// conserved pool's aggregate (every shot reported real usage into it) + wall clock.
// Drivers deliver the strategy outcome; the cost vector is stamped here from `result.spentTotal`
// (the journal aggregate: settled child work + metered driver inference) + wall clock.
const core = result.out.deliverable as Omit<AgenticRunResult, 'usd' | 'ms' | 'tokens'>
return {
...core,
Expand Down
32 changes: 31 additions & 1 deletion src/runtime/supervise/budget.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ export interface ReservationTicket {

/** Post-reservation pool readout — the shape `Scope.budget` exposes. `tokensLeft`,
* `usdLeft`, and `reservedTokens` reflect committed-but-unsettled reservations;
* `deadlineMs` is the ABSOLUTE wall-clock deadline (0 when the root set none). */
* `deadlineMs` is the ABSOLUTE wall-clock deadline (0 when the root set none).
* `usdCapped` distinguishes a real `usdLeft <= 0` exhaustion from an uncapped pool (which always
* reads `usdLeft: 0`) — the in-loop guard needs it to bound a usd-capped driver. */
export type BudgetReadout = Readonly<{
tokensLeft: number
usdLeft: number
usdCapped: boolean
deadlineMs: number
reservedTokens: number
}>
Expand All @@ -66,6 +69,17 @@ export interface BudgetPool {
spendFrom(events: AsyncIterable<UsageEvent> | UsageEvent[]): Promise<Spend>
/** The current readout, reflecting all outstanding reservations. */
readout(): BudgetReadout
/**
* Record OBSERVED spend that did NOT go through reserve/reconcile — the driver's OWN inference
* (its chat turns), which is real compute but not a spawned child. A direct `free → committed`
* debit, so `total ≡ free + reserved + committed` is preserved: equal-k counts the driver's
* tokens and the in-loop budget guard (`readout().tokensLeft`) sees them. `free` may go negative
* when a run overspends — that is honest (the readout then signals exhaustion). It never throws:
* the spend already happened, so accounting records reality; the in-loop guard prevents MORE.
* The DURABLE record is the journal's `metered` event (written by `Scope.meter`); this debit
* only makes the live `readout()` reflect driver inference for the in-loop guard.
*/
observe(spend: Spend): void
/** Fail loud if any reservation is still open — the conserved-pool leak detector. Called at the
* supervisor's join barrier: once every child has settled, no ticket may remain (a leaked
* reservation would silently break `total ≡ free + reserved + committed`). */
Expand Down Expand Up @@ -219,10 +233,25 @@ export function createBudgetPool(root: Budget, now: () => number = Date.now): Bu
}
}

function observe(spend: Spend): void {
const tokens = totalTokens(spend.tokens)
// Direct free → committed debit (no reservation ticket). `free` may go negative on overspend —
// that is honest; the readout then reports exhaustion and the in-loop guard halts the driver.
// The DURABLE record of this spend is the journal's `metered` event (the twin written by
// `Scope.meter`); this debit exists only to make the live `readout()` reflect driver inference.
freeTokens -= tokens
committedTokens += tokens
freeIterations -= spend.iterations
committedIterations += spend.iterations
committedUsd += spend.usd
if (usdCapped) freeUsd -= spend.usd
}

function readout(): BudgetReadout {
return {
tokensLeft: freeTokens,
usdLeft: usdCapped ? freeUsd : 0,
usdCapped,
deadlineMs: absoluteDeadlineMs,
reservedTokens,
}
Expand All @@ -241,6 +270,7 @@ export function createBudgetPool(root: Budget, now: () => number = Date.now): Bu
reconcile,
spendFrom: foldUsage,
readout,
observe,
assertNoOpenTickets,
}
}
53 changes: 44 additions & 9 deletions src/runtime/supervise/coordination-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import { ValidationError } from '../../errors'
import type { McpToolDescriptor } from '../../mcp/server'
import { createCoordinationTools, type MakeWorkerAgent } from '../../mcp/tools/coordination'
import type { Agent, Budget, ResultBlobStore, Scope } from './types'
import type { Agent, Budget, ResultBlobStore, Scope, Spend } from './types'

/** One tool call the driver LLM asks for this turn. */
export interface DriverToolCall {
Expand All @@ -50,6 +50,12 @@ export interface DriverTurn {
readonly toolCalls?: ReadonlyArray<DriverToolCall>
/** The driver's natural-language output — the answer when there are no tool calls. */
readonly content?: string
/** The driver LLM's OWN token usage for THIS turn — metered against the conserved pool so the
* driver's inference counts toward equal-k AND the in-loop budget guard. Omit for a scripted/
* mock turn (no real inference); production `routerDriverChat` forwards it from the router. */
readonly usage?: { readonly input: number; readonly output: number }
/** The turn's inference cost (usd), when the provider priced it. */
readonly costUsd?: number
}

/** The injected driver-LLM seam: one turn over the conversation + the coordination tool specs. */
Expand Down Expand Up @@ -84,18 +90,25 @@ export interface CoordinationDriverOptions {
readonly now?: () => number
}

/** maxTurns=0 anti-runaway tripwire: a finite ceiling that only catches a DEGENERATE driver
* looping on a no-spawn tool (the driver's own inference tokens are not yet metered against
* the conserved pool, so they alone cannot drain it). The conserved pool + deadline + abort
* are the real bounds; no healthy run approaches this. */
/** maxTurns=0 anti-runaway tripwire: a finite ceiling for the ONE case the conserved pool can't
* bound — a driver whose chat seam reports NO usage (so `scope.meter`/`pool.observe` is never
* called and its turns don't drain the pool). With a usage-reporting seam, driver inference now
* meters into the pool and `poolStarved` halts it; the pool + deadline + abort are the real bounds
* and no healthy run approaches this. */
const runawayTripwireTurns = 2000

/** Spawn-progress is impossible: the pool can't afford another worker AND nothing is in flight
* to await. A long-horizon driver bounded by the conserved pool stops here instead of spinning
* (the in-loop budget guard the turn cap alone never provided). */
/** Spawn-progress is impossible: the pool can't afford another worker AND nothing is in flight to
* await. A long-horizon driver bounded by the conserved pool stops here instead of spinning (the
* in-loop budget guard the turn cap alone never provided). Checks BOTH conserved channels: tokens
* (can't afford a worker) and usd (a usd-capped pool whose ceiling the driver's own metered
* inference has drained — `meter` debits usd, so without this a huge-token/small-usd pool would
* overspend usd up to the turn tripwire). */
function poolStarved(scope: Scope<unknown>, perWorker: Budget): boolean {
const b = scope.budget
return b.tokensLeft < perWorker.maxTokens && b.reservedTokens <= 0
if (b.reservedTokens > 0) return false // a child is in flight — await it, don't finalize early
const tokenStarved = b.tokensLeft < perWorker.maxTokens
const usdStarved = b.usdCapped && b.usdLeft <= 0
return tokenStarved || usdStarved
}

/** The absolute wall-clock deadline (when the root set one) has passed. */
Expand Down Expand Up @@ -153,6 +166,28 @@ export function coordinationDriverAgent(opts: CoordinationDriverOptions): Agent<
if (poolStarved(scope, opts.perWorker) || deadlinePassed(scope, now)) break
const res = await opts.chat.next({ system, messages, tools: toolSpecs })
const calls = res.toolCalls ?? []
// Meter the driver's OWN inference for this turn — the largest single token consumer in an
// agentic loop, and the one the conserved pool never saw. Only when the turn carried real
// usage: a scripted/mock turn meters nothing, so offline equal-k stays exact. This debit is
// what makes maxTurns=0 genuinely bounded — a thinking driver drains the pool → poolStarved.
if (res.usage || res.costUsd !== undefined) {
const turnSpend: Spend = {
// iterations:0 — the conserved iteration channel (`maxIterations`) budgets CHILD rounds,
// not driver turns; counting turns there would conflate the two AND make a driver arm's
// iteration count diverge from a blind arm's. The driver is bounded by maxTurns + the
// token/usd pool; its turn COUNT stays observable via the per-turn `agent.turn` events.
iterations: 0,
tokens: { input: res.usage?.input ?? 0, output: res.usage?.output ?? 0 },
usd: res.costUsd ?? 0,
ms: 0,
}
await scope.meter(turnSpend, {
kind: 'driver-inference',
driver: opts.name,
turn,
toolCalls: calls.map((c) => c.name),
})
}
if (calls.length === 0) {
// The driver named no tool call — it is finished. Its deliverable is the best DELIVERED
// child (the completion-oracle), NOT its own prose: a driver cannot self-declare done
Expand Down
Loading
Loading