Skip to content
Merged
35 changes: 29 additions & 6 deletions cloudflare-gastown/container/src/control-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ import {
activeServerCount,
getUptime,
stopAll,
getAgentEvents,
} from './process-manager';
import { startHeartbeat, stopHeartbeat } from './heartbeat';
import { StartAgentRequest, StopAgentRequest, SendMessageRequest } from './types';
import type { AgentStatusResponse, HealthResponse, StreamTicketResponse } from './types';

// TODO: Add a WebSocket consumer endpoint (GET /agents/:agentId/stream) that
// validates and consumes tickets. Until then, tickets are only used as a
// placeholder for the upcoming streaming implementation.
const MAX_TICKETS = 1000;
const streamTickets = new Map<string, { agentId: string; expiresAt: number }>();

Expand Down Expand Up @@ -96,12 +94,25 @@ app.get('/agents/:agentId/status', c => {
return c.json(response);
});

// GET /agents/:agentId/events?after=N
// Returns buffered events for the agent, optionally after a given event id.
// Used by the TownContainerDO to poll for events and relay them to WebSocket clients.
// Does NOT 404 for unknown agents — returns an empty array so the poller
// can keep trying while the agent is starting up.
app.get('/agents/:agentId/events', c => {
const { agentId } = c.req.param();
const afterParam = c.req.query('after');
const afterId = afterParam ? parseInt(afterParam, 10) : 0;
const events = getAgentEvents(agentId, afterId);
return c.json({ events });
});

