diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index d8a3058b8..fe42138a1 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -8,14 +8,12 @@ import { activeServerCount, getUptime, stopAll, + getAgentEvents, } from './process-manager'; import { startHeartbeat, stopHeartbeat } from './heartbeat'; import { StartAgentRequest, StopAgentRequest, SendMessageRequest } from './types'; import type { AgentStatusResponse, HealthResponse, StreamTicketResponse } from './types'; -// TODO: Add a WebSocket consumer endpoint (GET /agents/:agentId/stream) that -// validates and consumes tickets. Until then, tickets are only used as a -// placeholder for the upcoming streaming implementation. const MAX_TICKETS = 1000; const streamTickets = new Map(); @@ -96,12 +94,25 @@ app.get('/agents/:agentId/status', c => { return c.json(response); }); +// GET /agents/:agentId/events?after=N +// Returns buffered events for the agent, optionally after a given event id. +// Used by the TownContainerDO to poll for events and relay them to WebSocket clients. +// Does NOT 404 for unknown agents — returns an empty array so the poller +// can keep trying while the agent is starting up. +app.get('/agents/:agentId/events', c => { + const { agentId } = c.req.param(); + const afterParam = c.req.query('after'); + const afterId = afterParam ? parseInt(afterParam, 10) : 0; + const events = getAgentEvents(agentId, afterId); + return c.json({ events }); +}); + // POST /agents/:agentId/stream-ticket +// Issues a one-time-use stream ticket for the agent. Does NOT require +// the agent to be registered yet — tickets can be issued optimistically +// so the frontend can connect a WebSocket before the agent finishes starting. app.post('/agents/:agentId/stream-ticket', c => { const { agentId } = c.req.param(); - if (!getAgentStatus(agentId)) { - return c.json({ error: `Agent ${agentId} not found` }, 404); - } const ticket = crypto.randomUUID(); const expiresAt = Date.now() + 60_000; @@ -123,6 +134,18 @@ app.post('/agents/:agentId/stream-ticket', c => { return c.json(response); }); +/** + * Validate a stream ticket and return the associated agentId, consuming it. + * Returns null if the ticket is invalid or expired. + */ +export function consumeStreamTicket(ticket: string): string | null { + const entry = streamTickets.get(ticket); + if (!entry) return null; + streamTickets.delete(ticket); + if (entry.expiresAt < Date.now()) return null; + return entry.agentId; +} + // Catch-all app.notFound(c => c.json({ error: 'Not found' }, 404)); diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index 35450f561..6804127f1 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -6,7 +6,7 @@ * via HTTP, not stdin. */ -import type { ManagedAgent, StartAgentRequest, KiloSSEEventData } from './types'; +import type { ManagedAgent, StartAgentRequest, KiloSSEEventData, KiloSSEEvent } from './types'; import { ensureServer, registerSession, @@ -21,6 +21,58 @@ import { reportAgentCompleted } from './completion-reporter'; const agents = new Map(); const sseConsumers = new Map(); +// ── Event buffer for HTTP polling ───────────────────────────────────────── +// Each agent keeps a ring buffer of recent events. The DO polls +// GET /agents/:agentId/events?after=N to retrieve them. + +type BufferedEvent = { + id: number; + event: string; + data: KiloSSEEventData; + timestamp: string; +}; + +const MAX_BUFFERED_EVENTS = 500; +const agentEventBuffers = new Map(); +let nextEventId = 1; + +function bufferAgentEvent(agentId: string, event: KiloSSEEvent): void { + let buf = agentEventBuffers.get(agentId); + if (!buf) { + buf = []; + agentEventBuffers.set(agentId, buf); + } + buf.push({ + id: nextEventId++, + event: event.event, + data: event.data, + timestamp: new Date().toISOString(), + }); + // Trim to cap + if (buf.length > MAX_BUFFERED_EVENTS) { + buf.splice(0, buf.length - MAX_BUFFERED_EVENTS); + } +} + +/** + * Get buffered events for an agent, optionally after a given event id. + * Returns events ordered by id ascending. + */ +export function getAgentEvents(agentId: string, afterId = 0): BufferedEvent[] { + const buf = agentEventBuffers.get(agentId); + if (!buf) return []; + return buf.filter(e => e.id > afterId); +} + +// Clean up stale event buffers after the DO has had time to poll final events. +const EVENT_BUFFER_TTL_MS = 5 * 60 * 1000; // 5 minutes + +function scheduleEventBufferCleanup(agentId: string): void { + setTimeout(() => { + agentEventBuffers.delete(agentId); + }, EVENT_BUFFER_TTL_MS); +} + const startTime = Date.now(); export function getUptime(): number { @@ -93,10 +145,18 @@ export async function startAgent( } } + // Buffer for HTTP polling by the DO + bufferAgentEvent(request.agentId, evt); + // Detect completion if (isCompletionEvent(evt)) { agent.status = 'exited'; agent.exitReason = 'completed'; + bufferAgentEvent(request.agentId, { + event: 'agent.exited', + data: { type: 'agent.exited', properties: { reason: 'completed' } }, + }); + scheduleEventBufferCleanup(request.agentId); void reportAgentCompleted(agent, 'completed'); } }, @@ -107,6 +167,11 @@ export async function startAgent( if (agent.status === 'running') { agent.status = 'failed'; agent.exitReason = `SSE stream closed: ${reason}`; + bufferAgentEvent(request.agentId, { + event: 'agent.exited', + data: { type: 'agent.exited', properties: { reason: `stream closed: ${reason}` } }, + }); + scheduleEventBufferCleanup(request.agentId); void reportAgentCompleted(agent, 'failed', reason); } }, @@ -187,6 +252,13 @@ export async function stopAgent(agentId: string): Promise { agent.status = 'exited'; agent.exitReason = 'stopped'; + + // Buffer exit event for polling + bufferAgentEvent(agentId, { + event: 'agent.exited', + data: { type: 'agent.exited', properties: { reason: 'stopped' } }, + }); + scheduleEventBufferCleanup(agentId); } /** diff --git a/cloudflare-gastown/src/db/tables/agent-events.table.ts b/cloudflare-gastown/src/db/tables/agent-events.table.ts new file mode 100644 index 000000000..944673aeb --- /dev/null +++ b/cloudflare-gastown/src/db/tables/agent-events.table.ts @@ -0,0 +1,34 @@ +import { z } from 'zod'; +import { getTableFromZodSchema, getCreateTableQueryFromTable } from '../../util/table'; + +export const AgentEventRecord = z.object({ + id: z.number(), + agent_id: z.string(), + event_type: z.string(), + data: z + .string() + .transform(v => JSON.parse(v)) + .pipe(z.record(z.string(), z.unknown())), + created_at: z.string(), +}); + +export type AgentEventRecord = z.output; + +export const agentEvents = getTableFromZodSchema('agent_events', AgentEventRecord); + +export function createTableAgentEvents(): string { + return getCreateTableQueryFromTable(agentEvents, { + id: `integer primary key autoincrement`, + agent_id: `text not null`, + event_type: `text not null`, + data: `text not null default '{}'`, + created_at: `text not null`, + }); +} + +export function getIndexesAgentEvents(): string[] { + return [ + `CREATE INDEX IF NOT EXISTS idx_agent_events_agent_id ON ${agentEvents}(${agentEvents.columns.agent_id})`, + `CREATE INDEX IF NOT EXISTS idx_agent_events_agent_created ON ${agentEvents}(${agentEvents.columns.agent_id}, ${agentEvents.columns.id})`, + ]; +} diff --git a/cloudflare-gastown/src/dos/Rig.do.ts b/cloudflare-gastown/src/dos/Rig.do.ts index 915dbcac6..888438886 100644 --- a/cloudflare-gastown/src/dos/Rig.do.ts +++ b/cloudflare-gastown/src/dos/Rig.do.ts @@ -8,6 +8,12 @@ import { ReviewQueueRecord, } from '../db/tables/review-queue.table'; import { createTableMolecules } from '../db/tables/molecules.table'; +import { + createTableAgentEvents, + getIndexesAgentEvents, + agentEvents, + AgentEventRecord, +} from '../db/tables/agent-events.table'; import { getTownContainerStub } from './TownContainer.do'; import { query } from '../util/query.util'; import { signAgentJWT } from '../util/jwt.util'; @@ -104,6 +110,11 @@ export class RigDO extends DurableObject { query(this.sql, createTableReviewQueue(), []); query(this.sql, createTableMolecules(), []); + + query(this.sql, createTableAgentEvents(), []); + for (const idx of getIndexesAgentEvents()) { + query(this.sql, idx, []); + } } // ── Beads ────────────────────────────────────────────────────────────── @@ -451,6 +462,76 @@ export class RigDO extends DurableObject { return this.getBead(agent.current_hook_bead_id); } + // ── Agent Events (append-only log for streaming) ──────────────────────── + + /** Max events kept per agent. Older events are pruned on insert. */ + private static readonly MAX_EVENTS_PER_AGENT = 2000; + + /** + * Append an event to the agent's event log. Used by the container + * completion callback or the streaming proxy to persist events for + * late-joining clients. + */ + async appendAgentEvent(agentId: string, eventType: string, data: unknown): Promise { + await this.ensureInitialized(); + const timestamp = now(); + const dataJson = JSON.stringify(data ?? {}); + + query( + this.sql, + /* sql */ ` + INSERT INTO ${agentEvents} ( + ${agentEvents.columns.agent_id}, + ${agentEvents.columns.event_type}, + ${agentEvents.columns.data}, + ${agentEvents.columns.created_at} + ) VALUES (?, ?, ?, ?)`, + [agentId, eventType, dataJson, timestamp] + ); + + // Prune old events beyond the cap + query( + this.sql, + /* sql */ ` + DELETE FROM ${agentEvents} + WHERE ${agentEvents.agent_id} = ? + AND ${agentEvents.id} NOT IN ( + SELECT ${agentEvents.id} FROM ${agentEvents} + WHERE ${agentEvents.agent_id} = ? + ORDER BY ${agentEvents.id} DESC + LIMIT ? + )`, + [agentId, agentId, RigDO.MAX_EVENTS_PER_AGENT] + ); + } + + /** + * Get agent events, optionally after a given event id (for catch-up). + * Returns events ordered by id ascending. + */ + async getAgentEvents( + agentId: string, + afterId?: number, + limit = 200 + ): Promise { + await this.ensureInitialized(); + + const rows = query( + this.sql, + /* sql */ ` + SELECT ${agentEvents.id}, ${agentEvents.agent_id}, ${agentEvents.event_type}, + ${agentEvents.data}, ${agentEvents.created_at} + FROM ${agentEvents} + WHERE ${agentEvents.agent_id} = ? + AND (? IS NULL OR ${agentEvents.id} > ?) + ORDER BY ${agentEvents.id} ASC + LIMIT ?`, + [agentId, afterId ?? null, afterId ?? null, limit] + ); + + return AgentEventRecord.array().parse(rows); + } + // ── Mail ─────────────────────────────────────────────────────────────── async sendMail(input: SendMailInput): Promise { diff --git a/cloudflare-gastown/src/dos/TownContainer.do.ts b/cloudflare-gastown/src/dos/TownContainer.do.ts index 07a9ed8cf..21ba5f811 100644 --- a/cloudflare-gastown/src/dos/TownContainer.do.ts +++ b/cloudflare-gastown/src/dos/TownContainer.do.ts @@ -1,5 +1,13 @@ import { Container } from '@cloudflare/containers'; +const TC_LOG = '[TownContainer.do]'; + +/** + * Polling interval for relaying container events to WebSocket clients. + * Fast enough for near-real-time UX, slow enough to avoid hammering the container. + */ +const POLL_INTERVAL_MS = 500; + /** * TownContainer — a Cloudflare Container per town. * @@ -10,6 +18,10 @@ import { Container } from '@cloudflare/containers'; * * The DO side (this class) handles container lifecycle; the control * server inside the container handles process management. + * + * For agent streaming, this DO accepts WebSocket connections from the + * browser, polls the container's HTTP events endpoint, and relays + * events to connected clients. */ export class TownContainerDO extends Container { defaultPort = 8080; @@ -21,18 +33,203 @@ export class TownContainerDO extends Container { ? { GASTOWN_API_URL: this.env.GASTOWN_API_URL } : {}; + // Active WebSocket sessions: agentId -> set of { ws, lastEventId } + private wsSessions = new Map>(); + private pollTimer: ReturnType | null = null; + override onStart(): void { - console.log(`Town container started for DO id=${this.ctx.id.toString()}`); + console.log(`${TC_LOG} container started for DO id=${this.ctx.id.toString()}`); } override onStop({ exitCode, reason }: { exitCode: number; reason: string }): void { console.log( - `Town container stopped: exitCode=${exitCode} reason=${reason} id=${this.ctx.id.toString()}` + `${TC_LOG} container stopped: exitCode=${exitCode} reason=${reason} id=${this.ctx.id.toString()}` ); + this.stopPolling(); + for (const sessions of this.wsSessions.values()) { + for (const session of sessions) { + try { + session.ws.close(1001, 'Container stopped'); + } catch { + /* best effort */ + } + } + } + this.wsSessions.clear(); } override onError(error: unknown): void { - console.error('Town container error:', error, `id=${this.ctx.id.toString()}`); + console.error(`${TC_LOG} container error:`, error, `id=${this.ctx.id.toString()}`); + } + + /** + * Override fetch to intercept WebSocket upgrade requests for agent streaming. + * All other requests delegate to the base Container class (which proxies to the container). + */ + override async fetch(request: Request): Promise { + const url = new URL(request.url); + + // Match the agent stream path (works with both full worker path and + // short container-relative path) + const streamMatch = url.pathname.match(/\/agents\/([^/]+)\/stream$/); + + if (streamMatch && request.headers.get('Upgrade')?.toLowerCase() === 'websocket') { + return this.handleStreamWebSocket(streamMatch[1], url.searchParams.get('ticket')); + } + + return super.fetch(request); + } + + /** + * Handle a WebSocket upgrade request for agent streaming. + * Creates a WebSocketPair, starts polling the container for events, + * and relays them to the connected client. + */ + private handleStreamWebSocket(agentId: string, ticket: string | null): Response { + if (!ticket) { + return new Response(JSON.stringify({ error: 'Missing ticket' }), { + status: 400, + headers: { 'Content-Type': 'application/json' }, + }); + } + + const pair = new WebSocketPair(); + const [client, server] = Object.values(pair); + + server.accept(); + console.log(`${TC_LOG} WS connected: agent=${agentId}`); + + // Track this session + let sessions = this.wsSessions.get(agentId); + if (!sessions) { + sessions = new Set(); + this.wsSessions.set(agentId, sessions); + } + const session = { ws: server, lastEventId: 0 }; + sessions.add(session); + + // Start polling if not already running + this.ensurePolling(); + + // Send historical backfill asynchronously + void this.backfillEvents(agentId, server, session); + + // Handle client disconnect + server.addEventListener('close', event => { + console.log(`${TC_LOG} WS closed: agent=${agentId} code=${event.code}`); + sessions.delete(session); + if (sessions.size === 0) { + this.wsSessions.delete(agentId); + } + if (this.wsSessions.size === 0) { + this.stopPolling(); + } + }); + + server.addEventListener('error', event => { + console.error(`${TC_LOG} WS error: agent=${agentId}`, event); + }); + + return new Response(null, { status: 101, webSocket: client }); + } + + /** + * Send a historical backfill of all buffered events to a newly connected + * WebSocket client. Ensures late-joining clients see everything. + */ + private async backfillEvents( + agentId: string, + ws: WebSocket, + session: { ws: WebSocket; lastEventId: number } + ): Promise { + try { + // Send current agent status + const statusRes = await this.containerFetch(`http://container/agents/${agentId}/status`); + if (statusRes.ok) { + const status = (await statusRes.json()) as Record; + ws.send(JSON.stringify({ event: 'agent.status', data: status })); + } + + // Fetch and send all buffered events + const eventsRes = await this.containerFetch( + `http://container/agents/${agentId}/events?after=0` + ); + if (eventsRes.ok) { + const body = (await eventsRes.json()) as { + events: Array<{ id: number; event: string; data: unknown; timestamp: string }>; + }; + if (body.events && body.events.length > 0) { + for (const evt of body.events) { + try { + ws.send(JSON.stringify({ event: evt.event, data: evt.data })); + } catch { + return; // WS closed during backfill + } + } + // Advance cursor past the backfill + session.lastEventId = body.events[body.events.length - 1].id; + } + } + } catch (err) { + console.error(`${TC_LOG} backfill error: agent=${agentId}`, err); + } + } + + private ensurePolling(): void { + if (this.pollTimer) return; + this.pollTimer = setInterval(() => void this.pollEvents(), POLL_INTERVAL_MS); + } + + private stopPolling(): void { + if (this.pollTimer) { + clearInterval(this.pollTimer); + this.pollTimer = null; + } + } + + /** + * Poll the container for new events for each agent with active WS sessions. + * Relays new events to all connected clients. + */ + private async pollEvents(): Promise { + for (const [agentId, sessions] of this.wsSessions) { + if (sessions.size === 0) continue; + + // Find the minimum lastEventId across all sessions for this agent + let minLastId = Infinity; + for (const s of sessions) { + if (s.lastEventId < minLastId) minLastId = s.lastEventId; + } + if (minLastId === Infinity) minLastId = 0; + + try { + const res = await this.containerFetch( + `http://container/agents/${agentId}/events?after=${minLastId}` + ); + if (!res.ok) continue; + + const body = (await res.json()) as { + events: Array<{ id: number; event: string; data: unknown; timestamp: string }>; + }; + if (!body.events || body.events.length === 0) continue; + + for (const evt of body.events) { + const msg = JSON.stringify({ event: evt.event, data: evt.data }); + for (const session of sessions) { + if (evt.id > session.lastEventId) { + try { + session.ws.send(msg); + session.lastEventId = evt.id; + } catch { + // WS likely closed; cleaned up by close handler + } + } + } + } + } catch { + // Container may be starting up or unavailable; skip this cycle + } + } } } diff --git a/cloudflare-gastown/src/gastown.worker.ts b/cloudflare-gastown/src/gastown.worker.ts index 57143012e..23469f1e4 100644 --- a/cloudflare-gastown/src/gastown.worker.ts +++ b/cloudflare-gastown/src/gastown.worker.ts @@ -1,4 +1,5 @@ import { Hono } from 'hono'; +import { getTownContainerStub } from './dos/TownContainer.do'; import { resError } from './util/res.util'; import { dashboardHtml } from './ui/dashboard.ui'; import { withCloudflareAccess } from './middleware/cf-access.middleware'; @@ -32,6 +33,7 @@ import { handleDeleteAgent, } from './handlers/rig-agents.handler'; import { handleSendMail } from './handlers/rig-mail.handler'; +import { handleAppendAgentEvent, handleGetAgentEvents } from './handlers/rig-agent-events.handler'; import { handleSubmitToReviewQueue } from './handlers/rig-review-queue.handler'; import { handleCreateEscalation } from './handlers/rig-escalations.handler'; import { @@ -140,6 +142,10 @@ app.post('/api/rigs/:rigId/agents/get-or-create', c => handleGetOrCreateAgent(c, app.get('/api/rigs/:rigId/agents/:agentId', c => handleGetAgent(c, c.req.param())); app.delete('/api/rigs/:rigId/agents/:agentId', c => handleDeleteAgent(c, c.req.param())); +// Dashboard-accessible agent events (before agentOnlyMiddleware so the +// frontend can query events without an agent JWT) +app.get('/api/rigs/:rigId/agents/:agentId/events', c => handleGetAgentEvents(c, c.req.param())); + // Agent-scoped routes — agentOnlyMiddleware enforces JWT agentId match app.use('/api/rigs/:rigId/agents/:agentId/*', async (c, next) => c.env.ENVIRONMENT === 'development' ? next() : agentOnlyMiddleware(c, next) @@ -155,6 +161,10 @@ app.post('/api/rigs/:rigId/agents/:agentId/checkpoint', c => app.get('/api/rigs/:rigId/agents/:agentId/mail', c => handleCheckMail(c, c.req.param())); app.post('/api/rigs/:rigId/agents/:agentId/heartbeat', c => handleHeartbeat(c, c.req.param())); +// ── Agent Events ───────────────────────────────────────────────────────── + +app.post('/api/rigs/:rigId/agent-events', c => handleAppendAgentEvent(c, c.req.param())); + // ── Mail ──────────────────────────────────────────────────────────────── app.post('/api/rigs/:rigId/mail', c => handleSendMail(c, c.req.param())); @@ -199,6 +209,10 @@ app.get('/api/towns/:townId/container/agents/:agentId/status', c => app.post('/api/towns/:townId/container/agents/:agentId/stream-ticket', c => handleContainerStreamTicket(c, c.req.param()) ); +// Note: GET /api/towns/:townId/container/agents/:agentId/stream (WebSocket) +// is handled outside Hono in the default export's fetch handler, which +// routes the upgrade directly to TownContainerDO.fetch(). + app.get('/api/towns/:townId/container/health', c => handleContainerHealth(c, c.req.param())); // ── Mayor ──────────────────────────────────────────────────────────────── @@ -235,4 +249,34 @@ app.onError((err, c) => { return c.json(resError('Internal server error'), 500); }); -export default app; +// ── Export with WebSocket interception ─────────────────────────────────── +// WebSocket upgrade requests for agent streaming must bypass Hono and go +// directly to the TownContainerDO.fetch(). Hono cannot relay a 101 +// WebSocket response — the DO must return the WebSocketPair client end +// directly to the runtime. + +const WS_STREAM_PATTERN = /^\/api\/towns\/([^/]+)\/container\/agents\/([^/]+)\/stream$/; + +export default { + async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { + // Intercept WebSocket upgrade requests for agent streaming. + // Must bypass Hono — the DO returns a 101 + WebSocketPair that the + // runtime handles directly. + if (request.headers.get('Upgrade')?.toLowerCase() === 'websocket') { + const url = new URL(request.url); + const match = url.pathname.match(WS_STREAM_PATTERN); + if (match) { + const townId = match[1]; + const agentId = match[2]; + console.log(`[gastown-worker] WS upgrade: townId=${townId} agentId=${agentId}`); + // Pass the original request to the DO — the CF runtime needs the + // original Request object to handle the WebSocket upgrade. + const stub = getTownContainerStub(env, townId); + return stub.fetch(request); + } + } + + // All other requests go through Hono + return app.fetch(request, env, ctx); + }, +}; diff --git a/cloudflare-gastown/src/handlers/rig-agent-events.handler.ts b/cloudflare-gastown/src/handlers/rig-agent-events.handler.ts new file mode 100644 index 000000000..ddb0df6a4 --- /dev/null +++ b/cloudflare-gastown/src/handlers/rig-agent-events.handler.ts @@ -0,0 +1,67 @@ +import type { Context } from 'hono'; +import { z } from 'zod'; +import { getRigDOStub } from '../dos/Rig.do'; +import { resSuccess, resError } from '../util/res.util'; +import { parseJsonBody } from '../util/parse-json-body.util'; +import { getEnforcedAgentId } from '../middleware/auth.middleware'; +import type { GastownEnv } from '../gastown.worker'; + +const AppendEventBody = z.object({ + agent_id: z.string().min(1), + event_type: z.string().min(1), + data: z.unknown().default({}), +}); + +const GetEventsQuery = z.object({ + after_id: z.coerce.number().int().nonnegative().optional(), + limit: z.coerce.number().int().positive().max(1000).optional(), +}); + +/** + * Append an event to the agent's persistent event log. + * Called by the container (via completion-reporter or a streaming relay) + * to persist events so late-joining dashboard clients can catch up. + */ +export async function handleAppendAgentEvent(c: Context, params: { rigId: string }) { + const parsed = AppendEventBody.safeParse(await parseJsonBody(c)); + if (!parsed.success) { + return c.json(resError('Invalid request body'), 400); + } + + // Verify the caller's agent identity matches the agent_id in the body + const enforced = getEnforcedAgentId(c); + if (enforced && enforced !== parsed.data.agent_id) { + return c.json(resError('agent_id does not match authenticated agent'), 403); + } + + const rig = getRigDOStub(c.env, params.rigId); + await rig.appendAgentEvent(parsed.data.agent_id, parsed.data.event_type, parsed.data.data); + return c.json(resSuccess({ appended: true }), 201); +} + +/** + * Get agent events from the persistent log, optionally after a given event id. + * Used by the frontend to catch up on events that happened before the + * WebSocket connection was established. + */ +export async function handleGetAgentEvents( + c: Context, + params: { rigId: string; agentId: string } +) { + const queryParsed = GetEventsQuery.safeParse({ + after_id: c.req.query('after_id'), + limit: c.req.query('limit'), + }); + if (!queryParsed.success) { + return c.json(resError('Invalid query parameters'), 400); + } + + const rig = getRigDOStub(c.env, params.rigId); + const events = await rig.getAgentEvents( + params.agentId, + queryParsed.data.after_id, + queryParsed.data.limit + ); + + return c.json(resSuccess(events)); +} diff --git a/cloudflare-gastown/src/handlers/town-container.handler.ts b/cloudflare-gastown/src/handlers/town-container.handler.ts index 3e2320166..ef74dd47b 100644 --- a/cloudflare-gastown/src/handlers/town-container.handler.ts +++ b/cloudflare-gastown/src/handlers/town-container.handler.ts @@ -143,12 +143,12 @@ export async function handleContainerStreamTicket( return c.json(resError('Unexpected container response'), 502); } - // Construct the stream URL. The frontend connects to this URL via EventSource. - // Use the request's origin so it works in both dev and production. - const origin = new URL(c.req.url).origin; - const streamUrl = `${origin}/api/towns/${params.townId}/container/agents/${params.agentId}/stream`; + // Return just the path — the caller (tRPC router on the Next.js server) + // constructs the full WS URL using its known GASTOWN_SERVICE_URL, which + // resolves to the correct host in both local dev and production. + const streamPath = `/api/towns/${params.townId}/container/agents/${params.agentId}/stream`; - return c.json(resSuccess({ url: streamUrl, ticket: parsed.data.ticket }), 200); + return c.json(resSuccess({ url: streamPath, ticket: parsed.data.ticket }), 200); } /** diff --git a/src/components/gastown/AgentStream.tsx b/src/components/gastown/AgentStream.tsx index fbb178581..d0826cac5 100644 --- a/src/components/gastown/AgentStream.tsx +++ b/src/components/gastown/AgentStream.tsx @@ -1,6 +1,6 @@ 'use client'; -import { useEffect, useRef, useState } from 'react'; +import { useEffect, useRef, useState, useCallback } from 'react'; import { useQuery } from '@tanstack/react-query'; import { useTRPC } from '@/lib/trpc/utils'; import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; @@ -16,64 +16,102 @@ type AgentStreamProps = { type StreamEvent = { id: number; type: string; - data: string; + data: Record; timestamp: Date; }; +const MAX_EVENTS = 500; + export function AgentStream({ townId, agentId, onClose }: AgentStreamProps) { const trpc = useTRPC(); const [events, setEvents] = useState([]); const [connected, setConnected] = useState(false); - const [error, setError] = useState(null); - const eventSourceRef = useRef(null); + const [status, setStatus] = useState('Fetching ticket...'); + const wsRef = useRef(null); const scrollRef = useRef(null); const eventIdRef = useRef(0); + // Track whether the component is still mounted to avoid state updates after unmount + const mountedRef = useRef(true); const ticketQuery = useQuery(trpc.gastown.getAgentStreamUrl.queryOptions({ agentId, townId })); + const appendEvent = useCallback((type: string, data: Record) => { + if (!mountedRef.current) return; + setEvents(prev => [ + ...prev.slice(-(MAX_EVENTS - 1)), + { + id: eventIdRef.current++, + type, + data, + timestamp: new Date(), + }, + ]); + }, []); + + // Connect the WebSocket once we have a ticket. This effect runs exactly + // once per successful ticket fetch. Reconnection is NOT automatic — the + // user can refetch manually or we accept that the stream is done. useEffect(() => { - if (!ticketQuery.data?.url) return; + mountedRef.current = true; + const url = ticketQuery.data?.url; + const ticket = ticketQuery.data?.ticket; - // Reset state when switching agents or reconnecting - setEvents([]); - eventIdRef.current = 0; + if (!url || !ticket) return; - const url = new URL(ticketQuery.data.url); - if (ticketQuery.data.ticket) { - url.searchParams.set('ticket', ticketQuery.data.ticket); - } + setStatus('Connecting...'); + + const wsUrl = new URL(url); + wsUrl.searchParams.set('ticket', ticket); - const es = new EventSource(url.toString()); - eventSourceRef.current = es; + const ws = new WebSocket(wsUrl.toString()); + wsRef.current = ws; - es.onopen = () => { + ws.onopen = () => { + if (!mountedRef.current) return; setConnected(true); - setError(null); + setStatus('Connected'); }; - const MAX_EVENTS = 500; - es.onmessage = e => { - setEvents(prev => [ - ...prev.slice(-MAX_EVENTS + 1), - { - id: eventIdRef.current++, - type: 'message', - data: e.data, - timestamp: new Date(), - }, - ]); + ws.onmessage = e => { + try { + const msg = JSON.parse(e.data as string) as { + event: string; + data: Record; + }; + appendEvent(msg.event, msg.data); + + if (msg.event === 'agent.exited') { + if (!mountedRef.current) return; + setConnected(false); + setStatus('Agent exited'); + } + } catch { + // Non-JSON messages (e.g. keepalive) are ignored + } }; - es.onerror = () => { + ws.onclose = () => { + if (!mountedRef.current) return; setConnected(false); - setError('Stream disconnected'); + // Don't try to reconnect — the ticket is consumed. If the user + // wants to reconnect they can re-open the stream panel. + setStatus(prev => (prev === 'Agent exited' ? prev : 'Disconnected')); + }; + + ws.onerror = () => { + if (!mountedRef.current) return; + setStatus('Connection error'); }; return () => { - es.close(); - eventSourceRef.current = null; + mountedRef.current = false; + ws.onclose = null; + ws.onmessage = null; + ws.onerror = null; + ws.close(1000, 'Component unmount'); + wsRef.current = null; }; - }, [ticketQuery.data?.url, ticketQuery.data?.ticket]); + }, [ticketQuery.data?.url, ticketQuery.data?.ticket, appendEvent]); // Auto-scroll to bottom useEffect(() => { @@ -89,9 +127,7 @@ export function AgentStream({ townId, agentId, onClose }: AgentStreamProps) { Agent Stream
- - {connected ? 'Connected' : (error ?? 'Connecting...')} - + {status}
+ + + - {/* Message input */} -
- setMessage(e.target.value)} - placeholder="Send a message to the Mayor..." - disabled={sendMessage.isPending} - className="flex-1" - /> - -
- - + {/* Mayor agent stream — shows live events when the mayor is working */} + {mayorAgentId && showStream && ( + setShowStream(false)} /> + )} + ); } diff --git a/src/routers/gastown-router.ts b/src/routers/gastown-router.ts index b56381891..27efcab5d 100644 --- a/src/routers/gastown-router.ts +++ b/src/routers/gastown-router.ts @@ -5,6 +5,7 @@ import { z } from 'zod'; import * as gastown from '@/lib/gastown/gastown-client'; import { GastownApiError } from '@/lib/gastown/gastown-client'; import { generateApiToken, TOKEN_EXPIRY } from '@/lib/tokens'; +import { GASTOWN_SERVICE_URL } from '@/lib/config.server'; const LOG_PREFIX = '[gastown-router]'; @@ -246,7 +247,18 @@ export const gastownRouter = createTRPCRouter({ throw new TRPCError({ code: 'FORBIDDEN', message: 'Not your town' }); } - return withGastownError(() => gastown.getStreamTicket(input.townId, input.agentId)); + const ticket = await withGastownError(() => + gastown.getStreamTicket(input.townId, input.agentId) + ); + + // The gastown worker returns a relative path. Construct the full + // WebSocket URL using GASTOWN_SERVICE_URL so the browser connects + // directly to the gastown worker (not the Next.js server). + const baseUrl = new URL(GASTOWN_SERVICE_URL ?? 'http://localhost:8787'); + const wsProtocol = baseUrl.protocol === 'https:' ? 'wss:' : 'ws:'; + const fullUrl = `${wsProtocol}//${baseUrl.host}${ticket.url}`; + + return { ...ticket, url: fullUrl }; }), // ── Deletes ────────────────────────────────────────────────────────────