Skip to content

[Gastown] Agent Streaming Endpoint — WebSocket-based Live Agent Observation#348

Merged
jrf0110 merged 8 commits into204-gt-prop-dfrom
343-agent-streaming
Feb 19, 2026
Merged

[Gastown] Agent Streaming Endpoint — WebSocket-based Live Agent Observation#348
jrf0110 merged 8 commits into204-gt-prop-dfrom
343-agent-streaming

Conversation

@jrf0110
Copy link
Copy Markdown
Contributor

@jrf0110 jrf0110 commented Feb 19, 2026

Summary

Closes #343

Implements the missing streaming endpoint that lets the dashboard watch agents work in real time. Uses WebSockets (preferred over SSE on Cloudflare Workers/DOs) with event persistence in Rig DO SQLite for late-joining clients.

Architecture

Browser (WebSocket) → Worker route → Container DO (WS proxy) → Bun control server (WS)
                                                                     ↑
                                                          process-manager event fan-out
                                                                     ↑
                                                          kilo serve SSE consumer

Changes

Container (runs inside Cloudflare Container)

  • control-server.tsGET /agents/:agentId/stream?ticket=<uuid> WebSocket upgrade endpoint. Validates one-time-use stream tickets, subscribes to the agent's event fan-out, sends events as JSON frames, 30s keepalive interval.
  • process-manager.tssubscribeToAgent() / emitAgentEvent() fan-out mechanism. External listeners (WebSocket clients) receive the same filtered events the internal SSE consumer sees. Synthetic agent.exited events on completion, failure, or stop. Cleanup on agent stop.

Cloudflare Worker

  • gastown.worker.ts — New GET /api/towns/:townId/container/agents/:agentId/stream route + agent event CRUD routes.
  • town-container.handler.tshandleContainerAgentStream proxies the WebSocket upgrade through the Container DO's fetch() method. Stream ticket response now returns ws:// / wss:// URLs.
  • rig-agent-events.handler.ts (new) — POST /api/rigs/:rigId/agent-events and GET /api/rigs/:rigId/agents/:agentId/events for event persistence.

Rig DO (event persistence)

  • agent-events.table.ts (new) — agent_events SQLite table with autoincrement id for ordered replay, indexed on (agent_id, id).
  • Rig.do.tsappendAgentEvent() with 2000-event-per-agent cap and auto-pruning. getAgentEvents() with after_id cursor for catch-up queries.

Frontend

  • AgentStream.tsx — Rewritten from EventSource to WebSocket. Reconnect with exponential backoff (refetches one-time-use ticket per attempt). Structured event display with type badges and data formatting. Clean termination on agent.exited events.

Acceptance Criteria

  • GET /agents/:agentId/stream?ticket=<uuid> endpoint on control server
  • Ticket validation and one-time consumption
  • WebSocket stream of agent events (tool calls, messages, completions, errors)
  • Periodic keepalive to prevent timeout
  • Clean disconnect handling (client closes, agent exits)
  • AgentStream.tsx component works end-to-end (connect → see live events)
  • Multiple concurrent viewers for the same agent work correctly (fan-out)
  • Event persistence in DO SQLite for late-joining clients