// POST /agents/:agentId/stream-ticket
// Issues a one-time-use stream ticket for the agent. Does NOT require
// the agent to be registered yet — tickets can be issued optimistically
// so the frontend can connect a WebSocket before the agent finishes starting.
app.post('/agents/:agentId/stream-ticket', c => {
const { agentId } = c.req.param();
if (!getAgentStatus(agentId)) {
return c.json({ error: `Agent ${agentId} not found` }, 404);
}

const ticket = crypto.randomUUID();
const expiresAt = Date.now() + 60_000;
Expand All @@ -123,6 +134,18 @@ app.post('/agents/:agentId/stream-ticket', c => {
return c.json(response);
});

/**
* Validate a stream ticket and return the associated agentId, consuming it.
* Returns null if the ticket is invalid or expired.
*/
export function consumeStreamTicket(ticket: string): string | null {
const entry = streamTickets.get(ticket);
if (!entry) return null;
streamTickets.delete(ticket);
if (entry.expiresAt < Date.now()) return null;
return entry.agentId;
}

// Catch-all
app.notFound(c => c.json({ error: 'Not found' }, 404));

Expand Down
74 changes: 73 additions & 1 deletion cloudflare-gastown/container/src/process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* via HTTP, not stdin.
*/

import type { ManagedAgent, StartAgentRequest, KiloSSEEventData } from './types';
import type { ManagedAgent, StartAgentRequest, KiloSSEEventData, KiloSSEEvent } from './types';
import {
ensureServer,
registerSession,
Expand All @@ -21,6 +21,58 @@ import { reportAgentCompleted } from './completion-reporter';
const agents = new Map<string, ManagedAgent>();
const sseConsumers = new Map<string, SSEConsumer>();

// ── Event buffer for HTTP polling ─────────────────────────────────────────
// Each agent keeps a ring buffer of recent events. The DO polls
// GET /agents/:agentId/events?after=N to retrieve them.

type BufferedEvent = {
id: number;
event: string;
data: KiloSSEEventData;
timestamp: string;
};

const MAX_BUFFERED_EVENTS = 500;
const agentEventBuffers = new Map<string, BufferedEvent[]>();
let nextEventId = 1;

function bufferAgentEvent(agentId: string, event: KiloSSEEvent): void {
let buf = agentEventBuffers.get(agentId);
if (!buf) {
buf = [];
agentEventBuffers.set(agentId, buf);
}
buf.push({
id: nextEventId++,
event: event.event,
data: event.data,
timestamp: new Date().toISOString(),
});
// Trim to cap
if (buf.length > MAX_BUFFERED_EVENTS) {
buf.splice(0, buf.length - MAX_BUFFERED_EVENTS);
}
}

/**
* Get buffered events for an agent, optionally after a given event id.
* Returns events ordered by id ascending.
*/
export function getAgentEvents(agentId: string, afterId = 0): BufferedEvent[] {
const buf = agentEventBuffers.get(agentId);
if (!buf) return [];
return buf.filter(e => e.id > afterId);
}

// Clean up stale event buffers after the DO has had time to poll final events.
const EVENT_BUFFER_TTL_MS = 5 * 60 * 1000; // 5 minutes

function scheduleEventBufferCleanup(agentId: string): void {
setTimeout(() => {
agentEventBuffers.delete(agentId);
}, EVENT_BUFFER_TTL_MS);
}

const startTime = Date.now();

export function getUptime(): number {
Expand Down Expand Up @@ -93,10 +145,18 @@ export async function startAgent(
}
}

// Buffer for HTTP polling by the DO
bufferAgentEvent(request.agentId, evt);

// Detect completion
if (isCompletionEvent(evt)) {
agent.status = 'exited';
agent.exitReason = 'completed';
bufferAgentEvent(request.agentId, {
event: 'agent.exited',
data: { type: 'agent.exited', properties: { reason: 'completed' } },
});
scheduleEventBufferCleanup(request.agentId);
void reportAgentCompleted(agent, 'completed');
}
},
Expand All @@ -107,6 +167,11 @@ export async function startAgent(
if (agent.status === 'running') {
agent.status = 'failed';
agent.exitReason = `SSE stream closed: ${reason}`;
bufferAgentEvent(request.agentId, {
event: 'agent.exited',
data: { type: 'agent.exited', properties: { reason: `stream closed: ${reason}` } },
});
scheduleEventBufferCleanup(request.agentId);
void reportAgentCompleted(agent, 'failed', reason);
}
},
Expand Down Expand Up @@ -187,6 +252,13 @@ export async function stopAgent(agentId: string): Promise<void> {

agent.status = 'exited';
agent.exitReason = 'stopped';

// Buffer exit event for polling
bufferAgentEvent(agentId, {
event: 'agent.exited',
data: { type: 'agent.exited', properties: { reason: 'stopped' } },
});
scheduleEventBufferCleanup(agentId);
}

/**
Expand Down
34 changes: 34 additions & 0 deletions cloudflare-gastown/src/db/tables/agent-events.table.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { z } from 'zod';
import { getTableFromZodSchema, getCreateTableQueryFromTable } from '../../util/table';

export const AgentEventRecord = z.object({
id: z.number(),
agent_id: z.string(),
event_type: z.string(),
data: z
.string()
.transform(v => JSON.parse(v))
.pipe(z.record(z.string(), z.unknown())),
created_at: z.string(),
});

export type AgentEventRecord = z.output<typeof AgentEventRecord>;

export const agentEvents = getTableFromZodSchema('agent_events', AgentEventRecord);

export function createTableAgentEvents(): string {
return getCreateTableQueryFromTable(agentEvents, {
id: `integer primary key autoincrement`,
agent_id: `text not null`,
event_type: `text not null`,
data: `text not null default '{}'`,
created_at: `text not null`,
});
}

export function getIndexesAgentEvents(): string[] {
return [
`CREATE INDEX IF NOT EXISTS idx_agent_events_agent_id ON ${agentEvents}(${agentEvents.columns.agent_id})`,
`CREATE INDEX IF NOT EXISTS idx_agent_events_agent_created ON ${agentEvents}(${agentEvents.columns.agent_id}, ${agentEvents.columns.id})`,
];
}
81 changes: 81 additions & 0 deletions cloudflare-gastown/src/dos/Rig.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import {
ReviewQueueRecord,
} from '../db/tables/review-queue.table';
import { createTableMolecules } from '../db/tables/molecules.table';
import {
createTableAgentEvents,
getIndexesAgentEvents,
agentEvents,
AgentEventRecord,
} from '../db/tables/agent-events.table';
import { getTownContainerStub } from './TownContainer.do';
import { query } from '../util/query.util';
import { signAgentJWT } from '../util/jwt.util';
Expand Down Expand Up @@ -104,6 +110,11 @@ export class RigDO extends DurableObject<Env> {

query(this.sql, createTableReviewQueue(), []);
query(this.sql, createTableMolecules(), []);

query(this.sql, createTableAgentEvents(), []);
for (const idx of getIndexesAgentEvents()) {
query(this.sql, idx, []);
}
}

// ── Beads ──────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -451,6 +462,76 @@ export class RigDO extends DurableObject<Env> {
return this.getBead(agent.current_hook_bead_id);
}

// ── Agent Events (append-only log for streaming) ────────────────────────

/** Max events kept per agent. Older events are pruned on insert. */
private static readonly MAX_EVENTS_PER_AGENT = 2000;

/**
* Append an event to the agent's event log. Used by the container
* completion callback or the streaming proxy to persist events for
* late-joining clients.
*/
async appendAgentEvent(agentId: string, eventType: string, data: unknown): Promise<void> {
await this.ensureInitialized();
const timestamp = now();
const dataJson = JSON.stringify(data ?? {});

query(
this.sql,
/* sql */ `
INSERT INTO ${agentEvents} (
${agentEvents.columns.agent_id},
${agentEvents.columns.event_type},
${agentEvents.columns.data},
${agentEvents.columns.created_at}
) VALUES (?, ?, ?, ?)`,
[agentId, eventType, dataJson, timestamp]
);

// Prune old events beyond the cap
query(
this.sql,
/* sql */ `
DELETE FROM ${agentEvents}
WHERE ${agentEvents.agent_id} = ?
AND ${agentEvents.id} NOT IN (
SELECT ${agentEvents.id} FROM ${agentEvents}
WHERE ${agentEvents.agent_id} = ?
ORDER BY ${agentEvents.id} DESC
LIMIT ?
)`,
[agentId, agentId, RigDO.MAX_EVENTS_PER_AGENT]
);
}

/**
* Get agent events, optionally after a given event id (for catch-up).
* Returns events ordered by id ascending.
*/
async getAgentEvents(
agentId: string,
afterId?: number,
limit = 200
): Promise<AgentEventRecord[]> {
await this.ensureInitialized();

const rows = query(
this.sql,
/* sql */ `
SELECT ${agentEvents.id}, ${agentEvents.agent_id}, ${agentEvents.event_type},
${agentEvents.data}, ${agentEvents.created_at}
FROM ${agentEvents}
WHERE ${agentEvents.agent_id} = ?
AND (? IS NULL OR ${agentEvents.id} > ?)
ORDER BY ${agentEvents.id} ASC
LIMIT ?`,
[agentId, afterId ?? null, afterId ?? null, limit]
);

return AgentEventRecord.array().parse(rows);
}

// ── Mail ───────────────────────────────────────────────────────────────

async sendMail(input: SendMailInput): Promise<void> {
Expand Down
Loading