diff --git a/packages/opencode/src/automation/index.ts b/packages/opencode/src/automation/index.ts index 7e027763f..4b70ebc15 100644 --- a/packages/opencode/src/automation/index.ts +++ b/packages/opencode/src/automation/index.ts @@ -7,6 +7,7 @@ import { ProjectID } from "@/project/schema" import { PermissionID } from "@/permission/schema" import { SessionID } from "@/session/schema" import { NotFoundError } from "@/storage/db" +import type { AutomationRunAttendance, AutomationRunBlocker } from "./run-context" export const AutomationID = { Definition: { @@ -232,8 +233,22 @@ export namespace Automation { type State = { definitions: Map runs: Map + activeWriters: Set + activeRuns: Map } - const state = Instance.state(() => ({ definitions: new Map(), runs: new Map() })) + const state = Instance.state(() => ({ + definitions: new Map(), + runs: new Map(), + activeWriters: new Set(), + activeRuns: new Map(), + })) + + export type RunExecutor = (input: { + definition: Definition + run: Run + attendance: AutomationRunAttendance + signal: AbortSignal + }) => Promise<{ sessionID: SessionID; result: string | null; cost?: number | null }> const COMMON_CREATE_FIELDS = new Set([ "kind", @@ -416,6 +431,10 @@ export namespace Automation { return definition } + function getOptional(id: string): Definition | undefined { + return state().definitions.get(id) + } + function isSameValue(left: unknown, right: unknown): boolean { if (Object.is(left, right)) return true if (Array.isArray(left) || Array.isArray(right)) { @@ -453,11 +472,95 @@ export namespace Automation { return next } - export function remove(id: string): Tombstone { + export function remove(id: string): { tombstone: Tombstone; stoppedRun?: Run } { const previous = get(id) + const stoppedRun = stopActiveRun(id) state().definitions.delete(id) - state().runs.delete(id) - return { id: previous.id, deleted: true, revision: previous.revision + 1 } + if (!stoppedRun) state().runs.delete(id) + return { tombstone: { id: previous.id, deleted: true, revision: previous.revision + 1 }, stoppedRun } + } + + function replaceRun(run: Run) { + const current = state().runs.get(run.automationID) ?? [] + state().runs.set( + run.automationID, + current.map((item) => (item.id === run.id ? run : item)), + ) + return run + } + + function reviseRun(run: Run, patch: Record): Run { + const next = { + ...run, + ...patch, + revision: run.revision + 1, + } + if (next.state !== "awaiting_input") delete (next as Record).blocker + if (next.state !== "stopped") delete (next as Record).stopReason + for (const [key, value] of Object.entries(next)) { + if (value === undefined) delete (next as Record)[key] + } + return replaceRun(Run.parse(next)) + } + + function stopRun(run: Run, stopReason: Extract["stopReason"]): Run { + if (run.state === "stopped" || run.state === "succeeded" || run.state === "failed") return run + return reviseRun(run, { + state: "stopped", + completedAt: Date.now(), + result: null, + error: null, + stopReason, + }) + } + + function stopActiveRun(automationID: string) { + const active = state().activeRuns.get(automationID) + if (!active) return undefined + active.controller.abort() + const current = state().runs.get(automationID)?.find((run) => ( + run.state === "scheduled" || run.state === "running" || run.state === "awaiting_input" + )) + return current ? stopRun(current, "cancelled") : undefined + } + + export function markRunStarted(run: Run, sessionID: SessionID, options?: { now?: number }): Run { + return reviseRun(run, { + state: "running", + sessionID, + startedAt: options?.now ?? Date.now(), + completedAt: null, + result: null, + error: null, + }) + } + + function setDefinitionAutomationSession(definition: Definition, sessionID: SessionID) { + if (definition.automationSessionID === sessionID) return definition + const next = Definition.parse({ + ...definition, + automationSessionID: sessionID, + revision: definition.revision + 1, + updatedAt: Date.now(), + }) + state().definitions.set(definition.id, next) + return next + } + + export function markRunBlocked(run: Run, blocker: AutomationRunBlocker): Run { + if (run.state !== "running" && run.state !== "awaiting_input") return run + return reviseRun(run, { + state: "awaiting_input", + blocker, + }) + } + + export function clearRunBlocker(run: Run): Run { + if (run.state !== "awaiting_input") return run + return reviseRun(run, { + state: "running", + blocker: undefined, + }) } export function runNow(id: string, options?: { now?: number }): Run { @@ -481,6 +584,90 @@ export namespace Automation { return run } + export function runNowExecuting( + id: string, + options: { executor: RunExecutor; attendance?: AutomationRunAttendance; now?: number }, + ): Run { + const initial = runNow(id, { now: options.now }) + void executeRun(initial, options.executor, options.attendance ?? "attended") + return initial + } + + async function executeRun(initial: Run, executor: RunExecutor, attendance: AutomationRunAttendance) { + const data = state() + const definition = get(initial.automationID) + const writerKey = definition.where.worktree ?? definition.where.projectID + if (data.activeWriters.has(writerKey)) { + const stopped = reviseRun(initial, { + state: "stopped", + completedAt: Date.now(), + stopReason: "previous_run_awaiting_input", + }) + await publishRunUpdated(stopped) + return + } + data.activeWriters.add(writerKey) + const controller = new AbortController() + data.activeRuns.set(initial.automationID, { writerKey, controller }) + let current = initial + try { + const prepared = await executor({ definition, run: initial, attendance, signal: controller.signal }) + const latest = state().runs.get(initial.automationID)?.find((item) => item.id === initial.id) ?? initial + if (controller.signal.aborted) { + const stopped = stopRun(latest, "cancelled") + current = stopped + if (stopped !== latest) await publishRunUpdated(stopped) + return + } + const running = latest.state === "scheduled" ? markRunStarted(latest, prepared.sessionID) : latest + current = running + if (running !== latest) await publishRunUpdated(running) + const latestDefinition = getOptional(initial.automationID) + if (latestDefinition?.context === "continue") { + const updatedDefinition = setDefinitionAutomationSession(latestDefinition, prepared.sessionID) + if (updatedDefinition !== latestDefinition) await publishDefinitionUpdated(updatedDefinition) + } + const succeeded = reviseRun(running, { + state: "succeeded", + completedAt: Date.now(), + result: prepared.result, + error: null, + cost: prepared.cost ?? null, + }) + current = succeeded + await publishRunUpdated(succeeded) + } catch (error) { + current = state().runs.get(initial.automationID)?.find((item) => item.id === initial.id) ?? current + if (controller.signal.aborted) { + const stopped = stopRun(current, "cancelled") + if (stopped !== current) await publishRunUpdated(stopped) + return + } + if (current.state === "scheduled") { + const stopped = reviseRun(current, { + state: "stopped", + completedAt: Date.now(), + stopReason: "cancelled", + }) + await publishRunUpdated(stopped) + return + } + const isStepCap = error instanceof globalThis.Error && error.name === "AutomationStepCapError" + const failed = reviseRun(current, { + state: "failed", + completedAt: Date.now(), + error: { + code: isStepCap ? "step_cap" : "execution_failed", + message: error instanceof globalThis.Error ? error.message : String(error), + }, + }) + await publishRunUpdated(failed) + } finally { + data.activeRuns.delete(initial.automationID) + data.activeWriters.delete(writerKey) + } + } + export function runs(input: { automationID: string; limit?: number; cursor?: string }) { get(input.automationID) const limit = Math.min(Math.max(input.limit ?? 50, 1), 100) diff --git a/packages/opencode/src/automation/run-context.ts b/packages/opencode/src/automation/run-context.ts new file mode 100644 index 000000000..636d0647a --- /dev/null +++ b/packages/opencode/src/automation/run-context.ts @@ -0,0 +1,47 @@ +import { Context, Effect } from "effect" +import type { Permission } from "@/permission" +import type { PermissionID } from "@/permission/schema" + +export type AutomationRunAttendance = "attended" | "unattended" + +export type AutomationRunBlocker = + | { kind: "permission"; requestID: PermissionID } + | { kind: "question"; callID: string } + +export interface AutomationRunContext { + readonly attendance: AutomationRunAttendance + readonly stepCap?: number + readonly block: (blocker: AutomationRunBlocker) => Effect.Effect + readonly clear: () => Effect.Effect +} + +export class AutomationStepCapError extends Error { + readonly _tag = "AutomationStepCapError" + constructor(readonly stepCap: number) { + super(`Automation run exceeded the hard step cap (${stepCap}).`) + this.name = "AutomationStepCapError" + } +} + +export const AutomationRunContextService: Context.Reference = Context.Reference< + AutomationRunContext | undefined +>("@opencode/AutomationRunContext", { + defaultValue: () => undefined, +}) + +export const AutomationRunContext = { + service: AutomationRunContextService, + current: AutomationRunContextService, + attended(input: Pick): AutomationRunContext { + return { ...input, attendance: "attended" } + }, + unattended(input: Pick): AutomationRunContext { + return { ...input, attendance: "unattended" } + }, + permissionOnPending( + context: AutomationRunContext | undefined, + ): ((request: Permission.Request) => Effect.Effect) | undefined { + if (!context) return undefined + return (request) => context.block({ kind: "permission", requestID: request.id }) + }, +} diff --git a/packages/opencode/src/automation/runner.ts b/packages/opencode/src/automation/runner.ts new file mode 100644 index 000000000..1c86f7ee8 --- /dev/null +++ b/packages/opencode/src/automation/runner.ts @@ -0,0 +1,56 @@ +import { Effect } from "effect" +import { Automation } from "." +import { Session } from "@/session" +import { SessionPrompt } from "@/session/prompt" +import { AutomationRunContext, type AutomationRunBlocker } from "./run-context" + +export const sessionPromptExecutor: Automation.RunExecutor = async ({ definition, run, attendance, signal }) => { + signal.throwIfAborted() + const sessionID = + definition.context === "continue" && definition.automationSessionID + ? definition.automationSessionID + : (await Session.create({ title: `Automation: ${definition.title}` })).id + const cancelPrompt = () => { + void SessionPrompt.cancel(sessionID, { source: "automation.cancel" }).catch(() => undefined) + } + if (signal.aborted) cancelPrompt() + else signal.addEventListener("abort", cancelPrompt, { once: true }) + try { + signal.throwIfAborted() + let currentRun = Automation.markRunStarted(run, sessionID) + await Automation.publishRunUpdated(currentRun) + const handlers = { + stepCap: 50, + block: (blocker: AutomationRunBlocker) => + Effect.gen(function* () { + const previous = currentRun + currentRun = Automation.markRunBlocked(currentRun, blocker) + if (currentRun !== previous) yield* Effect.promise(() => Automation.publishRunUpdated(currentRun)) + }), + clear: () => + Effect.gen(function* () { + const previous = currentRun + currentRun = Automation.clearRunBlocker(currentRun) + if (currentRun !== previous) yield* Effect.promise(() => Automation.publishRunUpdated(currentRun)) + }), + } + const scoped = + attendance === "attended" ? AutomationRunContext.attended(handlers) : AutomationRunContext.unattended(handlers) + const message = await SessionPrompt.promptWithAutomationContext( + { + sessionID, + parts: [{ type: "text", text: definition.prompt }], + }, + scoped, + { abortSignal: signal }, + ) + signal.throwIfAborted() + return { + sessionID, + result: message.parts.find((part) => part.type === "text")?.text ?? null, + cost: message.info.role === "assistant" ? message.info.cost : null, + } + } finally { + signal.removeEventListener("abort", cancelPrompt) + } +} diff --git a/packages/opencode/src/permission/index.ts b/packages/opencode/src/permission/index.ts index 41a99e829..570becd2e 100644 --- a/packages/opencode/src/permission/index.ts +++ b/packages/opencode/src/permission/index.ts @@ -117,8 +117,12 @@ export namespace Permission { message: z.string().optional(), }) + export type AskOptions = z.infer & { + onPending?: (request: Request) => Effect.Effect + } + export interface Interface { - readonly ask: (input: z.infer) => Effect.Effect + readonly ask: (input: AskOptions) => Effect.Effect readonly reply: (input: z.infer) => Effect.Effect readonly clearSession: ( sessionID: SessionID, @@ -171,9 +175,9 @@ export namespace Permission { }), ) - const ask = Effect.fn("Permission.ask")(function* (input: z.infer) { + const ask = Effect.fn("Permission.ask")(function* (input: AskOptions) { const { approved, pending } = yield* InstanceState.get(state) - const { ruleset, ...request } = input + const { ruleset, onPending, ...request } = input let needsAsk = false const denied: Array<{ pattern: string; rule: Rule }> = [] @@ -222,6 +226,7 @@ export namespace Permission { const deferred = yield* Deferred.make() pending.set(id, { info, deferred }) + if (onPending) yield* onPending(info) yield* bus.publish(Event.Asked, info) return yield* Effect.ensuring( Deferred.await(deferred), diff --git a/packages/opencode/src/server/instance/automation.ts b/packages/opencode/src/server/instance/automation.ts index 12b6ceb91..07a66786b 100644 --- a/packages/opencode/src/server/instance/automation.ts +++ b/packages/opencode/src/server/instance/automation.ts @@ -3,6 +3,7 @@ import type { Context } from "hono" import { describeRoute, resolver, validator } from "hono-openapi" import z from "zod" import { Automation, AutomationID, ValidationError } from "@/automation" +import { sessionPromptExecutor } from "@/automation/runner" import { errors } from "../error" function validationError(error: ValidationError) { @@ -208,7 +209,8 @@ export const AutomationRoutes = (): Hono => "/:automationID", describeRoute({ summary: "Delete automation", - description: "Delete an automation definition and return a tombstone.", + description: + "Delete an automation definition and return a tombstone. If a run is active, stop it and publish the stopped run before publishing the tombstone.", operationId: "automation.delete", responses: { 200: { @@ -220,16 +222,17 @@ export const AutomationRoutes = (): Hono => }), validator("param", z.object({ automationID: AutomationID.Definition.zod })), async (c) => { - const tombstone = Automation.remove(c.req.valid("param").automationID) - await Automation.publishDefinitionDeleted(tombstone) - return c.json(tombstone) + const removed = Automation.remove(c.req.valid("param").automationID) + if (removed.stoppedRun) await Automation.publishRunUpdated(removed.stoppedRun) + await Automation.publishDefinitionDeleted(removed.tombstone) + return c.json(removed.tombstone) }, ) .post( "/:automationID/run", describeRoute({ summary: "Run automation now", - description: "Create a scheduled automation run record. Execution lands in a later PR.", + description: "Create a queued automation run, start execution in the background, and return the queued run immediately.", operationId: "automation.runNow", responses: { 200: { @@ -241,7 +244,9 @@ export const AutomationRoutes = (): Hono => }), validator("param", z.object({ automationID: AutomationID.Definition.zod })), async (c) => { - const run = Automation.runNow(c.req.valid("param").automationID) + const run = Automation.runNowExecuting(c.req.valid("param").automationID, { + executor: sessionPromptExecutor, + }) await Automation.publishRunUpdated(run) return c.json(run) }, diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 5a07dae8a..25ff152a4 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -62,6 +62,7 @@ import { attachWith, makeRuntime } from "@/effect/run-service" import { Instance } from "@/project/instance" import { MemoryFile } from "@/memory/memory" import { MemoryService } from "@/memory/service" +import { AutomationRunContext, AutomationStepCapError } from "@/automation/run-context" // @ts-ignore globalThis.AI_SDK_LOG_WARNINGS = false @@ -86,6 +87,10 @@ type TitleGenerationProgress = { completedAt?: number } +type PromptRuntimeOptions = { + abortSignal?: AbortSignal +} + export function titleGenerationStateAtAbort( progress: TitleGenerationProgress | undefined, abortRecordedAt: number, @@ -265,8 +270,8 @@ function modelCanReadMedia(model: Provider.Model, kind: MediaInputKind) { export interface Interface { readonly cancel: (sessionID: SessionID, options?: { source?: string }) => Effect.Effect - readonly prompt: (input: PromptInput) => Effect.Effect - readonly loop: (input: z.infer) => Effect.Effect + readonly prompt: (input: PromptInput, options?: PromptRuntimeOptions) => Effect.Effect + readonly loop: (input: z.infer, options?: PromptRuntimeOptions) => Effect.Effect readonly shell: (input: ShellInput) => Effect.Effect readonly command: (input: CommandInput) => Effect.Effect readonly resolvePromptParts: (template: string) => Effect.Effect @@ -303,6 +308,9 @@ export const layer = Layer.effect( const runner = Effect.fn("SessionPrompt.runner")(function* () { return yield* EffectBridge.make() }) + const throwIfAborted = Effect.fn("SessionPrompt.throwIfAborted")(function* (options?: PromptRuntimeOptions) { + options?.abortSignal?.throwIfAborted() + }) // Tracks subagent sessions whose runner.onInterrupt fired during the most recent prompt run. // Reset at the start of each prompt() call; written when loop()'s onInterrupt arg fires; read // by AgentTool.execute via AgentPromptOps.wasInterrupted to deterministically distinguish @@ -676,6 +684,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the const run = yield* runner() const promptOps = yield* ops() const effectContext = yield* Effect.context() + const automation = yield* AutomationRunContext.current const runInSessionContext = (effect: Effect.Effect): Effect.Effect => Effect.gen(function* () { const session = yield* sessions.get(input.session.id) @@ -739,13 +748,20 @@ NOTE: At any point in time through this workflow you should feel free to ask the sessionID: input.session.id, tool: { messageID: input.processor.message.id, callID: options.toolCallId }, ruleset: Permission.merge(input.agent.permission, input.session.permission ?? []), + onPending: AutomationRunContext.permissionOnPending(automation), }) - .pipe(Effect.orDie), + .pipe(Effect.ensuring(automation ? automation.clear() : Effect.void), Effect.orDie), externalResult: ({ inputSnapshot, decoder }) => Effect.gen(function* () { + if (automation?.attendance === "unattended") { + return yield* Effect.fail(new ExternalResult.Error({ reason: "aborted" })) + } const sessionID = input.session.id const messageID = input.processor.message.id const callID = options.toolCallId + if (automation) { + yield* automation.block({ kind: "question", callID }) + } const deferred = yield* ExternalResult.register({ sessionID, messageID, @@ -814,6 +830,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the return result as Tool.ExternalResultOutcome } finally { if (signal) signal.removeEventListener("abort", abortHandler) + if (automation) yield* automation.clear() } }), }) @@ -1827,27 +1844,29 @@ NOTE: At any point in time through this workflow you should feel free to ask the return { info, parts } }, Effect.scoped) - const prompt: (input: PromptInput) => Effect.Effect = Effect.fn("SessionPrompt.prompt")( - function* (input: PromptInput) { - interruptedSessions.delete(input.sessionID) - const session = yield* sessions.get(input.sessionID) - yield* revert.cleanup(session) - const message = yield* createUserMessage(input) - yield* sessions.touch(input.sessionID) - - const permissions: Permission.Ruleset = [] - for (const [t, enabled] of Object.entries(input.tools ?? {})) { - permissions.push({ permission: t, action: enabled ? "allow" : "deny", pattern: "*" }) - } - if (permissions.length > 0) { - session.permission = permissions - yield* sessions.setPermission({ sessionID: session.id, permission: permissions }) - } + const prompt: (input: PromptInput, options?: PromptRuntimeOptions) => Effect.Effect = Effect.fn( + "SessionPrompt.prompt", + )(function* (input: PromptInput, options?: PromptRuntimeOptions) { + yield* throwIfAborted(options) + interruptedSessions.delete(input.sessionID) + const session = yield* sessions.get(input.sessionID) + yield* revert.cleanup(session) + const message = yield* createUserMessage(input) + yield* sessions.touch(input.sessionID) + + const permissions: Permission.Ruleset = [] + for (const [t, enabled] of Object.entries(input.tools ?? {})) { + permissions.push({ permission: t, action: enabled ? "allow" : "deny", pattern: "*" }) + } + if (permissions.length > 0) { + session.permission = permissions + yield* sessions.setPermission({ sessionID: session.id, permission: permissions }) + } - if (input.noReply === true) return message - return yield* loop({ sessionID: input.sessionID, traceMessageID: message.info.id }) - }, - ) + yield* throwIfAborted(options) + if (input.noReply === true) return message + return yield* loop({ sessionID: input.sessionID, traceMessageID: message.info.id }, options) + }) const appendRunLifecycleEvent = Effect.fn("SessionPrompt.appendRunLifecycleEvent")(function* ( sessionID: SessionID, @@ -1939,6 +1958,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the const runLoop: (sessionID: SessionID) => Effect.Effect = Effect.fn("SessionPrompt.run")( function* (sessionID: SessionID) { const ctx = yield* InstanceState.context + const automation = yield* AutomationRunContext.current const slog = elog.with({ sessionID }) let structured: unknown | undefined let step = 0 @@ -2054,6 +2074,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the throw error } const maxSteps = agent.steps ?? Infinity + const automationHardStepCap = automation?.stepCap ?? 50 + if (automation && step > automationHardStepCap) { + throw new AutomationStepCapError(automationHardStepCap) + } const isLastStep = step >= maxSteps msgs = yield* insertReminders({ messages: msgs, agent, session }) const diagnostics = SessionDiagnostics.consumeReminders({ messages: msgs, parentID: lastUser.id }) @@ -2254,9 +2278,12 @@ NOTE: At any point in time through this workflow you should feel free to ask the }, ) - const loop: (input: z.infer) => Effect.Effect = Effect.fn( + const loop: ( + input: z.infer, + options?: PromptRuntimeOptions, + ) => Effect.Effect = Effect.fn( "SessionPrompt.loop", - )(function* (input: z.infer) { + )(function* (input: z.infer, options?: PromptRuntimeOptions) { const onInterrupt = (meta?: { source?: string reason?: string @@ -2394,6 +2421,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the return assistant }) const work = Effect.gen(function* () { + yield* throwIfAborted(options) // Two reasons busy goes first. (1) The compaction part event must // not race ahead of `session.status: busy` — the divider's // "no summary + not working" branch would otherwise flash the @@ -2454,6 +2482,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the appendRunLifecycleEvent(input.sessionID, input.traceMessageID!, event), } : undefined + yield* throwIfAborted(options) return yield* state.ensureRunning(input.sessionID, onInterrupt, work, { rejectIfBusy: input.prelude !== undefined, runLifecycle, @@ -2734,6 +2763,18 @@ export async function prompt(input: PromptInput) { return runPromise((svc) => svc.prompt(PromptInput.parse(input))) } +export async function promptWithAutomationContext( + input: PromptInput, + context: import("@/automation/run-context").AutomationRunContext, + options?: PromptRuntimeOptions, +) { + return runPromise((svc) => + svc + .prompt(PromptInput.parse(input), options) + .pipe(Effect.provideService(AutomationRunContext.service, context)), + ) +} + export async function resolvePromptParts(template: string) { return runPromise((svc) => svc.resolvePromptParts(z.string().parse(template))) } diff --git a/packages/opencode/test/server/automation-routes.test.ts b/packages/opencode/test/server/automation-routes.test.ts index 775b54a8c..33cf91ee0 100644 --- a/packages/opencode/test/server/automation-routes.test.ts +++ b/packages/opencode/test/server/automation-routes.test.ts @@ -471,7 +471,17 @@ describe("automation routes", () => { expect(spec.components?.schemas).toHaveProperty("AutomationValidationError") }) - test("runNow is a contract stub before PR2 execution", async () => { + test("openapi describes delete active-run stop side effect", async () => { + const { Server } = await import("../../src/server/server") + const spec = await Server.openapi() + const paths = spec.paths as Record + const description = paths["/automation/{automationID}"].delete.description + + expect(description).toContain("If a run is active") + expect(description).toContain("publish the stopped run") + }) + + test("runNow returns the queued run before background execution updates it", async () => { await withAutomationApp(async ({ app, projectID }) => { const created = await json(app, "/automation", { method: "POST", diff --git a/packages/opencode/test/server/automation-runner.test.ts b/packages/opencode/test/server/automation-runner.test.ts new file mode 100644 index 000000000..e356c1a1e --- /dev/null +++ b/packages/opencode/test/server/automation-runner.test.ts @@ -0,0 +1,507 @@ +import path from "path" +import { afterEach, describe, expect, test } from "bun:test" +import { Effect } from "effect" +import { Automation } from "../../src/automation" +import { sessionPromptExecutor } from "../../src/automation/runner" +import { Bus } from "../../src/bus" +import { Instance } from "../../src/project/instance" +import { ProjectID } from "../../src/project/schema" +import { Session } from "../../src/session" +import { SessionID } from "../../src/session/schema" +import { AutomationRunContext, AutomationStepCapError } from "../../src/automation/run-context" +import { tmpdir } from "../fixture/fixture" + +afterEach(async () => { + await Instance.disposeAll() +}) + +async function withAutomation(fn: (projectID: ProjectID) => Promise) { + await using tmp = await tmpdir({ git: true }) + return await Instance.provide({ + directory: tmp.path, + fn: () => fn(Instance.project.id), + }) +} + +function input(projectID: ProjectID, overrides: Partial> = {}): Automation.CreateInput { + return { + kind: "recurring", + title: "Repo brief", + prompt: "Summarize repo changes.", + context: "fresh", + where: { projectID }, + timezone: "Asia/Shanghai", + rhythm: { kind: "interval", everyMs: 60_000 }, + stop: { kind: "count", count: 3 }, + ...overrides, + } +} + +async function waitForRun(automationID: string, state: Automation.Run["state"]) { + const deadline = Date.now() + 2_000 + while (Date.now() < deadline) { + const run = Automation.runs({ automationID }).items.find((item) => item.state === state) + if (run?.state === state) return run + await Bun.sleep(10) + } + throw new Error(`Timed out waiting for ${state}`) +} + +function defer() { + let resolve!: (value: T | PromiseLike) => void + const promise = new Promise((done) => { + resolve = done + }) + return { promise, resolve } +} + +function hangingChat(ready: () => void) { + const encoder = new TextEncoder() + let timer: ReturnType | undefined + const first = `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: { role: "assistant" } }], + })}\n\n` + const rest = + [ + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: { content: "late" } }], + })}`, + `data: ${JSON.stringify({ + id: "chatcmpl-1", + object: "chat.completion.chunk", + choices: [{ delta: {}, finish_reason: "stop" }], + })}`, + "data: [DONE]", + ].join("\n\n") + "\n\n" + + return new ReadableStream({ + start(ctrl) { + ctrl.enqueue(encoder.encode(first)) + ready() + timer = setTimeout(() => { + ctrl.enqueue(encoder.encode(rest)) + ctrl.close() + }, 10_000) + }, + cancel() { + if (timer) clearTimeout(timer) + }, + }) +} + +async function waitForAbortedAssistant(sessionID: SessionID) { + const deadline = Date.now() + 1_000 + while (Date.now() < deadline) { + const messages = await Session.messages({ sessionID }) + const assistant = messages.findLast((message) => message.info.role === "assistant") + if (assistant?.info.role === "assistant" && assistant.info.error?.name === "MessageAbortedError") return assistant + await Bun.sleep(10) + } + throw new Error("Timed out waiting for aborted assistant message") +} + +describe("automation runNow execution", () => { + test("executes a run and records the terminal result", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + const sessionID = SessionID.descending() + + const initial = Automation.runNowExecuting(definition.id, { + executor: async () => ({ sessionID, result: "done", cost: 0 }), + }) + expect(initial.state).toBe("scheduled") + + const completed = await waitForRun(definition.id, "succeeded") + expect(completed).toMatchObject({ + state: "succeeded", + sessionID, + result: "done", + error: null, + }) + expect(completed.revision).toBeGreaterThan(initial.revision) + }) + }) + + test("does not publish duplicate running events when the executor already started the run", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + const sessionID = SessionID.descending() + const runEvents: Automation.Run[] = [] + const unsubscribeRun = Bus.subscribe(Automation.Event.RunUpdated, (event) => { + if (event.properties.automationID === definition.id) runEvents.push(event.properties) + }) + + Automation.runNowExecuting(definition.id, { + executor: async ({ run }) => { + const started = Automation.markRunStarted(run, sessionID, { now: run.triggeredAt }) + await Automation.publishRunUpdated(started) + return { sessionID, result: "done", cost: 0 } + }, + }) + + await waitForRun(definition.id, "succeeded") + unsubscribeRun() + expect(runEvents.map((event) => event.state)).toEqual(["running", "succeeded"]) + }) + }) + + test("keeps one active writer per project", async () => { + await withAutomation(async (projectID) => { + const first = Automation.create(input(projectID, { title: "First automation" })) + const second = Automation.create(input(projectID, { title: "Second automation" })) + let release!: () => void + const held = new Promise((resolve) => { + release = resolve + }) + let entered = 0 + + Automation.runNowExecuting(first.id, { + executor: async () => { + entered++ + await held + return { sessionID: SessionID.descending(), result: "first", cost: 0 } + }, + }) + Automation.runNowExecuting(second.id, { + executor: async () => { + entered++ + return { sessionID: SessionID.descending(), result: "second", cost: 0 } + }, + }) + + const stopped = await waitForRun(second.id, "stopped") + if (stopped.state !== "stopped") throw new Error("expected stopped run") + expect(stopped.stopReason).toBe("previous_run_awaiting_input") + expect(entered).toBe(1) + release() + const succeeded = await waitForRun(first.id, "succeeded") + expect(succeeded.result).toBe("first") + }) + }) + + test("records and clears blocker state on the run ledger", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + const run = Automation.runNow(definition.id) + const started = Automation.markRunStarted(run, SessionID.descending(), { now: run.triggeredAt }) + const blocked = Automation.markRunBlocked(started, { kind: "question", callID: "call_1" }) + const cleared = Automation.clearRunBlocker(blocked) + + expect(blocked).toMatchObject({ + state: "awaiting_input", + blocker: { kind: "question", callID: "call_1" }, + }) + expect(cleared.state).toBe("running") + expect(cleared).not.toHaveProperty("blocker") + expect(Automation.clearRunBlocker(cleared)).toBe(cleared) + }) + }) + + test("drops state-specific fields when a run transitions out of that state", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + + Automation.runNowExecuting(definition.id, { + executor: async ({ run }) => { + const started = Automation.markRunStarted(run, SessionID.descending(), { now: run.triggeredAt }) + Automation.markRunBlocked(started, { kind: "question", callID: "call_1" }) + throw new Error("boom") + }, + }) + + const failed = await waitForRun(definition.id, "failed") + expect(failed).not.toHaveProperty("blocker") + + let release!: () => void + const held = new Promise((resolve) => { + release = resolve + }) + Automation.runNowExecuting(definition.id, { + executor: async () => { + await held + return { sessionID: SessionID.descending(), result: "first", cost: 0 } + }, + }) + Automation.runNowExecuting(definition.id, { + executor: async () => ({ sessionID: SessionID.descending(), result: "second", cost: 0 }), + }) + const stopped = await waitForRun(definition.id, "stopped") + if (stopped.completedAt === null) throw new Error("expected stopped run to have completedAt") + const restarted = Automation.markRunStarted(stopped, SessionID.descending(), { now: stopped.completedAt }) + expect(restarted).not.toHaveProperty("stopReason") + release() + }) + }) + + test("publishes continue-session definition updates from the latest definition", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID, { context: "continue" })) + const sessionID = SessionID.descending() + const definitionEvents: Automation.Definition[] = [] + const unsubscribe = Bus.subscribe(Automation.Event.DefinitionUpdated, (event) => { + definitionEvents.push(event.properties) + }) + + Automation.runNowExecuting(definition.id, { + executor: async () => { + Automation.update(definition.id, { title: "Updated repo brief", prompt: "Use the latest prompt." }) + return { sessionID, result: "done", cost: 0 } + }, + }) + + await waitForRun(definition.id, "succeeded") + unsubscribe() + const updated = Automation.get(definition.id) + expect(updated.title).toBe("Updated repo brief") + expect(updated.prompt).toBe("Use the latest prompt.") + expect(updated.automationSessionID).toBe(sessionID) + expect(definitionEvents.at(-1)).toMatchObject({ + id: definition.id, + title: "Updated repo brief", + prompt: "Use the latest prompt.", + automationSessionID: sessionID, + }) + }) + }) + + test("does not revive a continue automation deleted during execution", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID, { context: "continue" })) + const definitionEvents: Automation.Definition[] = [] + const unsubscribeDefinition = Bus.subscribe(Automation.Event.DefinitionUpdated, (event) => { + definitionEvents.push(event.properties) + }) + let removed!: ReturnType + + Automation.runNowExecuting(definition.id, { + executor: async () => { + removed = Automation.remove(definition.id) + return { sessionID: SessionID.descending(), result: "done", cost: 0 } + }, + }) + + await Bun.sleep(20) + unsubscribeDefinition() + expect(removed.stoppedRun).toMatchObject({ state: "stopped", stopReason: "cancelled" }) + expect(() => Automation.get(definition.id)).toThrow() + expect(definitionEvents).toHaveLength(0) + }) + }) + + test("aborts an active run when its automation is deleted", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + const sessionID = SessionID.descending() + let sawAbort = false + const started = Promise.withResolvers() + const release = Promise.withResolvers() + const runEvents: Automation.Run[] = [] + const unsubscribeRun = Bus.subscribe(Automation.Event.RunUpdated, (event) => { + if (event.properties.automationID === definition.id) runEvents.push(event.properties) + }) + + Automation.runNowExecuting(definition.id, { + executor: async ({ run, signal }) => { + Automation.markRunStarted(run, sessionID, { now: run.triggeredAt }) + signal.addEventListener("abort", () => { + sawAbort = true + release.resolve() + }) + started.resolve() + await release.promise + return { sessionID, result: "should not succeed", cost: 0 } + }, + }) + + await started.promise + const removed = Automation.remove(definition.id) + + expect(sawAbort).toBe(true) + expect(removed.stoppedRun).toMatchObject({ + state: "stopped", + sessionID, + stopReason: "cancelled", + }) + await Bun.sleep(20) + unsubscribeRun() + expect(runEvents.some((event) => event.state === "succeeded")).toBe(false) + }) + }) + + test("deleting an active automation cancels the real session prompt", async () => { + const ready = defer() + const server = Bun.serve({ + port: 0, + fetch(req) { + const url = new URL(req.url) + if (!url.pathname.endsWith("/chat/completions")) return new Response("not found", { status: 404 }) + return new Response(hangingChat(() => ready.resolve()), { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }) + }, + }) + + try { + await using tmp = await tmpdir({ + git: true, + init: async (dir) => { + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + enabled_providers: ["alibaba"], + provider: { + alibaba: { + options: { + apiKey: "test-key", + baseURL: `${server.url.origin}/v1`, + }, + }, + }, + agent: { + build: { + model: "alibaba/qwen-plus", + }, + }, + }), + ) + }, + }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const definition = Automation.create(input(Instance.project.id, { title: "Cancel real prompt" })) + + Automation.runNowExecuting(definition.id, { executor: sessionPromptExecutor }) + await ready.promise + + const removed = Automation.remove(definition.id) + const stoppedRun = removed.stoppedRun + expect(stoppedRun).toMatchObject({ state: "stopped", stopReason: "cancelled" }) + if (!stoppedRun?.sessionID) throw new Error("expected stopped run to keep its sessionID") + + await waitForAbortedAssistant(stoppedRun.sessionID) + }, + }) + } finally { + void server.stop(true) + } + }) + + test("deleting after run start but before prompt runner is busy does not call the provider", async () => { + let providerCalls = 0 + const server = Bun.serve({ + port: 0, + fetch(req) { + const url = new URL(req.url) + if (!url.pathname.endsWith("/chat/completions")) return new Response("not found", { status: 404 }) + providerCalls++ + return new Response(hangingChat(() => undefined), { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }) + }, + }) + + try { + await using tmp = await tmpdir({ + git: true, + init: async (dir) => { + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + enabled_providers: ["alibaba"], + provider: { + alibaba: { + options: { + apiKey: "test-key", + baseURL: `${server.url.origin}/v1`, + }, + }, + }, + agent: { + build: { + model: "alibaba/qwen-plus", + }, + }, + }), + ) + }, + }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const definition = Automation.create(input(Instance.project.id, { title: "Cancel before runner busy" })) + const removed = Promise.withResolvers>() + const unsubscribe = Bus.subscribe(Automation.Event.RunUpdated, (event) => { + if (event.properties.automationID !== definition.id || event.properties.state !== "running") return + removed.resolve(Automation.remove(definition.id)) + }) + + Automation.runNowExecuting(definition.id, { executor: sessionPromptExecutor }) + let result: ReturnType + try { + result = await Promise.race([ + removed.promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error("timed out waiting for running run")), 1_000), + ), + ]) + } finally { + unsubscribe() + } + + expect(result.stoppedRun).toMatchObject({ state: "stopped", stopReason: "cancelled" }) + await Bun.sleep(50) + expect(providerCalls).toBe(0) + if (result.stoppedRun?.sessionID) { + const messages = await Session.messages({ sessionID: result.stoppedRun.sessionID }) + expect(messages.some((message) => message.info.role === "assistant")).toBe(false) + } + }, + }) + } finally { + void server.stop(true) + } + }) + + test("unattended context construction overrides any existing attendance tag", async () => { + const handlers = { + stepCap: 50, + block: () => Effect.void, + clear: () => Effect.void, + } + const attended = AutomationRunContext.attended(handlers) + const unattended = AutomationRunContext.unattended(attended) + + expect(attended.attendance).toBe("attended") + expect(unattended.attendance).toBe("unattended") + }) + + test("records hard step-cap failures with the frozen stop code", async () => { + await withAutomation(async (projectID) => { + const definition = Automation.create(input(projectID)) + + Automation.runNowExecuting(definition.id, { + executor: async ({ run }) => { + Automation.markRunStarted(run, SessionID.descending(), { now: run.triggeredAt }) + throw new AutomationStepCapError(50) + }, + }) + + const failed = await waitForRun(definition.id, "failed") + expect(failed.error).toEqual({ + code: "step_cap", + message: "Automation run exceeded the hard step cap (50).", + }) + }) + }) +})