…vation (#343)

Add WebSocket streaming infrastructure so the dashboard can watch agents
work in real time. Uses Bun.serve WebSocket upgrade in the container,
proxied through the Container DO to the Cloudflare Worker.

Container (control-server.ts):
- GET /agents/:agentId/stream?ticket=<uuid> — WS upgrade endpoint
- Ticket validation and one-time consumption
- Bun WebSocket handlers with event fan-out from process-manager
- 30s keepalive interval

Process manager (process-manager.ts):
- subscribeToAgent/emitAgentEvent fan-out for external WS listeners
- Synthetic agent.exited events on completion, failure, or stop
- Listener cleanup on agent stop

Worker (gastown.worker.ts + town-container.handler.ts):
- GET /api/towns/:townId/container/agents/:agentId/stream route
- handleContainerAgentStream proxies WS upgrade through Container DO
- Stream ticket now returns ws:// / wss:// URL

Event persistence (Rig.do.ts + agent-events.table.ts):
- agent_events SQLite table with autoincrement id for ordered replay
- appendAgentEvent with 2000-event-per-agent cap and auto-pruning
- getAgentEvents with after_id cursor for catch-up queries
- HTTP routes: POST /api/rigs/:rigId/agent-events, GET .../events

Frontend (AgentStream.tsx):
- Rewritten from EventSource to WebSocket
- Reconnect with exponential backoff (refetches ticket per attempt)
- Structured event display with type badges and data formatting
- agent.exited detection for clean stream termination
@kilo-code-bot
Copy link
Copy Markdown
Contributor

kilo-code-bot bot commented Feb 19, 2026

Code Review Summary

Status: 11 Issues Found | Recommendation: Address before merge

Fix these issues in Kilo Cloud

Overview

Severity Count
CRITICAL 1
WARNING 8
SUGGESTION 2
Issue Details (click to expand)

CRITICAL

File Line Issue
cloudflare-gastown/src/dos/TownContainer.do.ts Stream ticket is never actually validated — handleStreamWebSocket receives the ticket param but never calls consumeStreamTicket() (which is exported from control-server.ts but never imported). Any string passes the null check, making the ticket system security theater.

WARNING

File Line Issue
cloudflare-gastown/src/handlers/rig-agent-events.handler.ts 25 Missing agent_id authorization check — getEnforcedAgentId(c) returns null when no agent JWT is present, so the if (enforced && ...) guard is skipped, allowing any CF Access user to append events for arbitrary agents.
cloudflare-gastown/src/gastown.worker.ts 265 WebSocket upgrade bypasses Cloudflare Access middleware — the fetch() export intercepts WS upgrades before Hono (and its CF Access middleware) runs, so unauthenticated clients can open WebSocket connections.
cloudflare-gastown/container/src/process-manager.ts 260 agentEventBuffers entry is never cleaned up when an agent exits normally without scheduleEventBufferCleanup being called in all exit paths.
cloudflare-gastown/container/src/control-server.ts Keepalive publish has no subscribers — keepalive messages are sent but no WebSocket endpoint exists in the container to receive them.
src/components/gastown/AgentStream.tsx Reconnect race condition — potential duplicate events if the effect re-runs while a previous WebSocket is still closing.
src/components/gastown/MayorChat.tsx 78 showStream never resets to true when a new agent session starts with a different agentId (though the code on line 93 does attempt this, the latch pattern may not cover all edge cases).
cloudflare-gastown/src/dos/TownContainer.do.ts Backfill/poll race — duplicate events can be sent to the client because backfillEvents and pollEvents run concurrently without coordination.
cloudflare-gastown/src/gastown.worker.ts GET /api/rigs/:rigId/agents/:agentId/events is placed before agentOnlyMiddleware, so any agent JWT for the rig can read any other agent's events.

SUGGESTION

File Line Issue
cloudflare-gastown/src/db/tables/agent-events.table.ts as Record<string, unknown> violates the project's coding style rule to avoid TypeScript's as operator.
cloudflare-gastown/src/handlers/rig-agent-events.handler.ts Number(limit) can produce NaN for non-numeric query strings (though z.coerce.number() now handles this).
Files Reviewed (11 files)
  • cloudflare-gastown/container/src/control-server.ts — 1 issue
  • cloudflare-gastown/container/src/process-manager.ts — 1 issue
  • cloudflare-gastown/src/db/tables/agent-events.table.ts — 1 issue
  • cloudflare-gastown/src/dos/Rig.do.ts — 0 issues
  • cloudflare-gastown/src/dos/TownContainer.do.ts — 2 issues
  • cloudflare-gastown/src/gastown.worker.ts — 2 issues
  • cloudflare-gastown/src/handlers/rig-agent-events.handler.ts — 2 issues
  • cloudflare-gastown/src/handlers/town-container.handler.ts — 0 issues
  • src/components/gastown/AgentStream.tsx — 1 issue
  • src/components/gastown/MayorChat.tsx — 1 issue
  • src/routers/gastown-router.ts — 0 issues

…HTTP polling

The previous approach tried to proxy WebSocket upgrade requests through
the Container DO to the Bun control server inside the container. This
doesn't work because:

1. Hono wraps responses, breaking the 101 WebSocket upgrade relay
2. Bun's server.upgrade() is container-internal; the 101 response
   can't travel back through the CF Workers runtime
3. The frontend had a reconnect loop: onclose → refetch ticket →
   ticket changes → effect re-runs → new WS → fails → repeat

New architecture:
  Browser (WS) → Worker → TownContainerDO (WebSocketPair) ←poll→ Container HTTP

Container side:
- Removed: WS upgrade endpoint, Bun websocket handlers, subscribeToAgent
- Added: GET /agents/:agentId/events?after=N HTTP endpoint serving a
  ring buffer of 500 recent events per agent (buffered by process-manager)
- Simplified: control-server.ts back to Bun.serve + Hono (no WS)

TownContainerDO:
- Override fetch() to intercept WS upgrade requests for /agents/:id/stream
- Create WebSocketPair, return client end to browser (standard CF pattern)
- Poll container's /events endpoint every 500ms per active agent
- Relay new events to all connected WS clients with cursor tracking
- Clean up polling when all WS sessions disconnect

Worker:
- Bypass Hono for WS upgrade requests matching the stream path pattern
- Route directly to TownContainerDO.fetch() and return its Response
  (the 101 + webSocket pair) so the runtime handles the upgrade

Frontend (AgentStream.tsx):
- Removed the reconnect loop entirely — tickets are one-time-use,
  and the effect now runs exactly once per ticket fetch
- Simplified status display, no more cascading refetch cycles
…aming

Three fixes for agent streaming reliability:

1. Historical backfill on WebSocket connect: When a client connects,
   TownContainerDO immediately fetches all buffered events from the
   container (after=0) and sends them to the client before starting
   the live poll. This means even if the client connects after the
   agent finishes, they see everything that happened. The session
   cursor is advanced past the backfill to avoid duplicates.

2. Remove 404 race condition: The container's stream-ticket and events
   endpoints no longer require the agent to be registered. Tickets
   can be issued optimistically before the agent starts, and the
   events endpoint returns an empty array for unknown agents instead
   of 404. The DO poller handles the wait gracefully.

3. Wire mayor streaming: MayorChat now renders an AgentStream
   component when the mayor session is active/starting, using the
   session's agentId. Previously, the mayor's agentId was available
   via getMayorStatus but never connected to the streaming UI.
Three changes to diagnose and fix streaming issues:

1. Add console.log throughout TownContainerDO for production
   tracing: WebSocket accept, backfill fetch, poll relay, errors.
   This will show up in Cloudflare dashboard logs and help
   identify where the pipeline breaks.

2. Latch mayor agentId in MayorChat: previously, when the status
   poll returned 'idle' (agent finished), mayorAgentId became null
   and AgentStream unmounted — the user saw it 'pop up for a few
   seconds and disappear.' Now we latch the agentId once seen,
   keeping the stream open until the user explicitly closes it
   or a new session starts.

3. Don't poll immediately after ensurePolling — let the backfill
   complete first to avoid racing between backfill and poll.
Two likely root causes for WebSocket messages never arriving:

1. The worker was constructing a new Request object to pass to the DO:
     new Request('http://container/agents/.../stream', { headers })
   This loses internal CF runtime properties needed for WebSocket
   upgrade handling. The Cloudflare docs pattern is:
     return stub.fetch(request)
   with the original request. Now we pass the original browser
   request directly to the DO.

2. The DO's fetch override had a regex anchored to start:
     /^\/agents\/([^/]+)\/stream$/
   But the original request has the full worker path:
     /api/towns/:townId/container/agents/:agentId/stream
   Changed to:
     /\/agents\/([^/]+)\/stream$/
   which matches the agentId regardless of path prefix.

Also added logging to both worker and DO fetch handlers to trace
the exact flow in wrangler dev.
Root cause: the stream-ticket handler derived the WebSocket URL from
c.req.url, but the request comes from the Next.js server (not the
browser), so the host resolved to the gastown worker's internal
address (gastown.kiloapps.io in production, wrong in local dev).
The browser connected to the wrong host and the WS upgrade never
reached the worker.

Fix: the gastown worker now returns just the stream path from the
stream-ticket endpoint. The tRPC router on the Next.js server
constructs the full ws:// URL using GASTOWN_SERVICE_URL, which
resolves correctly in both local dev (localhost:8787) and
production (gastown.kiloapps.io).

Also cleaned up verbose debug logging in TownContainerDO — kept
connect/disconnect/error logs, removed per-poll and per-backfill
step noise.
…acement

- Add agent_id auth check to handleAppendAgentEvent using getEnforcedAgentId
- Replace `as Record<string, unknown>` with .pipe(z.record()) in agent-events table
- Validate query params with z.coerce.number() instead of raw Number()
- Move GET /agents/:agentId/events before agentOnlyMiddleware so
  dashboard clients can query without an agent JWT
- Schedule event buffer cleanup after agent exit (5min TTL)
- Fix showStream reset: re-show when a new session starts (agentId changes)
@jrf0110 jrf0110 merged commit e75bd31 into 204-gt-prop-d Feb 19, 2026
1 check passed
@jrf0110 jrf0110 deleted the 343-agent-streaming branch February 19, 2026 05:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant