feat(orchestrator): in-memory queue + disk persistence (QueueStore)#607
Conversation
🧙 Wizard CIRun the Wizard CI and test your changes against wizard-workbench example apps by replying with a GitHub comment using one of the following commands: Test all apps:
Test all apps in a directory:
Test an individual app:
Show more apps
Results will be posted here when complete. |
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
4438709 to
7112748
Compare
f8d51ae to
48e0be9
Compare
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
7112748 to
f6620f8
Compare
48e0be9 to
0b7c4fd
Compare
| enqueue(input: EnqueueInput): QueuedTask { | ||
| const task: QueuedTask = { | ||
| id: randomUUID(), | ||
| type: input.type, | ||
| status: 'pending', | ||
| dependsOn: input.dependsOn ?? [], | ||
| inputs: input.inputs ?? {}, | ||
| model: input.model, | ||
| attempts: 0, | ||
| maxAttempts: input.maxAttempts ?? DEFAULT_MAX_ATTEMPTS, | ||
| enqueuedBy: input.enqueuedBy ?? 'orchestrator', | ||
| createdAt: nowIso(), | ||
| }; | ||
| this.tasks.push(task); | ||
| this.reflect(); | ||
| return task; | ||
| } |
There was a problem hiding this comment.
Missing dependency validation allows non-existent task IDs to be added as dependencies, causing tasks to be permanently blocked.
If input.dependsOn contains task IDs that don't exist in the queue, the new task will never become runnable because nextRunnable() checks if all dependencies are in the doneIds set. Non-existent IDs will never be in that set.
This creates a silent deadlock where tasks appear pending but can never run, and isDrained() will return true even though work is incomplete.
enqueue(input: EnqueueInput): QueuedTask {
// Validate dependencies exist
if (input.dependsOn) {
const existingIds = new Set(this.tasks.map(t => t.id));
const invalid = input.dependsOn.filter(id => !existingIds.has(id));
if (invalid.length > 0) {
throw new Error(`Invalid dependencies: ${invalid.join(', ')}`);
}
}
const task: QueuedTask = {
// ... rest of implementation
};
}| enqueue(input: EnqueueInput): QueuedTask { | |
| const task: QueuedTask = { | |
| id: randomUUID(), | |
| type: input.type, | |
| status: 'pending', | |
| dependsOn: input.dependsOn ?? [], | |
| inputs: input.inputs ?? {}, | |
| model: input.model, | |
| attempts: 0, | |
| maxAttempts: input.maxAttempts ?? DEFAULT_MAX_ATTEMPTS, | |
| enqueuedBy: input.enqueuedBy ?? 'orchestrator', | |
| createdAt: nowIso(), | |
| }; | |
| this.tasks.push(task); | |
| this.reflect(); | |
| return task; | |
| } | |
| enqueue(input: EnqueueInput): QueuedTask { | |
| if (input.dependsOn && input.dependsOn.length > 0) { | |
| const existingIds = new Set(this.tasks.map(t => t.id)); | |
| const invalid = input.dependsOn.filter(id => !existingIds.has(id)); | |
| if (invalid.length > 0) { | |
| throw new Error(`Invalid dependencies: ${invalid.join(', ')}`); | |
| } | |
| } | |
| const task: QueuedTask = { | |
| id: randomUUID(), | |
| type: input.type, | |
| status: 'pending', | |
| dependsOn: input.dependsOn ?? [], | |
| inputs: input.inputs ?? {}, | |
| model: input.model, | |
| attempts: 0, | |
| maxAttempts: input.maxAttempts ?? DEFAULT_MAX_ATTEMPTS, | |
| enqueuedBy: input.enqueuedBy ?? 'orchestrator', | |
| createdAt: nowIso(), | |
| }; | |
| this.tasks.push(task); | |
| this.reflect(); | |
| return task; | |
| } | |
Spotted by Graphite
Is this helpful? React 👍 or 👎 to let us know.
|
|
||
| export type TaskStatus = | ||
| | 'pending' | ||
| | 'in_progress' |
| function nowIso(): string { | ||
| return new Date().toISOString(); | ||
| } |
| id: string; | ||
| type: string; | ||
| status: TaskStatus; | ||
| dependsOn: string[]; |
There was a problem hiding this comment.
are we preventing circular deps? dependsOn forming a cycle (A→B→A) or pointing at a non-existent id
| .filter((t) => t.status === 'done' || t.status === 'skipped') | ||
| .map((t) => t.id), |
There was a problem hiding this comment.
what about failed? does the queue just stop?
| /** The structured handoff the task reported on completion. */ | ||
| handoff?: TaskHandoff; | ||
| /** 'orchestrator' for seeded tasks, or the id of the task that enqueued this one. */ | ||
| enqueuedBy: string; |
There was a problem hiding this comment.
enqueuedBy is free-form and tasks can enqueue tasks. Without a depth or count bound, a misbehaving task type can grow the queue unboundedly

In-memory task queue for the orchestrator. Synchronous and single-owner, no locking. Tasks carry type, dependencies, attempts, and a structured handoff for the next agent. Every transition rewrites
.posthog-wizard/queue.json, the run's log today and the resume point later.nextRunnablereturns every pending task whose dependencies are satisfied. Parallelism is decided by the task graph, not by a queue knob.