diff --git a/src/lib/programs/orchestrator/__tests__/queue-tools.test.ts b/src/lib/programs/orchestrator/__tests__/queue-tools.test.ts new file mode 100644 index 00000000..318825d2 --- /dev/null +++ b/src/lib/programs/orchestrator/__tests__/queue-tools.test.ts @@ -0,0 +1,127 @@ +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { QueueStore } from '@lib/programs/orchestrator/queue'; +import { + applyComplete, + applyEnqueue, + applyReadHandoffs, + checkEnqueueGuards, + type OrchestratorToolsContext, +} from '@lib/programs/orchestrator/queue-tools'; + +function tmpDir(): string { + return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-tools-test-')); +} + +const VALID = ['install', 'init', 'capture']; + +describe('checkEnqueueGuards', () => { + let dir: string; + let store: QueueStore; + let ctx: OrchestratorToolsContext; + + beforeEach(() => { + dir = tmpDir(); + store = new QueueStore(dir, 'run-1'); + ctx = { store, validTypes: VALID }; + }); + + afterEach(() => fs.rmSync(dir, { recursive: true, force: true })); + + it('rejects an unknown type', () => { + const r = checkEnqueueGuards(ctx, { type: 'nope', reason: 'x' }); + expect(r).toMatchObject({ ok: false, guard: 'unknown-type' }); + }); + + it('rejects an unknown dependency', () => { + const r = checkEnqueueGuards(ctx, { + type: 'init', + dependsOn: ['ghost'], + reason: 'x', + }); + expect(r).toMatchObject({ ok: false, guard: 'unknown-dep' }); + }); + + it('trips dedup on the same type and inputs', () => { + store.enqueue({ type: 'install', inputs: { pkg: 'posthog-js' } }); + const r = checkEnqueueGuards(ctx, { + type: 'install', + inputs: { pkg: 'posthog-js' }, + reason: 'x', + }); + expect(r).toMatchObject({ ok: false, guard: 'dedup' }); + }); + + it('allows a valid enqueue', () => { + const r = checkEnqueueGuards(ctx, { type: 'init', reason: 'x' }); + expect(r).toEqual({ ok: true }); + }); +}); + +describe('apply functions', () => { + let dir: string; + let store: QueueStore; + let ctx: OrchestratorToolsContext; + + beforeEach(() => { + dir = tmpDir(); + store = new QueueStore(dir, 'run-1'); + ctx = { store, validTypes: VALID }; + }); + + afterEach(() => fs.rmSync(dir, { recursive: true, force: true })); + + it('attributes a seed enqueue to the orchestrator', () => { + const r = applyEnqueue(ctx, { type: 'install', reason: 'seed' }); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.task.enqueuedBy).toBe('orchestrator'); + }); + + it('attributes a follow-up enqueue to the running task', () => { + const parent = store.enqueue({ type: 'init' }); + ctx.currentTaskId = parent.id; + const r = applyEnqueue(ctx, { type: 'capture', reason: 'follow-up' }); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.task.enqueuedBy).toBe(parent.id); + }); + + it('complete_task fails when no task is running', () => { + const r = applyComplete(ctx, { + status: 'done', + handoff: { goals: 'g', did: 'd', forNextAgent: 'n' }, + }); + expect(r.ok).toBe(false); + }); + + it('complete_task marks the running task done and stores the handoff', () => { + const t = store.enqueue({ type: 'install' }); + ctx.currentTaskId = t.id; + store.start(t.id); + const r = applyComplete(ctx, { + status: 'done', + handoff: { goals: 'g', did: 'added sdk', forNextAgent: 'env next' }, + }); + expect(r.ok).toBe(true); + expect(store.get(t.id)?.status).toBe('done'); + expect(store.readHandoff(t.id)?.did).toBe('added sdk'); + }); + + it('read_handoffs returns a dependency handoff for the running task', () => { + const dep = store.enqueue({ type: 'install' }); + store.start(dep.id); + store.complete(dep.id, { + goals: 'g', + did: 'installed', + forNextAgent: 'now init', + }); + const t = store.enqueue({ type: 'init', dependsOn: [dep.id] }); + ctx.currentTaskId = t.id; + + const handoffs = applyReadHandoffs(ctx, {}); + expect(handoffs).toHaveLength(1); + expect(handoffs[0].did).toBe('installed'); + }); +}); diff --git a/src/lib/programs/orchestrator/queue-tools.ts b/src/lib/programs/orchestrator/queue-tools.ts new file mode 100644 index 00000000..624d9b09 --- /dev/null +++ b/src/lib/programs/orchestrator/queue-tools.ts @@ -0,0 +1,253 @@ +/** + * Orchestrator MCP tools, registered into the existing `wizard-tools` server when + * a queue is present. They let the orchestrator agent and task agents grow the + * queue, report completion with a structured handoff, and read prior handoffs. + * + * The guard logic and the apply functions are plain, exported, and unit-tested. + * `buildOrchestratorTools` wraps them in the SDK `tool()` shape. + */ +import { z } from 'zod'; +import { analytics } from '../../../utils/analytics'; +import type { QueueStore, QueuedTask, TaskHandoff } from './queue'; + +export interface OrchestratorToolsContext { + store: QueueStore; + /** Task types the registry knows about. enqueue_task rejects anything else. */ + validTypes: readonly string[]; + /** + * The id of the task this tool server is bound to. Each task agent gets its + * own wizard-tools server, so attribution holds when independent tasks run + * in parallel. Absent for the seed, which is not a task. + */ + currentTaskId?: string; +} + +export interface EnqueueArgs { + type: string; + inputs?: Record; + dependsOn?: string[]; + model?: string; + reason: string; +} + +export type GuardResult = + | { ok: true } + | { ok: false; guard: string; message: string }; + +function stableStringify(value: unknown): string { + if (value === null || typeof value !== 'object') return JSON.stringify(value); + if (Array.isArray(value)) return `[${value.map(stableStringify).join(',')}]`; + const entries = Object.entries(value as Record).sort( + ([a], [b]) => a.localeCompare(b), + ); + return `{${entries + .map(([k, v]) => `${JSON.stringify(k)}:${stableStringify(v)}`) + .join(',')}}`; +} + +function dedupKey(type: string, inputs: Record): string { + return `${type}::${stableStringify(inputs)}`; +} + +/** + * Validate an enqueue. Structural checks only — a real type, real dependencies, + * and not a literal duplicate. How much runs, and in what shape, is the task + * graph's business, not a knob's. + */ +export function checkEnqueueGuards( + ctx: OrchestratorToolsContext, + args: EnqueueArgs, +): GuardResult { + const tasks = ctx.store.list(); + + if (!ctx.validTypes.includes(args.type)) { + return { + ok: false, + guard: 'unknown-type', + message: `Unknown task type "${ + args.type + }". Valid types: ${ctx.validTypes.join(', ')}.`, + }; + } + + for (const dep of args.dependsOn ?? []) { + if (!ctx.store.get(dep)) { + return { + ok: false, + guard: 'unknown-dep', + message: `Dependency "${dep}" is not a known task id.`, + }; + } + } + + const key = dedupKey(args.type, args.inputs ?? {}); + if ( + tasks.some( + (t) => t.status !== 'failed' && dedupKey(t.type, t.inputs) === key, + ) + ) { + return { + ok: false, + guard: 'dedup', + message: `A "${args.type}" task with these inputs already exists.`, + }; + } + + return { ok: true }; +} + +export type EnqueueResult = + | { ok: true; task: QueuedTask } + | { ok: false; guard: string; message: string }; + +export function applyEnqueue( + ctx: OrchestratorToolsContext, + args: EnqueueArgs, +): EnqueueResult { + const guard = checkEnqueueGuards(ctx, args); + if (!guard.ok) return guard; + + const task = ctx.store.enqueue({ + type: args.type, + inputs: args.inputs ?? {}, + dependsOn: args.dependsOn ?? [], + model: args.model, + enqueuedBy: ctx.currentTaskId ?? 'orchestrator', + }); + return { ok: true, task }; +} + +export type CompleteResult = { ok: true } | { ok: false; message: string }; + +export function applyComplete( + ctx: OrchestratorToolsContext, + args: { status: 'done' | 'failed' | 'skipped'; handoff: TaskHandoff }, +): CompleteResult { + const id = ctx.currentTaskId; + if (!id) { + return { + ok: false, + message: 'complete_task can only be called by a running task agent.', + }; + } + if (args.status === 'failed') { + ctx.store.fail( + id, + { type: 'self-reported', message: args.handoff.forNextAgent }, + args.handoff, + ); + } else if (args.status === 'skipped') { + ctx.store.skip(id, args.handoff); + } else { + ctx.store.complete(id, args.handoff); + } + return { ok: true }; +} + +export function applyReadHandoffs( + ctx: OrchestratorToolsContext, + args: { type?: string; taskId?: string }, +): TaskHandoff[] { + if (args.taskId) { + const h = ctx.store.readHandoff(args.taskId); + return h ? [h] : []; + } + if (args.type) { + return ctx.store.readHandoffsByType(args.type); + } + // No filter: every handoff of a dependency of the current task. + const currentId = ctx.currentTaskId; + const current = currentId ? ctx.store.get(currentId) : undefined; + if (!current) return []; + return current.dependsOn + .map((depId) => ctx.store.readHandoff(depId)) + .filter((h): h is TaskHandoff => h !== null); +} + +const HANDOFF_SHAPE = { + goals: z.string().describe('What this task was asked to achieve.'), + did: z.string().describe('What you actually did.'), + forNextAgent: z.string().describe('What the next agent should know.'), + filesTouched: z.array(z.string()).optional(), +}; + +type SdkTool = ( + name: string, + description: string, + // The SDK accepts a plain object of zod fields as the schema. + schema: Record, + handler: (args: never) => unknown, +) => unknown; + +function textResult(text: string, isError = false) { + return { isError, content: [{ type: 'text' as const, text }] }; +} + +/** + * Build the orchestrator tools in the SDK `tool()` shape. Called from + * createWizardToolsServer only when a queue context is present. + */ +export function buildOrchestratorTools( + tool: SdkTool, + ctx: OrchestratorToolsContext, +): unknown[] { + const enqueueTask = tool( + 'enqueue_task', + 'Add a task to the orchestrator queue. Use it to seed work and to enqueue follow-up work you discover. Keep tasks small and discrete.', + { + type: z + .string() + .describe(`The task type. One of: ${ctx.validTypes.join(', ')}.`), + inputs: z.record(z.unknown()).optional(), + dependsOn: z + .array(z.string()) + .optional() + .describe('Task ids that must be done before this task runs.'), + model: z.string().optional(), + reason: z.string().describe('One line on why this task is needed.'), + }, + ((args: EnqueueArgs) => { + const res = applyEnqueue(ctx, args); + if (!res.ok) { + analytics.wizardCapture('orchestrator guard tripped', { + guard: res.guard, + type: args.type, + }); + return textResult(res.message, true); + } + return textResult(JSON.stringify({ id: res.task.id })); + }) as (args: never) => unknown, + ); + + const completeTask = tool( + 'complete_task', + "Report the outcome of your task. Always call this exactly once when you finish, with a structured handoff for the next agent. Use status 'skipped' when the task does not apply to this project and you cannot do it (say why in the handoff) — not 'done'.", + { + status: z.enum(['done', 'failed', 'skipped']), + handoff: z.object(HANDOFF_SHAPE), + }, + ((args: { + status: 'done' | 'failed' | 'skipped'; + handoff: TaskHandoff; + }) => { + const res = applyComplete(ctx, args); + if (!res.ok) return textResult(res.message, true); + return textResult('ok'); + }) as (args: never) => unknown, + ); + + const readHandoffs = tool( + 'read_handoffs', + 'Read structured handoffs from earlier tasks. With no argument, returns the handoffs of your dependencies.', + { + type: z.string().optional(), + taskId: z.string().optional(), + }, + ((args: { type?: string; taskId?: string }) => { + const handoffs = applyReadHandoffs(ctx, args); + return textResult(JSON.stringify(handoffs, null, 2)); + }) as (args: never) => unknown, + ); + + return [enqueueTask, completeTask, readHandoffs]; +} diff --git a/src/lib/wizard-tools.ts b/src/lib/wizard-tools.ts index aae21ec5..173039b1 100644 --- a/src/lib/wizard-tools.ts +++ b/src/lib/wizard-tools.ts @@ -26,6 +26,10 @@ import { } from './programs/audit/types'; import type { WizardAskBridge } from './wizard-ask-bridge'; import { createSecretVault, type SecretVault } from './secret-vault'; +import { + buildOrchestratorTools, + type OrchestratorToolsContext, +} from './programs/orchestrator/queue-tools'; // --------------------------------------------------------------------------- // SDK dynamic import (ESM module loaded once, cached) @@ -203,6 +207,14 @@ export interface WizardToolsOptions { * (e.g. in unit tests), a fresh vault is created internally. */ secretVault?: SecretVault; + + /** + * Orchestrator queue context. Present only when the `wizard-orchestrator` + * flag routes the run to the orchestrator; when set, the orchestrator tools + * (enqueue_task, complete_task, read_handoffs) are registered. Absent on the + * linear path. + */ + orchestrator?: OrchestratorToolsContext; } /** Default per-run cap on wizard_ask calls when no override is provided. */ @@ -488,6 +500,7 @@ export async function createWizardToolsServer(options: WizardToolsOptions) { askBridge, askMaxQuestions = DEFAULT_ASK_MAX_QUESTIONS, secretVault = createSecretVault(), + orchestrator, } = options; const sdk = await getSDKModule(); const { tool, createSdkMcpServer } = sdk; @@ -1087,6 +1100,10 @@ export async function createWizardToolsServer(options: WizardToolsOptions) { // -- Assemble server ------------------------------------------------------ + const orchestratorTools = orchestrator + ? buildOrchestratorTools(tool, orchestrator) + : []; + return createSdkMcpServer({ name: SERVER_NAME, version: '1.0.0', @@ -1100,6 +1117,7 @@ export async function createWizardToolsServer(options: WizardToolsOptions) { auditAddChecks, auditResolveChecks, wizardAsk, + ...orchestratorTools, ], }); } @@ -1119,6 +1137,9 @@ export const WIZARD_TOOL_NAMES = { auditAddChecks: `mcp__${SERVER_NAME}__audit_add_checks`, auditResolveChecks: `mcp__${SERVER_NAME}__audit_resolve_checks`, wizardAsk: `mcp__${SERVER_NAME}__wizard_ask`, + enqueueTask: `mcp__${SERVER_NAME}__enqueue_task`, + completeTask: `mcp__${SERVER_NAME}__complete_task`, + readHandoffs: `mcp__${SERVER_NAME}__read_handoffs`, } as const; // ---------------------------------------------------------------------------