diff --git a/src/runtime/index.ts b/src/runtime/index.ts index 81ff7943..f2515ede 100644 --- a/src/runtime/index.ts +++ b/src/runtime/index.ts @@ -104,6 +104,8 @@ export { buildSteerContext, type CreateScopeAnalystOptions, createScopeAnalyst, + type RegistryAnalyzeProjection, + registryScopeAnalyst, } from './personify/analyst' export { fanout, diff --git a/src/runtime/personify/analyst.ts b/src/runtime/personify/analyst.ts index 9e00d146..b0816494 100644 --- a/src/runtime/personify/analyst.ts +++ b/src/runtime/personify/analyst.ts @@ -20,7 +20,8 @@ * throws; there is no silent empty-findings path that would let a combinator steer on nothing. */ -import type { AnalystFinding } from '@tangle-network/agent-eval' +import type { AnalystFinding, AnalystRunInputs } from '@tangle-network/agent-eval' +import type { AnalystRegistryLike } from '../../analyst-loop/types' import { AnalystError, PlannerError } from '../../errors' import type { Agent, Budget, DefaultVerdict, NodeId, Scope, Settled } from '../supervise/types' import { stringifySafe } from '../util' @@ -172,6 +173,49 @@ function readAnalystFindings(settled: Settled>): ReadonlyArray } +// ── The panel-of-analysts adapter — N analyst KINDS merged into one ScopeAnalyst ─────── + +/** + * Project a `ScopeAnalyzeInput` into the `AnalystRegistry.run` arguments. The registry runs over a + * `runId` + `AnalystRunInputs` (a trace store / run record / artifact dir), NOT in-memory scope + * settlements — so the CALLER owns the projection from the combinator's drained children to the + * registry's inputs (e.g. the trace store the run already wrote). This adapter never invents that + * bridge; it only runs the projected inputs and firewalls the merged findings. + */ +export interface RegistryAnalyzeProjection { + readonly runId: string + readonly inputs: AnalystRunInputs + /** Optional `run` opts (e.g. `priorFindings`) forwarded verbatim to the registry. */ + readonly opts?: Parameters[2] +} + +/** + * A `ScopeAnalyst` backed by an `AnalystRegistry` — the panel-of-analysts seam. The registry merges + * N analyst KINDS into one `AnalystRunResult.findings`; `analyze` runs it over the caller-projected + * `{ runId, inputs }` and pipes the merged findings through the SAME `assertTraceDerivedFindings` + * firewall `createScopeAnalyst` uses (single-sourced selector≠judge). Distinct from `panel()` + * (judges-vs-one-artifact) — this is analysts-over-a-trace, the diagnosis side of the wire. + * + * Fail loud: a registry that throws propagates; a judge-derived finding aborts via the firewall. + * The projection is the caller's (`buildInputs`) — if the scope settlements do not cleanly map to + * the registry's `AnalystRunInputs`, that is a caller-side contract gap, surfaced there, not papered + * over with a fabricated input here. + */ +export function registryScopeAnalyst( + registry: AnalystRegistryLike, + buildInputs: (input: ScopeAnalyzeInput) => RegistryAnalyzeProjection, +): ScopeAnalyst { + return { + async analyze(input: ScopeAnalyzeInput): Promise> { + const projection = buildInputs(input) + const result = await registry.run(projection.runId, projection.inputs, projection.opts) + const findings = result.findings + assertTraceDerivedFindings(findings) + return findings + }, + } +} + // ── The single firewalled steer surface every combinator funnels through ────────────── /** diff --git a/src/runtime/personify/combinators.ts b/src/runtime/personify/combinators.ts index 78e121c2..8104c45e 100644 --- a/src/runtime/personify/combinators.ts +++ b/src/runtime/personify/combinators.ts @@ -162,9 +162,10 @@ export function fanout( * the deployable stop. The conserved pool IS the loop bound: once `spawn` fails closed the loop * stops. A loop that exhausted the pool without `until` ever satisfying is a concrete blocker. * - * Findings are threaded through the `SteerContext` firewall in the analyst seam (`analyst.ts`); - * absent a wired analyst on this surface the firewall stays dormant and `until` is consulted with - * an empty findings array — never a fabricated finding (fail-loud honesty over a silent default). + * When `ctx.analyst` is set, each round runs it over the children settled so far and steers + * `until` on the resulting trace-derived findings (the analyst spawns into THIS scope, so its + * compute is conserved-pooled — equal-k holds by construction). Absent an analyst the findings + * argument is the empty array — never a fabricated finding (fail-loud honesty over a silent default). */ export function loopUntil( seed: State, @@ -175,6 +176,7 @@ export function loopUntil( async act(task, scope): Promise> { let state: LoopUntilState = { round: 0, value: seed } const blockers: string[] = [] + const settledSoFar: Settled>[] = [] for (;;) { const label = spec.label ? spec.label(state.round) : `step:${state.round}` const child = ctx.spawnChild(label, ctx.persona.root) @@ -188,8 +190,14 @@ export function loopUntil( } const settled = await drainOne(scope, label) if (settled.kind === 'down') blockers.push(blockerFromDown(settled)) + settledSoFar.push(settled) state = spec.fold(state, settled) - const reached = spec.until(state, []) + // Wired analyst ⇒ steer `until` on trace-derived findings; absent ⇒ the dormant empty + // default (the analyst spawns into THIS scope, so its compute is conserved-pooled). + const findings = ctx.analyst + ? await ctx.analyst.analyze({ task, settledSoFar, nodeId: scope.view.root }) + : [] + const reached = spec.until(state, findings) if (reached) return reached state = { round: state.round + 1, value: state.value } } @@ -316,9 +324,14 @@ export function verify( * never a child's raw `verdict` — and the default gate (`flatWidenGate`) never widens, so the R2 * firewall stays dormant. Terminal selection is `spec.synthesize` over every settled lineage. * - * No analyst is wired on this frozen surface, so `decide` is consulted with an empty findings - * array; a flat gate ignores it. A non-flat gate that wants findings reads them through the - * `SteerContext` firewall the analyst seam owns — never fabricated here. + * When `ctx.analyst` is set, `decide` is consulted with that round's trace-derived findings; + * absent an analyst the findings argument is the empty array a flat gate ignores. The analyst + * spawns into THIS scope (conserved-pooled, so equal-k holds). Streaming caveat: a wired analyst + * drains its own child off the SHARED cursor by id-match, so on a NON-flat gate (which spawns + * widen children that are live concurrently) the analyst can consume a sibling's settlement before + * the widen loop sees it. The shipped default (`flatWidenGate`) never widens, so no widen child is + * ever live when the analyst runs and the wire is exact; a non-flat gate must drive the analyst on + * a scope whose siblings are quiesced, or read findings without the shared-cursor drain. */ export function widen(spec: WidenSpec): CombinatorShape { return (ctx: ShapeContext): Agent> => ({ @@ -342,7 +355,16 @@ export function widen(spec: WidenSpec): CombinatorShape< let widenIndex = 0 for (let s = await scope.next(); s !== null; s = await scope.next()) { gathered.push(s) - const decision: WidenDecision = spec.gate.decide(s, [], scope.budget) + // Wired analyst ⇒ steer the gate on trace-derived findings; absent ⇒ the dormant empty + // default the flat gate ignores. The analyst spawns into THIS scope (conserved-pooled). + const findings = ctx.analyst + ? await ctx.analyst.analyze({ + task: _task, + settledSoFar: gathered, + nodeId: scope.view.root, + }) + : [] + const decision: WidenDecision = spec.gate.decide(s, findings, scope.budget) if (decision.kind !== 'widen') continue const label = `widen:${widenIndex}` widenIndex += 1 diff --git a/src/runtime/personify/persona.ts b/src/runtime/personify/persona.ts index 3250a189..37fae1e4 100644 --- a/src/runtime/personify/persona.ts +++ b/src/runtime/personify/persona.ts @@ -43,6 +43,7 @@ import type { ShapeBudget, ShapeContext, } from './types' +import type { ScopeAnalyst } from './wave-types' // ── definePersona ───────────────────────────────────────────────────────────── @@ -80,10 +81,15 @@ export function definePersona(input: DefinePersonaInput): Person * profile. The shape never touches the registry — resolution stays single-sourced in the * scope/registry the supervisor owns. */ -export function createShapeContext(persona: Persona, budget: ShapeBudget): ShapeContext { +export function createShapeContext( + persona: Persona, + budget: ShapeBudget, + analyst?: ScopeAnalyst, +): ShapeContext { return { persona, budget, + ...(analyst ? { analyst } : {}), spawnChild(name, spec): Agent> { // The wrapped agent is SPAWNED, not run — the resolved Executor drives it. `act` // is never invoked by the keystone for a spawned child; it throws if mis-used as a @@ -124,7 +130,7 @@ export async function runPersonified( const { persona } = options const shape = resolveShape(options.shape) const shapeBudget = resolveShapeBudget(options.budget, options.shapeBudget) - const ctx = createShapeContext(persona, shapeBudget) + const ctx = createShapeContext(persona, shapeBudget, options.analyst) const rootAgent = shape(ctx) const executors = personaRegistry(persona) diff --git a/src/runtime/personify/types.ts b/src/runtime/personify/types.ts index 0fa2dbdc..3c5da63d 100644 --- a/src/runtime/personify/types.ts +++ b/src/runtime/personify/types.ts @@ -36,6 +36,7 @@ import type { SpawnJournal, SupervisedResult, } from '../supervise/types' +import type { ScopeAnalyst } from './wave-types' // ── The deliverable contract every shape synthesizes into ────────────────────── @@ -174,6 +175,9 @@ export interface ShapeContext { /** Derive a child `AgentSpec` from the persona's root spec with an overridden profile — * the seam a shape uses to give a worker a narrower role/prompt than the root persona. */ childSpec(profile: AgentProfile, harness?: BackendType | null): AgentSpec + /** The scope analyst (selector≠judge firewall) the combinator steers from. Absent ⇒ the + * dormant default (empty findings → gates read deliverables/state only). */ + readonly analyst?: ScopeAnalyst } /** @@ -234,6 +238,9 @@ export interface RunPersonifiedOptions { readonly handle?: RootHandle> readonly now?: () => number readonly signal?: AbortSignal + /** Optional scope analyst threaded into the shape's ShapeContext so loopUntil/widen steer + * on trace-derived findings instead of the dormant empty default. */ + readonly analyst?: ScopeAnalyst } /** The composed run signature. */ diff --git a/tests/loops/rsi-wave.test.ts b/tests/loops/rsi-wave.test.ts index 70a1c3be..114b1296 100644 --- a/tests/loops/rsi-wave.test.ts +++ b/tests/loops/rsi-wave.test.ts @@ -1,16 +1,30 @@ import type { AnalystFinding } from '@tangle-network/agent-eval' import type { AgentProfile } from '@tangle-network/sandbox' import { describe, expect, it } from 'vitest' +import type { AnalystRegistryLike } from '../../src/analyst-loop/types' import { contentAddress, InMemoryResultBlobStore, InMemorySpawnJournal, } from '../../src/durable/spawn-journal' import { AnalystError, PlannerError, ValidationError } from '../../src/errors' -import { buildSteerContext, createScopeAnalyst } from '../../src/runtime/personify/analyst' +import { + buildSteerContext, + createScopeAnalyst, + registryScopeAnalyst, +} from '../../src/runtime/personify/analyst' +import { flatWidenGate, loopUntil, widen } from '../../src/runtime/personify/combinators' import { InMemoryCorpus, renderCorpusToInstructions } from '../../src/runtime/personify/corpus' +import { createShapeContext } from '../../src/runtime/personify/persona' import { equalKOnCost, trajectoryReport } from '../../src/runtime/personify/trajectory' -import type { CorpusRecord, Outcome } from '../../src/runtime/personify/wave-types' +import type { Persona, ShapeBudget, ShapeContext } from '../../src/runtime/personify/types' +import type { + CorpusRecord, + Outcome, + ScopeAnalyst, + ScopeWidenGate, + WidenDecision, +} from '../../src/runtime/personify/wave-types' import { createBudgetPool, spendFromUsageEvents } from '../../src/runtime/supervise/budget' import { createExecutorRegistry } from '../../src/runtime/supervise/runtime' import { createScope } from '../../src/runtime/supervise/scope' @@ -260,6 +274,232 @@ describe('analyst-on-scope (G1) firewall', () => { }) }) +// ════════════════════════════════════════════════════════════════════════════════════ +// 4b. Analyst→steer WIRE — the combinator gates steer on a ScopeAnalyst's findings, not [] +// ════════════════════════════════════════════════════════════════════════════════════ + +/** A ShapeContext whose `spawnChild` yields a worker leaf carrying a fixed `done` Outcome (so the + * offline scope settles it via the BYO executor). Optionally threads a `ScopeAnalyst` so a + * combinator's gate sees its findings — the seam this wave connects. */ +function workerCtx(analyst?: ScopeAnalyst): ShapeContext { + const root: AgentSpec = { profile: { name: 'worker' } as AgentProfile, harness: null } + const persona = { name: 'p', root } as Persona + const budget: ShapeBudget = { perChild: { maxIterations: 1, maxTokens: 1000 }, fanout: 1 } + return { + persona, + budget, + ...(analyst ? { analyst } : {}), + spawnChild(name): Agent> { + const outcome: Outcome = { kind: 'done', deliverable: name } + return leafAgent(name, outcome) as Agent> + }, + childSpec(profile): AgentSpec { + return { profile, harness: null } + }, + } +} + +/** A spy `ScopeAnalyst` — records every `analyze` call and returns a fixed finding list. */ +function spyAnalyst(findings: ReadonlyArray): { + analyst: ScopeAnalyst + calls: Array<{ settledCount: number; nodeId: string }> +} { + const calls: Array<{ settledCount: number; nodeId: string }> = [] + return { + calls, + analyst: { + analyze: async (input) => { + calls.push({ settledCount: input.settledSoFar.length, nodeId: input.nodeId }) + return findings + }, + }, + } +} + +describe('analyst→steer wire (combinator gates read findings, not [])', () => { + it('createShapeContext threads an analyst onto the ShapeContext (absent when omitted)', () => { + const root: AgentSpec = { profile: { name: 'w' } as AgentProfile, harness: null } + const persona = { name: 'p', root } as Persona + const budget: ShapeBudget = { perChild: { maxIterations: 1, maxTokens: 10 }, fanout: 1 } + const { analyst } = spyAnalyst([traceFinding]) + expect(createShapeContext(persona, budget).analyst).toBeUndefined() + expect(createShapeContext(persona, budget, analyst).analyst).toBe(analyst) + }) + + it('loopUntil with ctx.analyst CALLS analyze and passes its findings to `until`', async () => { + const { scope } = await beginScope() + const { analyst, calls } = spyAnalyst([traceFinding]) + const seen: Array> = [] + const shape = loopUntil(0, { + step: () => ({}), + fold: (prior) => ({ round: prior.round, value: prior.value + 1 }), + until: (_state, findings) => { + seen.push(findings) + // Stop as soon as the analyst's trace finding arrives — proving the wire reached the gate. + return findings.some((f) => f.finding_id === 'trace-1') + ? { kind: 'done', deliverable: 'stopped-on-finding' } + : null + }, + }) + const out = await shape(workerCtx(analyst)).act('goal', scope as Scope>) + expect(out).toEqual({ kind: 'done', deliverable: 'stopped-on-finding' }) + // The analyst was consulted, and the first round's `until` saw the analyst's finding (not []). + expect(calls).toHaveLength(1) + expect(seen[0]).toEqual([traceFinding]) + expect(calls[0]?.nodeId).toBe('run') + }) + + it('loopUntil WITHOUT an analyst consults `until` with the dormant empty default', async () => { + const { scope } = await beginScope() + const seen: Array> = [] + let rounds = 0 + const shape = loopUntil(0, { + step: () => ({}), + fold: (prior) => ({ round: prior.round, value: prior.value + 1 }), + until: (_state, findings) => { + seen.push(findings) + rounds += 1 + // No analyst ⇒ findings is always []; stop after one round on the state, not a finding. + return rounds >= 1 ? { kind: 'done', deliverable: 'state-stop' } : null + }, + }) + const out = await shape(workerCtx()).act('goal', scope as Scope>) + expect(out).toEqual({ kind: 'done', deliverable: 'state-stop' }) + expect(seen[0]).toEqual([]) + }) + + it('loopUntil drives the REAL createScopeAnalyst into the SAME scope (equal-k preserved)', async () => { + // The analyst child is spawned into the combinator's own scope, so its tokens are charged to + // the one conserved pool — equal-k holds by construction. Drive the real seam end to end. + const { scope, journal } = await beginScope() + const realAnalyst = createScopeAnalyst(scope as Scope>, { + analyst: leafAgent('analyst', [traceFinding]) as Agent< + unknown, + ReadonlyArray + >, + buildTask: () => ({}), + budget: { maxIterations: 1, maxTokens: 100 }, + }) + const shape = loopUntil(0, { + step: () => ({}), + fold: (prior) => ({ round: prior.round, value: prior.value + 1 }), + until: (_state, findings) => + findings.length > 0 ? { kind: 'done', deliverable: 'done' } : null, + }) + const out = await shape(workerCtx(realAnalyst)).act('goal', scope as Scope>) + expect(out).toEqual({ kind: 'done', deliverable: 'done' }) + // The journal recorded BOTH the worker child and the analyst child under the one run tree — + // the analyst spawned as a sibling in the same scope (its compute is conserved-pooled). + const tree = await journal.loadTree('run') + const spawnedLabels = (tree ?? []) + .filter((e): e is Extract => e.kind === 'spawned') + .map((e) => e.label) + expect(spawnedLabels).toContain('analyst') + expect(spawnedLabels.some((l) => l.startsWith('step:'))).toBe(true) + }) + + it('widen with ctx.analyst feeds findings to the gate; the flat default ignores []', async () => { + const { scope } = await beginScope() + const { analyst, calls } = spyAnalyst([traceFinding]) + const gateFindings: Array> = [] + const recordingGate: ScopeWidenGate = { + decide(_settled, findings): WidenDecision { + gateFindings.push(findings) + return { kind: 'stop' } + }, + } + const shape = widen({ + seeds: [0], + seedTask: () => ({}), + gate: recordingGate, + widenTask: () => ({}), + synthesize: (gathered) => + gathered.length > 0 + ? { kind: 'done', deliverable: 'widened' } + : { kind: 'blocked', blockers: ['none'] }, + }) + await shape(workerCtx(analyst)).act('goal', scope as Scope>) + // The gate saw the analyst's finding for the one settled seed — not the hardcoded []. + expect(calls.length).toBeGreaterThanOrEqual(1) + expect(gateFindings[0]).toEqual([traceFinding]) + + // And the shipped default (flatWidenGate) is consulted with [] when no analyst is wired. + const { scope: scope2 } = await beginScope('run2') + const flatSeen: Array> = [] + const flat = flatWidenGate() + const flatProbe: ScopeWidenGate = { + decide(settled, findings, budget) { + flatSeen.push(findings) + return flat.decide(settled, findings, budget) + }, + } + const shape2 = widen({ + seeds: [0], + seedTask: () => ({}), + gate: flatProbe, + widenTask: () => ({}), + synthesize: () => ({ kind: 'done', deliverable: 'flat' }), + }) + await shape2(workerCtx()).act('goal', scope2 as Scope>) + expect(flatSeen[0]).toEqual([]) + }) +}) + +// ════════════════════════════════════════════════════════════════════════════════════ +// 4c. registryScopeAnalyst — N analyst kinds merged into one ScopeAnalyst, behind the firewall +// ════════════════════════════════════════════════════════════════════════════════════ + +/** A mock AnalystRegistry whose `run` returns a fixed merged findings list (the panel-of-analysts + * merge the real registry performs across N kinds). Records the projected inputs it received. */ +function mockRegistry(findings: ReadonlyArray): { + registry: AnalystRegistryLike + runs: Array<{ runId: string }> +} { + const runs: Array<{ runId: string }> = [] + return { + runs, + registry: { + list: () => [{ id: 'k1' }, { id: 'k2' }], + run: async (runId) => { + runs.push({ runId }) + return { + run_id: runId, + correlation_id: 'corr', + started_at: '2026-01-01T00:00:00Z', + ended_at: '2026-01-01T00:00:01Z', + findings: [...findings], + per_analyst: [], + total_cost_usd: 0, + } + }, + }, + } +} + +describe('registryScopeAnalyst (panel-of-analysts adapter)', () => { + it('runs the registry, projects inputs via buildInputs, and returns the merged findings', async () => { + const { registry, runs } = mockRegistry([traceFinding]) + const analyst = registryScopeAnalyst(registry, (input) => ({ + runId: `proj-${input.nodeId}`, + inputs: {}, + })) + const findings = await analyst.analyze({ task: 'g', settledSoFar: [], nodeId: 'run' }) + expect(findings).toEqual([traceFinding]) + // The caller-supplied projection drove `run` — the adapter never invents the runId/inputs. + expect(runs).toEqual([{ runId: 'proj-run' }]) + }) + + it('pipes the merged findings through the SAME firewall (judge-derived ABORTS)', async () => { + const { registry } = mockRegistry([judgeFinding]) + const analyst = registryScopeAnalyst(registry, () => ({ runId: 'r', inputs: {} })) + // A merged finding citing judge/verdict/score `metric` evidence aborts via + // assertTraceDerivedFindings — single-sourced with createScopeAnalyst (selector ≠ judge). + await expect( + analyst.analyze({ task: 'g', settledSoFar: [], nodeId: 'run' }), + ).rejects.toBeInstanceOf(PlannerError) + }) +}) + // ════════════════════════════════════════════════════════════════════════════════════ // 5. Trajectory trace + cost ledger — roll-up over a multi-node tree, equalKOnCost confound // ════════════════════════════════════════════════════════════════════════════════════