diff --git a/src/lib/agent/agent-interface.ts b/src/lib/agent/agent-interface.ts index 35c45d03..3870074d 100644 --- a/src/lib/agent/agent-interface.ts +++ b/src/lib/agent/agent-interface.ts @@ -265,6 +265,12 @@ export type AgentConfig = { getPendingQuestion?: () => | import('@lib/wizard-session').PendingQuestion | null; + /** + * Orchestrator queue context. Present only when the `wizard-orchestrator` + * flag routes the run here; threaded into wizard-tools so the orchestrator + * tools register. + */ + orchestrator?: import('@lib/programs/orchestrator/queue-tools').OrchestratorToolsContext; }; /** @@ -286,6 +292,7 @@ export type StopHookResult = export function createStopHook( featureQueue: readonly AdditionalFeature[], signals?: AgentOutputSignals, + requestRemark = true, ): (input: { stop_hook_active: boolean }) => StopHookResult { let featureIndex = 0; let remarkRequested = false; @@ -313,8 +320,9 @@ export function createStopHook( return { decision: 'block', reason: prompt }; } - // Phase 2: collect remark (once) - if (!remarkRequested) { + // Phase 2: collect remark (once). Skipped when the caller opts out — the + // orchestrator suppresses it per task so it does not fire on every agent. + if (requestRemark && !remarkRequested) { remarkRequested = true; logToFile('Stop hook: requesting reflection'); return { @@ -655,8 +663,6 @@ export async function initializeAgent( logToFile('Agent initialization starting'); logToFile('Install directory:', options.installDir); - getUI().log.step('Initializing Claude agent...'); - try { // Configure LLM gateway environment variables (inherited by SDK subprocess) const gatewayUrl = getLlmGatewayUrlFromHost(config.posthogApiHost); @@ -708,6 +714,7 @@ export async function initializeAgent( skillsBaseUrl: config.skillsBaseUrl, askBridge: config.askBridge, askMaxQuestions: config.askMaxQuestions, + orchestrator: config.orchestrator, }); mcpServers['wizard-tools'] = wizardToolsServer; @@ -747,8 +754,6 @@ export async function initializeAgent( }); } - getUI().log.step(`Verbose logs: ${getLogFilePath()}`); - getUI().log.success("Agent initialized. Let's get cooking!"); return agentRunConfig; } catch (error) { getUI().log.error( @@ -794,6 +799,8 @@ export async function runAgent( errorMessage?: string; additionalFeatureQueue?: readonly AdditionalFeature[]; abortCases?: readonly AbortCaseMatcher[]; + /** Request the end-of-run reflection remark. Defaults to true. */ + requestRemark?: boolean; }, middleware?: { onMessage(message: any): void; @@ -1052,7 +1059,11 @@ export async function runAgent( Stop: [ { hooks: [ - createStopHook(config?.additionalFeatureQueue ?? [], signals), + createStopHook( + config?.additionalFeatureQueue ?? [], + signals, + config?.requestRemark ?? true, + ), ], timeout: 30, }, @@ -1100,6 +1111,7 @@ export async function runAgent( signals, receivedSuccessResult, tasks, + isOrchestratorEnabled(agentConfig.wizardFlags ?? {}), ); // [ABORT] detection: the skill emits "[ABORT] " when it @@ -1433,6 +1445,9 @@ function handleSDKMessage( signals: AgentOutputSignals, receivedSuccessResult = false, tasks?: Map, + // The orchestrator owns the TUI task panel (it renders its queue). Suppress the + // agent's own TaskCreate/TaskUpdate rendering so it does not clobber the queue. + suppressTaskRender = false, ): void { // Map preserves insertion order (the order the agent created the tasks). // Within that, group by status: completed first, then in_progress, then @@ -1444,7 +1459,7 @@ function handleSDKMessage( }; const rank = (status: string): number => STATUS_RANK[status] ?? 2; const syncTasks = (): void => { - if (!tasks) return; + if (!tasks || suppressTaskRender) return; const sorted = Array.from(tasks.values()).sort( (a, b) => rank(a.status) - rank(b.status), ); diff --git a/src/lib/agent/agent-runner.ts b/src/lib/agent/agent-runner.ts index a6169460..da561704 100644 --- a/src/lib/agent/agent-runner.ts +++ b/src/lib/agent/agent-runner.ts @@ -31,10 +31,12 @@ import { AgentErrorType, AgentSignals, buildWizardMetadata, + isOrchestratorEnabled, checkAllSettingsConflicts, backupAndFixClaudeSettings, restoreClaudeSettings, } from './agent-interface'; +import { runOrchestrator } from '../programs/orchestrator/orchestrator-runner'; import { getCloudUrlFromRegion } from '@utils/urls'; import { evaluateWizardReadiness, @@ -43,7 +45,12 @@ import { getBlockingServiceKeys, SERVICE_LABELS, } from '@lib/health-checks/readiness'; -import { enableDebugLogs, initLogFile, logToFile } from '@utils/debug'; +import { + enableDebugLogs, + getLogFilePath, + initLogFile, + logToFile, +} from '@utils/debug'; import { createBenchmarkPipeline } from '@lib/middleware/benchmark'; import { wizardAbort, WizardError, registerCleanup } from '@utils/wizard-abort'; import { formatScanReport, writeScanReport } from '@lib/yara-hooks'; @@ -200,6 +207,11 @@ export async function runProgram( ): Promise { const boot = await bootstrapProgram(session, config, programConfig); + if (isOrchestratorEnabled(boot.wizardFlags)) { + getUI().log.info('Task-queue orchestrator enabled.'); + return runOrchestrator(session, programConfig, boot); + } + return runLinearProgram(session, config, programConfig, boot); } @@ -412,6 +424,7 @@ async function runLinearProgram( showQuestion: (q) => getUI().requestQuestion(q), }); + getUI().log.step('Initializing Claude agent...'); const agent = await initializeAgent( { workingDirectory: session.installDir, @@ -433,6 +446,8 @@ async function runLinearProgram( }, sessionToOptions(session), ); + getUI().log.step(`Verbose logs: ${getLogFilePath()}`); + getUI().log.success("Agent initialized. Let's get cooking!"); const middleware = session.benchmark ? createBenchmarkPipeline(spinner, sessionToOptions(session)) diff --git a/src/lib/programs/orchestrator/__tests__/agent-prompt-loader.test.ts b/src/lib/programs/orchestrator/__tests__/agent-prompt-loader.test.ts new file mode 100644 index 00000000..64a4bdab --- /dev/null +++ b/src/lib/programs/orchestrator/__tests__/agent-prompt-loader.test.ts @@ -0,0 +1,205 @@ +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { + agentRunTools, + buildRegistry, + parseAgentPrompt, + resolveTask, + type AgentPrompt, + type AgentRegistry, +} from '../agent-prompt-loader'; +import { QueueStore } from '../queue'; + +function tmpDir(): string { + return fs.mkdtempSync(path.join(os.tmpdir(), 'agent-loader-test-')); +} + +function registryOf(prompts: AgentPrompt[]): AgentRegistry { + return buildRegistry( + prompts.map((p) => ({ ...p, flow: 'test-flow' })), + 'test-flow', + ); +} + +describe('parseAgentPrompt', () => { + const sample = `--- +type: instrument-events +model: claude-sonnet-4-6 # cheapest model that succeeds +skills: [instrument-events] +allowedTools: [Read, Edit, Grep, Glob, Bash] +disallowedTools: [enqueue_task] +dependsOn: [init] +--- + +## Goal +Add at least one capture call. +`; + + it('parses frontmatter scalars and inline arrays', () => { + const p = parseAgentPrompt(sample, 'fallback'); + expect(p.type).toBe('instrument-events'); + expect(p.model).toBe('claude-sonnet-4-6'); + expect(p.skills).toEqual(['instrument-events']); + expect(p.allowedTools).toEqual(['Read', 'Edit', 'Grep', 'Glob', 'Bash']); + expect(p.disallowedTools).toEqual(['enqueue_task']); + expect(p.dependsOn).toEqual(['init']); + }); + + it('strips inline comments and keeps the body', () => { + const p = parseAgentPrompt(sample, 'fallback'); + expect(p.model).not.toContain('#'); + expect(p.body).toContain('## Goal'); + expect(p.body).not.toContain('---'); + }); + + it('falls back to the menu id when type is omitted', () => { + const p = parseAgentPrompt('---\nmodel: x\n---\nbody', 'install'); + expect(p.type).toBe('install'); + }); + + it('parses the flow from frontmatter', () => { + const p = parseAgentPrompt('---\nflow: audit\n---\nx', 'fix-events'); + expect(p.flow).toBe('audit'); + }); + + it('marks the seed from frontmatter; everything else is a task', () => { + expect(parseAgentPrompt('---\nseed: true\n---\nplan', 'planner').seed).toBe( + true, + ); + expect(parseAgentPrompt('---\nmodel: x\n---\nbody', 'install').seed).toBe( + false, + ); + }); + + it('defaults missing array fields to empty and model to undefined', () => { + const p = parseAgentPrompt('no frontmatter at all', 'stub'); + expect(p.model).toBeUndefined(); + expect(p.skills).toEqual([]); + expect(p.dependsOn).toEqual([]); + expect(p.body).toBe('no frontmatter at all'); + }); +}); + +describe('agentRunTools', () => { + it('MCP-qualifies orchestrator tools and passes native tools through', () => { + const p = parseAgentPrompt( + '---\nallowedTools: [Read, read_handoffs]\ndisallowedTools: [enqueue_task, complete_task, Bash]\n---\nx', + 't', + ); + const { allowedTools, disallowedTools } = agentRunTools(p); + expect(allowedTools).toEqual([ + 'Read', + 'mcp__posthog-wizard__read_handoffs', + ]); + expect(disallowedTools).toEqual([ + 'mcp__posthog-wizard__enqueue_task', + 'mcp__posthog-wizard__complete_task', + 'Bash', + ]); + }); +}); + +describe('buildRegistry', () => { + const prompt = (over: Partial): AgentPrompt => ({ + type: 'x', + seed: false, + skills: [], + allowedTools: [], + disallowedTools: [], + dependsOn: [], + body: 'b', + ...over, + }); + + it('scopes to one flow and keeps the seed out of the task types', () => { + const registry = buildRegistry( + [ + prompt({ type: 'plan-audit', flow: 'audit', seed: true }), + prompt({ type: 'fix-events', flow: 'audit' }), + prompt({ type: 'install', flow: 'posthog-integration' }), + prompt({ type: 'example' }), + ], + 'audit', + ); + expect(registry.types).toEqual(['fix-events']); + expect(registry.seed?.type).toBe('plan-audit'); + expect(registry.get('install')).toBeUndefined(); + // A flowless prompt (e.g. the documentation example) joins no registry. + expect(registry.get('example')).toBeUndefined(); + }); +}); + +describe('resolveTask', () => { + let dir: string; + let store: QueueStore; + + beforeEach(() => { + dir = tmpDir(); + store = new QueueStore(dir, 'run-1'); + }); + + afterEach(() => { + fs.rmSync(dir, { recursive: true, force: true }); + }); + + const prompt: AgentPrompt = { + type: 'capture', + seed: false, + model: 'claude-haiku-4-5-20251001', + skills: ['instrument-events'], + allowedTools: ['Read', 'Edit'], + disallowedTools: ['enqueue_task'], + dependsOn: ['plan-capture'], + body: '## Goal\nInstrument the planned events.', + }; + + it('throws when no prompt is registered for the type', () => { + const registry = registryOf([]); + const task = { type: 'capture', dependsOn: [] } as never; + expect(() => resolveTask(registry, task, store)).toThrow(/capture/); + }); + + it('resolves model, tools, and skills from the prompt', () => { + const registry = registryOf([prompt]); + const task = store.enqueue({ type: 'capture' }); + const resolved = resolveTask(registry, task, store); + expect(resolved.model).toBe('claude-haiku-4-5-20251001'); + expect(resolved.skills).toEqual(['instrument-events']); + expect(resolved.disallowedTools).toEqual([ + 'mcp__posthog-wizard__enqueue_task', + ]); + }); + + it('prefers the enqueue model override over the prompt model', () => { + const registry = registryOf([prompt]); + const task = store.enqueue({ type: 'capture', model: 'override-x' }); + expect(resolveTask(registry, task, store).model).toBe('override-x'); + }); + + it("appends upstream dependencies' handoffs as context", () => { + const registry = registryOf([prompt]); + const dep = store.enqueue({ type: 'plan-capture' }); + store.complete(dep.id, { + goals: 'decide events', + did: 'picked signup and purchase', + forNextAgent: 'instrument those two', + }); + const task = store.enqueue({ + type: 'capture', + dependsOn: [dep.id], + }); + const resolved = resolveTask(registry, task, store); + expect(resolved.prompt).toContain('Context from previous steps'); + expect(resolved.prompt).toContain('picked signup and purchase'); + expect(resolved.prompt).toContain('instrument those two'); + }); + + it('omits the context section when there are no handoffs', () => { + const registry = registryOf([prompt]); + const task = store.enqueue({ type: 'capture' }); + expect(resolveTask(registry, task, store).prompt).not.toContain( + 'Context from previous steps', + ); + }); +}); diff --git a/src/lib/programs/orchestrator/agent-prompt-loader.ts b/src/lib/programs/orchestrator/agent-prompt-loader.ts new file mode 100644 index 00000000..ee351db8 --- /dev/null +++ b/src/lib/programs/orchestrator/agent-prompt-loader.ts @@ -0,0 +1,310 @@ +/** + * Agent-prompt loader + registry. + * + * Agent prompts are the WHAT of a task: a markdown file per type, served from + * context-mill as the `agents` content type (parallel to skills). The frontmatter + * carries the artifacts the executor needs — model, the mini-skills to load (the + * HOW), the tools the task may use, and its dependencies — and the body is the + * instruction the agent reads. + * + * The registry is fetched once at startup and scoped to one flow — agents + * declare `flow` and (for the planner) `seed: true` in frontmatter, so each + * program (integration, audit, migration, ...) ships its own agent set and the + * loader stays generic. Every prompt is downloaded and parsed up front, so + * resolving a task to its run config is synchronous and adds no mid-drain + * network latency. The registry's type list also drives `enqueue_task` + * validation. + */ +import type { QueueStore, QueuedTask } from './queue'; +import type { ResolvedTask } from './executor'; + +/** + * The basics the client injects around every agent-prompt body. The `/agents/` + * files carry intent only (goal, success criteria); the wizard owns the I/O + * contract — who the agent is, how it reports, how it surfaces progress — so the + * authored prompts never restate it. + */ +export interface OrchestratorPromptContext { + projectId: number; + projectApiKey: string; + host: string; + /** Path to the framework's reference implementation (EXAMPLE.md), if available. */ + examplePath?: string; + /** Path to the framework's rules (COMMANDMENTS.md), if available. */ + commandmentsPath?: string; +} + +function projectContext(ctx: OrchestratorPromptContext): string { + return `You have access to the PostHog MCP server and the wizard tools. + +Project context: +- PostHog Project ID: ${ctx.projectId} +- PostHog public token: ${ctx.projectApiKey} +- PostHog Host: ${ctx.host}`; +} + +/** Points the agent at the framework's reference integration to learn patterns from. */ +function exampleReference(ctx: OrchestratorPromptContext): string | null { + if (!ctx.examplePath) return null; + return `A reference PostHog integration for this framework is at \`${ctx.examplePath}\`. It shows the target implementation pattern. Reference its patterns and conventions, adapting them to this codebase.`; +} + +/** The framework's rules ship with the reference skill; every task follows them. */ +function commandmentsReference(ctx: OrchestratorPromptContext): string | null { + if (!ctx.commandmentsPath) return null; + return `Framework rules for this integration are at \`${ctx.commandmentsPath}\`. Read them before you edit and follow them.`; +} + +const TASK_BASICS = `You are one isolated task in a larger PostHog workflow, run as a fresh agent with no memory of the other tasks beyond the context you are given. Do only your task, then report exactly once by calling complete_task with a structured handoff: what your goal was, what you did, and what the next agent should know. When you are given context from previous steps, trust it — those agents already did their work, so do not re-verify or re-read what their handoffs tell you. Build on it and move fast. Read a file before you edit it, so your own changes do not duplicate what is already there. Work only within this project's own directory; nothing outside it is part of your task. If your task does not apply to this project — there is genuinely nothing for it to do — report it with status \`skipped\` and say why, rather than marking it done.`; + +const SEED_BASICS = `You are the orchestrator. Plan the work and seed the queue with enqueue_task — each call returns an id you can pass as a dependency to a later task. Give each task a short label for the UI — the action in a few words, not file names, class names, or other specifics. You are not a task yourself: do not call complete_task and do not edit the project.`; + +/** A task agent's full prompt: injected basics, then the authored intent. */ +export function assembleTaskPrompt( + ctx: OrchestratorPromptContext, + body: string, +): string { + return [ + projectContext(ctx), + exampleReference(ctx), + commandmentsReference(ctx), + TASK_BASICS, + body, + ] + .filter(Boolean) + .join('\n\n'); +} + +/** The seed agent's full prompt: injected basics, then the authored intent. */ +export function assembleSeedPrompt( + ctx: OrchestratorPromptContext, + body: string, +): string { + return [projectContext(ctx), SEED_BASICS, body].join('\n\n'); +} + +/** Used when neither the enqueue call nor the prompt frontmatter names a model. */ +const DEFAULT_TASK_MODEL = 'claude-sonnet-4-6'; + +/** Orchestrator tools are MCP tools under the `posthog-wizard` server. Frontmatter + * names them short (e.g. `enqueue_task`); the SDK gates on the full name. */ +const ORCHESTRATOR_TOOL_PREFIX = 'mcp__posthog-wizard__'; +const ORCHESTRATOR_TOOLS = new Set([ + 'enqueue_task', + 'complete_task', + 'read_handoffs', +]); + +/** A parsed agent prompt. The frontmatter fields plus the markdown body. */ +export interface AgentPrompt { + type: string; + /** Human-readable title for the TUI; falls back to `type` when absent. */ + label?: string; + /** The flow this agent belongs to (the program id, e.g. \`posthog-integration\`). */ + flow?: string; + /** Marks the flow's planner: it seeds the queue and is not an enqueueable task. */ + seed: boolean; + model?: string; + skills: string[]; + allowedTools: string[]; + disallowedTools: string[]; + dependsOn: string[]; + body: string; +} + +export interface AgentRegistry { + /** The flow's enqueueable task types — every prompt except the seed. */ + readonly types: string[]; + /** The flow's planner, the one prompt marked `seed: true` in its frontmatter. */ + readonly seed?: AgentPrompt; + get(type: string): AgentPrompt | undefined; +} + +/** The registry for one flow's prompts. Pure; the loader feeds it the fetched set. */ +export function buildRegistry( + prompts: readonly AgentPrompt[], + flow: string, +): AgentRegistry { + const inFlow = prompts.filter((p) => p.flow === flow); + const byType = new Map(inFlow.map((p) => [p.type, p])); + return { + types: inFlow.filter((p) => !p.seed).map((p) => p.type), + seed: inFlow.find((p) => p.seed), + get: (type) => byType.get(type), + }; +} + +interface AgentMenu { + agents: { id: string; downloadUrl: string }[]; +} + +/** A native tool passes through; an orchestrator tool gets its MCP-qualified name. */ +function expandToolName(name: string): string { + return ORCHESTRATOR_TOOLS.has(name) + ? `${ORCHESTRATOR_TOOL_PREFIX}${name}` + : name; +} + +/** A prompt's allow/disallow lists with orchestrator tool names MCP-qualified. */ +export function agentRunTools(prompt: AgentPrompt): { + allowedTools: string[]; + disallowedTools: string[]; +} { + return { + allowedTools: prompt.allowedTools.map(expandToolName), + disallowedTools: prompt.disallowedTools.map(expandToolName), + }; +} + +function toStringArray(value: unknown): string[] { + if (!Array.isArray(value)) return []; + return value.filter((v): v is string => typeof v === 'string'); +} + +/** + * Parse the leading `---` frontmatter block and the markdown body. The + * frontmatter is a small, known schema (scalars and inline `[a, b]` arrays), so + * a tiny parser covers it without a YAML dependency. Inline `# comments` after a + * value are stripped. `fallbackType` is the menu id, used when the body omits + * `type:`. + */ +export function parseAgentPrompt( + text: string, + fallbackType: string, +): AgentPrompt { + const match = text.match(/^---\r?\n([\s\S]*?)\r?\n---\r?\n?([\s\S]*)$/); + const frontmatter = match ? match[1] : ''; + const body = (match ? match[2] : text).trim(); + + const fields: Record = {}; + for (const rawLine of frontmatter.split(/\r?\n/)) { + const line = rawLine.replace(/\s+#.*$/, '').trim(); + if (!line || line.startsWith('#')) continue; + const kv = line.match(/^([\w-]+):\s*(.*)$/); + if (!kv) continue; + const [, key, raw] = kv; + if (raw.startsWith('[') && raw.endsWith(']')) { + fields[key] = raw + .slice(1, -1) + .split(',') + .map((s) => s.trim().replace(/^['"]|['"]$/g, '')) + .filter(Boolean); + } else { + fields[key] = raw.replace(/^['"]|['"]$/g, ''); + } + } + + const model = typeof fields.model === 'string' ? fields.model : undefined; + return { + type: typeof fields.type === 'string' ? fields.type : fallbackType, + label: typeof fields.label === 'string' ? fields.label : undefined, + flow: typeof fields.flow === 'string' ? fields.flow : undefined, + seed: fields.seed === 'true', + model, + skills: toStringArray(fields.skills), + allowedTools: toStringArray(fields.allowedTools), + disallowedTools: toStringArray(fields.disallowedTools), + dependsOn: toStringArray(fields.dependsOn), + body, + }; +} + +async function fetchText(url: string): Promise { + const res = await fetch(url); + if (!res.ok) { + throw new Error(`Fetch ${url} failed: ${res.status} ${res.statusText}`); + } + return res.text(); +} + +/** + * Fetch the agent menu and every agent prompt it lists, parse them, and build + * the registry for one flow. Throws if the menu cannot be fetched — the + * orchestrator cannot run without its prompts. + */ +export async function loadAgentRegistry( + skillsBaseUrl: string, + flow: string, +): Promise { + const menuRaw = await fetchText(`${skillsBaseUrl}/agent-menu.json`); + const menu = JSON.parse(menuRaw) as AgentMenu; + + const prompts = await Promise.all( + (menu.agents ?? []).map(async (entry) => { + const text = await fetchText(entry.downloadUrl); + return parseAgentPrompt(text, entry.id); + }), + ); + + return buildRegistry(prompts, flow); +} + +/** + * Render a task's own inputs into a section, so a fanned-out task (e.g. one + * `capture` per event) sees the specific thing it owns. Empty when there are none. + */ +function renderInputs(task: QueuedTask): string { + const entries = Object.entries(task.inputs ?? {}); + if (entries.length === 0) return ''; + const lines = entries.map(([k, v]) => `- ${k}: ${formatInputValue(v)}`); + return `## Your task input\n\n${lines.join('\n')}`; +} + +function formatInputValue(value: unknown): string { + if (typeof value === 'string') return value; + return JSON.stringify(value); +} + +/** + * Render the handoffs of a task's completed dependencies into a context section, + * so a fresh agent sees what the upstream steps did. Empty when there are none. + */ +function renderHandoffContext(task: QueuedTask, store: QueueStore): string { + const lines: string[] = []; + for (const depId of task.dependsOn) { + const dep = store.get(depId); + const handoff = store.readHandoff(depId); + if (!dep || !handoff) continue; + lines.push(`### ${dep.type}`); + lines.push(`- did: ${handoff.did}`); + lines.push(`- for you: ${handoff.forNextAgent}`); + if (handoff.filesTouched?.length) { + lines.push(`- files: ${handoff.filesTouched.join(', ')}`); + } + lines.push(''); + } + if (lines.length === 0) return ''; + return `## Context from previous steps\n\n${lines.join('\n')}`.trim(); +} + +/** + * Resolve a queued task to its run config: the prompt body (with upstream + * handoffs appended), the model, and the tool lists with orchestrator tool names + * MCP-qualified. The model precedence is enqueue override, then prompt, then + * default. Throws if no prompt is registered for the task's type. + */ +export function resolveTask( + registry: AgentRegistry, + task: QueuedTask, + store: QueueStore, +): ResolvedTask { + const prompt = registry.get(task.type); + if (!prompt) { + throw new Error(`No agent prompt registered for task type "${task.type}"`); + } + + const body = [ + renderInputs(task), + prompt.body, + renderHandoffContext(task, store), + ] + .filter(Boolean) + .join('\n\n'); + + return { + model: task.model ?? prompt.model ?? DEFAULT_TASK_MODEL, + ...agentRunTools(prompt), + prompt: body, + skills: prompt.skills, + }; +} diff --git a/src/lib/programs/orchestrator/orchestrator-runner.ts b/src/lib/programs/orchestrator/orchestrator-runner.ts new file mode 100644 index 00000000..5565ee73 --- /dev/null +++ b/src/lib/programs/orchestrator/orchestrator-runner.ts @@ -0,0 +1,296 @@ +/** + * Experimental task-queue orchestrator runner. + * + * Branches from the linear runner when the `wizard-orchestrator` flag is on. An + * orchestrator agent inspects the repo and seeds an in-memory task queue; an + * executor drains it, running one fresh agent per task. + * + * Both the WHAT (agent prompts: model, goal, success criteria, tools) and the + * HOW (mini-skills) are markdown served from context-mill — the seed and every + * task resolve to a prompt fetched at startup into the registry. The wizard side + * stays product-ignorant: it is the queue, the executor, and the loader. + */ +import { randomUUID } from 'crypto'; +import { existsSync } from 'fs'; +import * as path from 'path'; +import { + initializeAgent, + runAgent, + type AgentConfig, +} from '../../agent/agent-interface'; +import { OutroKind, type WizardSession } from '../../wizard-session'; +import { detectNodePackageManagers } from '../../detection/package-manager'; +import { installSkillById } from '../../wizard-tools'; +import { getUI } from '../../../ui'; +import { analytics } from '../../../utils/analytics'; +import { logToFile } from '../../../utils/debug'; +import type { ProgramConfig } from '../program-step'; +import type { BootstrapResult } from '../../agent/agent-runner'; +import type { WizardRunOptions } from '../../../utils/types'; +import { QueueStore, QUEUE_DIR_NAME, type TaskStatus } from './queue'; +import { drainQueue, type RunTask } from './executor'; +import { + agentRunTools, + assembleSeedPrompt, + assembleTaskPrompt, + loadAgentRegistry, + resolveTask, + type OrchestratorPromptContext, +} from './agent-prompt-loader'; + +function toTodoStatus(status: TaskStatus): string { + switch (status) { + case 'in_progress': + return 'in_progress'; + case 'done': + case 'failed': + return 'completed'; + case 'skipped': + return 'skipped'; + default: + return 'pending'; + } +} + +function sessionRunOptions(session: WizardSession): WizardRunOptions { + return { + installDir: session.installDir, + debug: session.debug, + default: false, + signup: session.signup, + localMcp: session.localMcp, + ci: session.ci, + benchmark: session.benchmark, + projectId: session.projectId, + apiKey: session.apiKey, + yaraReport: session.yaraReport, + }; +} + +export async function runOrchestrator( + session: WizardSession, + programConfig: ProgramConfig, + boot: BootstrapResult, +): Promise { + const runId = randomUUID(); + const store = new QueueStore(session.installDir, runId); + + const options = sessionRunOptions(session); + + // The WHAT (agent prompts) is served from context-mill. Fetch the registry + // once up front: its types drive enqueue validation, and resolving a task to + // its run config is then synchronous, with no mid-drain network latency. + const registry = await loadAgentRegistry( + boot.skillsBaseUrl, + programConfig.id, + ); + const seedPrompt = registry.seed; + if (!seedPrompt) { + throw new Error( + `No seed agent prompt (frontmatter \`seed: true\`) for flow "${programConfig.id}" is available from ${boot.skillsBaseUrl}.`, + ); + } + + // Give task agents the framework's finished reference integration to match, + // the same EXAMPLE.md the linear flow uses. Install it under the run dir rather + // than .claude/skills so its "do everything" workflow is not auto-loaded as a + // skill — only the example file is read, when the agent's prompt points at it. + let examplePath: string | undefined; + let commandmentsPath: string | undefined; + if (session.skillId) { + const ref = await installSkillById( + session.skillId, + session.installDir, + boot.skillsBaseUrl, + path.join(QUEUE_DIR_NAME, 'reference'), + ); + if (ref.kind === 'ok') { + const example = path.join(ref.path, 'references', 'EXAMPLE.md'); + if (existsSync(path.join(session.installDir, example))) { + examplePath = example; + } + const commandments = path.join(ref.path, 'references', 'COMMANDMENTS.md'); + if (existsSync(path.join(session.installDir, commandments))) { + commandmentsPath = commandments; + } + } else { + logToFile(`[orchestrator] reference example unavailable: ${ref.kind}`); + } + } + + // The client injects the basics (project context + the I/O contract) around + // every authored agent-prompt body. + const promptContext: OrchestratorPromptContext = { + projectId: boot.projectId, + projectApiKey: boot.projectApiKey, + host: boot.host, + examplePath, + commandmentsPath, + }; + + logToFile( + `[orchestrator] START program=${programConfig.id} dir=${session.installDir} run=${runId}`, + ); + analytics.wizardCapture('orchestrator started', { + program_id: programConfig.id, + }); + getUI().startRun(); + + // Label precedence: what the orchestrator set at enqueue, then the agent + // prompt's default, then the bare type. + const labelFor = (t: { type: string; label?: string }) => + t.label ?? registry.get(t.type)?.label ?? t.type; + const renderQueue = () => + getUI().syncTodos( + store.list().map((t) => ({ + content: labelFor(t), + status: toTodoStatus(t.status), + activeForm: labelFor(t), + })), + ); + + // Each agent gets its own config so its wizard-tools server is bound to the + // task it runs — independent tasks run in parallel, and attribution of + // complete_task / enqueue_task must hold per agent. The seed is not a task, + // so its context has no task id. + const agentConfigFor = (currentTaskId?: string): AgentConfig => ({ + workingDirectory: session.installDir, + posthogMcpUrl: boot.mcpUrl, + posthogApiKey: boot.accessToken, + posthogApiHost: boot.host, + detectPackageManager: detectNodePackageManagers, + skillsBaseUrl: boot.skillsBaseUrl, + wizardFlags: boot.wizardFlags, + // Tag agent events as orchestrator so telemetry segments from the baseline. + wizardMetadata: { ...boot.wizardMetadata, VARIANT: 'orchestrator' }, + integrationLabel: programConfig.id, + orchestrator: { + store, + validTypes: registry.types, + currentTaskId, + }, + }); + + const spinner = getUI().spinner(); + + // 1. Seed the queue with the orchestrator agent. It is itself an agent prompt + // (the WHAT), so its model and tools come from its frontmatter. The seed + // plans the graph, it is not a task. + const seedAgent = await initializeAgent(agentConfigFor(), options); + const seedResult = await runAgent( + { + ...seedAgent, + model: seedPrompt.model ?? seedAgent.model, + ...agentRunTools(seedPrompt), + }, + assembleSeedPrompt(promptContext, seedPrompt.body), + options, + spinner, + { + spinnerMessage: 'Planning the integration...', + successMessage: 'Planned the integration', + additionalFeatureQueue: [], + requestRemark: false, + }, + ); + if (seedResult.error) { + logToFile( + `[orchestrator] seed error: ${seedResult.error} ${ + seedResult.message ?? '' + }`, + ); + } + analytics.wizardCapture('orchestrator seeded', { + task_count: store.list().length, + types: store.list().map((t) => t.type), + }); + renderQueue(); + + // 2. Drain the queue, one fresh agent per task; independent tasks run in + // parallel, the seed's graph being the only schedule. Each task resolves to + // its agent prompt (the WHAT) and the mini-skills it needs (the HOW), then + // runs on its own model and tools. + const runTask: RunTask = async (task) => { + renderQueue(); + try { + const resolved = resolveTask(registry, task, store); + const agent = await initializeAgent(agentConfigFor(task.id), options); + for (const skillId of resolved.skills) { + const result = await installSkillById( + skillId, + session.installDir, + boot.skillsBaseUrl, + ); + if (result.kind !== 'ok') { + logToFile( + `[orchestrator] skill install failed type=${task.type} skill=${skillId} ${result.kind}`, + ); + } + } + await runAgent( + { + ...agent, + model: resolved.model, + allowedTools: resolved.allowedTools, + disallowedTools: resolved.disallowedTools, + }, + assembleTaskPrompt(promptContext, resolved.prompt), + options, + spinner, + // Empty messages suppress the per-task spinner lines (the spinner renders + // only when a message is set); the queue panel shows progress. Errors + // still surface — runAgent stops the spinner with its own error text. + // No per-task remark — the reflection would fire on every task. + { + spinnerMessage: '', + successMessage: '', + additionalFeatureQueue: [], + requestRemark: false, + }, + ); + } finally { + renderQueue(); + } + }; + await drainQueue(store, runTask); + + renderQueue(); + + const summary = store.summary(); + logToFile( + `[orchestrator] DONE done=${summary.done} failed=${summary.failed} total=${summary.total}`, + ); + analytics.wizardCapture('orchestrator run finished', { + tasks_total: summary.total, + tasks_done: summary.done, + tasks_failed: summary.failed, + }); + + // The build step flags any unresolved conflict in its handoff; surface the + // one-liner here and point the user at the report for the detail. + const buildTask = store.list().find((t) => t.type === 'build'); + const conflict = buildTask + ? store.readHandoff(buildTask.id)?.conflict + : undefined; + + // Prefer the report the run wrote; fall back to the raw queue if it is missing. + const reportPath = path.join(session.installDir, 'posthog-setup-report.md'); + const reportFile = existsSync(reportPath) + ? 'posthog-setup-report.md' + : store.queuePath; + + const message = conflict + ? 'PostHog set up, with one conflict to review.' + : `PostHog set up: ${summary.done}/${summary.total} steps completed.`; + getUI().setOutroData({ + kind: OutroKind.Success, + message, + body: conflict + ? `⚠ Build conflict: ${conflict}\nFull details are in the report.` + : undefined, + reportFile, + docsUrl: 'https://posthog.com/docs/ai-engineering/ai-wizard', + }); + getUI().outro(message); + await analytics.shutdown('success'); +} diff --git a/src/lib/programs/orchestrator/queue-tools.ts b/src/lib/programs/orchestrator/queue-tools.ts index 624d9b09..3d2af397 100644 --- a/src/lib/programs/orchestrator/queue-tools.ts +++ b/src/lib/programs/orchestrator/queue-tools.ts @@ -24,6 +24,7 @@ export interface OrchestratorToolsContext { export interface EnqueueArgs { type: string; + label?: string; inputs?: Record; dependsOn?: string[]; model?: string; @@ -109,6 +110,7 @@ export function applyEnqueue( const task = ctx.store.enqueue({ type: args.type, + label: args.label, inputs: args.inputs ?? {}, dependsOn: args.dependsOn ?? [], model: args.model, @@ -169,6 +171,12 @@ const HANDOFF_SHAPE = { did: z.string().describe('What you actually did.'), forNextAgent: z.string().describe('What the next agent should know.'), filesTouched: z.array(z.string()).optional(), + conflict: z + .string() + .optional() + .describe( + 'A one-line summary of any conflict you could not cleanly resolve (e.g. a dependency or build conflict). Put full detail in your work; this line is surfaced to the user.', + ), }; type SdkTool = ( @@ -198,6 +206,12 @@ export function buildOrchestratorTools( type: z .string() .describe(`The task type. One of: ${ctx.validTypes.join(', ')}.`), + label: z + .string() + .optional() + .describe( + 'A short label for the UI — the action in a few words (e.g. "Add the PostHog SDK", "Initialize PostHog"). Leave out file names, class names, and other specifics.', + ), inputs: z.record(z.unknown()).optional(), dependsOn: z .array(z.string()) diff --git a/src/lib/programs/orchestrator/queue.ts b/src/lib/programs/orchestrator/queue.ts index 0ac9cb46..37bc6e88 100644 --- a/src/lib/programs/orchestrator/queue.ts +++ b/src/lib/programs/orchestrator/queue.ts @@ -25,6 +25,8 @@ export type TaskStatus = export interface QueuedTask { id: string; type: string; + /** Human-readable label for the TUI, set by the enqueuing agent. */ + label?: string; status: TaskStatus; dependsOn: string[]; inputs: Record; @@ -53,10 +55,13 @@ export interface TaskHandoff { did: string; forNextAgent: string; filesTouched?: string[]; + /** A one-line summary of any unresolved conflict, surfaced in the outro. */ + conflict?: string; } export interface EnqueueInput { type: string; + label?: string; inputs?: Record; dependsOn?: string[]; model?: string; @@ -147,6 +152,7 @@ export class QueueStore { const task: QueuedTask = { id: randomUUID(), type: input.type, + label: input.label, status: 'pending', dependsOn: input.dependsOn ?? [], inputs: input.inputs ?? {}, diff --git a/src/lib/task-stream/task-stream-push.ts b/src/lib/task-stream/task-stream-push.ts index cecd9ff8..02815419 100644 --- a/src/lib/task-stream/task-stream-push.ts +++ b/src/lib/task-stream/task-stream-push.ts @@ -37,6 +37,8 @@ const STATUS_MAP: Record = { [TaskStatus.Pending]: StreamTaskStatus.Pending, [TaskStatus.InProgress]: StreamTaskStatus.InProgress, [TaskStatus.Completed]: StreamTaskStatus.Completed, + // The stream has no skipped state; skipped is terminal, so report it resolved. + [TaskStatus.Skipped]: StreamTaskStatus.Completed, }; function buildTasks(items: TaskItem[]): StreamTask[] { diff --git a/src/ui/logging-ui.ts b/src/ui/logging-ui.ts index 84a8d2e9..6cf3cc52 100644 --- a/src/ui/logging-ui.ts +++ b/src/ui/logging-ui.ts @@ -218,20 +218,22 @@ export class LoggingUI implements WizardUI { // the session. } + private lastTodoLine = ''; + syncTodos( todos: Array<{ content: string; status: string; activeForm?: string }>, ): void { const completed = todos.filter( (t) => t.status === TaskStatus.Completed, ).length; - const inProgress = todos.find((t) => t.status === TaskStatus.InProgress); - if (inProgress) { - console.log( - `◌ [${completed}/${todos.length}] ${ - inProgress.activeForm || inProgress.content - }`, - ); - } + const active = todos.filter((t) => t.status === TaskStatus.InProgress); + if (active.length === 0) return; + const labels = active.map((t) => t.activeForm || t.content).join(' · '); + const line = `◌ [${completed}/${todos.length}] ${labels}`; + // The queue re-renders on every transition; print only what changed. + if (line === this.lastTodoLine) return; + this.lastTodoLine = line; + console.log(line); } setEventPlan(_events: Array<{ name: string; description: string }>): void { diff --git a/src/ui/tui/primitives/ProgressList.tsx b/src/ui/tui/primitives/ProgressList.tsx index b72156c8..3c84c8ee 100644 --- a/src/ui/tui/primitives/ProgressList.tsx +++ b/src/ui/tui/primitives/ProgressList.tsx @@ -11,7 +11,7 @@ import { LoadingBox } from './LoadingBox.js'; export interface ProgressItem { label: string; activeForm?: string; - status: 'pending' | 'in_progress' | 'completed'; + status: 'pending' | 'in_progress' | 'completed' | 'skipped'; } interface ProgressListProps { @@ -20,7 +20,9 @@ interface ProgressListProps { } export const ProgressList = ({ items, title }: ProgressListProps) => { - const completed = items.filter((t) => t.status === 'completed').length; + const resolved = items.filter( + (t) => t.status === 'completed' || t.status === 'skipped', + ).length; const total = items.length; return ( @@ -33,6 +35,7 @@ export const ProgressList = ({ items, title }: ProgressListProps) => { )} {items.length === 0 && } {items.map((item, i) => { + const skipped = item.status === 'skipped'; const icon = item.status === 'completed' ? Icons.squareFilled @@ -45,15 +48,22 @@ export const ProgressList = ({ items, title }: ProgressListProps) => { : item.status === 'in_progress' ? Colors.primary : Colors.muted; - const label = - item.status === 'in_progress' && item.activeForm - ? item.activeForm - : item.label; + const label = skipped + ? `${item.label} (skipped)` + : item.status === 'in_progress' && item.activeForm + ? item.activeForm + : item.label; return ( {icon} - {label} + + {' '} + {label} + ); })} @@ -61,8 +71,8 @@ export const ProgressList = ({ items, title }: ProgressListProps) => { - {completed < total - ? `Progress: ${completed}/${total} completed` + {resolved < total + ? `Progress: ${resolved}/${total} completed` : 'Cleaning up...'} diff --git a/src/ui/wizard-ui.ts b/src/ui/wizard-ui.ts index 429b06b7..7d47b96b 100644 --- a/src/ui/wizard-ui.ts +++ b/src/ui/wizard-ui.ts @@ -21,6 +21,7 @@ export enum TaskStatus { Pending = 'pending', InProgress = 'in_progress', Completed = 'completed', + Skipped = 'skipped', } export function isTaskStatus(value: string): value is TaskStatus {