diff --git a/src/lib/programs/orchestrator/__tests__/queue.test.ts b/src/lib/programs/orchestrator/__tests__/queue.test.ts new file mode 100644 index 00000000..4a18dee2 --- /dev/null +++ b/src/lib/programs/orchestrator/__tests__/queue.test.ts @@ -0,0 +1,135 @@ +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { + QueueStore, + type QueueFile, + type TaskHandoff, +} from '@lib/programs/orchestrator/queue'; + +function tmpDir(): string { + return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-test-')); +} + +describe('QueueStore', () => { + let dir: string; + let q: QueueStore; + + beforeEach(() => { + dir = tmpDir(); + q = new QueueStore(dir, 'run-1'); + }); + + afterEach(() => { + fs.rmSync(dir, { recursive: true, force: true }); + }); + + it('enqueues a pending task with defaults', () => { + const t = q.enqueue({ type: 'install' }); + expect(t.status).toBe('pending'); + expect(t.attempts).toBe(0); + expect(t.maxAttempts).toBe(2); + expect(t.enqueuedBy).toBe('orchestrator'); + expect(t.dependsOn).toEqual([]); + expect(q.list()).toHaveLength(1); + }); + + it('only marks a task runnable once its dependencies are done', () => { + const a = q.enqueue({ type: 'install' }); + const b = q.enqueue({ type: 'init', dependsOn: [a.id] }); + + expect(q.nextRunnable().map((t) => t.id)).toEqual([a.id]); + + q.start(a.id); + q.complete(a.id); + expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]); + }); + + it('returns every runnable task; the graph alone decides parallelism', () => { + const a = q.enqueue({ type: 'install' }); + const b = q.enqueue({ type: 'init' }); + q.enqueue({ type: 'capture', dependsOn: [a.id, b.id] }); + + // Both independent tasks are runnable at once; the dependent one is not. + expect( + q + .nextRunnable() + .map((t) => t.id) + .sort(), + ).toEqual([a.id, b.id].sort()); + + q.start(a.id); + // An in-progress task is no longer offered. + expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]); + }); + + it('treats a skipped dependency as satisfied', () => { + const a = q.enqueue({ type: 'install' }); + const b = q.enqueue({ type: 'init', dependsOn: [a.id] }); + + q.start(a.id); + q.skip(a.id); + expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]); + }); + + it('start increments attempts and supports within-run retry while attempts remain', () => { + const t = q.enqueue({ type: 'install', maxAttempts: 2 }); + q.start(t.id); + expect(q.get(t.id)?.attempts).toBe(1); + + q.fail(t.id, { type: 'API_ERROR', message: 'boom' }); + expect(q.get(t.id)?.status).toBe('failed'); + + // Retry: attempts (1) < maxAttempts (2), so requeue and run again. + q.requeue(t.id); + expect(q.get(t.id)?.status).toBe('pending'); + q.start(t.id); + expect(q.get(t.id)?.attempts).toBe(2); + }); + + it('completing a task records and reads back a structured handoff', () => { + const t = q.enqueue({ type: 'install' }); + const handoff: TaskHandoff = { + goals: 'install the sdk', + did: 'added posthog-js', + forNextAgent: 'env vars not set yet', + filesTouched: ['package.json'], + }; + q.start(t.id); + q.complete(t.id, handoff); + + expect(q.get(t.id)?.status).toBe('done'); + expect(q.readHandoff(t.id)).toEqual(handoff); + expect(q.readHandoffsByType('install')).toEqual([handoff]); + }); + + it('is drained when a pending task is blocked by a failed dependency', () => { + const a = q.enqueue({ type: 'install' }); + q.enqueue({ type: 'init', dependsOn: [a.id] }); + + expect(q.isDrained()).toBe(false); + q.start(a.id); + q.fail(a.id, { type: 'API_ERROR', message: 'boom' }); + + // init can never run now, and nothing is in progress. + expect(q.nextRunnable()).toHaveLength(0); + expect(q.isDrained()).toBe(true); + }); + + it('reflects every transition to queue.json, handoffs included', () => { + const a = q.enqueue({ type: 'install' }); + q.start(a.id); + q.complete(a.id, { + goals: 'g', + did: 'd', + forNextAgent: 'n', + }); + + const file = JSON.parse(fs.readFileSync(q.queuePath, 'utf8')) as QueueFile; + expect(file.version).toBe(1); + expect(file.runId).toBe('run-1'); + expect(file.tasks).toHaveLength(1); + expect(file.tasks[0].status).toBe('done'); + expect(file.tasks[0].handoff?.did).toBe('d'); + }); +}); diff --git a/src/lib/programs/orchestrator/queue.ts b/src/lib/programs/orchestrator/queue.ts new file mode 100644 index 00000000..0ac9cb46 --- /dev/null +++ b/src/lib/programs/orchestrator/queue.ts @@ -0,0 +1,231 @@ +/** + * The orchestrator task queue. + * + * In memory, synchronous, single-owner: one Node process drives the run, so + * there is no locking. The queue imposes no execution policy — `nextRunnable` + * returns every pending task whose dependencies are satisfied, and how many of + * those run at once is decided by the task graph, not the queue. + * + * Every transition rewrites `/.posthog-wizard/queue.json`, a small + * file holding the whole queue, handoffs included. Today it is the run's + * log and the report's source; later it is the resume point. + */ +import * as fs from 'fs'; +import * as path from 'path'; +import { randomUUID } from 'crypto'; +import { writeJsonAtomic } from '../../../utils/atomic-ledger'; + +export type TaskStatus = + | 'pending' + | 'in_progress' + | 'done' + | 'skipped' + | 'failed'; + +export interface QueuedTask { + id: string; + type: string; + status: TaskStatus; + dependsOn: string[]; + inputs: Record; + model?: string; + attempts: number; + maxAttempts: number; + /** The structured handoff the task reported on completion. */ + handoff?: TaskHandoff; + /** 'orchestrator' for seeded tasks, or the id of the task that enqueued this one. */ + enqueuedBy: string; + createdAt: string; + startedAt?: string; + finishedAt?: string; + error?: { type: string; message: string }; +} + +export interface QueueFile { + version: 1; + runId: string; + tasks: QueuedTask[]; +} + +/** The structured handoff a task leaves for the next agent. */ +export interface TaskHandoff { + goals: string; + did: string; + forNextAgent: string; + filesTouched?: string[]; +} + +export interface EnqueueInput { + type: string; + inputs?: Record; + dependsOn?: string[]; + model?: string; + maxAttempts?: number; + enqueuedBy?: string; +} + +export const QUEUE_DIR_NAME = '.posthog-wizard'; +const DEFAULT_MAX_ATTEMPTS = 2; + +function nowIso(): string { + return new Date().toISOString(); +} + +export class QueueStore { + private tasks: QueuedTask[] = []; + + readonly runId: string; + readonly queuePath: string; + + constructor(installDir: string, runId: string) { + this.runId = runId; + const dir = path.join(installDir, QUEUE_DIR_NAME); + this.queuePath = path.join(dir, 'queue.json'); + fs.mkdirSync(dir, { recursive: true }); + } + + // ── Reads ─────────────────────────────────────────────────────────── + + list(): readonly QueuedTask[] { + return this.tasks; + } + + get(id: string): QueuedTask | undefined { + return this.tasks.find((t) => t.id === id); + } + + /** + * Every pending task whose dependencies are all satisfied (`done` or + * `skipped`). A skipped dependency does not block downstream work. + */ + nextRunnable(): QueuedTask[] { + const doneIds = new Set( + this.tasks + .filter((t) => t.status === 'done' || t.status === 'skipped') + .map((t) => t.id), + ); + return this.tasks.filter( + (t) => t.status === 'pending' && t.dependsOn.every((d) => doneIds.has(d)), + ); + } + + /** + * True when no task is in progress and none can be started. Either everything + * is terminal, or the only pending tasks are blocked by a failed dependency. + */ + isDrained(): boolean { + if (this.tasks.some((t) => t.status === 'in_progress')) return false; + return this.nextRunnable().length === 0; + } + + summary(): Record & { total: number } { + const counts: Record = { + pending: 0, + in_progress: 0, + done: 0, + skipped: 0, + failed: 0, + }; + for (const t of this.tasks) counts[t.status] += 1; + return { ...counts, total: this.tasks.length }; + } + + readHandoff(id: string): TaskHandoff | null { + return this.get(id)?.handoff ?? null; + } + + /** Handoffs of completed tasks of a given type, oldest first. */ + readHandoffsByType(type: string): TaskHandoff[] { + return this.tasks + .filter((t) => t.type === type && t.handoff) + .map((t) => t.handoff as TaskHandoff); + } + + // ── Transitions (each one reflected to queue.json) ────────────────── + + enqueue(input: EnqueueInput): QueuedTask { + const task: QueuedTask = { + id: randomUUID(), + type: input.type, + status: 'pending', + dependsOn: input.dependsOn ?? [], + inputs: input.inputs ?? {}, + model: input.model, + attempts: 0, + maxAttempts: input.maxAttempts ?? DEFAULT_MAX_ATTEMPTS, + enqueuedBy: input.enqueuedBy ?? 'orchestrator', + createdAt: nowIso(), + }; + this.tasks.push(task); + this.reflect(); + return task; + } + + start(id: string): QueuedTask { + const t = this.require(id); + t.status = 'in_progress'; + t.startedAt = nowIso(); + t.attempts += 1; + this.reflect(); + return t; + } + + complete(id: string, handoff?: TaskHandoff): QueuedTask { + return this.finish(id, 'done', handoff); + } + + /** Terminal: the agent could not do the task. Not done, not failed. */ + skip(id: string, handoff?: TaskHandoff): QueuedTask { + return this.finish(id, 'skipped', handoff); + } + + fail( + id: string, + error: { type: string; message: string }, + handoff?: TaskHandoff, + ): QueuedTask { + const t = this.require(id); + t.error = error; + return this.finish(id, 'failed', handoff); + } + + /** Put a failed/in-progress task back to pending for a retry within the run. */ + requeue(id: string): QueuedTask { + const t = this.require(id); + t.status = 'pending'; + t.startedAt = undefined; + t.finishedAt = undefined; + this.reflect(); + return t; + } + + // ── Internals ─────────────────────────────────────────────────────── + + private finish( + id: string, + status: 'done' | 'skipped' | 'failed', + handoff?: TaskHandoff, + ): QueuedTask { + const t = this.require(id); + if (handoff) t.handoff = handoff; + t.status = status; + t.finishedAt = nowIso(); + this.reflect(); + return t; + } + + private reflect(): void { + const file: QueueFile = { + version: 1, + runId: this.runId, + tasks: this.tasks, + }; + writeJsonAtomic(this.queuePath, file); + } + + private require(id: string): QueuedTask { + const t = this.get(id); + if (!t) throw new Error(`No task ${id} in the queue`); + return t; + } +} diff --git a/src/lib/wizard-tools.ts b/src/lib/wizard-tools.ts index ab8824b6..aae21ec5 100644 --- a/src/lib/wizard-tools.ts +++ b/src/lib/wizard-tools.ts @@ -16,6 +16,7 @@ import { z } from 'zod'; import { logToFile } from '@utils/debug'; import { analytics } from '@utils/analytics'; import { skillTmpPath } from '@utils/paths'; +import { writeJsonAtomic, makeMutex } from '@utils/atomic-ledger'; import type { PackageManagerDetector } from './detection/package-manager'; import { AUDIT_CHECKS_FILE, @@ -368,14 +369,9 @@ const auditUpdateSchema = z.object({ details: z.string().optional(), }); -/** - * Atomically write JSON: write to .tmp then rename. The rename is what bumps - * the file's mtime, which is what the UI's file watcher polls on. - */ +/** Atomically write the audit ledger. Thin typed wrapper over writeJsonAtomic. */ function writeLedgerAtomic(targetPath: string, checks: AuditCheck[]): void { - const tmpPath = `${targetPath}.tmp`; - fs.writeFileSync(tmpPath, JSON.stringify(checks, null, 2), 'utf8'); - fs.renameSync(tmpPath, targetPath); + writeJsonAtomic(targetPath, checks); } /** @@ -474,19 +470,6 @@ function appendAuditChecksToLedger( return { ok: true, added: additions.length }; } -/** - * Single async mutex shared by audit tools — guarantees a read-modify-write - * cycle on the ledger is atomic across concurrent tool calls (e.g. future subagents). - */ -function makeMutex() { - let chain: Promise = Promise.resolve(); - return async function run(fn: () => Promise | T): Promise { - const next = chain.then(() => fn()); - chain = next.catch(() => undefined); - return next; - }; -} - // --------------------------------------------------------------------------- // Server factory // --------------------------------------------------------------------------- diff --git a/src/utils/atomic-ledger.ts b/src/utils/atomic-ledger.ts new file mode 100644 index 00000000..0ae8c832 --- /dev/null +++ b/src/utils/atomic-ledger.ts @@ -0,0 +1,29 @@ +/** + * Small shared primitives for on-disk ledgers: an atomic JSON writer and a + * single-chain async mutex. Used by the audit tools and by the orchestrator + * queue. Lifted here so both share one implementation. + */ +import * as fs from 'fs'; + +/** + * Atomically write JSON: write to a `.tmp` file then rename over the target. The + * rename bumps the file's mtime in one step, which is what a file watcher polls. + */ +export function writeJsonAtomic(targetPath: string, data: unknown): void { + const tmpPath = `${targetPath}.tmp`; + fs.writeFileSync(tmpPath, JSON.stringify(data, null, 2), 'utf8'); + fs.renameSync(tmpPath, targetPath); +} + +/** + * A single async mutex. Serializes read-modify-write cycles so concurrent callers + * (parallel task agents, audit tool calls) never interleave a mutation. + */ +export function makeMutex() { + let chain: Promise = Promise.resolve(); + return async function run(fn: () => Promise | T): Promise { + const next = chain.then(() => fn()); + chain = next.catch(() => undefined); + return next; + }; +}