Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions src/lib/programs/orchestrator/__tests__/queue-tools.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import * as fs from 'fs';
import * as os from 'os';
import * as path from 'path';
import { QueueStore } from '@lib/programs/orchestrator/queue';
import {
applyComplete,
applyEnqueue,
applyReadHandoffs,
checkEnqueueGuards,
type OrchestratorToolsContext,
} from '@lib/programs/orchestrator/queue-tools';

function tmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-tools-test-'));
}

const VALID = ['install', 'init', 'capture'];

describe('checkEnqueueGuards', () => {
let dir: string;
let store: QueueStore;
let ctx: OrchestratorToolsContext;

beforeEach(() => {
dir = tmpDir();
store = new QueueStore(dir, 'run-1');
ctx = { store, validTypes: VALID };
});

afterEach(() => fs.rmSync(dir, { recursive: true, force: true }));

it('rejects an unknown type', () => {
const r = checkEnqueueGuards(ctx, { type: 'nope', reason: 'x' });
expect(r).toMatchObject({ ok: false, guard: 'unknown-type' });
});

it('rejects an unknown dependency', () => {
const r = checkEnqueueGuards(ctx, {
type: 'init',
dependsOn: ['ghost'],
reason: 'x',
});
expect(r).toMatchObject({ ok: false, guard: 'unknown-dep' });
});

it('trips dedup on the same type and inputs', () => {
store.enqueue({ type: 'install', inputs: { pkg: 'posthog-js' } });
const r = checkEnqueueGuards(ctx, {
type: 'install',
inputs: { pkg: 'posthog-js' },
reason: 'x',
});
expect(r).toMatchObject({ ok: false, guard: 'dedup' });
});

it('allows a valid enqueue', () => {
const r = checkEnqueueGuards(ctx, { type: 'init', reason: 'x' });
expect(r).toEqual({ ok: true });
});
});

describe('apply functions', () => {
let dir: string;
let store: QueueStore;
let ctx: OrchestratorToolsContext;

beforeEach(() => {
dir = tmpDir();
store = new QueueStore(dir, 'run-1');
ctx = { store, validTypes: VALID };
});

afterEach(() => fs.rmSync(dir, { recursive: true, force: true }));

it('attributes a seed enqueue to the orchestrator', () => {
const r = applyEnqueue(ctx, { type: 'install', reason: 'seed' });
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.task.enqueuedBy).toBe('orchestrator');
});

it('attributes a follow-up enqueue to the running task', () => {
const parent = store.enqueue({ type: 'init' });
ctx.currentTaskId = parent.id;
const r = applyEnqueue(ctx, { type: 'capture', reason: 'follow-up' });
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.task.enqueuedBy).toBe(parent.id);
});

it('complete_task fails when no task is running', () => {
const r = applyComplete(ctx, {
status: 'done',
handoff: { goals: 'g', did: 'd', forNextAgent: 'n' },
});
expect(r.ok).toBe(false);
});

it('complete_task marks the running task done and stores the handoff', () => {
const t = store.enqueue({ type: 'install' });
ctx.currentTaskId = t.id;
store.start(t.id);
const r = applyComplete(ctx, {
status: 'done',
handoff: { goals: 'g', did: 'added sdk', forNextAgent: 'env next' },
});
expect(r.ok).toBe(true);
expect(store.get(t.id)?.status).toBe('done');
expect(store.readHandoff(t.id)?.did).toBe('added sdk');
});

it('read_handoffs returns a dependency handoff for the running task', () => {
const dep = store.enqueue({ type: 'install' });
store.start(dep.id);
store.complete(dep.id, {
goals: 'g',
did: 'installed',
forNextAgent: 'now init',
});
const t = store.enqueue({ type: 'init', dependsOn: [dep.id] });
ctx.currentTaskId = t.id;

const handoffs = applyReadHandoffs(ctx, {});
expect(handoffs).toHaveLength(1);
expect(handoffs[0].did).toBe('installed');
});
});
253 changes: 253 additions & 0 deletions src/lib/programs/orchestrator/queue-tools.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/**
* Orchestrator MCP tools, registered into the existing `wizard-tools` server when
* a queue is present. They let the orchestrator agent and task agents grow the
* queue, report completion with a structured handoff, and read prior handoffs.
*
* The guard logic and the apply functions are plain, exported, and unit-tested.
* `buildOrchestratorTools` wraps them in the SDK `tool()` shape.
*/
import { z } from 'zod';
import { analytics } from '../../../utils/analytics';
import type { QueueStore, QueuedTask, TaskHandoff } from './queue';

export interface OrchestratorToolsContext {
store: QueueStore;
/** Task types the registry knows about. enqueue_task rejects anything else. */
validTypes: readonly string[];
/**
* The id of the task this tool server is bound to. Each task agent gets its
* own wizard-tools server, so attribution holds when independent tasks run
* in parallel. Absent for the seed, which is not a task.
*/
currentTaskId?: string;
}

export interface EnqueueArgs {
type: string;
inputs?: Record<string, unknown>;
dependsOn?: string[];
model?: string;
reason: string;
}

export type GuardResult =
| { ok: true }
| { ok: false; guard: string; message: string };

function stableStringify(value: unknown): string {
if (value === null || typeof value !== 'object') return JSON.stringify(value);
if (Array.isArray(value)) return `[${value.map(stableStringify).join(',')}]`;
const entries = Object.entries(value as Record<string, unknown>).sort(
([a], [b]) => a.localeCompare(b),
);
return `{${entries
.map(([k, v]) => `${JSON.stringify(k)}:${stableStringify(v)}`)
.join(',')}}`;
}

function dedupKey(type: string, inputs: Record<string, unknown>): string {
return `${type}::${stableStringify(inputs)}`;
}

/**
* Validate an enqueue. Structural checks only — a real type, real dependencies,
* and not a literal duplicate. How much runs, and in what shape, is the task
* graph's business, not a knob's.
*/
export function checkEnqueueGuards(
ctx: OrchestratorToolsContext,
args: EnqueueArgs,
): GuardResult {
const tasks = ctx.store.list();

if (!ctx.validTypes.includes(args.type)) {
return {
ok: false,
guard: 'unknown-type',
message: `Unknown task type "${
args.type
}". Valid types: ${ctx.validTypes.join(', ')}.`,
};
}

for (const dep of args.dependsOn ?? []) {
if (!ctx.store.get(dep)) {
return {
ok: false,
guard: 'unknown-dep',
message: `Dependency "${dep}" is not a known task id.`,
};
}
}

const key = dedupKey(args.type, args.inputs ?? {});
if (
tasks.some(
(t) => t.status !== 'failed' && dedupKey(t.type, t.inputs) === key,
)
) {
return {
ok: false,
guard: 'dedup',
message: `A "${args.type}" task with these inputs already exists.`,
};
}

return { ok: true };
}

export type EnqueueResult =
| { ok: true; task: QueuedTask }
| { ok: false; guard: string; message: string };

export function applyEnqueue(
ctx: OrchestratorToolsContext,
args: EnqueueArgs,
): EnqueueResult {
const guard = checkEnqueueGuards(ctx, args);
if (!guard.ok) return guard;

const task = ctx.store.enqueue({
type: args.type,
inputs: args.inputs ?? {},
dependsOn: args.dependsOn ?? [],
model: args.model,
enqueuedBy: ctx.currentTaskId ?? 'orchestrator',
});
return { ok: true, task };
}

export type CompleteResult = { ok: true } | { ok: false; message: string };

