From 690769856c317d9132e5c2e000bac10e5a9140c6 Mon Sep 17 00:00:00 2001 From: Sweets Sweetman Date: Sun, 24 May 2026 22:30:27 -0500 Subject: [PATCH 1/5] fix(chat): clear active_stream_id when chat workflow finishes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit handleChatWorkflowStream sets chats.active_stream_id when it starts the workflow but never clears it after the run completes. The chat UI's recovery probe (GET /api/sessions/{sessionId}/chats) reports isStreaming based on active_stream_id !== null, so a stale non-null value tells the open-agents stream-recovery hook to call chat.resumeStream() forever — which keeps the AI SDK chat.status stuck in `submitted`/`streaming`. The send button never returns and the user can't submit a follow-up. Adds clearChatActiveStreamWhenWorkflowFinishes(): awaits run.returnValue (swallowing failure / cancel so we clear in every terminal state) and CAS-clears active_stream_id from the run id back to null. Scheduled via `after(...)` from the handler so it runs past the response without blocking the stream. Uses CAS rather than an unconditional UPDATE so that if a newer run claimed the slot in the meantime (user kicked off another message before the old run finished cleaning up), the newer run's id isn't clobbered. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...atActiveStreamWhenWorkflowFinishes.test.ts | 89 +++++++++++++++++++ ...earChatActiveStreamWhenWorkflowFinishes.ts | 50 +++++++++++ lib/chat/handleChatWorkflowStream.ts | 10 ++- 3 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 lib/chat/__tests__/clearChatActiveStreamWhenWorkflowFinishes.test.ts create mode 100644 lib/chat/clearChatActiveStreamWhenWorkflowFinishes.ts diff --git a/lib/chat/__tests__/clearChatActiveStreamWhenWorkflowFinishes.test.ts b/lib/chat/__tests__/clearChatActiveStreamWhenWorkflowFinishes.test.ts new file mode 100644 index 000000000..4ec16d526 --- /dev/null +++ b/lib/chat/__tests__/clearChatActiveStreamWhenWorkflowFinishes.test.ts @@ -0,0 +1,89 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { clearChatActiveStreamWhenWorkflowFinishes } from "@/lib/chat/clearChatActiveStreamWhenWorkflowFinishes"; +import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; + +vi.mock("@/lib/chat/compareAndSetChatActiveStreamId", () => ({ + compareAndSetChatActiveStreamId: vi.fn(), +})); + +beforeEach(() => vi.clearAllMocks()); + +function buildRunStub({ runId, returnValue }: { runId: string; returnValue: Promise }) { + return { runId, returnValue } as unknown as { + runId: string; + returnValue: Promise; + }; +} + +describe("clearChatActiveStreamWhenWorkflowFinishes", () => { + it("CAS-clears active_stream_id back to null when the workflow resolves", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ + ok: true, + claimed: true, + }); + const run = buildRunStub({ + runId: "wrun_ok", + returnValue: Promise.resolve(undefined), + }); + + await clearChatActiveStreamWhenWorkflowFinishes({ chatId: "chat-1", run }); + + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledWith("chat-1", "wrun_ok", null); + }); + + it("still clears when the workflow rejects (cancelled / failed run)", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ + ok: true, + claimed: true, + }); + const run = buildRunStub({ + runId: "wrun_fail", + returnValue: Promise.reject(new Error("boom")), + }); + + await clearChatActiveStreamWhenWorkflowFinishes({ chatId: "chat-1", run }); + + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledWith("chat-1", "wrun_fail", null); + }); + + it("waits for the workflow to finish before issuing the CAS", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ + ok: true, + claimed: true, + }); + let resolveRun: (v: unknown) => void = () => undefined; + const returnValue = new Promise(res => { + resolveRun = res; + }); + const run = buildRunStub({ runId: "wrun_slow", returnValue }); + + const promise = clearChatActiveStreamWhenWorkflowFinishes({ + chatId: "chat-1", + run, + }); + + // Give the microtask queue a chance — CAS should NOT have fired yet. + await Promise.resolve(); + await Promise.resolve(); + expect(compareAndSetChatActiveStreamId).not.toHaveBeenCalled(); + + resolveRun(undefined); + await promise; + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(1); + }); + + it("does not throw when the CAS fails (race lost or DB error)", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ + ok: true, + claimed: false, + }); + const run = buildRunStub({ + runId: "wrun_lostrace", + returnValue: Promise.resolve(undefined), + }); + + await expect( + clearChatActiveStreamWhenWorkflowFinishes({ chatId: "chat-1", run }), + ).resolves.toBeUndefined(); + }); +}); diff --git a/lib/chat/clearChatActiveStreamWhenWorkflowFinishes.ts b/lib/chat/clearChatActiveStreamWhenWorkflowFinishes.ts new file mode 100644 index 000000000..75cf10fbc --- /dev/null +++ b/lib/chat/clearChatActiveStreamWhenWorkflowFinishes.ts @@ -0,0 +1,50 @@ +import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; + +type RunLike = { + runId: string; + returnValue: Promise; +}; + +/** + * Wait for a `start(workflow, ...)` run to finish, then CAS-clear + * `chats.active_stream_id` back to null **only if** it still holds + * this run's id. + * + * Why: the chat UI uses `chats.active_stream_id !== null` as a proxy + * for "is this chat still streaming" (via + * `GET /api/sessions/{sessionId}/chats`). Without this cleanup, the + * field stays set after the workflow finishes, the recovery probe + * keeps reporting `isStreaming: true`, and the client's + * auto-resume keeps the AI SDK `chat.status` stuck in `submitted` / + * `streaming` — so the send button never returns and follow-up + * messages are blocked. + * + * Designed to be scheduled with `after(...)` from the handler so it + * runs past the response return without blocking the stream. The + * CAS guard (`expected = run.runId`) makes it safe to fire even if + * the user already kicked off another run for the same chat in the + * meantime — that newer run owns the slot and shouldn't be wiped. + * + * Workflow failures, cancellations, and successful completions all + * trigger the same clear. The `returnValue` promise rejects on + * cancel / failure, so we swallow the rejection and proceed to the + * CAS — same behavior as success. + */ +export async function clearChatActiveStreamWhenWorkflowFinishes(params: { + chatId: string; + run: RunLike; +}): Promise { + const { chatId, run } = params; + + await run.returnValue.catch(() => undefined); + + const cleared = await compareAndSetChatActiveStreamId(chatId, run.runId, null); + if (cleared.ok === false) { + console.error( + `[clearChatActiveStreamWhenWorkflowFinishes] CAS error chatId=${chatId} runId=${run.runId}: ${cleared.error}`, + ); + return; + } + // cleared.claimed === false means the race was lost — another run owns + // the slot now. Nothing to do. +} diff --git a/lib/chat/handleChatWorkflowStream.ts b/lib/chat/handleChatWorkflowStream.ts index 20f048a5c..b8b538246 100644 --- a/lib/chat/handleChatWorkflowStream.ts +++ b/lib/chat/handleChatWorkflowStream.ts @@ -1,6 +1,7 @@ -import { NextRequest, NextResponse } from "next/server"; +import { NextRequest, NextResponse, after } from "next/server"; import { createUIMessageStreamResponse, type UIMessageChunk } from "ai"; import { start, getRun } from "workflow/api"; +import { clearChatActiveStreamWhenWorkflowFinishes } from "@/lib/chat/clearChatActiveStreamWhenWorkflowFinishes"; import { validateChatWorkflow } from "@/lib/chat/validateChatWorkflow"; import { maybeResumeChatStream } from "@/lib/chat/maybeResumeChatStream"; import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; @@ -155,6 +156,13 @@ export async function handleChatWorkflowStream(request: NextRequest): Promise clearChatActiveStreamWhenWorkflowFinishes({ chatId: validated.chatId, run })); + return createUIMessageStreamResponse({ stream: run.getReadable(), headers: { ...getCorsHeaders(), "x-workflow-run-id": run.runId }, From 45de2765306aa24cb8f5a1e5d1e2faba444de367 Mon Sep 17 00:00:00 2001 From: Sweets Sweetman Date: Mon, 25 May 2026 10:21:15 -0500 Subject: [PATCH 2/5] test(chat): mock next/server.after in handleChatWorkflowStream tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `after()` only resolves in a real Next.js request scope. The handler test suite drives the route function directly with a mock NextRequest — no async-storage context is set up — so the call throws "after() was called outside a request scope" on every test that reaches the post-stream cleanup scheduling. Stub `after` via vi.importActual partial mock so other next/server exports (NextRequest, NextResponse) keep their real behavior. Also mock the scheduled function itself, which has its own dedicated unit test (clearChatActiveStreamWhenWorkflowFinishes.test.ts). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../__tests__/handleChatWorkflowStream.test.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/lib/chat/__tests__/handleChatWorkflowStream.test.ts b/lib/chat/__tests__/handleChatWorkflowStream.test.ts index 1af062c8b..1a92386e3 100644 --- a/lib/chat/__tests__/handleChatWorkflowStream.test.ts +++ b/lib/chat/__tests__/handleChatWorkflowStream.test.ts @@ -39,6 +39,22 @@ vi.mock("@/lib/networking/getCorsHeaders", () => ({ })); vi.mock("@/lib/uuid/generateUUID", () => ({ default: vi.fn(() => "deterministic-uuid") })); +// `after()` only runs inside a real Next.js request scope, but these tests +// drive the handler directly with a mock NextRequest — no async-storage +// context is set up. Stub it as a noop so the handler's post-stream +// scheduling doesn't throw the "outside a request scope" error. The +// scheduled cleanup itself is covered by +// clearChatActiveStreamWhenWorkflowFinishes.test.ts. +vi.mock("next/server", async () => { + const actual = + await vi.importActual("next/server"); + return { ...actual, after: vi.fn() }; +}); + +vi.mock("@/lib/chat/clearChatActiveStreamWhenWorkflowFinishes", () => ({ + clearChatActiveStreamWhenWorkflowFinishes: vi.fn(), +})); + // Stub sandbox connection + skill discovery so handler tests don't actually // try to talk to Vercel Sandbox / parse SKILL.md files. The handler treats // discovery failures as non-fatal (empty catalog), but we mock to keep tests fast. From f22ead8902ebe2b3034f798de60fd522ce594ed3 Mon Sep 17 00:00:00 2001 From: Sweets Sweetman Date: Mon, 25 May 2026 11:03:47 -0500 Subject: [PATCH 3/5] refactor(chat): clear active_stream_id from inside the workflow body MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the previous after()-callback approach with an in-workflow "use step" call so the cleanup fires immediately when the durable run finishes — no after()/run.returnValue polling lag. Observed lag with the after() approach: ~20–30s from stream end to db clear, because Run.returnValue polls the workflow runtime for completion AND the after() callback contends with the function instance's remaining lifetime. UX impact: send button stuck for that window before reverting. In-workflow approach mirrors open-agents' chat workflow (`clearActiveStream` step). `runAgentWorkflow` reads its own workflowRunId via `getWorkflowMetadata()` and calls `clearChatActiveStream(chatId, runId)` in a `try/finally` so the CAS-clear fires on success, failure, AND cancellation. Other changes: - Helper renamed clearChatActiveStreamWhenWorkflowFinishes → clearChatActiveStream; signature collapses to (chatId, runId). - Added 3-attempt retry with 500ms backoff to match open-agents' resilience for transient Supabase failures. - Removed the next/server.after() import and PostToolUse mock from handleChatWorkflowStream; cleanup is no longer the handler's job. - New runAgentWorkflow.test.ts asserts the step fires on both success and runAgentStep throw paths. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../__tests__/runAgentWorkflow.test.ts | 52 +++++++++++ app/lib/workflows/runAgentWorkflow.ts | 30 +++++-- .../__tests__/clearChatActiveStream.test.ts | 74 +++++++++++++++ ...atActiveStreamWhenWorkflowFinishes.test.ts | 89 ------------------- .../handleChatWorkflowStream.test.ts | 16 ---- lib/chat/clearChatActiveStream.ts | 65 ++++++++++++++ ...earChatActiveStreamWhenWorkflowFinishes.ts | 50 ----------- lib/chat/handleChatWorkflowStream.ts | 10 +-- 8 files changed, 214 insertions(+), 172 deletions(-) create mode 100644 app/lib/workflows/__tests__/runAgentWorkflow.test.ts create mode 100644 lib/chat/__tests__/clearChatActiveStream.test.ts delete mode 100644 lib/chat/__tests__/clearChatActiveStreamWhenWorkflowFinishes.test.ts create mode 100644 lib/chat/clearChatActiveStream.ts delete mode 100644 lib/chat/clearChatActiveStreamWhenWorkflowFinishes.ts diff --git a/app/lib/workflows/__tests__/runAgentWorkflow.test.ts b/app/lib/workflows/__tests__/runAgentWorkflow.test.ts new file mode 100644 index 000000000..92f72594d --- /dev/null +++ b/app/lib/workflows/__tests__/runAgentWorkflow.test.ts @@ -0,0 +1,52 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { runAgentWorkflow } from "@/app/lib/workflows/runAgentWorkflow"; +import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; +import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; + +vi.mock("@/app/lib/workflows/runAgentStep", () => ({ + runAgentStep: vi.fn(), +})); +vi.mock("@/lib/chat/clearChatActiveStream", () => ({ + clearChatActiveStream: vi.fn(), +})); +vi.mock("workflow", () => ({ + getWritable: vi.fn(() => new WritableStream()), + getWorkflowMetadata: vi.fn(() => ({ + workflowRunId: "wrun_from_metadata", + workflowName: "runAgentWorkflow", + workflowStartedAt: new Date(), + url: "https://example.invalid/workflow", + })), +})); + +beforeEach(() => vi.clearAllMocks()); + +const baseInput = { + messages: [{ id: "m1", role: "user", parts: [{ type: "text", text: "hi" }] } as never], + chatId: "chat-1", + sessionId: "session-1", + modelId: "anthropic/claude-haiku-4.5", + agentContext: { + sandbox: { state: { type: "vercel" }, workingDirectory: "/sandbox/mono" }, + } as never, +}; + +describe("runAgentWorkflow", () => { + it("clears active_stream_id after a successful run, using the workflow's own runId", async () => { + vi.mocked(runAgentStep).mockResolvedValue({ finishReason: "stop" }); + + await runAgentWorkflow(baseInput); + + expect(clearChatActiveStream).toHaveBeenCalledTimes(1); + expect(clearChatActiveStream).toHaveBeenCalledWith("chat-1", "wrun_from_metadata"); + }); + + it("clears active_stream_id even when runAgentStep throws (try/finally guarantee)", async () => { + vi.mocked(runAgentStep).mockRejectedValue(new Error("model exploded")); + + await expect(runAgentWorkflow(baseInput)).rejects.toThrow("model exploded"); + + expect(clearChatActiveStream).toHaveBeenCalledTimes(1); + expect(clearChatActiveStream).toHaveBeenCalledWith("chat-1", "wrun_from_metadata"); + }); +}); diff --git a/app/lib/workflows/runAgentWorkflow.ts b/app/lib/workflows/runAgentWorkflow.ts index 3a0965342..07dce5483 100644 --- a/app/lib/workflows/runAgentWorkflow.ts +++ b/app/lib/workflows/runAgentWorkflow.ts @@ -1,6 +1,7 @@ -import { getWritable } from "workflow"; +import { getWorkflowMetadata, getWritable } from "workflow"; import type { UIMessage, UIMessageChunk } from "ai"; import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; +import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; import type { DurableAgentContext } from "@/lib/agent/tools/AgentContext"; export type RunAgentWorkflowInput = { @@ -35,19 +36,32 @@ export type RunAgentWorkflowInput = { export async function runAgentWorkflow(input: RunAgentWorkflowInput): Promise { "use workflow"; + const { workflowRunId } = getWorkflowMetadata(); + console.log("[runAgentWorkflow] start", { chatId: input.chatId, sessionId: input.sessionId, modelId: input.modelId, + workflowRunId, }); const writable = getWritable(); - const result = await runAgentStep({ - messages: input.messages, - modelId: input.modelId, - writable, - agentContext: input.agentContext, - }); - console.log("[runAgentWorkflow] finish", { finishReason: result.finishReason }); + try { + const result = await runAgentStep({ + messages: input.messages, + modelId: input.modelId, + writable, + agentContext: input.agentContext, + }); + console.log("[runAgentWorkflow] finish", { finishReason: result.finishReason }); + } finally { + // Clear `chats.active_stream_id` (CAS-gated on this run's id) so the + // client's "is this chat still streaming?" probe flips back to false + // and the AI SDK can release `chat.status` from `submitted`. Runs + // inside the workflow body (vs. an after() callback in the request + // handler) so it fires immediately on the same workflow tick — no + // polling lag. Mirrors open-agents' chat workflow. + await clearChatActiveStream(input.chatId, workflowRunId); + } } diff --git a/lib/chat/__tests__/clearChatActiveStream.test.ts b/lib/chat/__tests__/clearChatActiveStream.test.ts new file mode 100644 index 000000000..03ad3f4e6 --- /dev/null +++ b/lib/chat/__tests__/clearChatActiveStream.test.ts @@ -0,0 +1,74 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; +import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; + +vi.mock("@/lib/chat/compareAndSetChatActiveStreamId", () => ({ + compareAndSetChatActiveStreamId: vi.fn(), +})); + +beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers({ shouldAdvanceTime: true }); +}); + +const CHAT_ID = "chat-1"; +const RUN_ID = "wrun_test"; + +describe("clearChatActiveStream", () => { + it("CAS-clears active_stream_id back to null on the happy path", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ + ok: true, + claimed: true, + }); + + await clearChatActiveStream(CHAT_ID, RUN_ID); + + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(1); + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledWith(CHAT_ID, RUN_ID, null); + }); + + it("returns without throwing when the race is lost (a newer run owns the slot)", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ + ok: true, + claimed: false, + }); + + await expect(clearChatActiveStream(CHAT_ID, RUN_ID)).resolves.toBeUndefined(); + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(1); + }); + + it("retries up to 3 times on transient DB errors and stops once a CAS succeeds", async () => { + vi.mocked(compareAndSetChatActiveStreamId) + .mockResolvedValueOnce({ ok: false, error: "transient 1" }) + .mockResolvedValueOnce({ ok: true, claimed: true }); + + await clearChatActiveStream(CHAT_ID, RUN_ID); + + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(2); + }); + + it("gives up after 3 failed CAS attempts and logs (does not throw)", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ + ok: false, + error: "persistent", + }); + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + await expect(clearChatActiveStream(CHAT_ID, RUN_ID)).resolves.toBeUndefined(); + + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(3); + expect(consoleSpy).toHaveBeenCalled(); + consoleSpy.mockRestore(); + }); + + it("retries on thrown exceptions and gives up after 3 attempts", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockRejectedValue(new Error("boom")); + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + await expect(clearChatActiveStream(CHAT_ID, RUN_ID)).resolves.toBeUndefined(); + + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(3); + expect(consoleSpy).toHaveBeenCalled(); + consoleSpy.mockRestore(); + }); +}); diff --git a/lib/chat/__tests__/clearChatActiveStreamWhenWorkflowFinishes.test.ts b/lib/chat/__tests__/clearChatActiveStreamWhenWorkflowFinishes.test.ts deleted file mode 100644 index 4ec16d526..000000000 --- a/lib/chat/__tests__/clearChatActiveStreamWhenWorkflowFinishes.test.ts +++ /dev/null @@ -1,89 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { clearChatActiveStreamWhenWorkflowFinishes } from "@/lib/chat/clearChatActiveStreamWhenWorkflowFinishes"; -import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; - -vi.mock("@/lib/chat/compareAndSetChatActiveStreamId", () => ({ - compareAndSetChatActiveStreamId: vi.fn(), -})); - -beforeEach(() => vi.clearAllMocks()); - -function buildRunStub({ runId, returnValue }: { runId: string; returnValue: Promise }) { - return { runId, returnValue } as unknown as { - runId: string; - returnValue: Promise; - }; -} - -describe("clearChatActiveStreamWhenWorkflowFinishes", () => { - it("CAS-clears active_stream_id back to null when the workflow resolves", async () => { - vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ - ok: true, - claimed: true, - }); - const run = buildRunStub({ - runId: "wrun_ok", - returnValue: Promise.resolve(undefined), - }); - - await clearChatActiveStreamWhenWorkflowFinishes({ chatId: "chat-1", run }); - - expect(compareAndSetChatActiveStreamId).toHaveBeenCalledWith("chat-1", "wrun_ok", null); - }); - - it("still clears when the workflow rejects (cancelled / failed run)", async () => { - vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ - ok: true, - claimed: true, - }); - const run = buildRunStub({ - runId: "wrun_fail", - returnValue: Promise.reject(new Error("boom")), - }); - - await clearChatActiveStreamWhenWorkflowFinishes({ chatId: "chat-1", run }); - - expect(compareAndSetChatActiveStreamId).toHaveBeenCalledWith("chat-1", "wrun_fail", null); - }); - - it("waits for the workflow to finish before issuing the CAS", async () => { - vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ - ok: true, - claimed: true, - }); - let resolveRun: (v: unknown) => void = () => undefined; - const returnValue = new Promise(res => { - resolveRun = res; - }); - const run = buildRunStub({ runId: "wrun_slow", returnValue }); - - const promise = clearChatActiveStreamWhenWorkflowFinishes({ - chatId: "chat-1", - run, - }); - - // Give the microtask queue a chance — CAS should NOT have fired yet. - await Promise.resolve(); - await Promise.resolve(); - expect(compareAndSetChatActiveStreamId).not.toHaveBeenCalled(); - - resolveRun(undefined); - await promise; - expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(1); - }); - - it("does not throw when the CAS fails (race lost or DB error)", async () => { - vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ - ok: true, - claimed: false, - }); - const run = buildRunStub({ - runId: "wrun_lostrace", - returnValue: Promise.resolve(undefined), - }); - - await expect( - clearChatActiveStreamWhenWorkflowFinishes({ chatId: "chat-1", run }), - ).resolves.toBeUndefined(); - }); -}); diff --git a/lib/chat/__tests__/handleChatWorkflowStream.test.ts b/lib/chat/__tests__/handleChatWorkflowStream.test.ts index 1a92386e3..1af062c8b 100644 --- a/lib/chat/__tests__/handleChatWorkflowStream.test.ts +++ b/lib/chat/__tests__/handleChatWorkflowStream.test.ts @@ -39,22 +39,6 @@ vi.mock("@/lib/networking/getCorsHeaders", () => ({ })); vi.mock("@/lib/uuid/generateUUID", () => ({ default: vi.fn(() => "deterministic-uuid") })); -// `after()` only runs inside a real Next.js request scope, but these tests -// drive the handler directly with a mock NextRequest — no async-storage -// context is set up. Stub it as a noop so the handler's post-stream -// scheduling doesn't throw the "outside a request scope" error. The -// scheduled cleanup itself is covered by -// clearChatActiveStreamWhenWorkflowFinishes.test.ts. -vi.mock("next/server", async () => { - const actual = - await vi.importActual("next/server"); - return { ...actual, after: vi.fn() }; -}); - -vi.mock("@/lib/chat/clearChatActiveStreamWhenWorkflowFinishes", () => ({ - clearChatActiveStreamWhenWorkflowFinishes: vi.fn(), -})); - // Stub sandbox connection + skill discovery so handler tests don't actually // try to talk to Vercel Sandbox / parse SKILL.md files. The handler treats // discovery failures as non-fatal (empty catalog), but we mock to keep tests fast. diff --git a/lib/chat/clearChatActiveStream.ts b/lib/chat/clearChatActiveStream.ts new file mode 100644 index 000000000..e7286d715 --- /dev/null +++ b/lib/chat/clearChatActiveStream.ts @@ -0,0 +1,65 @@ +import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; + +const MAX_ATTEMPTS = 3; +const RETRY_DELAY_MS = 500; + +function delay(ms: number): Promise { + return new Promise(resolve => { + setTimeout(resolve, ms); + }); +} + +/** + * Vercel Workflow `"use step"` that CAS-clears `chats.active_stream_id` + * back to null **only if** it still holds this workflow run's id. + * + * Designed to be called from the end of `runAgentWorkflow`'s body so it + * fires the moment the durable run finishes — no `after()` / polling + * lag. Mirrors open-agents' `clearActiveStream` step in + * `app/workflows/chat-post-finish.ts`. + * + * Why CAS instead of unconditional UPDATE: if a newer run has already + * claimed the slot (e.g. the user submitted a follow-up while this + * run was draining cleanup), the newer run's id is preserved. + * + * Retries up to 3 times with a short delay so a transient Supabase + * failure here doesn't leave the chat permanently stuck as + * "isStreaming: true". Final-attempt failures are logged but never + * thrown — the workflow has already done its real work; we don't want + * a cleanup hiccup to mark the run as failed. + * + * @param chatId - Target chat row. + * @param workflowRunId - The current run's id (from + * `getWorkflowMetadata().workflowRunId`). + */ +export async function clearChatActiveStream(chatId: string, workflowRunId: string): Promise { + "use step"; + + for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) { + try { + const result = await compareAndSetChatActiveStreamId(chatId, workflowRunId, null); + if (result.ok === false) { + if (attempt === MAX_ATTEMPTS) { + console.error( + `[clearChatActiveStream] CAS error chatId=${chatId} runId=${workflowRunId}: ${result.error}`, + ); + return; + } + await delay(RETRY_DELAY_MS); + continue; + } + // result.ok === true. result.claimed === false means the race was lost + // (a newer run owns the slot) — nothing to do, just return. + return; + } catch (error) { + if (attempt === MAX_ATTEMPTS) { + console.error( + `[clearChatActiveStream] unhandled error chatId=${chatId} runId=${workflowRunId}:`, + error, + ); + return; + } + await delay(RETRY_DELAY_MS); + } + } +} diff --git a/lib/chat/clearChatActiveStreamWhenWorkflowFinishes.ts b/lib/chat/clearChatActiveStreamWhenWorkflowFinishes.ts deleted file mode 100644 index 75cf10fbc..000000000 --- a/lib/chat/clearChatActiveStreamWhenWorkflowFinishes.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; - -type RunLike = { - runId: string; - returnValue: Promise; -}; - -/** - * Wait for a `start(workflow, ...)` run to finish, then CAS-clear - * `chats.active_stream_id` back to null **only if** it still holds - * this run's id. - * - * Why: the chat UI uses `chats.active_stream_id !== null` as a proxy - * for "is this chat still streaming" (via - * `GET /api/sessions/{sessionId}/chats`). Without this cleanup, the - * field stays set after the workflow finishes, the recovery probe - * keeps reporting `isStreaming: true`, and the client's - * auto-resume keeps the AI SDK `chat.status` stuck in `submitted` / - * `streaming` — so the send button never returns and follow-up - * messages are blocked. - * - * Designed to be scheduled with `after(...)` from the handler so it - * runs past the response return without blocking the stream. The - * CAS guard (`expected = run.runId`) makes it safe to fire even if - * the user already kicked off another run for the same chat in the - * meantime — that newer run owns the slot and shouldn't be wiped. - * - * Workflow failures, cancellations, and successful completions all - * trigger the same clear. The `returnValue` promise rejects on - * cancel / failure, so we swallow the rejection and proceed to the - * CAS — same behavior as success. - */ -export async function clearChatActiveStreamWhenWorkflowFinishes(params: { - chatId: string; - run: RunLike; -}): Promise { - const { chatId, run } = params; - - await run.returnValue.catch(() => undefined); - - const cleared = await compareAndSetChatActiveStreamId(chatId, run.runId, null); - if (cleared.ok === false) { - console.error( - `[clearChatActiveStreamWhenWorkflowFinishes] CAS error chatId=${chatId} runId=${run.runId}: ${cleared.error}`, - ); - return; - } - // cleared.claimed === false means the race was lost — another run owns - // the slot now. Nothing to do. -} diff --git a/lib/chat/handleChatWorkflowStream.ts b/lib/chat/handleChatWorkflowStream.ts index b8b538246..20f048a5c 100644 --- a/lib/chat/handleChatWorkflowStream.ts +++ b/lib/chat/handleChatWorkflowStream.ts @@ -1,7 +1,6 @@ -import { NextRequest, NextResponse, after } from "next/server"; +import { NextRequest, NextResponse } from "next/server"; import { createUIMessageStreamResponse, type UIMessageChunk } from "ai"; import { start, getRun } from "workflow/api"; -import { clearChatActiveStreamWhenWorkflowFinishes } from "@/lib/chat/clearChatActiveStreamWhenWorkflowFinishes"; import { validateChatWorkflow } from "@/lib/chat/validateChatWorkflow"; import { maybeResumeChatStream } from "@/lib/chat/maybeResumeChatStream"; import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; @@ -156,13 +155,6 @@ export async function handleChatWorkflowStream(request: NextRequest): Promise clearChatActiveStreamWhenWorkflowFinishes({ chatId: validated.chatId, run })); - return createUIMessageStreamResponse({ stream: run.getReadable(), headers: { ...getCorsHeaders(), "x-workflow-run-id": run.runId }, From e1be18ab4425ac89416583392f14504aef460f33 Mon Sep 17 00:00:00 2001 From: Sweets Sweetman Date: Mon, 25 May 2026 11:17:18 -0500 Subject: [PATCH 4/5] chore(chat): match open-agents' 50ms retry delay for clearChatActiveStream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-review nit: picked 500ms originally without checking the source. open-agents uses ACTIVE_STREAM_CLEAR_RETRY_DELAY_MS = 50, so a transient Supabase blip costs at most ~150ms across 3 attempts instead of ~1.5s — keeps the cleanup tail well under the human- perceptible threshold. Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/chat/clearChatActiveStream.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/chat/clearChatActiveStream.ts b/lib/chat/clearChatActiveStream.ts index e7286d715..6d1acdf29 100644 --- a/lib/chat/clearChatActiveStream.ts +++ b/lib/chat/clearChatActiveStream.ts @@ -1,7 +1,11 @@ import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; const MAX_ATTEMPTS = 3; -const RETRY_DELAY_MS = 500; +// 50ms picked to match open-agents' `ACTIVE_STREAM_CLEAR_RETRY_DELAY_MS` +// — a transient Supabase blip costs at most ~150ms total before the +// third attempt, keeping the cleanup tail well under the human- +// perceptible ~250ms threshold. +const RETRY_DELAY_MS = 50; function delay(ms: number): Promise { return new Promise(resolve => { From cd50c3e509d9b06084dcac11bd034553472fe7ed Mon Sep 17 00:00:00 2001 From: Sweets Sweetman Date: Mon, 25 May 2026 11:22:28 -0500 Subject: [PATCH 5/5] refactor(time): extract reusable delay() helper for clearChatActiveStream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-review feedback: the inline `delay` helper was duplicating logic that should live in a shared module. Audit: - No pre-existing `delay` in api (only ad-hoc inline `new Promise(r => setTimeout(r, ms))` in slack helpers and one chat-step retry — left untouched for scope). - `workflow.sleep()` exists but creates durable timer events in the workflow event log — it's the right tool for `"use workflow"` bodies, not for retry-loops inside `"use step"` functions that run as regular non-replayable code. - → Extract to `lib/time/delay.ts` alongside the existing time utilities (toDateValue, getLocalTime). Future DRY follow-ups can fold the slack inline-setTimeouts into this helper too. Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/chat/clearChatActiveStream.ts | 7 +----- lib/time/__tests__/delay.test.ts | 40 +++++++++++++++++++++++++++++++ lib/time/delay.ts | 18 ++++++++++++++ 3 files changed, 59 insertions(+), 6 deletions(-) create mode 100644 lib/time/__tests__/delay.test.ts create mode 100644 lib/time/delay.ts diff --git a/lib/chat/clearChatActiveStream.ts b/lib/chat/clearChatActiveStream.ts index 6d1acdf29..d45fd1fe5 100644 --- a/lib/chat/clearChatActiveStream.ts +++ b/lib/chat/clearChatActiveStream.ts @@ -1,4 +1,5 @@ import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; +import { delay } from "@/lib/time/delay"; const MAX_ATTEMPTS = 3; // 50ms picked to match open-agents' `ACTIVE_STREAM_CLEAR_RETRY_DELAY_MS` @@ -7,12 +8,6 @@ const MAX_ATTEMPTS = 3; // perceptible ~250ms threshold. const RETRY_DELAY_MS = 50; -function delay(ms: number): Promise { - return new Promise(resolve => { - setTimeout(resolve, ms); - }); -} - /** * Vercel Workflow `"use step"` that CAS-clears `chats.active_stream_id` * back to null **only if** it still holds this workflow run's id. diff --git a/lib/time/__tests__/delay.test.ts b/lib/time/__tests__/delay.test.ts new file mode 100644 index 000000000..a4682b73e --- /dev/null +++ b/lib/time/__tests__/delay.test.ts @@ -0,0 +1,40 @@ +import { describe, it, expect, vi } from "vitest"; +import { delay } from "@/lib/time/delay"; + +describe("delay", () => { + it("resolves after the specified delay (within tolerance)", async () => { + const start = Date.now(); + await delay(50); + const elapsed = Date.now() - start; + // setTimeout fires "no earlier than" the requested duration; allow generous + // headroom for CI jitter rather than asserting an exact value. + expect(elapsed).toBeGreaterThanOrEqual(45); + expect(elapsed).toBeLessThan(500); + }); + + it("resolves on the next tick when given 0", async () => { + let resolved = false; + const promise = delay(0).then(() => { + resolved = true; + }); + // Synchronously, the timer hasn't fired yet. + expect(resolved).toBe(false); + await promise; + expect(resolved).toBe(true); + }); + + it("uses fake timers correctly (callers can drive it deterministically)", async () => { + vi.useFakeTimers(); + let resolved = false; + const promise = delay(100).then(() => { + resolved = true; + }); + expect(resolved).toBe(false); + await vi.advanceTimersByTimeAsync(99); + expect(resolved).toBe(false); + await vi.advanceTimersByTimeAsync(1); + await promise; + expect(resolved).toBe(true); + vi.useRealTimers(); + }); +}); diff --git a/lib/time/delay.ts b/lib/time/delay.ts new file mode 100644 index 000000000..bd26c20f5 --- /dev/null +++ b/lib/time/delay.ts @@ -0,0 +1,18 @@ +/** + * Promise wrapper around `setTimeout` for use inside retry loops and + * other non-workflow-body waits. + * + * NOT a substitute for `workflow.sleep()` — that one creates durable + * timer events in the workflow event log and is the correct tool + * inside `"use workflow"` bodies. `delay()` is for ordinary async + * code (including `"use step"` functions, which run as regular + * non-replayable code). + * + * @param ms - Duration in milliseconds. Negative or 0 resolves on the + * next microtask tick (same behavior as `setTimeout(_, 0)`). + */ +export function delay(ms: number): Promise { + return new Promise(resolve => { + setTimeout(resolve, ms); + }); +}