Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/lib/agent/agent-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,11 @@ export async function runAgent(
abortCases?: readonly AbortCaseMatcher[];
/** Request the end-of-run reflection remark. Defaults to true. */
requestRemark?: boolean;
/**
* Extra properties attached to this run's `agent completed` / `agent
* aborted` events (e.g. the orchestrator's task type and id).
*/
analyticsProperties?: Record<string, unknown>;
},
middleware?: {
onMessage(message: any): void;
Expand Down Expand Up @@ -878,9 +883,27 @@ export async function runAgent(
analytics.capture(WIZARD_REMARK_EVENT_NAME, { remark });
}

// Token usage comes from the SDK result message and is per agent run —
// for the orchestrator that means per task, the secondary cost to watch.
const usage = lastResultMessage?.usage as
| {
input_tokens?: number;
output_tokens?: number;
cache_creation_input_tokens?: number;
cache_read_input_tokens?: number;
}
| undefined;
analytics.wizardCapture('agent completed', {
duration_ms: durationMs,
duration_seconds: durationSeconds,
model: agentConfig.model,
num_turns: lastResultMessage?.num_turns,
total_cost_usd: lastResultMessage?.total_cost_usd,
input_tokens: usage?.input_tokens,
output_tokens: usage?.output_tokens,
cache_creation_input_tokens: usage?.cache_creation_input_tokens,
cache_read_input_tokens: usage?.cache_read_input_tokens,
...config?.analyticsProperties,
});
try {
middleware?.finalize(lastResultMessage, durationMs);
Expand Down Expand Up @@ -1278,6 +1301,8 @@ export async function runAgent(
analytics.wizardCapture('agent aborted', {
duration_ms: durationMs,
duration_seconds: Math.round(durationMs / 1000),
model: agentConfig.model,
...config?.analyticsProperties,
});
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/lib/agent/agent-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ async function bootstrapProgram(
// fork decision reads the flags.
const wizardFlags = await analytics.getAllFlagsForWizard();
const wizardMetadata = buildWizardMetadata(wizardFlags);
// Tag every wizard event with the variant so runs segment in PostHog; the
// orchestrator arm overwrites this with its own variant when it forks.
analytics.setTag('variant', wizardMetadata.VARIANT);

const mcpUrl = session.localMcp
? 'http://localhost:8787/mcp'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
buildRegistry,
parseAgentPrompt,
resolveTask,
taskModel,
type AgentPrompt,
type AgentRegistry,
type OrchestratorPromptContext,
Expand Down Expand Up @@ -206,6 +207,23 @@ describe('resolveTask', () => {
});
});

describe('taskModel', () => {
const prompt = parseAgentPrompt(
'---\nmodel: prompt-model\n---\nx',
'capture',
);

it('prefers the enqueue override, then the prompt, then the default', () => {
const registry = registryOf([prompt]);
const task = { type: 'capture' };
expect(taskModel(registry, { ...task, model: 'override' } as never)).toBe(
'override',
);
expect(taskModel(registry, task as never)).toBe('prompt-model');
expect(taskModel(registryOf([]), task as never)).toBe('claude-sonnet-4-6');
});
});

describe('assembleTaskPrompt', () => {
const ctx: OrchestratorPromptContext = {
projectId: 1,
Expand Down
40 changes: 40 additions & 0 deletions src/lib/programs/orchestrator/__tests__/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import {
type TaskHandoff,
} from '@lib/programs/orchestrator/queue';

jest.mock('@utils/analytics', () => ({
analytics: { captureException: jest.fn(), wizardCapture: jest.fn() },
}));

function tmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-test-'));
}
Expand Down Expand Up @@ -132,4 +136,40 @@ describe('QueueStore', () => {
expect(file.tasks[0].status).toBe('done');
expect(file.tasks[0].handoff?.did).toBe('d');
});

it('notifies the transition listener with post-transition task state', () => {
const seen: Array<{ event: string; status: string; attempts: number }> = [];
const listened = new QueueStore(dir, 'run-2', {
onTransition: (event, task) =>
seen.push({ event, status: task.status, attempts: task.attempts }),
});

const t = listened.enqueue({ type: 'install' });
listened.start(t.id);
listened.fail(t.id, { type: 'API_ERROR', message: 'boom' });
listened.requeue(t.id);
listened.start(t.id);
listened.complete(t.id);

expect(seen).toEqual([
{ event: 'enqueue', status: 'pending', attempts: 0 },
{ event: 'start', status: 'in_progress', attempts: 1 },
{ event: 'fail', status: 'failed', attempts: 1 },
{ event: 'requeue', status: 'pending', attempts: 1 },
{ event: 'start', status: 'in_progress', attempts: 2 },
{ event: 'complete', status: 'done', attempts: 2 },
]);
});

it('a throwing listener does not break transitions', () => {
const listened = new QueueStore(dir, 'run-3', {
onTransition: () => {
throw new Error('listener boom');
},
});
const t = listened.enqueue({ type: 'install' });
listened.start(t.id);
listened.complete(t.id);
expect(listened.get(t.id)?.status).toBe('done');
});
});
27 changes: 16 additions & 11 deletions src/lib/programs/orchestrator/agent-prompt-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ function exampleReference(ctx: OrchestratorPromptContext): string | null {
return `A reference PostHog integration for this framework is at \`${ctx.examplePath}\`. It shows the target implementation pattern. Reference its patterns and conventions, adapting them to this codebase.`;
}

/** The framework's rules ship with the reference skill; every task follows them. */
function commandmentsReference(ctx: OrchestratorPromptContext): string | null {
if (!ctx.commandmentsPath) return null;
return `Framework rules for this integration are at \`${ctx.commandmentsPath}\`. Read them before you edit and follow them.`;
}

const TASK_BASICS = `You are one isolated task in a larger PostHog workflow, run as a fresh agent with no memory of the other tasks beyond the context you are given. Do only your task, then report exactly once by calling complete_task with a structured handoff: what your goal was, what you did, and what the next agent should know. When you are given context from previous steps, trust it — those agents already did their work, so do not re-verify or re-read what their handoffs tell you. Build on it and move fast. Read a file before you edit it, so your own changes do not duplicate what is already there. Work only within this project's own directory; nothing outside it is part of your task. If your task does not apply to this project — there is genuinely nothing for it to do — report it with status \`skipped\` and say why, rather than marking it done.`;

const SEED_BASICS = `You are the orchestrator. Plan the work and seed the queue with enqueue_task — each call returns an id you can pass as a dependency to a later task. Give each task a short label for the UI — the action in a few words, not file names, class names, or other specifics. You are not a task yourself: do not call complete_task and do not edit the project.`;

/**
* Points the agent at its installed task instructions (the HOW). They live under
* the wizard's run dir, not `.claude/skills/`, so the SDK does not auto-load
Expand All @@ -60,16 +70,6 @@ function skillReference(paths: readonly string[]): string | null {
return `Your task instructions are at ${list}. Read them before you start and follow them. They are wizard scaffolding, not part of the project.`;
}

/** The framework's rules ship with the reference skill; every task follows them. */
function commandmentsReference(ctx: OrchestratorPromptContext): string | null {
if (!ctx.commandmentsPath) return null;
return `Framework rules for this integration are at \`${ctx.commandmentsPath}\`. Read them before you edit and follow them.`;
}

const TASK_BASICS = `You are one isolated task in a larger PostHog workflow, run as a fresh agent with no memory of the other tasks beyond the context you are given. Do only your task, then report exactly once by calling complete_task with a structured handoff: what your goal was, what you did, and what the next agent should know. When you are given context from previous steps, trust it — those agents already did their work, so do not re-verify or re-read what their handoffs tell you. Build on it and move fast. Read a file before you edit it, so your own changes do not duplicate what is already there. Work only within this project's own directory; nothing outside it is part of your task. If your task does not apply to this project — there is genuinely nothing for it to do — report it with status \`skipped\` and say why, rather than marking it done.`;

const SEED_BASICS = `You are the orchestrator. Plan the work and seed the queue with enqueue_task — each call returns an id you can pass as a dependency to a later task. Give each task a short label for the UI — the action in a few words, not file names, class names, or other specifics. You are not a task yourself: do not call complete_task and do not edit the project.`;

/** A task agent's full prompt: injected basics, then the authored intent. */
export function assembleTaskPrompt(
ctx: OrchestratorPromptContext,
Expand Down Expand Up @@ -315,9 +315,14 @@ export function resolveTask(
.join('\n\n');

return {
model: task.model ?? prompt.model ?? DEFAULT_TASK_MODEL,
model: taskModel(registry, task),
...agentRunTools(prompt),
prompt: body,
skills: prompt.skills,
};
}

/** The model a task runs on: enqueue override, then prompt frontmatter, then default. */
export function taskModel(registry: AgentRegistry, task: QueuedTask): string {
return task.model ?? registry.get(task.type)?.model ?? DEFAULT_TASK_MODEL;
}
98 changes: 94 additions & 4 deletions src/lib/programs/orchestrator/orchestrator-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@ import { logToFile } from '../../../utils/debug';
import type { ProgramConfig } from '../program-step';
import type { BootstrapResult } from '../../agent/agent-runner';
import type { WizardRunOptions } from '../../../utils/types';
import { QueueStore, QUEUE_DIR_NAME, type TaskStatus } from './queue';
import {
QueueStore,
QUEUE_DIR_NAME,
type QueuedTask,
type TaskStatus,
} from './queue';
import { drainQueue, type RunTask } from './executor';
import {
agentRunTools,
assembleSeedPrompt,
assembleTaskPrompt,
loadAgentRegistry,
resolveTask,
taskModel,
type OrchestratorPromptContext,
} from './agent-prompt-loader';

Expand Down Expand Up @@ -73,7 +79,6 @@ export async function runOrchestrator(
boot: BootstrapResult,
): Promise<void> {
const runId = randomUUID();
const store = new QueueStore(session.installDir, runId);

const options = sessionRunOptions(session);

Expand All @@ -91,6 +96,74 @@ export async function runOrchestrator(
);
}

// Every wizard event from here on carries the variant, so orchestrator runs
// segment cleanly from the linear baseline.
analytics.setTag('variant', 'orchestrator');

// Responsiveness is the headline metric of the dark launch: time to first
// visible progress, and no single step dominating wall-clock. Track it from
// queue transitions, with the resolved model so cheap work is attributable
// to cheap models.
const runStartMs = Date.now();
let firstStartMs: number | undefined;
let lastStartMs: number | undefined;
const durationMs = (t: QueuedTask) =>
t.startedAt && t.finishedAt
? Date.parse(t.finishedAt) - Date.parse(t.startedAt)
: undefined;

const store = new QueueStore(session.installDir, runId, {
onTransition: (event, task) => {
const base = {
type: task.type,
model: taskModel(registry, task),
attempts: task.attempts,
};
switch (event) {
case 'enqueue':
analytics.wizardCapture('orchestrator task enqueued', {
type: task.type,
enqueued_by: task.enqueuedBy,
dynamic: task.enqueuedBy !== 'orchestrator',
});
break;
case 'start': {
const now = Date.now();
analytics.wizardCapture('orchestrator task started', {
...base,
ms_since_run_start: now - runStartMs,
gap_since_prev_start_ms:
lastStartMs === undefined ? undefined : now - lastStartMs,
});
firstStartMs ??= now;
lastStartMs = now;
break;
}
case 'complete':
analytics.wizardCapture('orchestrator task completed', {
...base,
duration_ms: durationMs(task),
});
break;
case 'skip':
analytics.wizardCapture('orchestrator task skipped', {
...base,
duration_ms: durationMs(task),
});
break;
case 'fail':
analytics.wizardCapture('orchestrator task failed', {
...base,
duration_ms: durationMs(task),
error: task.error?.type,
});
break;
case 'requeue':
break;
}
},
});

// Give task agents the framework's finished reference integration to match,
// the same EXAMPLE.md the linear flow uses. Install it under the run dir rather
// than .claude/skills so its "do everything" workflow is not auto-loaded as a
Expand Down Expand Up @@ -191,6 +264,7 @@ export async function runOrchestrator(
successMessage: 'Planned the integration',
additionalFeatureQueue: [],
requestRemark: false,
analyticsProperties: { task_type: 'seed' },
},
);
if (seedResult.error) {
Expand All @@ -211,6 +285,7 @@ export async function runOrchestrator(
// its agent prompt (the WHAT) and the mini-skills it needs (the HOW), then
// runs on its own model and tools.
const taskSkillsRoot = path.join(QUEUE_DIR_NAME, 'skills');
let remarkRequested = false;
const runTask: RunTask = async (task) => {
renderQueue();
try {
Expand All @@ -236,6 +311,17 @@ export async function runOrchestrator(
);
}
}
// The run-end reflection fires once, on the task that is last in the
// queue when it starts — nothing else pending or running alongside it.
const isLastTask = !store
.list()
.some(
(t) =>
t.id !== task.id &&
(t.status === 'pending' || t.status === 'in_progress'),
);
const requestRemark = isLastTask && !remarkRequested;
if (requestRemark) remarkRequested = true;
await runAgent(
{
...agent,
Expand All @@ -249,12 +335,12 @@ export async function runOrchestrator(
// Empty messages suppress the per-task spinner lines (the spinner renders
// only when a message is set); the queue panel shows progress. Errors
// still surface — runAgent stops the spinner with its own error text.
// No per-task remark — the reflection would fire on every task.
{
spinnerMessage: '',
successMessage: '',
additionalFeatureQueue: [],
requestRemark: false,
requestRemark,
analyticsProperties: { task_type: task.type, task_id: task.id },
},
);
} finally {
Expand All @@ -281,6 +367,10 @@ export async function runOrchestrator(
tasks_total: summary.total,
tasks_done: summary.done,
tasks_failed: summary.failed,
tasks_skipped: summary.skipped,
total_duration_ms: Date.now() - runStartMs,
time_to_first_task_ms:
firstStartMs === undefined ? undefined : firstStartMs - runStartMs,
});

// The build step flags any unresolved conflict in its handoff; surface the
Expand Down
Loading
Loading