Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 191 additions & 4 deletions packages/opencode/src/automation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { ProjectID } from "@/project/schema"
import { PermissionID } from "@/permission/schema"
import { SessionID } from "@/session/schema"
import { NotFoundError } from "@/storage/db"
import type { AutomationRunAttendance, AutomationRunBlocker } from "./run-context"

export const AutomationID = {
Definition: {
Expand Down Expand Up @@ -232,8 +233,22 @@ export namespace Automation {
type State = {
definitions: Map<string, Definition>
runs: Map<string, Run[]>
activeWriters: Set<string>
activeRuns: Map<string, { writerKey: string; controller: AbortController }>
}
const state = Instance.state<State>(() => ({ definitions: new Map(), runs: new Map() }))
const state = Instance.state<State>(() => ({
definitions: new Map(),
runs: new Map(),
activeWriters: new Set(),
activeRuns: new Map(),
}))

export type RunExecutor = (input: {
definition: Definition
run: Run
attendance: AutomationRunAttendance
signal: AbortSignal
}) => Promise<{ sessionID: SessionID; result: string | null; cost?: number | null }>

const COMMON_CREATE_FIELDS = new Set([
"kind",
Expand Down Expand Up @@ -416,6 +431,10 @@ export namespace Automation {
return definition
}

function getOptional(id: string): Definition | undefined {
return state().definitions.get(id)
}

function isSameValue(left: unknown, right: unknown): boolean {
if (Object.is(left, right)) return true
if (Array.isArray(left) || Array.isArray(right)) {
Expand Down Expand Up @@ -453,11 +472,95 @@ export namespace Automation {
return next
}

export function remove(id: string): Tombstone {
export function remove(id: string): { tombstone: Tombstone; stoppedRun?: Run } {
const previous = get(id)
const stoppedRun = stopActiveRun(id)
state().definitions.delete(id)
state().runs.delete(id)
return { id: previous.id, deleted: true, revision: previous.revision + 1 }
if (!stoppedRun) state().runs.delete(id)
return { tombstone: { id: previous.id, deleted: true, revision: previous.revision + 1 }, stoppedRun }
}

function replaceRun(run: Run) {
const current = state().runs.get(run.automationID) ?? []
state().runs.set(
run.automationID,
current.map((item) => (item.id === run.id ? run : item)),
)
return run
}

function reviseRun(run: Run, patch: Record<string, unknown>): Run {
const next = {
...run,
...patch,
revision: run.revision + 1,
}
if (next.state !== "awaiting_input") delete (next as Record<string, unknown>).blocker
if (next.state !== "stopped") delete (next as Record<string, unknown>).stopReason
for (const [key, value] of Object.entries(next)) {
if (value === undefined) delete (next as Record<string, unknown>)[key]
}
return replaceRun(Run.parse(next))
}
Comment thread
Astro-Han marked this conversation as resolved.

function stopRun(run: Run, stopReason: Extract<Run, { state: "stopped" }>["stopReason"]): Run {
if (run.state === "stopped" || run.state === "succeeded" || run.state === "failed") return run
return reviseRun(run, {
state: "stopped",
completedAt: Date.now(),
result: null,
error: null,
stopReason,
})
}

function stopActiveRun(automationID: string) {
const active = state().activeRuns.get(automationID)
if (!active) return undefined
active.controller.abort()
const current = state().runs.get(automationID)?.find((run) => (
run.state === "scheduled" || run.state === "running" || run.state === "awaiting_input"
))
return current ? stopRun(current, "cancelled") : undefined
}

export function markRunStarted(run: Run, sessionID: SessionID, options?: { now?: number }): Run {
return reviseRun(run, {
state: "running",
sessionID,
startedAt: options?.now ?? Date.now(),
completedAt: null,
result: null,
error: null,
})
}

function setDefinitionAutomationSession(definition: Definition, sessionID: SessionID) {
if (definition.automationSessionID === sessionID) return definition
const next = Definition.parse({
...definition,
automationSessionID: sessionID,
revision: definition.revision + 1,
updatedAt: Date.now(),
})
state().definitions.set(definition.id, next)
return next
}

export function markRunBlocked(run: Run, blocker: AutomationRunBlocker): Run {
if (run.state !== "running" && run.state !== "awaiting_input") return run
return reviseRun(run, {
state: "awaiting_input",
blocker,
})
}

export function clearRunBlocker(run: Run): Run {
if (run.state !== "awaiting_input") return run
return reviseRun(run, {
state: "running",
blocker: undefined,
})
}

export function runNow(id: string, options?: { now?: number }): Run {
Expand All @@ -481,6 +584,90 @@ export namespace Automation {
return run
}

export function runNowExecuting(
id: string,
options: { executor: RunExecutor; attendance?: AutomationRunAttendance; now?: number },
): Run {
const initial = runNow(id, { now: options.now })
void executeRun(initial, options.executor, options.attendance ?? "attended")
return initial
}

async function executeRun(initial: Run, executor: RunExecutor, attendance: AutomationRunAttendance) {
const data = state()
const definition = get(initial.automationID)
const writerKey = definition.where.worktree ?? definition.where.projectID
if (data.activeWriters.has(writerKey)) {
const stopped = reviseRun(initial, {
state: "stopped",
completedAt: Date.now(),
stopReason: "previous_run_awaiting_input",
})
await publishRunUpdated(stopped)
return
}
data.activeWriters.add(writerKey)
const controller = new AbortController()
data.activeRuns.set(initial.automationID, { writerKey, controller })
let current = initial
try {
const prepared = await executor({ definition, run: initial, attendance, signal: controller.signal })
const latest = state().runs.get(initial.automationID)?.find((item) => item.id === initial.id) ?? initial
if (controller.signal.aborted) {
const stopped = stopRun(latest, "cancelled")
current = stopped
if (stopped !== latest) await publishRunUpdated(stopped)
return
}
const running = latest.state === "scheduled" ? markRunStarted(latest, prepared.sessionID) : latest
current = running
if (running !== latest) await publishRunUpdated(running)
const latestDefinition = getOptional(initial.automationID)
if (latestDefinition?.context === "continue") {
const updatedDefinition = setDefinitionAutomationSession(latestDefinition, prepared.sessionID)
if (updatedDefinition !== latestDefinition) await publishDefinitionUpdated(updatedDefinition)
}
const succeeded = reviseRun(running, {
state: "succeeded",
completedAt: Date.now(),
result: prepared.result,
error: null,
cost: prepared.cost ?? null,
})
current = succeeded
await publishRunUpdated(succeeded)
} catch (error) {
current = state().runs.get(initial.automationID)?.find((item) => item.id === initial.id) ?? current
if (controller.signal.aborted) {
const stopped = stopRun(current, "cancelled")
if (stopped !== current) await publishRunUpdated(stopped)
return
}
if (current.state === "scheduled") {
const stopped = reviseRun(current, {
state: "stopped",
completedAt: Date.now(),
stopReason: "cancelled",
})
await publishRunUpdated(stopped)
return
}
const isStepCap = error instanceof globalThis.Error && error.name === "AutomationStepCapError"
const failed = reviseRun(current, {
state: "failed",
completedAt: Date.now(),
error: {
code: isStepCap ? "step_cap" : "execution_failed",
message: error instanceof globalThis.Error ? error.message : String(error),
},
})
await publishRunUpdated(failed)
} finally {
data.activeRuns.delete(initial.automationID)
data.activeWriters.delete(writerKey)
}
}

export function runs(input: { automationID: string; limit?: number; cursor?: string }) {
get(input.automationID)
const limit = Math.min(Math.max(input.limit ?? 50, 1), 100)
Expand Down
47 changes: 47 additions & 0 deletions packages/opencode/src/automation/run-context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Context, Effect } from "effect"
import type { Permission } from "@/permission"
import type { PermissionID } from "@/permission/schema"

export type AutomationRunAttendance = "attended" | "unattended"

export type AutomationRunBlocker =
| { kind: "permission"; requestID: PermissionID }
| { kind: "question"; callID: string }

export interface AutomationRunContext {
readonly attendance: AutomationRunAttendance
readonly stepCap?: number
readonly block: (blocker: AutomationRunBlocker) => Effect.Effect<void>
readonly clear: () => Effect.Effect<void>
}

export class AutomationStepCapError extends Error {
readonly _tag = "AutomationStepCapError"
constructor(readonly stepCap: number) {
super(`Automation run exceeded the hard step cap (${stepCap}).`)
this.name = "AutomationStepCapError"
}
}

export const AutomationRunContextService: Context.Reference<AutomationRunContext | undefined> = Context.Reference<
AutomationRunContext | undefined
>("@opencode/AutomationRunContext", {
defaultValue: () => undefined,
})

export const AutomationRunContext = {
service: AutomationRunContextService,
current: AutomationRunContextService,
attended(input: Pick<AutomationRunContext, "block" | "clear" | "stepCap">): AutomationRunContext {
return { ...input, attendance: "attended" }
},
unattended(input: Pick<AutomationRunContext, "block" | "clear" | "stepCap">): AutomationRunContext {
return { ...input, attendance: "unattended" }
},
Comment thread
Astro-Han marked this conversation as resolved.
permissionOnPending(
context: AutomationRunContext | undefined,
): ((request: Permission.Request) => Effect.Effect<void>) | undefined {
if (!context) return undefined
return (request) => context.block({ kind: "permission", requestID: request.id })
},
}
56 changes: 56 additions & 0 deletions packages/opencode/src/automation/runner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Effect } from "effect"
import { Automation } from "."
import { Session } from "@/session"
import { SessionPrompt } from "@/session/prompt"
import { AutomationRunContext, type AutomationRunBlocker } from "./run-context"

export const sessionPromptExecutor: Automation.RunExecutor = async ({ definition, run, attendance, signal }) => {
signal.throwIfAborted()
const sessionID =
definition.context === "continue" && definition.automationSessionID
? definition.automationSessionID
: (await Session.create({ title: `Automation: ${definition.title}` })).id
const cancelPrompt = () => {
void SessionPrompt.cancel(sessionID, { source: "automation.cancel" }).catch(() => undefined)
}
if (signal.aborted) cancelPrompt()
else signal.addEventListener("abort", cancelPrompt, { once: true })
try {
signal.throwIfAborted()
let currentRun = Automation.markRunStarted(run, sessionID)
await Automation.publishRunUpdated(currentRun)
const handlers = {
stepCap: 50,
block: (blocker: AutomationRunBlocker) =>
Effect.gen(function* () {
const previous = currentRun
currentRun = Automation.markRunBlocked(currentRun, blocker)
if (currentRun !== previous) yield* Effect.promise(() => Automation.publishRunUpdated(currentRun))
}),
clear: () =>
Effect.gen(function* () {
const previous = currentRun
currentRun = Automation.clearRunBlocker(currentRun)
if (currentRun !== previous) yield* Effect.promise(() => Automation.publishRunUpdated(currentRun))
}),
}
const scoped =
attendance === "attended" ? AutomationRunContext.attended(handlers) : AutomationRunContext.unattended(handlers)
const message = await SessionPrompt.promptWithAutomationContext(
{
sessionID,
parts: [{ type: "text", text: definition.prompt }],
},
scoped,
{ abortSignal: signal },
)
signal.throwIfAborted()
return {
sessionID,
result: message.parts.find((part) => part.type === "text")?.text ?? null,
cost: message.info.role === "assistant" ? message.info.cost : null,
}
} finally {
signal.removeEventListener("abort", cancelPrompt)
}
}
11 changes: 8 additions & 3 deletions packages/opencode/src/permission/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,12 @@ export namespace Permission {
message: z.string().optional(),
})

export type AskOptions = z.infer<typeof AskInput> & {
onPending?: (request: Request) => Effect.Effect<void>
}

export interface Interface {
readonly ask: (input: z.infer<typeof AskInput>) => Effect.Effect<void, Error>
readonly ask: (input: AskOptions) => Effect.Effect<void, Error>
readonly reply: (input: z.infer<typeof ReplyInput>) => Effect.Effect<void>
readonly clearSession: (
sessionID: SessionID,
Expand Down Expand Up @@ -171,9 +175,9 @@ export namespace Permission {
}),
)

const ask = Effect.fn("Permission.ask")(function* (input: z.infer<typeof AskInput>) {
const ask = Effect.fn("Permission.ask")(function* (input: AskOptions) {
const { approved, pending } = yield* InstanceState.get(state)
const { ruleset, ...request } = input
const { ruleset, onPending, ...request } = input
let needsAsk = false
const denied: Array<{ pattern: string; rule: Rule }> = []

Expand Down Expand Up @@ -222,6 +226,7 @@ export namespace Permission {

const deferred = yield* Deferred.make<void, RejectedError | CorrectedError>()
pending.set(id, { info, deferred })
if (onPending) yield* onPending(info)
yield* bus.publish(Event.Asked, info)
return yield* Effect.ensuring(
Deferred.await(deferred),
Expand Down
Loading
Loading