export function applyComplete(
ctx: OrchestratorToolsContext,
args: { status: 'done' | 'failed' | 'skipped'; handoff: TaskHandoff },
): CompleteResult {
const id = ctx.currentTaskId;
if (!id) {
return {
ok: false,
message: 'complete_task can only be called by a running task agent.',
};
}
if (args.status === 'failed') {
ctx.store.fail(
id,
{ type: 'self-reported', message: args.handoff.forNextAgent },
args.handoff,
);
} else if (args.status === 'skipped') {
ctx.store.skip(id, args.handoff);
} else {
ctx.store.complete(id, args.handoff);
}
return { ok: true };
}

export function applyReadHandoffs(
ctx: OrchestratorToolsContext,
args: { type?: string; taskId?: string },
): TaskHandoff[] {
if (args.taskId) {
const h = ctx.store.readHandoff(args.taskId);
return h ? [h] : [];
}
if (args.type) {
return ctx.store.readHandoffsByType(args.type);
}
// No filter: every handoff of a dependency of the current task.
const currentId = ctx.currentTaskId;
const current = currentId ? ctx.store.get(currentId) : undefined;
if (!current) return [];
return current.dependsOn
.map((depId) => ctx.store.readHandoff(depId))
.filter((h): h is TaskHandoff => h !== null);
}

const HANDOFF_SHAPE = {
goals: z.string().describe('What this task was asked to achieve.'),
did: z.string().describe('What you actually did.'),
forNextAgent: z.string().describe('What the next agent should know.'),
filesTouched: z.array(z.string()).optional(),
};

type SdkTool = (
name: string,
description: string,
// The SDK accepts a plain object of zod fields as the schema.
schema: Record<string, z.ZodTypeAny>,
handler: (args: never) => unknown,
) => unknown;

function textResult(text: string, isError = false) {
return { isError, content: [{ type: 'text' as const, text }] };
}

/**
* Build the orchestrator tools in the SDK `tool()` shape. Called from
* createWizardToolsServer only when a queue context is present.
*/
export function buildOrchestratorTools(
tool: SdkTool,
ctx: OrchestratorToolsContext,
): unknown[] {
const enqueueTask = tool(
'enqueue_task',
'Add a task to the orchestrator queue. Use it to seed work and to enqueue follow-up work you discover. Keep tasks small and discrete.',
{
type: z
.string()
.describe(`The task type. One of: ${ctx.validTypes.join(', ')}.`),
inputs: z.record(z.unknown()).optional(),
dependsOn: z
.array(z.string())
.optional()
.describe('Task ids that must be done before this task runs.'),
model: z.string().optional(),
reason: z.string().describe('One line on why this task is needed.'),
},
((args: EnqueueArgs) => {
const res = applyEnqueue(ctx, args);
if (!res.ok) {
analytics.wizardCapture('orchestrator guard tripped', {
guard: res.guard,
type: args.type,
});
return textResult(res.message, true);
}
return textResult(JSON.stringify({ id: res.task.id }));
}) as (args: never) => unknown,
);

const completeTask = tool(
'complete_task',
"Report the outcome of your task. Always call this exactly once when you finish, with a structured handoff for the next agent. Use status 'skipped' when the task does not apply to this project and you cannot do it (say why in the handoff) — not 'done'.",
{
status: z.enum(['done', 'failed', 'skipped']),
handoff: z.object(HANDOFF_SHAPE),
},
((args: {
status: 'done' | 'failed' | 'skipped';
handoff: TaskHandoff;
}) => {
const res = applyComplete(ctx, args);
if (!res.ok) return textResult(res.message, true);
return textResult('ok');
}) as (args: never) => unknown,
);

const readHandoffs = tool(
'read_handoffs',
'Read structured handoffs from earlier tasks. With no argument, returns the handoffs of your dependencies.',
{
type: z.string().optional(),
taskId: z.string().optional(),
},
((args: { type?: string; taskId?: string }) => {
const handoffs = applyReadHandoffs(ctx, args);
return textResult(JSON.stringify(handoffs, null, 2));
}) as (args: never) => unknown,
);

return [enqueueTask, completeTask, readHandoffs];
}
Loading
Loading