From 55ee3c0a43a32f193180eb6cb2279a26344ed123 Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 13:45:32 +0800 Subject: [PATCH 01/12] feat: add automation run context seam --- .../opencode/src/automation/run-context.ts | 47 +++++++++++++++++++ packages/opencode/src/permission/index.ts | 11 +++-- packages/opencode/src/session/prompt.ts | 28 ++++++++++- 3 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 packages/opencode/src/automation/run-context.ts diff --git a/packages/opencode/src/automation/run-context.ts b/packages/opencode/src/automation/run-context.ts new file mode 100644 index 000000000..9d10bd55e --- /dev/null +++ b/packages/opencode/src/automation/run-context.ts @@ -0,0 +1,47 @@ +import { Context, Effect } from "effect" +import type { Permission } from "@/permission" +import type { PermissionID } from "@/permission/schema" + +export type AutomationRunAttendance = "attended" | "unattended" + +export type AutomationRunBlocker = + | { kind: "permission"; requestID: PermissionID } + | { kind: "question"; callID: string } + +export interface AutomationRunContext { + readonly attendance: AutomationRunAttendance + readonly stepCap?: number + readonly block: (blocker: AutomationRunBlocker) => Effect.Effect + readonly clear: () => Effect.Effect +} + +export class AutomationStepCapError extends Error { + readonly _tag = "AutomationStepCapError" + constructor(readonly stepCap: number) { + super(`Automation run exceeded the hard step cap (${stepCap}).`) + this.name = "AutomationStepCapError" + } +} + +export const AutomationRunContextService: Context.Reference = Context.Reference< + AutomationRunContext | undefined +>("@opencode/AutomationRunContext", { + defaultValue: () => undefined, +}) + +export const AutomationRunContext = { + service: AutomationRunContextService, + current: AutomationRunContextService, + attended(input: Pick): AutomationRunContext { + return { attendance: "attended", ...input } + }, + unattended(input: Pick): AutomationRunContext { + return { attendance: "unattended", ...input } + }, + permissionOnPending( + context: AutomationRunContext | undefined, + ): ((request: Permission.Request) => Effect.Effect) | undefined { + if (!context) return undefined + return (request) => context.block({ kind: "permission", requestID: request.id }) + }, +} diff --git a/packages/opencode/src/permission/index.ts b/packages/opencode/src/permission/index.ts index 41a99e829..570becd2e 100644 --- a/packages/opencode/src/permission/index.ts +++ b/packages/opencode/src/permission/index.ts @@ -117,8 +117,12 @@ export namespace Permission { message: z.string().optional(), }) + export type AskOptions = z.infer & { + onPending?: (request: Request) => Effect.Effect + } + export interface Interface { - readonly ask: (input: z.infer) => Effect.Effect + readonly ask: (input: AskOptions) => Effect.Effect readonly reply: (input: z.infer) => Effect.Effect readonly clearSession: ( sessionID: SessionID, @@ -171,9 +175,9 @@ export namespace Permission { }), ) - const ask = Effect.fn("Permission.ask")(function* (input: z.infer) { + const ask = Effect.fn("Permission.ask")(function* (input: AskOptions) { const { approved, pending } = yield* InstanceState.get(state) - const { ruleset, ...request } = input + const { ruleset, onPending, ...request } = input let needsAsk = false const denied: Array<{ pattern: string; rule: Rule }> = [] @@ -222,6 +226,7 @@ export namespace Permission { const deferred = yield* Deferred.make() pending.set(id, { info, deferred }) + if (onPending) yield* onPending(info) yield* bus.publish(Event.Asked, info) return yield* Effect.ensuring( Deferred.await(deferred), diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 5a07dae8a..bfc0a68dd 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -62,6 +62,7 @@ import { attachWith, makeRuntime } from "@/effect/run-service" import { Instance } from "@/project/instance" import { MemoryFile } from "@/memory/memory" import { MemoryService } from "@/memory/service" +import { AutomationRunContext, AutomationStepCapError } from "@/automation/run-context" // @ts-ignore globalThis.AI_SDK_LOG_WARNINGS = false @@ -676,6 +677,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the const run = yield* runner() const promptOps = yield* ops() const effectContext = yield* Effect.context() + const automation = yield* AutomationRunContext.current const runInSessionContext = (effect: Effect.Effect): Effect.Effect => Effect.gen(function* () { const session = yield* sessions.get(input.session.id) @@ -739,13 +741,20 @@ NOTE: At any point in time through this workflow you should feel free to ask the sessionID: input.session.id, tool: { messageID: input.processor.message.id, callID: options.toolCallId }, ruleset: Permission.merge(input.agent.permission, input.session.permission ?? []), + onPending: AutomationRunContext.permissionOnPending(automation), }) - .pipe(Effect.orDie), + .pipe(Effect.ensuring(automation ? automation.clear() : Effect.void), Effect.orDie), externalResult: ({ inputSnapshot, decoder }) => Effect.gen(function* () { + if (automation?.attendance === "unattended") { + return yield* Effect.fail(new ExternalResult.Error({ reason: "aborted" })) + } const sessionID = input.session.id const messageID = input.processor.message.id const callID = options.toolCallId + if (automation) { + yield* automation.block({ kind: "question", callID }) + } const deferred = yield* ExternalResult.register({ sessionID, messageID, @@ -814,6 +823,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the return result as Tool.ExternalResultOutcome } finally { if (signal) signal.removeEventListener("abort", abortHandler) + if (automation) yield* automation.clear() } }), }) @@ -1939,6 +1949,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the const runLoop: (sessionID: SessionID) => Effect.Effect = Effect.fn("SessionPrompt.run")( function* (sessionID: SessionID) { const ctx = yield* InstanceState.context + const automation = yield* AutomationRunContext.current const slog = elog.with({ sessionID }) let structured: unknown | undefined let step = 0 @@ -2054,6 +2065,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the throw error } const maxSteps = agent.steps ?? Infinity + const automationHardStepCap = automation?.stepCap ?? 50 + if (automation && step > automationHardStepCap) { + throw new AutomationStepCapError(automationHardStepCap) + } const isLastStep = step >= maxSteps msgs = yield* insertReminders({ messages: msgs, agent, session }) const diagnostics = SessionDiagnostics.consumeReminders({ messages: msgs, parentID: lastUser.id }) @@ -2734,6 +2749,17 @@ export async function prompt(input: PromptInput) { return runPromise((svc) => svc.prompt(PromptInput.parse(input))) } +export async function promptWithAutomationContext( + input: PromptInput, + context: import("@/automation/run-context").AutomationRunContext, +) { + return runPromise((svc) => + svc + .prompt(PromptInput.parse(input)) + .pipe(Effect.provideService(AutomationRunContext.service, context)), + ) +} + export async function resolvePromptParts(template: string) { return runPromise((svc) => svc.resolvePromptParts(z.string().parse(template))) } From 41ee735cdf787e34d7bfe6e45211140934311c24 Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 13:45:44 +0800 Subject: [PATCH 02/12] feat: execute automation run now --- packages/opencode/src/automation/index.ts | 137 +++++++++++++++++- packages/opencode/src/automation/runner.ts | 39 +++++ .../src/server/instance/automation.ts | 5 +- 3 files changed, 179 insertions(+), 2 deletions(-) create mode 100644 packages/opencode/src/automation/runner.ts diff --git a/packages/opencode/src/automation/index.ts b/packages/opencode/src/automation/index.ts index 7e027763f..dc110d4b9 100644 --- a/packages/opencode/src/automation/index.ts +++ b/packages/opencode/src/automation/index.ts @@ -7,6 +7,7 @@ import { ProjectID } from "@/project/schema" import { PermissionID } from "@/permission/schema" import { SessionID } from "@/session/schema" import { NotFoundError } from "@/storage/db" +import type { AutomationRunAttendance, AutomationRunBlocker } from "./run-context" export const AutomationID = { Definition: { @@ -232,8 +233,16 @@ export namespace Automation { type State = { definitions: Map runs: Map + activeRuns: Set } - const state = Instance.state(() => ({ definitions: new Map(), runs: new Map() })) + const state = Instance.state(() => ({ definitions: new Map(), runs: new Map(), activeRuns: new Set() })) + + export type RunExecutor = (input: { + definition: Definition + run: Run + attendance: AutomationRunAttendance + signal: AbortSignal + }) => Promise<{ sessionID: SessionID; result: string | null; cost?: number | null }> const COMMON_CREATE_FIELDS = new Set([ "kind", @@ -460,6 +469,66 @@ export namespace Automation { return { id: previous.id, deleted: true, revision: previous.revision + 1 } } + function replaceRun(run: Run) { + const current = state().runs.get(run.automationID) ?? [] + state().runs.set( + run.automationID, + current.map((item) => (item.id === run.id ? run : item)), + ) + return run + } + + function reviseRun(run: Run, patch: Record): Run { + const next = { + ...run, + ...patch, + revision: run.revision + 1, + } + for (const [key, value] of Object.entries(next)) { + if (value === undefined) delete (next as Record)[key] + } + return replaceRun(Run.parse(next)) + } + + export function markRunStarted(run: Run, sessionID: SessionID, options?: { now?: number }): Run { + return reviseRun(run, { + state: "running", + sessionID, + startedAt: options?.now ?? Date.now(), + completedAt: null, + result: null, + error: null, + }) + } + + function setDefinitionAutomationSession(definition: Definition, sessionID: SessionID) { + if (definition.automationSessionID === sessionID) return definition + const next = Definition.parse({ + ...definition, + automationSessionID: sessionID, + revision: definition.revision + 1, + updatedAt: Date.now(), + }) + state().definitions.set(definition.id, next) + return next + } + + export function markRunBlocked(run: Run, blocker: AutomationRunBlocker): Run { + if (run.state !== "running" && run.state !== "awaiting_input") return run + return reviseRun(run, { + state: "awaiting_input", + blocker, + }) + } + + export function clearRunBlocker(run: Run): Run { + if (run.state !== "awaiting_input") return run + return reviseRun(run, { + state: "running", + blocker: undefined, + }) + } + export function runNow(id: string, options?: { now?: number }): Run { const definition = get(id) const current = state().runs.get(id) ?? [] @@ -481,6 +550,72 @@ export namespace Automation { return run } + export function runNowExecuting( + id: string, + options: { executor: RunExecutor; attendance?: AutomationRunAttendance; now?: number }, + ): Run { + const initial = runNow(id, { now: options.now }) + void executeRun(initial, options.executor, options.attendance ?? "attended") + return initial + } + + async function executeRun(initial: Run, executor: RunExecutor, attendance: AutomationRunAttendance) { + const data = state() + if (data.activeRuns.has(initial.automationID)) { + const stopped = reviseRun(initial, { + state: "stopped", + completedAt: Date.now(), + stopReason: "previous_run_awaiting_input", + }) + await publishRunUpdated(stopped) + return + } + data.activeRuns.add(initial.automationID) + const controller = new AbortController() + let current = initial + try { + const definition = get(initial.automationID) + const prepared = await executor({ definition, run: initial, attendance, signal: controller.signal }) + const latest = state().runs.get(initial.automationID)?.find((item) => item.id === initial.id) ?? initial + const running = latest.state === "scheduled" ? markRunStarted(latest, prepared.sessionID) : latest + current = running + await publishRunUpdated(running) + if (definition.context === "continue") setDefinitionAutomationSession(definition, prepared.sessionID) + const succeeded = reviseRun(running, { + state: "succeeded", + completedAt: Date.now(), + result: prepared.result, + error: null, + cost: prepared.cost ?? null, + }) + current = succeeded + await publishRunUpdated(succeeded) + } catch (error) { + current = state().runs.get(initial.automationID)?.find((item) => item.id === initial.id) ?? current + if (current.state === "scheduled") { + const stopped = reviseRun(current, { + state: "stopped", + completedAt: Date.now(), + stopReason: "cancelled", + }) + await publishRunUpdated(stopped) + return + } + const isStepCap = error instanceof globalThis.Error && error.name === "AutomationStepCapError" + const failed = reviseRun(current, { + state: "failed", + completedAt: Date.now(), + error: { + code: isStepCap ? "step_cap" : "execution_failed", + message: error instanceof globalThis.Error ? error.message : String(error), + }, + }) + await publishRunUpdated(failed) + } finally { + data.activeRuns.delete(initial.automationID) + } + } + export function runs(input: { automationID: string; limit?: number; cursor?: string }) { get(input.automationID) const limit = Math.min(Math.max(input.limit ?? 50, 1), 100) diff --git a/packages/opencode/src/automation/runner.ts b/packages/opencode/src/automation/runner.ts new file mode 100644 index 000000000..4ab26319f --- /dev/null +++ b/packages/opencode/src/automation/runner.ts @@ -0,0 +1,39 @@ +import { Effect } from "effect" +import { Automation } from "." +import { Session } from "@/session" +import { SessionPrompt } from "@/session/prompt" +import { AutomationRunContext } from "./run-context" + +export const sessionPromptExecutor: Automation.RunExecutor = async ({ definition, run, attendance, signal }) => { + const sessionID = + definition.context === "continue" && definition.automationSessionID + ? definition.automationSessionID + : (await Session.create({ title: `Automation: ${definition.title}` })).id + let currentRun = Automation.markRunStarted(run, sessionID) + await Automation.publishRunUpdated(currentRun) + const context = AutomationRunContext.attended({ + stepCap: 50, + block: (blocker) => + Effect.sync(() => { + currentRun = Automation.markRunBlocked(currentRun, blocker) + }).pipe(Effect.flatMap(() => Effect.promise(() => Automation.publishRunUpdated(currentRun)))), + clear: () => + Effect.sync(() => { + currentRun = Automation.clearRunBlocker(currentRun) + }).pipe(Effect.flatMap(() => Effect.promise(() => Automation.publishRunUpdated(currentRun)))), + }) + const scoped = attendance === "attended" ? context : AutomationRunContext.unattended(context) + const message = await SessionPrompt.promptWithAutomationContext( + { + sessionID, + parts: [{ type: "text", text: definition.prompt }], + }, + scoped, + ) + signal.throwIfAborted() + return { + sessionID, + result: message.parts.find((part) => part.type === "text")?.text ?? null, + cost: message.info.role === "assistant" ? message.info.cost : null, + } +} diff --git a/packages/opencode/src/server/instance/automation.ts b/packages/opencode/src/server/instance/automation.ts index 12b6ceb91..73c11544e 100644 --- a/packages/opencode/src/server/instance/automation.ts +++ b/packages/opencode/src/server/instance/automation.ts @@ -3,6 +3,7 @@ import type { Context } from "hono" import { describeRoute, resolver, validator } from "hono-openapi" import z from "zod" import { Automation, AutomationID, ValidationError } from "@/automation" +import { sessionPromptExecutor } from "@/automation/runner" import { errors } from "../error" function validationError(error: ValidationError) { @@ -241,7 +242,9 @@ export const AutomationRoutes = (): Hono => }), validator("param", z.object({ automationID: AutomationID.Definition.zod })), async (c) => { - const run = Automation.runNow(c.req.valid("param").automationID) + const run = Automation.runNowExecuting(c.req.valid("param").automationID, { + executor: sessionPromptExecutor, + }) await Automation.publishRunUpdated(run) return c.json(run) }, From 57a93fa56ebcd762d7cc27964a70bba40b97a54f Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 13:45:59 +0800 Subject: [PATCH 03/12] test: cover automation run now execution --- .../test/server/automation-routes.test.ts | 2 +- .../test/server/automation-runner.test.ts | 128 ++++++++++++++++++ 2 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 packages/opencode/test/server/automation-runner.test.ts diff --git a/packages/opencode/test/server/automation-routes.test.ts b/packages/opencode/test/server/automation-routes.test.ts index 775b54a8c..c5af72285 100644 --- a/packages/opencode/test/server/automation-routes.test.ts +++ b/packages/opencode/test/server/automation-routes.test.ts @@ -471,7 +471,7 @@ describe("automation routes", () => { expect(spec.components?.schemas).toHaveProperty("AutomationValidationError") }) - test("runNow is a contract stub before PR2 execution", async () => { + test("runNow returns the queued run before background execution updates it", async () => { await withAutomationApp(async ({ app, projectID }) => { const created = await json(app, "/automation", { method: "POST", diff --git a/packages/opencode/test/server/automation-runner.test.ts b/packages/opencode/test/server/automation-runner.test.ts new file mode 100644 index 000000000..136f41682 --- /dev/null +++ b/packages/opencode/test/server/automation-runner.test.ts @@ -0,0 +1,128 @@ +import { afterEach, describe, expect, test } from "bun:test" +import { Automation } from "../../src/automation" +import { Instance } from "../../src/project/instance" +import { ProjectID } from "../../src/project/schema" +import { SessionID } from "../../src/session/schema" +import { AutomationStepCapError } from "../../src/automation/run-context" +import { tmpdir } from "../fixture/fixture" + +afterEach(async () => { + await Instance.disposeAll() +}) + +async function withAutomation(fn: (projectID: ProjectID) => Promise) { + await using tmp = await tmpdir({ git: true }) + return Instance.provide({ + directory: tmp.path, + fn: () => fn(Instance.project.id), + }) +} + +function input(projectID: ProjectID): Automation.CreateInput { + return { + kind: "recurring", + title: "Repo brief", + prompt: "Summarize repo changes.", + context: "fresh", + where: { projectID }, + timezone: "Asia/Shanghai", + rhythm: { kind: "interval", everyMs: 60_000 }, + stop: { kind: "count", count: 3 }, + } +} + +async function waitForRun(automationID: string, state: Automation.Run["state"]) { + const deadline = Date.now() + 2_000 + while (Date.now() < deadline) { + const run = Automation.runs({ automationID }).items.find((item) => item.state === state) + if (run?.state === state) return run + await Bun.sleep(10) + } + throw new Error(`Timed out waiting for ${state}`) +} + +describe("automation runNow execution", () => { + test("executes a run and records the terminal result", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + const sessionID = SessionID.descending() + + const initial = Automation.runNowExecuting(definition.id, { + executor: async () => ({ sessionID, result: "done", cost: 0 }), + }) + expect(initial.state).toBe("scheduled") + + const completed = await waitForRun(definition.id, "succeeded") + expect(completed).toMatchObject({ + state: "succeeded", + sessionID, + result: "done", + error: null, + }) + expect(completed.revision).toBeGreaterThan(initial.revision) + }) + }) + + test("keeps one active writer per automation", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + let release!: () => void + const held = new Promise((resolve) => { + release = resolve + }) + + Automation.runNowExecuting(definition.id, { + executor: async () => { + await held + return { sessionID: SessionID.descending(), result: "first", cost: 0 } + }, + }) + Automation.runNowExecuting(definition.id, { + executor: async () => ({ sessionID: SessionID.descending(), result: "second", cost: 0 }), + }) + + const stopped = await waitForRun(definition.id, "stopped") + if (stopped.state !== "stopped") throw new Error("expected stopped run") + expect(stopped.stopReason).toBe("previous_run_awaiting_input") + release() + const succeeded = await waitForRun(definition.id, "succeeded") + expect(succeeded.result).toBe("first") + }) + }) + + test("records and clears blocker state on the run ledger", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + const run = Automation.runNow(definition.id) + const started = Automation.markRunStarted(run, SessionID.descending(), { now: run.triggeredAt }) + const blocked = Automation.markRunBlocked(started, { kind: "question", callID: "call_1" }) + const cleared = Automation.clearRunBlocker(blocked) + + expect(blocked).toMatchObject({ + state: "awaiting_input", + blocker: { kind: "question", callID: "call_1" }, + }) + expect(cleared.state).toBe("running") + expect(cleared).not.toHaveProperty("blocker") + }) + }) + + test("records hard step-cap failures with the frozen stop code", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + + Automation.runNowExecuting(definition.id, { + executor: async ({ run }) => { + Automation.markRunStarted(run, SessionID.descending(), { now: run.triggeredAt }) + throw new AutomationStepCapError(50) + }, + }) + + const failed = await waitForRun(definition.id, "failed") + expect(failed.error).toEqual({ + code: "step_cap", + message: "Automation run exceeded the hard step cap (50).", + }) + }) + }) +}) From 11508387adec0623230c5e9faf5f69c1d384e024 Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 14:22:51 +0800 Subject: [PATCH 04/12] fix: address automation run review feedback --- packages/opencode/src/automation/index.ts | 8 +- .../opencode/src/automation/run-context.ts | 4 +- packages/opencode/src/automation/runner.ts | 23 +++-- .../src/server/instance/automation.ts | 2 +- .../test/server/automation-runner.test.ts | 86 ++++++++++++++++++- 5 files changed, 108 insertions(+), 15 deletions(-) diff --git a/packages/opencode/src/automation/index.ts b/packages/opencode/src/automation/index.ts index dc110d4b9..f8c7e2c79 100644 --- a/packages/opencode/src/automation/index.ts +++ b/packages/opencode/src/automation/index.ts @@ -484,6 +484,8 @@ export namespace Automation { ...patch, revision: run.revision + 1, } + if (next.state !== "awaiting_input") delete (next as Record).blocker + if (next.state !== "stopped") delete (next as Record).stopReason for (const [key, value] of Object.entries(next)) { if (value === undefined) delete (next as Record)[key] } @@ -580,7 +582,11 @@ export namespace Automation { const running = latest.state === "scheduled" ? markRunStarted(latest, prepared.sessionID) : latest current = running await publishRunUpdated(running) - if (definition.context === "continue") setDefinitionAutomationSession(definition, prepared.sessionID) + const latestDefinition = get(initial.automationID) + if (latestDefinition.context === "continue") { + const updatedDefinition = setDefinitionAutomationSession(latestDefinition, prepared.sessionID) + if (updatedDefinition !== latestDefinition) await publishDefinitionUpdated(updatedDefinition) + } const succeeded = reviseRun(running, { state: "succeeded", completedAt: Date.now(), diff --git a/packages/opencode/src/automation/run-context.ts b/packages/opencode/src/automation/run-context.ts index 9d10bd55e..636d0647a 100644 --- a/packages/opencode/src/automation/run-context.ts +++ b/packages/opencode/src/automation/run-context.ts @@ -33,10 +33,10 @@ export const AutomationRunContext = { service: AutomationRunContextService, current: AutomationRunContextService, attended(input: Pick): AutomationRunContext { - return { attendance: "attended", ...input } + return { ...input, attendance: "attended" } }, unattended(input: Pick): AutomationRunContext { - return { attendance: "unattended", ...input } + return { ...input, attendance: "unattended" } }, permissionOnPending( context: AutomationRunContext | undefined, diff --git a/packages/opencode/src/automation/runner.ts b/packages/opencode/src/automation/runner.ts index 4ab26319f..1e1243cf7 100644 --- a/packages/opencode/src/automation/runner.ts +++ b/packages/opencode/src/automation/runner.ts @@ -2,7 +2,7 @@ import { Effect } from "effect" import { Automation } from "." import { Session } from "@/session" import { SessionPrompt } from "@/session/prompt" -import { AutomationRunContext } from "./run-context" +import { AutomationRunContext, type AutomationRunBlocker } from "./run-context" export const sessionPromptExecutor: Automation.RunExecutor = async ({ definition, run, attendance, signal }) => { const sessionID = @@ -11,18 +11,23 @@ export const sessionPromptExecutor: Automation.RunExecutor = async ({ definition : (await Session.create({ title: `Automation: ${definition.title}` })).id let currentRun = Automation.markRunStarted(run, sessionID) await Automation.publishRunUpdated(currentRun) - const context = AutomationRunContext.attended({ + const handlers = { stepCap: 50, - block: (blocker) => - Effect.sync(() => { + block: (blocker: AutomationRunBlocker) => + Effect.gen(function* () { + const previous = currentRun currentRun = Automation.markRunBlocked(currentRun, blocker) - }).pipe(Effect.flatMap(() => Effect.promise(() => Automation.publishRunUpdated(currentRun)))), + if (currentRun !== previous) yield* Effect.promise(() => Automation.publishRunUpdated(currentRun)) + }), clear: () => - Effect.sync(() => { + Effect.gen(function* () { + const previous = currentRun currentRun = Automation.clearRunBlocker(currentRun) - }).pipe(Effect.flatMap(() => Effect.promise(() => Automation.publishRunUpdated(currentRun)))), - }) - const scoped = attendance === "attended" ? context : AutomationRunContext.unattended(context) + if (currentRun !== previous) yield* Effect.promise(() => Automation.publishRunUpdated(currentRun)) + }), + } + const scoped = + attendance === "attended" ? AutomationRunContext.attended(handlers) : AutomationRunContext.unattended(handlers) const message = await SessionPrompt.promptWithAutomationContext( { sessionID, diff --git a/packages/opencode/src/server/instance/automation.ts b/packages/opencode/src/server/instance/automation.ts index 73c11544e..08e25e526 100644 --- a/packages/opencode/src/server/instance/automation.ts +++ b/packages/opencode/src/server/instance/automation.ts @@ -230,7 +230,7 @@ export const AutomationRoutes = (): Hono => "/:automationID/run", describeRoute({ summary: "Run automation now", - description: "Create a scheduled automation run record. Execution lands in a later PR.", + description: "Create a scheduled automation run record and start execution in the background.", operationId: "automation.runNow", responses: { 200: { diff --git a/packages/opencode/test/server/automation-runner.test.ts b/packages/opencode/test/server/automation-runner.test.ts index 136f41682..0b5f1bbd6 100644 --- a/packages/opencode/test/server/automation-runner.test.ts +++ b/packages/opencode/test/server/automation-runner.test.ts @@ -1,9 +1,11 @@ import { afterEach, describe, expect, test } from "bun:test" +import { Effect } from "effect" import { Automation } from "../../src/automation" +import { Bus } from "../../src/bus" import { Instance } from "../../src/project/instance" import { ProjectID } from "../../src/project/schema" import { SessionID } from "../../src/session/schema" -import { AutomationStepCapError } from "../../src/automation/run-context" +import { AutomationRunContext, AutomationStepCapError } from "../../src/automation/run-context" import { tmpdir } from "../fixture/fixture" afterEach(async () => { @@ -18,7 +20,7 @@ async function withAutomation(fn: (projectID: ProjectID) => Promise) { }) } -function input(projectID: ProjectID): Automation.CreateInput { +function input(projectID: ProjectID, overrides: Partial> = {}): Automation.CreateInput { return { kind: "recurring", title: "Repo brief", @@ -28,6 +30,7 @@ function input(projectID: ProjectID): Automation.CreateInput { timezone: "Asia/Shanghai", rhythm: { kind: "interval", everyMs: 60_000 }, stop: { kind: "count", count: 3 }, + ...overrides, } } @@ -104,9 +107,88 @@ describe("automation runNow execution", () => { }) expect(cleared.state).toBe("running") expect(cleared).not.toHaveProperty("blocker") + expect(Automation.clearRunBlocker(cleared)).toBe(cleared) }) }) + test("drops state-specific fields when a run transitions out of that state", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + + Automation.runNowExecuting(definition.id, { + executor: async ({ run }) => { + const started = Automation.markRunStarted(run, SessionID.descending(), { now: run.triggeredAt }) + Automation.markRunBlocked(started, { kind: "question", callID: "call_1" }) + throw new Error("boom") + }, + }) + + const failed = await waitForRun(definition.id, "failed") + expect(failed).not.toHaveProperty("blocker") + + let release!: () => void + const held = new Promise((resolve) => { + release = resolve + }) + Automation.runNowExecuting(definition.id, { + executor: async () => { + await held + return { sessionID: SessionID.descending(), result: "first", cost: 0 } + }, + }) + Automation.runNowExecuting(definition.id, { + executor: async () => ({ sessionID: SessionID.descending(), result: "second", cost: 0 }), + }) + const stopped = await waitForRun(definition.id, "stopped") + if (stopped.completedAt === null) throw new Error("expected stopped run to have completedAt") + const restarted = Automation.markRunStarted(stopped, SessionID.descending(), { now: stopped.completedAt }) + expect(restarted).not.toHaveProperty("stopReason") + release() + }) + }) + + test("publishes continue-session definition updates from the latest definition", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID, { context: "continue" })) + const sessionID = SessionID.descending() + const definitionEvents: Automation.Definition[] = [] + const unsubscribe = Bus.subscribe(Automation.Event.DefinitionUpdated, (event) => { + definitionEvents.push(event.properties) + }) + + Automation.runNowExecuting(definition.id, { + executor: async () => { + Automation.update(definition.id, { title: "Updated repo brief" }) + return { sessionID, result: "done", cost: 0 } + }, + }) + + await waitForRun(definition.id, "succeeded") + unsubscribe() + const updated = Automation.get(definition.id) + expect(updated.title).toBe("Updated repo brief") + expect(updated.automationSessionID).toBe(sessionID) + expect(definitionEvents.at(-1)).toMatchObject({ + id: definition.id, + title: "Updated repo brief", + automationSessionID: sessionID, + }) + }) + }) + + test("unattended context construction overrides any existing attendance tag", async () => { + const handlers = { + stepCap: 50, + block: () => Effect.void, + clear: () => Effect.void, + } + const attended = AutomationRunContext.attended(handlers) + const unattended = AutomationRunContext.unattended(attended) + + expect(attended.attendance).toBe("attended") + expect(unattended.attendance).toBe("unattended") + }) + test("records hard step-cap failures with the frozen stop code", async () => { await withAutomation(async (projectID) => { const definition = Automation.create(input(projectID)) From d5d233bb70045f8e474968a20b715df0940fef4b Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 15:20:51 +0800 Subject: [PATCH 05/12] fix: guard automation run writer boundaries --- packages/opencode/src/automation/index.ts | 21 ++++--- .../src/server/instance/automation.ts | 2 +- .../test/server/automation-runner.test.ts | 56 ++++++++++++++++--- 3 files changed, 62 insertions(+), 17 deletions(-) diff --git a/packages/opencode/src/automation/index.ts b/packages/opencode/src/automation/index.ts index f8c7e2c79..4dc6b24c7 100644 --- a/packages/opencode/src/automation/index.ts +++ b/packages/opencode/src/automation/index.ts @@ -233,9 +233,9 @@ export namespace Automation { type State = { definitions: Map runs: Map - activeRuns: Set + activeWriters: Set } - const state = Instance.state(() => ({ definitions: new Map(), runs: new Map(), activeRuns: new Set() })) + const state = Instance.state(() => ({ definitions: new Map(), runs: new Map(), activeWriters: new Set() })) export type RunExecutor = (input: { definition: Definition @@ -425,6 +425,10 @@ export namespace Automation { return definition } + function getOptional(id: string): Definition | undefined { + return state().definitions.get(id) + } + function isSameValue(left: unknown, right: unknown): boolean { if (Object.is(left, right)) return true if (Array.isArray(left) || Array.isArray(right)) { @@ -563,7 +567,9 @@ export namespace Automation { async function executeRun(initial: Run, executor: RunExecutor, attendance: AutomationRunAttendance) { const data = state() - if (data.activeRuns.has(initial.automationID)) { + const definition = get(initial.automationID) + const writerKey = definition.where.worktree ?? definition.where.projectID + if (data.activeWriters.has(writerKey)) { const stopped = reviseRun(initial, { state: "stopped", completedAt: Date.now(), @@ -572,18 +578,17 @@ export namespace Automation { await publishRunUpdated(stopped) return } - data.activeRuns.add(initial.automationID) + data.activeWriters.add(writerKey) const controller = new AbortController() let current = initial try { - const definition = get(initial.automationID) const prepared = await executor({ definition, run: initial, attendance, signal: controller.signal }) const latest = state().runs.get(initial.automationID)?.find((item) => item.id === initial.id) ?? initial const running = latest.state === "scheduled" ? markRunStarted(latest, prepared.sessionID) : latest current = running await publishRunUpdated(running) - const latestDefinition = get(initial.automationID) - if (latestDefinition.context === "continue") { + const latestDefinition = getOptional(initial.automationID) + if (latestDefinition?.context === "continue") { const updatedDefinition = setDefinitionAutomationSession(latestDefinition, prepared.sessionID) if (updatedDefinition !== latestDefinition) await publishDefinitionUpdated(updatedDefinition) } @@ -618,7 +623,7 @@ export namespace Automation { }) await publishRunUpdated(failed) } finally { - data.activeRuns.delete(initial.automationID) + data.activeWriters.delete(writerKey) } } diff --git a/packages/opencode/src/server/instance/automation.ts b/packages/opencode/src/server/instance/automation.ts index 08e25e526..426a52bfd 100644 --- a/packages/opencode/src/server/instance/automation.ts +++ b/packages/opencode/src/server/instance/automation.ts @@ -230,7 +230,7 @@ export const AutomationRoutes = (): Hono => "/:automationID/run", describeRoute({ summary: "Run automation now", - description: "Create a scheduled automation run record and start execution in the background.", + description: "Create a queued automation run, start execution in the background, and return the queued run immediately.", operationId: "automation.runNow", responses: { 200: { diff --git a/packages/opencode/test/server/automation-runner.test.ts b/packages/opencode/test/server/automation-runner.test.ts index 0b5f1bbd6..10c73f12f 100644 --- a/packages/opencode/test/server/automation-runner.test.ts +++ b/packages/opencode/test/server/automation-runner.test.ts @@ -66,29 +66,36 @@ describe("automation runNow execution", () => { }) }) - test("keeps one active writer per automation", async () => { + test("keeps one active writer per project", async () => { await withAutomation(async (projectID) => { - const definition = Automation.create(input(projectID)) + const first = Automation.create(input(projectID, { title: "First automation" })) + const second = Automation.create(input(projectID, { title: "Second automation" })) let release!: () => void const held = new Promise((resolve) => { release = resolve }) + let entered = 0 - Automation.runNowExecuting(definition.id, { + Automation.runNowExecuting(first.id, { executor: async () => { + entered++ await held return { sessionID: SessionID.descending(), result: "first", cost: 0 } }, }) - Automation.runNowExecuting(definition.id, { - executor: async () => ({ sessionID: SessionID.descending(), result: "second", cost: 0 }), + Automation.runNowExecuting(second.id, { + executor: async () => { + entered++ + return { sessionID: SessionID.descending(), result: "second", cost: 0 } + }, }) - const stopped = await waitForRun(definition.id, "stopped") + const stopped = await waitForRun(second.id, "stopped") if (stopped.state !== "stopped") throw new Error("expected stopped run") expect(stopped.stopReason).toBe("previous_run_awaiting_input") + expect(entered).toBe(1) release() - const succeeded = await waitForRun(definition.id, "succeeded") + const succeeded = await waitForRun(first.id, "succeeded") expect(succeeded.result).toBe("first") }) }) @@ -158,7 +165,7 @@ describe("automation runNow execution", () => { Automation.runNowExecuting(definition.id, { executor: async () => { - Automation.update(definition.id, { title: "Updated repo brief" }) + Automation.update(definition.id, { title: "Updated repo brief", prompt: "Use the latest prompt." }) return { sessionID, result: "done", cost: 0 } }, }) @@ -167,15 +174,48 @@ describe("automation runNow execution", () => { unsubscribe() const updated = Automation.get(definition.id) expect(updated.title).toBe("Updated repo brief") + expect(updated.prompt).toBe("Use the latest prompt.") expect(updated.automationSessionID).toBe(sessionID) expect(definitionEvents.at(-1)).toMatchObject({ id: definition.id, title: "Updated repo brief", + prompt: "Use the latest prompt.", automationSessionID: sessionID, }) }) }) + test("does not revive a continue automation deleted during execution", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID, { context: "continue" })) + const definitionEvents: Automation.Definition[] = [] + const terminalRun = new Promise((resolve) => { + const unsubscribe = Bus.subscribe(Automation.Event.RunUpdated, (event) => { + const run = event.properties + if (run.automationID !== definition.id) return + if (run.state !== "succeeded" && run.state !== "failed" && run.state !== "stopped") return + unsubscribe() + resolve(run) + }) + }) + const unsubscribeDefinition = Bus.subscribe(Automation.Event.DefinitionUpdated, (event) => { + definitionEvents.push(event.properties) + }) + + Automation.runNowExecuting(definition.id, { + executor: async () => { + Automation.remove(definition.id) + return { sessionID: SessionID.descending(), result: "done", cost: 0 } + }, + }) + + await terminalRun + unsubscribeDefinition() + expect(() => Automation.get(definition.id)).toThrow() + expect(definitionEvents).toHaveLength(0) + }) + }) + test("unattended context construction overrides any existing attendance tag", async () => { const handlers = { stepCap: 50, From b66d426de3e30e7211b33c70da16448f3bbd0cdb Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 15:56:59 +0800 Subject: [PATCH 06/12] fix: cancel active automation on delete --- packages/opencode/src/automation/index.ts | 49 ++++++++++++++++-- .../src/server/instance/automation.ts | 7 +-- .../test/server/automation-runner.test.ts | 50 +++++++++++++++---- 3 files changed, 88 insertions(+), 18 deletions(-) diff --git a/packages/opencode/src/automation/index.ts b/packages/opencode/src/automation/index.ts index 4dc6b24c7..b82794961 100644 --- a/packages/opencode/src/automation/index.ts +++ b/packages/opencode/src/automation/index.ts @@ -234,8 +234,14 @@ export namespace Automation { definitions: Map runs: Map activeWriters: Set + activeRuns: Map } - const state = Instance.state(() => ({ definitions: new Map(), runs: new Map(), activeWriters: new Set() })) + const state = Instance.state(() => ({ + definitions: new Map(), + runs: new Map(), + activeWriters: new Set(), + activeRuns: new Map(), + })) export type RunExecutor = (input: { definition: Definition @@ -466,11 +472,12 @@ export namespace Automation { return next } - export function remove(id: string): Tombstone { + export function remove(id: string): { tombstone: Tombstone; stoppedRun?: Run } { const previous = get(id) + const stoppedRun = stopActiveRun(id) state().definitions.delete(id) - state().runs.delete(id) - return { id: previous.id, deleted: true, revision: previous.revision + 1 } + if (!stoppedRun) state().runs.delete(id) + return { tombstone: { id: previous.id, deleted: true, revision: previous.revision + 1 }, stoppedRun } } function replaceRun(run: Run) { @@ -496,6 +503,27 @@ export namespace Automation { return replaceRun(Run.parse(next)) } + function stopRun(run: Run, stopReason: Extract["stopReason"]): Run { + if (run.state === "stopped" || run.state === "succeeded" || run.state === "failed") return run + return reviseRun(run, { + state: "stopped", + completedAt: Date.now(), + result: null, + error: null, + stopReason, + }) + } + + function stopActiveRun(automationID: string) { + const active = state().activeRuns.get(automationID) + if (!active) return undefined + active.controller.abort() + const current = state().runs.get(automationID)?.find((run) => ( + run.state === "scheduled" || run.state === "running" || run.state === "awaiting_input" + )) + return current ? stopRun(current, "cancelled") : undefined + } + export function markRunStarted(run: Run, sessionID: SessionID, options?: { now?: number }): Run { return reviseRun(run, { state: "running", @@ -580,10 +608,17 @@ export namespace Automation { } data.activeWriters.add(writerKey) const controller = new AbortController() + data.activeRuns.set(initial.automationID, { writerKey, controller }) let current = initial try { const prepared = await executor({ definition, run: initial, attendance, signal: controller.signal }) const latest = state().runs.get(initial.automationID)?.find((item) => item.id === initial.id) ?? initial + if (controller.signal.aborted) { + const stopped = stopRun(latest, "cancelled") + current = stopped + if (stopped !== latest) await publishRunUpdated(stopped) + return + } const running = latest.state === "scheduled" ? markRunStarted(latest, prepared.sessionID) : latest current = running await publishRunUpdated(running) @@ -603,6 +638,11 @@ export namespace Automation { await publishRunUpdated(succeeded) } catch (error) { current = state().runs.get(initial.automationID)?.find((item) => item.id === initial.id) ?? current + if (controller.signal.aborted) { + const stopped = stopRun(current, "cancelled") + if (stopped !== current) await publishRunUpdated(stopped) + return + } if (current.state === "scheduled") { const stopped = reviseRun(current, { state: "stopped", @@ -623,6 +663,7 @@ export namespace Automation { }) await publishRunUpdated(failed) } finally { + data.activeRuns.delete(initial.automationID) data.activeWriters.delete(writerKey) } } diff --git a/packages/opencode/src/server/instance/automation.ts b/packages/opencode/src/server/instance/automation.ts index 426a52bfd..c12882863 100644 --- a/packages/opencode/src/server/instance/automation.ts +++ b/packages/opencode/src/server/instance/automation.ts @@ -221,9 +221,10 @@ export const AutomationRoutes = (): Hono => }), validator("param", z.object({ automationID: AutomationID.Definition.zod })), async (c) => { - const tombstone = Automation.remove(c.req.valid("param").automationID) - await Automation.publishDefinitionDeleted(tombstone) - return c.json(tombstone) + const removed = Automation.remove(c.req.valid("param").automationID) + if (removed.stoppedRun) await Automation.publishRunUpdated(removed.stoppedRun) + await Automation.publishDefinitionDeleted(removed.tombstone) + return c.json(removed.tombstone) }, ) .post( diff --git a/packages/opencode/test/server/automation-runner.test.ts b/packages/opencode/test/server/automation-runner.test.ts index 10c73f12f..924596779 100644 --- a/packages/opencode/test/server/automation-runner.test.ts +++ b/packages/opencode/test/server/automation-runner.test.ts @@ -189,33 +189,61 @@ describe("automation runNow execution", () => { await withAutomation(async (projectID) => { const definition = Automation.create(input(projectID, { context: "continue" })) const definitionEvents: Automation.Definition[] = [] - const terminalRun = new Promise((resolve) => { - const unsubscribe = Bus.subscribe(Automation.Event.RunUpdated, (event) => { - const run = event.properties - if (run.automationID !== definition.id) return - if (run.state !== "succeeded" && run.state !== "failed" && run.state !== "stopped") return - unsubscribe() - resolve(run) - }) - }) const unsubscribeDefinition = Bus.subscribe(Automation.Event.DefinitionUpdated, (event) => { definitionEvents.push(event.properties) }) + let removed!: ReturnType Automation.runNowExecuting(definition.id, { executor: async () => { - Automation.remove(definition.id) + removed = Automation.remove(definition.id) return { sessionID: SessionID.descending(), result: "done", cost: 0 } }, }) - await terminalRun + await Bun.sleep(20) unsubscribeDefinition() + expect(removed.stoppedRun).toMatchObject({ state: "stopped", stopReason: "cancelled" }) expect(() => Automation.get(definition.id)).toThrow() expect(definitionEvents).toHaveLength(0) }) }) + test("aborts an active run when its automation is deleted", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + const sessionID = SessionID.descending() + let sawAbort = false + const started = Promise.withResolvers() + const release = Promise.withResolvers() + + Automation.runNowExecuting(definition.id, { + executor: async ({ run, signal }) => { + Automation.markRunStarted(run, sessionID, { now: run.triggeredAt }) + signal.addEventListener("abort", () => { + sawAbort = true + release.resolve() + }) + started.resolve() + await release.promise + return { sessionID, result: "should not succeed", cost: 0 } + }, + }) + + await started.promise + const removed = Automation.remove(definition.id) + + expect(sawAbort).toBe(true) + expect(removed.stoppedRun).toMatchObject({ + state: "stopped", + sessionID, + stopReason: "cancelled", + }) + await Bun.sleep(20) + expect(removed.stoppedRun).not.toMatchObject({ state: "succeeded" }) + }) + }) + test("unattended context construction overrides any existing attendance tag", async () => { const handlers = { stepCap: 50, From cc736506269ad0c4fdd94a7e7b29b1cc5f990bf2 Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 16:02:16 +0800 Subject: [PATCH 07/12] test: keep automation temp directory alive --- packages/opencode/test/server/automation-runner.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/opencode/test/server/automation-runner.test.ts b/packages/opencode/test/server/automation-runner.test.ts index 924596779..b10a87240 100644 --- a/packages/opencode/test/server/automation-runner.test.ts +++ b/packages/opencode/test/server/automation-runner.test.ts @@ -14,7 +14,7 @@ afterEach(async () => { async function withAutomation(fn: (projectID: ProjectID) => Promise) { await using tmp = await tmpdir({ git: true }) - return Instance.provide({ + return await Instance.provide({ directory: tmp.path, fn: () => fn(Instance.project.id), }) From eda2ac434bc814ba39307f016c75d0ac9f1dcc44 Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 16:34:17 +0800 Subject: [PATCH 08/12] fix: cancel automation session prompts --- packages/opencode/src/automation/runner.ts | 69 +++++----- .../test/server/automation-runner.test.ts | 122 ++++++++++++++++++ 2 files changed, 162 insertions(+), 29 deletions(-) diff --git a/packages/opencode/src/automation/runner.ts b/packages/opencode/src/automation/runner.ts index 1e1243cf7..a7cee0309 100644 --- a/packages/opencode/src/automation/runner.ts +++ b/packages/opencode/src/automation/runner.ts @@ -5,40 +5,51 @@ import { SessionPrompt } from "@/session/prompt" import { AutomationRunContext, type AutomationRunBlocker } from "./run-context" export const sessionPromptExecutor: Automation.RunExecutor = async ({ definition, run, attendance, signal }) => { + signal.throwIfAborted() const sessionID = definition.context === "continue" && definition.automationSessionID ? definition.automationSessionID : (await Session.create({ title: `Automation: ${definition.title}` })).id - let currentRun = Automation.markRunStarted(run, sessionID) - await Automation.publishRunUpdated(currentRun) - const handlers = { - stepCap: 50, - block: (blocker: AutomationRunBlocker) => - Effect.gen(function* () { - const previous = currentRun - currentRun = Automation.markRunBlocked(currentRun, blocker) - if (currentRun !== previous) yield* Effect.promise(() => Automation.publishRunUpdated(currentRun)) - }), - clear: () => - Effect.gen(function* () { - const previous = currentRun - currentRun = Automation.clearRunBlocker(currentRun) - if (currentRun !== previous) yield* Effect.promise(() => Automation.publishRunUpdated(currentRun)) - }), + const cancelPrompt = () => { + void SessionPrompt.cancel(sessionID, { source: "automation.cancel" }).catch(() => undefined) } - const scoped = - attendance === "attended" ? AutomationRunContext.attended(handlers) : AutomationRunContext.unattended(handlers) - const message = await SessionPrompt.promptWithAutomationContext( - { + if (signal.aborted) cancelPrompt() + else signal.addEventListener("abort", cancelPrompt, { once: true }) + try { + signal.throwIfAborted() + let currentRun = Automation.markRunStarted(run, sessionID) + await Automation.publishRunUpdated(currentRun) + const handlers = { + stepCap: 50, + block: (blocker: AutomationRunBlocker) => + Effect.gen(function* () { + const previous = currentRun + currentRun = Automation.markRunBlocked(currentRun, blocker) + if (currentRun !== previous) yield* Effect.promise(() => Automation.publishRunUpdated(currentRun)) + }), + clear: () => + Effect.gen(function* () { + const previous = currentRun + currentRun = Automation.clearRunBlocker(currentRun) + if (currentRun !== previous) yield* Effect.promise(() => Automation.publishRunUpdated(currentRun)) + }), + } + const scoped = + attendance === "attended" ? AutomationRunContext.attended(handlers) : AutomationRunContext.unattended(handlers) + const message = await SessionPrompt.promptWithAutomationContext( + { + sessionID, + parts: [{ type: "text", text: definition.prompt }], + }, + scoped, + ) + signal.throwIfAborted() + return { sessionID, - parts: [{ type: "text", text: definition.prompt }], - }, - scoped, - ) - signal.throwIfAborted() - return { - sessionID, - result: message.parts.find((part) => part.type === "text")?.text ?? null, - cost: message.info.role === "assistant" ? message.info.cost : null, + result: message.parts.find((part) => part.type === "text")?.text ?? null, + cost: message.info.role === "assistant" ? message.info.cost : null, + } + } finally { + signal.removeEventListener("abort", cancelPrompt) } } diff --git a/packages/opencode/test/server/automation-runner.test.ts b/packages/opencode/test/server/automation-runner.test.ts index b10a87240..9df9ae0e7 100644 --- a/packages/opencode/test/server/automation-runner.test.ts +++ b/packages/opencode/test/server/automation-runner.test.ts @@ -1,9 +1,12 @@ +import path from "path" import { afterEach, describe, expect, test } from "bun:test" import { Effect } from "effect" import { Automation } from "../../src/automation" +import { sessionPromptExecutor } from "../../src/automation/runner" import { Bus } from "../../src/bus" import { Instance } from "../../src/project/instance" import { ProjectID } from "../../src/project/schema" +import { Session } from "../../src/session" import { SessionID } from "../../src/session/schema" import { AutomationRunContext, AutomationStepCapError } from "../../src/automation/run-context" import { tmpdir } from "../fixture/fixture" @@ -44,6 +47,63 @@ async function waitForRun(automationID: string, state: Automation.Run["state"]) throw new Error(`Timed out waiting for ${state}`) } +function defer() { + let resolve!: (value: T | PromiseLike) => void + const promise = new Promise((done) => { + resolve = done + }) + return { promise, resolve } +} + +function hangingChat(ready: () => void) { + const encoder = new TextEncoder() + let timer: ReturnType | undefined + const first = `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: { role: "assistant" } }], + })}\n\n` + const rest = + [ + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: { content: "late" } }], + })}`, + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: {}, finish_reason: "stop" }], + })}`, + "data: [DONE]", + ].join("\n\n") + "\n\n" + + return new ReadableStream({ + start(ctrl) { + ctrl.enqueue(encoder.encode(first)) + ready() + timer = setTimeout(() => { + ctrl.enqueue(encoder.encode(rest)) + ctrl.close() + }, 10_000) + }, + cancel() { + if (timer) clearTimeout(timer) + }, + }) +} + +async function waitForAbortedAssistant(sessionID: SessionID) { + const deadline = Date.now() + 1_000 + while (Date.now() < deadline) { + const messages = await Session.messages({ sessionID }) + const assistant = messages.findLast((message) => message.info.role === "assistant") + if (assistant?.info.role === "assistant" && assistant.info.error?.name === "MessageAbortedError") return assistant + await Bun.sleep(10) + } + throw new Error("Timed out waiting for aborted assistant message") +} + describe("automation runNow execution", () => { test("executes a run and records the terminal result", async () => { await withAutomation(async (projectID) => { @@ -244,6 +304,68 @@ describe("automation runNow execution", () => { }) }) + test("deleting an active automation cancels the real session prompt", async () => { + const ready = defer() + const server = Bun.serve({ + port: 0, + fetch(req) { + const url = new URL(req.url) + if (!url.pathname.endsWith("/chat/completions")) return new Response("not found", { status: 404 }) + return new Response(hangingChat(() => ready.resolve()), { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }) + }, + }) + + try { + await using tmp = await tmpdir({ + git: true, + init: async (dir) => { + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + enabled_providers: ["alibaba"], + provider: { + alibaba: { + options: { + apiKey: "test-key", + baseURL: `${server.url.origin}/v1`, + }, + }, + }, + agent: { + build: { + model: "alibaba/qwen-plus", + }, + }, + }), + ) + }, + }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const definition = Automation.create(input(Instance.project.id, { title: "Cancel real prompt" })) + + Automation.runNowExecuting(definition.id, { executor: sessionPromptExecutor }) + await ready.promise + + const removed = Automation.remove(definition.id) + const stoppedRun = removed.stoppedRun + expect(stoppedRun).toMatchObject({ state: "stopped", stopReason: "cancelled" }) + if (!stoppedRun?.sessionID) throw new Error("expected stopped run to keep its sessionID") + + await waitForAbortedAssistant(stoppedRun.sessionID) + }, + }) + } finally { + void server.stop(true) + } + }) + test("unattended context construction overrides any existing attendance tag", async () => { const handlers = { stepCap: 50, From decdbb22103e1a0625867419d9b690c5612bb0f2 Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 17:22:42 +0800 Subject: [PATCH 09/12] fix: honor automation abort before prompt start --- packages/opencode/src/automation/runner.ts | 1 + packages/opencode/src/session/prompt.ts | 65 +++++++++------- .../test/server/automation-runner.test.ts | 75 +++++++++++++++++++ 3 files changed, 116 insertions(+), 25 deletions(-) diff --git a/packages/opencode/src/automation/runner.ts b/packages/opencode/src/automation/runner.ts index a7cee0309..1c86f7ee8 100644 --- a/packages/opencode/src/automation/runner.ts +++ b/packages/opencode/src/automation/runner.ts @@ -42,6 +42,7 @@ export const sessionPromptExecutor: Automation.RunExecutor = async ({ definition parts: [{ type: "text", text: definition.prompt }], }, scoped, + { abortSignal: signal }, ) signal.throwIfAborted() return { diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index bfc0a68dd..25ff152a4 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -87,6 +87,10 @@ type TitleGenerationProgress = { completedAt?: number } +type PromptRuntimeOptions = { + abortSignal?: AbortSignal +} + export function titleGenerationStateAtAbort( progress: TitleGenerationProgress | undefined, abortRecordedAt: number, @@ -266,8 +270,8 @@ function modelCanReadMedia(model: Provider.Model, kind: MediaInputKind) { export interface Interface { readonly cancel: (sessionID: SessionID, options?: { source?: string }) => Effect.Effect - readonly prompt: (input: PromptInput) => Effect.Effect - readonly loop: (input: z.infer) => Effect.Effect + readonly prompt: (input: PromptInput, options?: PromptRuntimeOptions) => Effect.Effect + readonly loop: (input: z.infer, options?: PromptRuntimeOptions) => Effect.Effect readonly shell: (input: ShellInput) => Effect.Effect readonly command: (input: CommandInput) => Effect.Effect readonly resolvePromptParts: (template: string) => Effect.Effect @@ -304,6 +308,9 @@ export const layer = Layer.effect( const runner = Effect.fn("SessionPrompt.runner")(function* () { return yield* EffectBridge.make() }) + const throwIfAborted = Effect.fn("SessionPrompt.throwIfAborted")(function* (options?: PromptRuntimeOptions) { + options?.abortSignal?.throwIfAborted() + }) // Tracks subagent sessions whose runner.onInterrupt fired during the most recent prompt run. // Reset at the start of each prompt() call; written when loop()'s onInterrupt arg fires; read // by AgentTool.execute via AgentPromptOps.wasInterrupted to deterministically distinguish @@ -1837,27 +1844,29 @@ NOTE: At any point in time through this workflow you should feel free to ask the return { info, parts } }, Effect.scoped) - const prompt: (input: PromptInput) => Effect.Effect = Effect.fn("SessionPrompt.prompt")( - function* (input: PromptInput) { - interruptedSessions.delete(input.sessionID) - const session = yield* sessions.get(input.sessionID) - yield* revert.cleanup(session) - const message = yield* createUserMessage(input) - yield* sessions.touch(input.sessionID) - - const permissions: Permission.Ruleset = [] - for (const [t, enabled] of Object.entries(input.tools ?? {})) { - permissions.push({ permission: t, action: enabled ? "allow" : "deny", pattern: "*" }) - } - if (permissions.length > 0) { - session.permission = permissions - yield* sessions.setPermission({ sessionID: session.id, permission: permissions }) - } + const prompt: (input: PromptInput, options?: PromptRuntimeOptions) => Effect.Effect = Effect.fn( + "SessionPrompt.prompt", + )(function* (input: PromptInput, options?: PromptRuntimeOptions) { + yield* throwIfAborted(options) + interruptedSessions.delete(input.sessionID) + const session = yield* sessions.get(input.sessionID) + yield* revert.cleanup(session) + const message = yield* createUserMessage(input) + yield* sessions.touch(input.sessionID) + + const permissions: Permission.Ruleset = [] + for (const [t, enabled] of Object.entries(input.tools ?? {})) { + permissions.push({ permission: t, action: enabled ? "allow" : "deny", pattern: "*" }) + } + if (permissions.length > 0) { + session.permission = permissions + yield* sessions.setPermission({ sessionID: session.id, permission: permissions }) + } - if (input.noReply === true) return message - return yield* loop({ sessionID: input.sessionID, traceMessageID: message.info.id }) - }, - ) + yield* throwIfAborted(options) + if (input.noReply === true) return message + return yield* loop({ sessionID: input.sessionID, traceMessageID: message.info.id }, options) + }) const appendRunLifecycleEvent = Effect.fn("SessionPrompt.appendRunLifecycleEvent")(function* ( sessionID: SessionID, @@ -2269,9 +2278,12 @@ NOTE: At any point in time through this workflow you should feel free to ask the }, ) - const loop: (input: z.infer) => Effect.Effect = Effect.fn( + const loop: ( + input: z.infer, + options?: PromptRuntimeOptions, + ) => Effect.Effect = Effect.fn( "SessionPrompt.loop", - )(function* (input: z.infer) { + )(function* (input: z.infer, options?: PromptRuntimeOptions) { const onInterrupt = (meta?: { source?: string reason?: string @@ -2409,6 +2421,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the return assistant }) const work = Effect.gen(function* () { + yield* throwIfAborted(options) // Two reasons busy goes first. (1) The compaction part event must // not race ahead of `session.status: busy` — the divider's // "no summary + not working" branch would otherwise flash the @@ -2469,6 +2482,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the appendRunLifecycleEvent(input.sessionID, input.traceMessageID!, event), } : undefined + yield* throwIfAborted(options) return yield* state.ensureRunning(input.sessionID, onInterrupt, work, { rejectIfBusy: input.prelude !== undefined, runLifecycle, @@ -2752,10 +2766,11 @@ export async function prompt(input: PromptInput) { export async function promptWithAutomationContext( input: PromptInput, context: import("@/automation/run-context").AutomationRunContext, + options?: PromptRuntimeOptions, ) { return runPromise((svc) => svc - .prompt(PromptInput.parse(input)) + .prompt(PromptInput.parse(input), options) .pipe(Effect.provideService(AutomationRunContext.service, context)), ) } diff --git a/packages/opencode/test/server/automation-runner.test.ts b/packages/opencode/test/server/automation-runner.test.ts index 9df9ae0e7..082ecc8de 100644 --- a/packages/opencode/test/server/automation-runner.test.ts +++ b/packages/opencode/test/server/automation-runner.test.ts @@ -366,6 +366,81 @@ describe("automation runNow execution", () => { } }) + test("deleting after run start but before prompt runner is busy does not call the provider", async () => { + let providerCalls = 0 + const server = Bun.serve({ + port: 0, + fetch(req) { + const url = new URL(req.url) + if (!url.pathname.endsWith("/chat/completions")) return new Response("not found", { status: 404 }) + providerCalls++ + return new Response(hangingChat(() => undefined), { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }) + }, + }) + + try { + await using tmp = await tmpdir({ + git: true, + init: async (dir) => { + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + enabled_providers: ["alibaba"], + provider: { + alibaba: { + options: { + apiKey: "test-key", + baseURL: `${server.url.origin}/v1`, + }, + }, + }, + agent: { + build: { + model: "alibaba/qwen-plus", + }, + }, + }), + ) + }, + }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const definition = Automation.create(input(Instance.project.id, { title: "Cancel before runner busy" })) + const removed = Promise.withResolvers>() + const unsubscribe = Bus.subscribe(Automation.Event.RunUpdated, (event) => { + if (event.properties.automationID !== definition.id || event.properties.state !== "running") return + removed.resolve(Automation.remove(definition.id)) + }) + + Automation.runNowExecuting(definition.id, { executor: sessionPromptExecutor }) + const result = await Promise.race([ + removed.promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error("timed out waiting for running run")), 1_000), + ), + ]) + unsubscribe() + + expect(result.stoppedRun).toMatchObject({ state: "stopped", stopReason: "cancelled" }) + await Bun.sleep(50) + expect(providerCalls).toBe(0) + if (result.stoppedRun?.sessionID) { + const messages = await Session.messages({ sessionID: result.stoppedRun.sessionID }) + expect(messages.some((message) => message.info.role === "assistant")).toBe(false) + } + }, + }) + } finally { + void server.stop(true) + } + }) + test("unattended context construction overrides any existing attendance tag", async () => { const handlers = { stepCap: 50, From 39f7cee1e50fd21377afc6b96574e48904369e17 Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 17:43:04 +0800 Subject: [PATCH 10/12] test: strengthen automation delete coverage --- packages/opencode/src/server/instance/automation.ts | 3 ++- .../opencode/test/server/automation-routes.test.ts | 10 ++++++++++ .../opencode/test/server/automation-runner.test.ts | 7 ++++++- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/packages/opencode/src/server/instance/automation.ts b/packages/opencode/src/server/instance/automation.ts index c12882863..07a66786b 100644 --- a/packages/opencode/src/server/instance/automation.ts +++ b/packages/opencode/src/server/instance/automation.ts @@ -209,7 +209,8 @@ export const AutomationRoutes = (): Hono => "/:automationID", describeRoute({ summary: "Delete automation", - description: "Delete an automation definition and return a tombstone.", + description: + "Delete an automation definition and return a tombstone. If a run is active, stop it and publish the stopped run before publishing the tombstone.", operationId: "automation.delete", responses: { 200: { diff --git a/packages/opencode/test/server/automation-routes.test.ts b/packages/opencode/test/server/automation-routes.test.ts index c5af72285..33cf91ee0 100644 --- a/packages/opencode/test/server/automation-routes.test.ts +++ b/packages/opencode/test/server/automation-routes.test.ts @@ -471,6 +471,16 @@ describe("automation routes", () => { expect(spec.components?.schemas).toHaveProperty("AutomationValidationError") }) + test("openapi describes delete active-run stop side effect", async () => { + const { Server } = await import("../../src/server/server") + const spec = await Server.openapi() + const paths = spec.paths as Record + const description = paths["/automation/{automationID}"].delete.description + + expect(description).toContain("If a run is active") + expect(description).toContain("publish the stopped run") + }) + test("runNow returns the queued run before background execution updates it", async () => { await withAutomationApp(async ({ app, projectID }) => { const created = await json(app, "/automation", { diff --git a/packages/opencode/test/server/automation-runner.test.ts b/packages/opencode/test/server/automation-runner.test.ts index 082ecc8de..ba27e769a 100644 --- a/packages/opencode/test/server/automation-runner.test.ts +++ b/packages/opencode/test/server/automation-runner.test.ts @@ -276,6 +276,10 @@ describe("automation runNow execution", () => { let sawAbort = false const started = Promise.withResolvers() const release = Promise.withResolvers() + const runEvents: Automation.Run[] = [] + const unsubscribeRun = Bus.subscribe(Automation.Event.RunUpdated, (event) => { + if (event.properties.automationID === definition.id) runEvents.push(event.properties) + }) Automation.runNowExecuting(definition.id, { executor: async ({ run, signal }) => { @@ -300,7 +304,8 @@ describe("automation runNow execution", () => { stopReason: "cancelled", }) await Bun.sleep(20) - expect(removed.stoppedRun).not.toMatchObject({ state: "succeeded" }) + unsubscribeRun() + expect(runEvents.some((event) => event.state === "succeeded")).toBe(false) }) }) From 722a6f6e9c912b11c81975c78319d720919c15e0 Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 17:58:51 +0800 Subject: [PATCH 11/12] fix: avoid duplicate automation running events --- packages/opencode/src/automation/index.ts | 2 +- .../test/server/automation-runner.test.ts | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/packages/opencode/src/automation/index.ts b/packages/opencode/src/automation/index.ts index b82794961..4b70ebc15 100644 --- a/packages/opencode/src/automation/index.ts +++ b/packages/opencode/src/automation/index.ts @@ -621,7 +621,7 @@ export namespace Automation { } const running = latest.state === "scheduled" ? markRunStarted(latest, prepared.sessionID) : latest current = running - await publishRunUpdated(running) + if (running !== latest) await publishRunUpdated(running) const latestDefinition = getOptional(initial.automationID) if (latestDefinition?.context === "continue") { const updatedDefinition = setDefinitionAutomationSession(latestDefinition, prepared.sessionID) diff --git a/packages/opencode/test/server/automation-runner.test.ts b/packages/opencode/test/server/automation-runner.test.ts index ba27e769a..710e07142 100644 --- a/packages/opencode/test/server/automation-runner.test.ts +++ b/packages/opencode/test/server/automation-runner.test.ts @@ -126,6 +126,29 @@ describe("automation runNow execution", () => { }) }) + test("does not publish duplicate running events when the executor already started the run", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + const sessionID = SessionID.descending() + const runEvents: Automation.Run[] = [] + const unsubscribeRun = Bus.subscribe(Automation.Event.RunUpdated, (event) => { + if (event.properties.automationID === definition.id) runEvents.push(event.properties) + }) + + Automation.runNowExecuting(definition.id, { + executor: async ({ run }) => { + const started = Automation.markRunStarted(run, sessionID, { now: run.triggeredAt }) + await Automation.publishRunUpdated(started) + return { sessionID, result: "done", cost: 0 } + }, + }) + + await waitForRun(definition.id, "succeeded") + unsubscribeRun() + expect(runEvents.map((event) => event.state)).toEqual(["running", "succeeded"]) + }) + }) + test("keeps one active writer per project", async () => { await withAutomation(async (projectID) => { const first = Automation.create(input(projectID, { title: "First automation" })) From 67dc4f0caa933beaeb2d4ddee2a434d1e6ed9c4b Mon Sep 17 00:00:00 2001 From: Yuhan Lei Date: Fri, 29 May 2026 18:59:03 +0800 Subject: [PATCH 12/12] test: clean up automation run listener --- .../test/server/automation-runner.test.ts | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/packages/opencode/test/server/automation-runner.test.ts b/packages/opencode/test/server/automation-runner.test.ts index 710e07142..e356c1a1e 100644 --- a/packages/opencode/test/server/automation-runner.test.ts +++ b/packages/opencode/test/server/automation-runner.test.ts @@ -447,13 +447,17 @@ describe("automation runNow execution", () => { }) Automation.runNowExecuting(definition.id, { executor: sessionPromptExecutor }) - const result = await Promise.race([ - removed.promise, - new Promise((_, reject) => - setTimeout(() => reject(new Error("timed out waiting for running run")), 1_000), - ), - ]) - unsubscribe() + let result: ReturnType + try { + result = await Promise.race([ + removed.promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error("timed out waiting for running run")), 1_000), + ), + ]) + } finally { + unsubscribe() + } expect(result.stoppedRun).toMatchObject({ state: "stopped", stopReason: "cancelled" }) await Bun.sleep(50)