diff --git a/src/lib/agent/agent-interface.ts b/src/lib/agent/agent-interface.ts index 4e0b0c5c..990a4576 100644 --- a/src/lib/agent/agent-interface.ts +++ b/src/lib/agent/agent-interface.ts @@ -683,6 +683,11 @@ export async function runAgent( abortCases?: readonly AbortCaseMatcher[]; /** Request the end-of-run reflection remark. Defaults to true. */ requestRemark?: boolean; + /** + * Extra properties attached to this run's `agent completed` / `agent + * aborted` events (e.g. the orchestrator's task type and id). + */ + analyticsProperties?: Record; }, middleware?: { onMessage(message: any): void; @@ -761,9 +766,27 @@ export async function runAgent( analytics.capture(WIZARD_REMARK_EVENT_NAME, { remark }); } + // Token usage comes from the SDK result message and is per agent run — + // for the orchestrator that means per task, the secondary cost to watch. + const usage = lastResultMessage?.usage as + | { + input_tokens?: number; + output_tokens?: number; + cache_creation_input_tokens?: number; + cache_read_input_tokens?: number; + } + | undefined; analytics.wizardCapture('agent completed', { duration_ms: durationMs, duration_seconds: durationSeconds, + model: agentConfig.model, + num_turns: lastResultMessage?.num_turns, + total_cost_usd: lastResultMessage?.total_cost_usd, + input_tokens: usage?.input_tokens, + output_tokens: usage?.output_tokens, + cache_creation_input_tokens: usage?.cache_creation_input_tokens, + cache_read_input_tokens: usage?.cache_read_input_tokens, + ...config?.analyticsProperties, }); try { middleware?.finalize(lastResultMessage, durationMs); @@ -1177,6 +1200,8 @@ export async function runAgent( analytics.wizardCapture('agent aborted', { duration_ms: durationMs, duration_seconds: Math.round(durationMs / 1000), + model: agentConfig.model, + ...config?.analyticsProperties, }); } } diff --git a/src/lib/agent/agent-runner.ts b/src/lib/agent/agent-runner.ts index a08320ed..2d693058 100644 --- a/src/lib/agent/agent-runner.ts +++ b/src/lib/agent/agent-runner.ts @@ -368,6 +368,9 @@ async function bootstrapProgram( // fork decision reads the flags. const wizardFlags = await analytics.getAllFlagsForWizard(); const wizardMetadata = buildWizardMetadata(wizardFlags); + // Tag every wizard event with the variant so runs segment in PostHog; the + // orchestrator arm overwrites this with its own variant when it forks. + analytics.setTag('variant', wizardMetadata.VARIANT); const mcpUrl = session.localMcp ? 'http://localhost:8787/mcp' diff --git a/src/lib/programs/orchestrator/__tests__/agent-prompt-loader.test.ts b/src/lib/programs/orchestrator/__tests__/agent-prompt-loader.test.ts index 8252e791..22ce11af 100644 --- a/src/lib/programs/orchestrator/__tests__/agent-prompt-loader.test.ts +++ b/src/lib/programs/orchestrator/__tests__/agent-prompt-loader.test.ts @@ -7,6 +7,7 @@ import { buildRegistry, parseAgentPrompt, resolveTask, + taskModel, type AgentPrompt, type AgentRegistry, type OrchestratorPromptContext, @@ -206,6 +207,23 @@ describe('resolveTask', () => { }); }); +describe('taskModel', () => { + const prompt = parseAgentPrompt( + '---\nmodel: prompt-model\n---\nx', + 'capture', + ); + + it('prefers the enqueue override, then the prompt, then the default', () => { + const registry = registryOf([prompt]); + const task = { type: 'capture' }; + expect(taskModel(registry, { ...task, model: 'override' } as never)).toBe( + 'override', + ); + expect(taskModel(registry, task as never)).toBe('prompt-model'); + expect(taskModel(registryOf([]), task as never)).toBe('claude-sonnet-4-6'); + }); +}); + describe('assembleTaskPrompt', () => { const ctx: OrchestratorPromptContext = { projectId: 1, diff --git a/src/lib/programs/orchestrator/__tests__/queue.test.ts b/src/lib/programs/orchestrator/__tests__/queue.test.ts index 4a18dee2..8ead49e7 100644 --- a/src/lib/programs/orchestrator/__tests__/queue.test.ts +++ b/src/lib/programs/orchestrator/__tests__/queue.test.ts @@ -7,6 +7,10 @@ import { type TaskHandoff, } from '@lib/programs/orchestrator/queue'; +jest.mock('@utils/analytics', () => ({ + analytics: { captureException: jest.fn(), wizardCapture: jest.fn() }, +})); + function tmpDir(): string { return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-test-')); } @@ -132,4 +136,40 @@ describe('QueueStore', () => { expect(file.tasks[0].status).toBe('done'); expect(file.tasks[0].handoff?.did).toBe('d'); }); + + it('notifies the transition listener with post-transition task state', () => { + const seen: Array<{ event: string; status: string; attempts: number }> = []; + const listened = new QueueStore(dir, 'run-2', { + onTransition: (event, task) => + seen.push({ event, status: task.status, attempts: task.attempts }), + }); + + const t = listened.enqueue({ type: 'install' }); + listened.start(t.id); + listened.fail(t.id, { type: 'API_ERROR', message: 'boom' }); + listened.requeue(t.id); + listened.start(t.id); + listened.complete(t.id); + + expect(seen).toEqual([ + { event: 'enqueue', status: 'pending', attempts: 0 }, + { event: 'start', status: 'in_progress', attempts: 1 }, + { event: 'fail', status: 'failed', attempts: 1 }, + { event: 'requeue', status: 'pending', attempts: 1 }, + { event: 'start', status: 'in_progress', attempts: 2 }, + { event: 'complete', status: 'done', attempts: 2 }, + ]); + }); + + it('a throwing listener does not break transitions', () => { + const listened = new QueueStore(dir, 'run-3', { + onTransition: () => { + throw new Error('listener boom'); + }, + }); + const t = listened.enqueue({ type: 'install' }); + listened.start(t.id); + listened.complete(t.id); + expect(listened.get(t.id)?.status).toBe('done'); + }); }); diff --git a/src/lib/programs/orchestrator/agent-prompt-loader.ts b/src/lib/programs/orchestrator/agent-prompt-loader.ts index 902adaee..3212a2c3 100644 --- a/src/lib/programs/orchestrator/agent-prompt-loader.ts +++ b/src/lib/programs/orchestrator/agent-prompt-loader.ts @@ -49,6 +49,16 @@ function exampleReference(ctx: OrchestratorPromptContext): string | null { return `A reference PostHog integration for this framework is at \`${ctx.examplePath}\`. It shows the target implementation pattern. Reference its patterns and conventions, adapting them to this codebase.`; } +/** The framework's rules ship with the reference skill; every task follows them. */ +function commandmentsReference(ctx: OrchestratorPromptContext): string | null { + if (!ctx.commandmentsPath) return null; + return `Framework rules for this integration are at \`${ctx.commandmentsPath}\`. Read them before you edit and follow them.`; +} + +const TASK_BASICS = `You are one isolated task in a larger PostHog workflow, run as a fresh agent with no memory of the other tasks beyond the context you are given. Do only your task, then report exactly once by calling complete_task with a structured handoff: what your goal was, what you did, and what the next agent should know. When you are given context from previous steps, trust it — those agents already did their work, so do not re-verify or re-read what their handoffs tell you. Build on it and move fast. Read a file before you edit it, so your own changes do not duplicate what is already there. Work only within this project's own directory; nothing outside it is part of your task. If your task does not apply to this project — there is genuinely nothing for it to do — report it with status \`skipped\` and say why, rather than marking it done.`; + +const SEED_BASICS = `You are the orchestrator. Plan the work and seed the queue with enqueue_task — each call returns an id you can pass as a dependency to a later task. Give each task a short label for the UI — the action in a few words, not file names, class names, or other specifics. You are not a task yourself: do not call complete_task and do not edit the project.`; + /** * Points the agent at its installed task instructions (the HOW). They live under * the wizard's run dir, not `.claude/skills/`, so the SDK does not auto-load @@ -60,16 +70,6 @@ function skillReference(paths: readonly string[]): string | null { return `Your task instructions are at ${list}. Read them before you start and follow them. They are wizard scaffolding, not part of the project.`; } -/** The framework's rules ship with the reference skill; every task follows them. */ -function commandmentsReference(ctx: OrchestratorPromptContext): string | null { - if (!ctx.commandmentsPath) return null; - return `Framework rules for this integration are at \`${ctx.commandmentsPath}\`. Read them before you edit and follow them.`; -} - -const TASK_BASICS = `You are one isolated task in a larger PostHog workflow, run as a fresh agent with no memory of the other tasks beyond the context you are given. Do only your task, then report exactly once by calling complete_task with a structured handoff: what your goal was, what you did, and what the next agent should know. When you are given context from previous steps, trust it — those agents already did their work, so do not re-verify or re-read what their handoffs tell you. Build on it and move fast. Read a file before you edit it, so your own changes do not duplicate what is already there. Work only within this project's own directory; nothing outside it is part of your task. If your task does not apply to this project — there is genuinely nothing for it to do — report it with status \`skipped\` and say why, rather than marking it done.`; - -const SEED_BASICS = `You are the orchestrator. Plan the work and seed the queue with enqueue_task — each call returns an id you can pass as a dependency to a later task. Give each task a short label for the UI — the action in a few words, not file names, class names, or other specifics. You are not a task yourself: do not call complete_task and do not edit the project.`; - /** A task agent's full prompt: injected basics, then the authored intent. */ export function assembleTaskPrompt( ctx: OrchestratorPromptContext, @@ -315,9 +315,14 @@ export function resolveTask( .join('\n\n'); return { - model: task.model ?? prompt.model ?? DEFAULT_TASK_MODEL, + model: taskModel(registry, task), ...agentRunTools(prompt), prompt: body, skills: prompt.skills, }; } + +/** The model a task runs on: enqueue override, then prompt frontmatter, then default. */ +export function taskModel(registry: AgentRegistry, task: QueuedTask): string { + return task.model ?? registry.get(task.type)?.model ?? DEFAULT_TASK_MODEL; +} diff --git a/src/lib/programs/orchestrator/orchestrator-runner.ts b/src/lib/programs/orchestrator/orchestrator-runner.ts index 40777fd8..44c73f7e 100644 --- a/src/lib/programs/orchestrator/orchestrator-runner.ts +++ b/src/lib/programs/orchestrator/orchestrator-runner.ts @@ -27,7 +27,12 @@ import { logToFile } from '../../../utils/debug'; import type { ProgramConfig } from '../program-step'; import type { BootstrapResult } from '../../agent/agent-runner'; import type { WizardRunOptions } from '../../../utils/types'; -import { QueueStore, QUEUE_DIR_NAME, type TaskStatus } from './queue'; +import { + QueueStore, + QUEUE_DIR_NAME, + type QueuedTask, + type TaskStatus, +} from './queue'; import { drainQueue, type RunTask } from './executor'; import { agentRunTools, @@ -35,6 +40,7 @@ import { assembleTaskPrompt, loadAgentRegistry, resolveTask, + taskModel, type OrchestratorPromptContext, } from './agent-prompt-loader'; @@ -73,7 +79,6 @@ export async function runOrchestrator( boot: BootstrapResult, ): Promise { const runId = randomUUID(); - const store = new QueueStore(session.installDir, runId); const options = sessionRunOptions(session); @@ -91,6 +96,74 @@ export async function runOrchestrator( ); } + // Every wizard event from here on carries the variant, so orchestrator runs + // segment cleanly from the linear baseline. + analytics.setTag('variant', 'orchestrator'); + + // Responsiveness is the headline metric of the dark launch: time to first + // visible progress, and no single step dominating wall-clock. Track it from + // queue transitions, with the resolved model so cheap work is attributable + // to cheap models. + const runStartMs = Date.now(); + let firstStartMs: number | undefined; + let lastStartMs: number | undefined; + const durationMs = (t: QueuedTask) => + t.startedAt && t.finishedAt + ? Date.parse(t.finishedAt) - Date.parse(t.startedAt) + : undefined; + + const store = new QueueStore(session.installDir, runId, { + onTransition: (event, task) => { + const base = { + type: task.type, + model: taskModel(registry, task), + attempts: task.attempts, + }; + switch (event) { + case 'enqueue': + analytics.wizardCapture('orchestrator task enqueued', { + type: task.type, + enqueued_by: task.enqueuedBy, + dynamic: task.enqueuedBy !== 'orchestrator', + }); + break; + case 'start': { + const now = Date.now(); + analytics.wizardCapture('orchestrator task started', { + ...base, + ms_since_run_start: now - runStartMs, + gap_since_prev_start_ms: + lastStartMs === undefined ? undefined : now - lastStartMs, + }); + firstStartMs ??= now; + lastStartMs = now; + break; + } + case 'complete': + analytics.wizardCapture('orchestrator task completed', { + ...base, + duration_ms: durationMs(task), + }); + break; + case 'skip': + analytics.wizardCapture('orchestrator task skipped', { + ...base, + duration_ms: durationMs(task), + }); + break; + case 'fail': + analytics.wizardCapture('orchestrator task failed', { + ...base, + duration_ms: durationMs(task), + error: task.error?.type, + }); + break; + case 'requeue': + break; + } + }, + }); + // Give task agents the framework's finished reference integration to match, // the same EXAMPLE.md the linear flow uses. Install it under the run dir rather // than .claude/skills so its "do everything" workflow is not auto-loaded as a @@ -191,6 +264,7 @@ export async function runOrchestrator( successMessage: 'Planned the integration', additionalFeatureQueue: [], requestRemark: false, + analyticsProperties: { task_type: 'seed' }, }, ); if (seedResult.error) { @@ -211,6 +285,7 @@ export async function runOrchestrator( // its agent prompt (the WHAT) and the mini-skills it needs (the HOW), then // runs on its own model and tools. const taskSkillsRoot = path.join(QUEUE_DIR_NAME, 'skills'); + let remarkRequested = false; const runTask: RunTask = async (task) => { renderQueue(); try { @@ -236,6 +311,17 @@ export async function runOrchestrator( ); } } + // The run-end reflection fires once, on the task that is last in the + // queue when it starts — nothing else pending or running alongside it. + const isLastTask = !store + .list() + .some( + (t) => + t.id !== task.id && + (t.status === 'pending' || t.status === 'in_progress'), + ); + const requestRemark = isLastTask && !remarkRequested; + if (requestRemark) remarkRequested = true; await runAgent( { ...agent, @@ -249,12 +335,12 @@ export async function runOrchestrator( // Empty messages suppress the per-task spinner lines (the spinner renders // only when a message is set); the queue panel shows progress. Errors // still surface — runAgent stops the spinner with its own error text. - // No per-task remark — the reflection would fire on every task. { spinnerMessage: '', successMessage: '', additionalFeatureQueue: [], - requestRemark: false, + requestRemark, + analyticsProperties: { task_type: task.type, task_id: task.id }, }, ); } finally { @@ -281,6 +367,10 @@ export async function runOrchestrator( tasks_total: summary.total, tasks_done: summary.done, tasks_failed: summary.failed, + tasks_skipped: summary.skipped, + total_duration_ms: Date.now() - runStartMs, + time_to_first_task_ms: + firstStartMs === undefined ? undefined : firstStartMs - runStartMs, }); // The build step flags any unresolved conflict in its handoff; surface the diff --git a/src/lib/programs/orchestrator/queue.ts b/src/lib/programs/orchestrator/queue.ts index 37bc6e88..b7cfa39e 100644 --- a/src/lib/programs/orchestrator/queue.ts +++ b/src/lib/programs/orchestrator/queue.ts @@ -14,6 +14,7 @@ import * as fs from 'fs'; import * as path from 'path'; import { randomUUID } from 'crypto'; import { writeJsonAtomic } from '../../../utils/atomic-ledger'; +import { analytics } from '../../../utils/analytics'; export type TaskStatus = | 'pending' @@ -72,17 +73,40 @@ export interface EnqueueInput { export const QUEUE_DIR_NAME = '.posthog-wizard'; const DEFAULT_MAX_ATTEMPTS = 2; +/** Every queue transition, in the order it is reflected. */ +export type TransitionEvent = + | 'enqueue' + | 'start' + | 'complete' + | 'skip' + | 'fail' + | 'requeue'; + +export interface QueueStoreOptions { + /** + * Called on every transition with the task's post-transition state. The + * runner uses it for telemetry; the store itself stays analytics-free. + * Listener errors are reported but cannot break a transition. + */ + onTransition?: (event: TransitionEvent, task: QueuedTask) => void; +} + function nowIso(): string { return new Date().toISOString(); } export class QueueStore { private tasks: QueuedTask[] = []; + private readonly onTransition?: ( + event: TransitionEvent, + task: QueuedTask, + ) => void; readonly runId: string; readonly queuePath: string; - constructor(installDir: string, runId: string) { + constructor(installDir: string, runId: string, opts?: QueueStoreOptions) { + this.onTransition = opts?.onTransition; this.runId = runId; const dir = path.join(installDir, QUEUE_DIR_NAME); this.queuePath = path.join(dir, 'queue.json'); @@ -164,6 +188,7 @@ export class QueueStore { }; this.tasks.push(task); this.reflect(); + this.notify('enqueue', task); return task; } @@ -173,6 +198,7 @@ export class QueueStore { t.startedAt = nowIso(); t.attempts += 1; this.reflect(); + this.notify('start', t); return t; } @@ -202,6 +228,7 @@ export class QueueStore { t.startedAt = undefined; t.finishedAt = undefined; this.reflect(); + this.notify('requeue', t); return t; } @@ -217,6 +244,10 @@ export class QueueStore { t.status = status; t.finishedAt = nowIso(); this.reflect(); + this.notify( + status === 'done' ? 'complete' : status === 'skipped' ? 'skip' : 'fail', + t, + ); return t; } @@ -229,6 +260,18 @@ export class QueueStore { writeJsonAtomic(this.queuePath, file); } + private notify(event: TransitionEvent, task: QueuedTask): void { + try { + this.onTransition?.(event, task); + } catch (error) { + // A listener must never break a transition, but its failure is a bug. + analytics.captureException( + error instanceof Error ? error : new Error(String(error)), + { step: 'orchestrator_queue_listener', event }, + ); + } + } + private require(id: string): QueuedTask { const t = this.get(id); if (!t) throw new Error(`No task ${id} in the queue`);