diff --git a/docs/superpowers/plans/2026-04-25-events-on-agent-contract.md b/docs/superpowers/plans/2026-04-25-events-on-agent-contract.md new file mode 100644 index 000000000..3a4df6b53 --- /dev/null +++ b/docs/superpowers/plans/2026-04-25-events-on-agent-contract.md @@ -0,0 +1,523 @@ +# `events$` on `Agent` Contract Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace the optional, free-form `Agent.customEvents$` with a required, structured `events$: Observable`, where `AgentEvent` is a discriminated union of `AgentStateUpdateEvent | AgentCustomEvent`. + +**Architecture:** Single-file new type module (`agent-event.ts`) replaces `agent-custom-event.ts`. `Agent` field renamed and made required. LangGraph adapter rewires the existing `buildCustomEvents$` bridge to emit the new union and discriminate `state_update` from generic custom events. Mock + conformance helpers updated. Sole production consumer (`chat.component.ts`) gets a cleaner subscribe block. + +**Tech Stack:** Angular 21 (signals + RxJS), Vitest, Nx. + +**Spec:** `docs/superpowers/specs/2026-04-25-events-on-agent-contract-design.md` + +--- + +## File Structure + +### New / renamed + +- Renamed: `libs/chat/src/lib/agent/agent-custom-event.ts` → `agent-event.ts` (content fully rewritten) +- Renamed: `libs/chat/src/lib/agent/agent-custom-event.spec.ts` → `agent-event.spec.ts` (content updated) + +### Modified + +- `libs/chat/src/lib/agent/agent.ts` — `customEvents$?: Observable` → `events$: Observable` (required) +- `libs/chat/src/lib/agent/index.ts` — re-export `AgentEvent`, `AgentStateUpdateEvent`, `AgentCustomEvent` (structured); drop old free-form `AgentCustomEvent` +- `libs/chat/src/public-api.ts` — same export delta +- `libs/chat/src/lib/testing/mock-agent.ts` — `events$?: Observable` option (defaults to `EMPTY`); drop `customEvents$` option +- `libs/chat/src/lib/testing/mock-agent.spec.ts` — adjust tests +- `libs/chat/src/lib/testing/agent-conformance.ts` — unconditional `events$.subscribe` assertion +- `libs/chat/src/lib/compositions/chat/chat.component.ts` — subscribe to `agent.events$`, narrow on `event.type === 'state_update'` +- `libs/chat/src/lib/compositions/chat/chat.component.spec.ts` — any `customEvents$` references updated +- `libs/langgraph/src/lib/to-agent.ts` — emit `AgentEvent`, discriminate `state_update`, rename helper +- `libs/langgraph/src/lib/to-agent.spec.ts` — translation tests +- `libs/langgraph/src/lib/to-agent.conformance.spec.ts` — adjust if it touches custom events + +--- + +### Task 1: Replace `agent-custom-event.ts` with `agent-event.ts` and update Agent contract + +**Files:** +- Rename: `libs/chat/src/lib/agent/agent-custom-event.ts` → `agent-event.ts` +- Rename: `libs/chat/src/lib/agent/agent-custom-event.spec.ts` → `agent-event.spec.ts` +- Modify: `libs/chat/src/lib/agent/agent.ts` +- Modify: `libs/chat/src/lib/agent/index.ts` +- Modify: `libs/chat/src/public-api.ts` +- Modify: `libs/chat/src/lib/testing/mock-agent.ts` (+ spec) +- Modify: `libs/chat/src/lib/testing/agent-conformance.ts` +- Modify: `libs/chat/src/lib/compositions/chat/chat.component.ts` (+ spec) + +(All bundled because intermediate steps break the build; commit once at end of Task 1.) + +- [ ] **Step 1: `git mv` and rewrite the type file** + +```bash +cd libs/chat/src/lib/agent +git mv agent-custom-event.ts agent-event.ts +git mv agent-custom-event.spec.ts agent-event.spec.ts +cd ../../../../.. +``` + +Replace `libs/chat/src/lib/agent/agent-event.ts` content with: + +```ts +// SPDX-License-Identifier: PolyForm-Noncommercial-1.0.0 + +/** + * Render-state-store sync event. Adapters emit this when the runtime + * publishes a state-snapshot intended for the chat library's render store + * (used by generative UI and a2ui surfaces). + */ +export interface AgentStateUpdateEvent { + readonly type: 'state_update'; + readonly data: Record; +} + +/** + * Escape hatch for runtime-specific or user-defined events that do not + * (yet) have a well-known structured variant. `name` carries the runtime + * event name; `data` carries the payload verbatim. + */ +export interface AgentCustomEvent { + readonly type: 'custom'; + readonly name: string; + readonly data: unknown; +} + +/** + * Discriminated union of events flowing on `Agent.events$`. + * + * Invariant: state lives on signals (`messages`, `status`, `toolCalls`, + * `state`, `interrupt`, `subagents`, `history`); events on `events$` + * carry only things that are not derivable from signals. New variants + * are added purely additively when patterns prove necessary. + */ +export type AgentEvent = AgentStateUpdateEvent | AgentCustomEvent; +``` + +- [ ] **Step 2: Rewrite the spec file** + +`libs/chat/src/lib/agent/agent-event.spec.ts`: + +```ts +// SPDX-License-Identifier: PolyForm-Noncommercial-1.0.0 +import { describe, it, expect } from 'vitest'; +import type { + AgentEvent, + AgentStateUpdateEvent, + AgentCustomEvent, +} from './agent-event'; + +describe('AgentEvent', () => { + it('narrows AgentStateUpdateEvent by type discriminator', () => { + const e: AgentEvent = { type: 'state_update', data: { foo: 1 } }; + if (e.type === 'state_update') { + expect(e.data.foo).toBe(1); + } + }); + + it('narrows AgentCustomEvent by type discriminator', () => { + const e: AgentEvent = { type: 'custom', name: 'tick', data: 42 }; + if (e.type === 'custom') { + expect(e.name).toBe('tick'); + expect(e.data).toBe(42); + } + }); + + it('AgentStateUpdateEvent.data is Record-shaped', () => { + const e: AgentStateUpdateEvent = { type: 'state_update', data: {} }; + expect(typeof e.data).toBe('object'); + }); + + it('AgentCustomEvent.data is unknown', () => { + const e: AgentCustomEvent = { type: 'custom', name: 'x', data: null }; + expect(e.data).toBeNull(); + }); +}); +``` + +- [ ] **Step 3: Update `Agent` contract** + +In `libs/chat/src/lib/agent/agent.ts`: + +Replace this import: +```ts +import type { AgentCustomEvent } from './agent-custom-event'; +``` + +with: +```ts +import type { AgentEvent } from './agent-event'; +``` + +Inside the `Agent` interface, replace: +```ts + customEvents$?: Observable; +``` + +with: +```ts + events$: Observable; +``` + +Remove the `customEvents$ are optional` paragraph from the class docstring; replace with a one-liner noting the no-duplication invariant: + +```ts +/** + * ...existing top of comment... + * + * Invariant: state lives on signals; `events$` carries only things that + * are not derivable from signals. + */ +``` + +- [ ] **Step 4: Update `agent/index.ts`** + +Replace: +```ts +export type { AgentCustomEvent } from './agent-custom-event'; +``` + +with: +```ts +export type { + AgentEvent, + AgentStateUpdateEvent, + AgentCustomEvent, +} from './agent-event'; +``` + +- [ ] **Step 5: Update `public-api.ts`** + +In the `export type { ... } from './lib/agent'` block, replace `AgentCustomEvent` with `AgentEvent, AgentStateUpdateEvent, AgentCustomEvent`. (The new structured `AgentCustomEvent` reuses the symbol name; the old free-form one is gone.) + +- [ ] **Step 6: Update `mock-agent.ts`** + +In `libs/chat/src/lib/testing/mock-agent.ts`: + +Update imports: +```ts +import { EMPTY, type Observable } from 'rxjs'; +import type { + Agent, Message, AgentStatus, ToolCall, + AgentInterrupt, Subagent, AgentSubmitInput, AgentSubmitOptions, + AgentCheckpoint, + AgentEvent, +} from '../agent'; +``` +(Drop the standalone `import type { AgentCustomEvent } from '../agent/agent-custom-event';` line if present.) + +In `MockAgent` interface: replace `customEvents$?: Observable;` with no field — but the actual returned object has `events$` (required). The `MockAgent` interface should declare: +```ts + events$: Observable; +``` + +In `MockAgentOptions`: replace `customEvents$?: Observable;` with `events$?: Observable;`. + +In the function body, replace the customEvents$ spread with: +```ts + events$: opts.events$ ?? EMPTY, +``` +in the returned object (no longer conditional). + +- [ ] **Step 7: Update `mock-agent.spec.ts`** + +Find any references to `customEvents$` in tests; rename to `events$`. The shape of test data may need adjusting if specs construct `AgentCustomEvent`-shaped events; switch to the structured variants: +```ts +const evt: AgentEvent = { type: 'custom', name: 'foo', data: 1 }; +``` + +- [ ] **Step 8: Update `agent-conformance.ts`** + +Replace the conditional block: +```ts +it('if customEvents$ is present, it is an Observable-like with .subscribe', () => { + const agent = factory(); + if (agent.customEvents$ !== undefined) { + expect(typeof agent.customEvents$.subscribe).toBe('function'); + } else { + expect(agent.customEvents$).toBeUndefined(); + } +}); +``` + +with: +```ts +it('events$ is an Observable-like with .subscribe', () => { + const agent = factory(); + expect(typeof agent.events$.subscribe).toBe('function'); +}); +``` + +If `agent-conformance.spec.ts` (the meta-test of the conformance helper) references `customEvents$`, update analogously. + +- [ ] **Step 9: Update `chat.component.ts`** + +Find the existing constructor effect that subscribes to `agent.customEvents$` (lines ~272–301). Replace the entire effect with: + +```ts +constructor() { + // Route state_update events from the agent to the render state store + // so components bound to $state paths reactively update. + effect(() => { + if (this.eventsSubscribed) return; + let agent: ReturnType; + try { + agent = this.agent(); + } catch { + // Required input not yet available — skip; effect will retry. + return; + } + this.eventsSubscribed = true; + agent.events$.pipe(takeUntilDestroyed(this.destroyRef)).subscribe((event) => { + if (event.type !== 'state_update') return; + const store = this.resolvedStore(); + if (!store) return; + store.update(event.data); + }); + }); + + // ...auto-scroll effect unchanged... +} +``` + +(Rename the private flag `customEventsSubscribed` → `eventsSubscribed` for consistency.) + +- [ ] **Step 10: Update `chat.component.spec.ts`** + +Find any test fixture that constructs a fake agent or supplies `customEvents$`; rename to `events$`. Adjust event payloads to the structured shape. + +- [ ] **Step 11: Run chat lint + test + build** + +```bash +npx nx run-many -t lint,test,build -p chat +``` + +Expected: PASS. (Pre-existing `vitest` peer-dep warning is OK.) + +If failures: +- "Property 'customEvents$' does not exist on type 'Agent'" — missed find/replace in a spec or component. +- "Type 'AgentCustomEvent' has no exported member 'X'" — old import path `./agent-custom-event`; switch to `./agent-event`. + +Fix and re-run before committing. + +- [ ] **Step 12: Commit** + +```bash +git add libs/chat/ +git commit -m "feat(chat): require events\$ on Agent contract with structured AgentEvent union + +Replaces optional customEvents\$: Observable with +required events\$: Observable. AgentEvent discriminates +state_update from generic custom events. Codifies invariant: state on +signals, events on events\$, no duplication. + +Co-Authored-By: Claude Opus 4.7 " +``` + +--- + +### Task 2: Update LangGraph adapter to emit `AgentEvent` + +**Files:** +- Modify: `libs/langgraph/src/lib/to-agent.ts` +- Modify: `libs/langgraph/src/lib/to-agent.spec.ts` +- Modify: `libs/langgraph/src/lib/to-agent.conformance.spec.ts` (only if it references `customEvents$`) + +- [ ] **Step 1: Write a failing test** + +Add to `libs/langgraph/src/lib/to-agent.spec.ts` inside the existing describe block: + +```ts +it('translates a state_update CustomStreamEvent into AgentStateUpdateEvent', async () => { + TestBed.runInInjectionContext(() => { + const customEvents = signal([]); + const ref = stubAgentRef({ customEvents } as any); + const chat = toAgent(ref); + + const received: any[] = []; + chat.events$.subscribe((e) => received.push(e)); + + customEvents.set([{ name: 'state_update', data: { count: 1 } }]); + TestBed.flushEffects(); + + expect(received).toEqual([{ type: 'state_update', data: { count: 1 } }]); + }); +}); + +it('wraps non-state_update CustomStreamEvent as AgentCustomEvent', async () => { + TestBed.runInInjectionContext(() => { + const customEvents = signal([]); + const ref = stubAgentRef({ customEvents } as any); + const chat = toAgent(ref); + + const received: any[] = []; + chat.events$.subscribe((e) => received.push(e)); + + customEvents.set([{ name: 'tick', data: 42 }]); + TestBed.flushEffects(); + + expect(received).toEqual([{ type: 'custom', name: 'tick', data: 42 }]); + }); +}); +``` + +- [ ] **Step 2: Run; expect FAIL** + +```bash +npx nx test langgraph --testNamePattern="state_update CustomStreamEvent|non-state_update CustomStreamEvent" +``` +Expected: FAIL (`events$ is not defined` or current implementation emits the old free-form shape). + +- [ ] **Step 3: Update `to-agent.ts`** + +In imports: +```ts +import type { + Agent, AgentWithHistory, AgentCheckpoint, AgentEvent, + Message, Role, ToolCall, ToolCallStatus, AgentStatus, + AgentInterrupt, Subagent, AgentSubmitInput, AgentSubmitOptions, +} from '@cacheplane/chat'; +``` +(Drop `AgentCustomEvent` from the import block; it's now subsumed by `AgentEvent`.) + +In `toAgent()`'s body, rename: +```ts +const events$ = buildEvents$(ref); +``` +(was: `customEvents$ = buildCustomEvents$(ref)`.) + +In the returned object, replace `customEvents$` with `events$`. + +Replace the helper function: + +```ts +function buildEvents$( + ref: AgentRef, +): Observable { + const subject = new Subject(); + let seen = 0; + effect(() => { + const all = ref.customEvents(); + if (all.length < seen) seen = 0; + for (let i = seen; i < all.length; i++) { + subject.next(toAgentEvent(all[i])); + } + seen = all.length; + }); + return subject.asObservable(); +} + +function toAgentEvent(e: CustomStreamEvent): AgentEvent { + if (e.name === 'state_update' && isRecord(e.data)) { + return { type: 'state_update', data: e.data }; + } + return { type: 'custom', name: e.name, data: e.data }; +} + +function isRecord(v: unknown): v is Record { + return typeof v === 'object' && v !== null && !Array.isArray(v); +} +``` + +(Remove the old `toCustomEvent` helper.) + +- [ ] **Step 4: Run; expect PASS** + +```bash +npx nx test langgraph --testNamePattern="state_update CustomStreamEvent|non-state_update CustomStreamEvent" +``` + +- [ ] **Step 5: Update `to-agent.conformance.spec.ts` if needed** + +Run: +```bash +rg "customEvents\\\$" libs/langgraph/src/lib/to-agent.conformance.spec.ts +``` + +If matches, update to `events$` and adjust the minimal-ref fixture to provide an Observable. The minimal stub returns `agent.events$` from `toAgent(ref)`, so it should already work — but re-verify by running the conformance suite. + +- [ ] **Step 6: Run full langgraph lint + test + build** + +```bash +npx nx run-many -t lint,test,build -p langgraph +``` + +Expected: PASS. + +- [ ] **Step 7: Commit** + +```bash +git add libs/langgraph/ +git commit -m "feat(langgraph): translate CustomStreamEvents into structured AgentEvent + +toAgent(ref) now emits events\$: Observable. Translates +state_update events into AgentStateUpdateEvent (with data: Record), +all others into the structured AgentCustomEvent escape hatch. + +Co-Authored-By: Claude Opus 4.7 " +``` + +--- + +### Task 3: Final verification, push, PR + +- [ ] **Step 1: Verify no stale references** + +```bash +rg "customEvents\\\$" libs/ cockpit/ +``` +Expected: zero hits. + +```bash +rg "from '\\./agent-custom-event'" libs/ +``` +Expected: zero hits. + +- [ ] **Step 2: Full lint/test/build** + +```bash +npx nx run-many -t lint,test,build -p chat,langgraph +npx nx affected -t build --base=origin/main +``` +Expected: all pass. + +- [ ] **Step 3: Push** + +```bash +git push -u origin feat/agent-events-contract +``` + +- [ ] **Step 4: Open PR** + +```bash +gh pr create --title "feat(chat): require events\$ on Agent contract with structured AgentEvent union" --body "$(cat <<'EOF' +## Summary +- Replaces optional `customEvents\$: Observable` (free-form) with required `events\$: Observable` (structured discriminated union). +- `AgentEvent = AgentStateUpdateEvent | AgentCustomEvent`. The `state_update` variant is type-narrowed; `custom` is the escape hatch carrying `{ name, data }`. +- LangGraph adapter translates `CustomStreamEvent` → `AgentEvent`, discriminating `state_update` when `data` is a Record. +- Codifies invariant: state on signals, events on events\$, no duplication. + +## Test Plan +- [x] \`nx run-many -t lint,test,build -p chat,langgraph\` passes +- [x] \`nx affected -t build\` passes +- [x] No residual \`customEvents\$\` references +- [ ] Cockpit demos still render (no consumers of \`customEvents\$\` outside chat.component.ts) + +## Design + plan +- Spec: \`docs/superpowers/specs/2026-04-25-events-on-agent-contract-design.md\` +- Plan: \`docs/superpowers/plans/2026-04-25-events-on-agent-contract.md\` + +🤖 Generated with [Claude Code](https://claude.com/claude-code) +EOF +)" +``` + +--- + +## Out of Scope + +- Backwards-compat alias `customEvents$` on `Agent`. +- Replay/snapshot semantics for late `events$` subscribers. +- Renaming `state_update` to a chat-library-specific name. +- Adding new well-known event types beyond `state_update` and `custom`. diff --git a/docs/superpowers/specs/2026-04-25-events-on-agent-contract-design.md b/docs/superpowers/specs/2026-04-25-events-on-agent-contract-design.md new file mode 100644 index 000000000..66e717783 --- /dev/null +++ b/docs/superpowers/specs/2026-04-25-events-on-agent-contract-design.md @@ -0,0 +1,219 @@ +# `events$` on `Agent` Contract Design + +## Goal + +Replace the optional, free-form `customEvents$?: Observable` on the `Agent` contract with a required, structured `events$: Observable` carrying a discriminated union. Codify the invariant: **state lives on signals, events live on `events$`, neither duplicates the other.** + +## Motivation + +The current `customEvents$` is the only event-shaped concern on the contract today. It is: + +- **Optional** — adapters may omit it, forcing every consumer to null-check before subscribing. +- **Free-form** — `{ type: string; [key: string]: unknown }` lets any field name flow through, but provides no type-narrowing for known event types like `state_update`. +- **Single example, no growth path** — the addition of more event-shaped concerns has been informally proposed (e.g., as part of broader course-correction discussions on AG-UI alignment). Without a structured union, each addition would either be a string-literal convention buried in handler code or a separate optional Observable on the contract. + +Codifying `events$` as required + structured does three things: + +1. **Removes optionality friction.** Every consumer can subscribe directly with no presence check. +2. **Makes well-known event types type-safe.** The current `chat.component.ts` handler does `if (event.type === 'state_update') { ... }` and trusts the payload shape; with the structured union, TypeScript narrows the variant. +3. **Establishes the duplication invariant.** State-bearing concerns (`messages`, `toolCalls`, `status`, `interrupt`, `subagents`, `state`, `history`) stay on signals. Events on `events$` carry only things that are not derivable from signals. + +## Architecture + +### Contract change + +```ts +// libs/chat/src/lib/agent/agent-event.ts (new file) +// SPDX-License-Identifier: PolyForm-Noncommercial-1.0.0 + +/** + * Render-state-store sync event. Adapters emit this when the runtime + * publishes a state-snapshot intended for the chat library's render store + * (used by generative UI and a2ui surfaces). + */ +export interface AgentStateUpdateEvent { + readonly type: 'state_update'; + readonly data: Record; +} + +/** + * Escape hatch for runtime-specific or user-defined events that do not + * (yet) have a well-known structured variant. `name` carries the runtime + * event name; `data` carries the payload verbatim. + */ +export interface AgentCustomEvent { + readonly type: 'custom'; + readonly name: string; + readonly data: unknown; +} + +export type AgentEvent = AgentStateUpdateEvent | AgentCustomEvent; +``` + +```ts +// libs/chat/src/lib/agent/agent.ts (delta) +import type { Observable } from 'rxjs'; +import type { AgentEvent } from './agent-event'; + +export interface Agent { + // ...all existing signals unchanged... + events$: Observable; // required; replaces customEvents$ + // ...submit, stop unchanged... +} +``` + +### Removed types + +- `AgentCustomEvent` (the previous free-form `{type: string, [k]: unknown}`) is **deleted**. The new `AgentCustomEvent` (structured `{type: 'custom', name, data}`) reuses the symbol — semantically the escape-hatch case is what the old one always was, but typed. + +The collision in name is intentional: the structured variant **is** the spiritual successor. Any consumer that imported `AgentCustomEvent` updates their usage from `event.type === 'foo'` (free) to `event.type === 'custom' && event.name === 'foo'` (structured). + +### File renames + +- `libs/chat/src/lib/agent/agent-custom-event.ts` → `agent-event.ts` +- `libs/chat/src/lib/agent/agent-custom-event.spec.ts` → `agent-event.spec.ts` + +### Required vs optional + +Required. Adapters that have no event source pass `EMPTY` from RxJS: + +```ts +import { EMPTY } from 'rxjs'; + +const agent: Agent = { + // ... + events$: EMPTY, + // ... +}; +``` + +This is preferable to optionality because: +- `EMPTY` is a one-line, free idiom for "this stream produces nothing". +- Consumers never write `if (agent.events$) ...`. +- Future event types added to the union benefit every adapter automatically; an adapter that wants to opt out of a specific event type just doesn't emit it. + +### Adapter behavior + +**LangGraph adapter (`libs/langgraph/src/lib/to-agent.ts`):** + +The existing `buildCustomEvents$(ref)` helper translates LangGraph's `CustomStreamEvent[]` signal into the new `AgentEvent` stream. The translation: + +```ts +function toAgentEvent(e: CustomStreamEvent): AgentEvent { + if (e.name === 'state_update' && isRecord(e.data)) { + return { type: 'state_update', data: e.data }; + } + return { type: 'custom', name: e.name, data: e.data }; +} + +function isRecord(v: unknown): v is Record { + return typeof v === 'object' && v !== null && !Array.isArray(v); +} +``` + +The bridge subject + cursor pattern (effect-driven, append-only-array → Observable) is preserved. Rename the helper to `buildEvents$` to match the new contract field name. + +**Mock helper (`libs/chat/src/lib/testing/mock-agent.ts`):** + +```ts +export interface MockAgentOptions { + // ...other options unchanged... + events$?: Observable; // optional input; defaults to EMPTY + // (drop) customEvents$ +} + +export function mockAgent(opts: MockAgentOptions = {}): MockAgent { + // ... + return { + // ...existing fields... + events$: opts.events$ ?? EMPTY, + // ... + }; +} +``` + +**Conformance helper (`libs/chat/src/lib/testing/agent-conformance.ts`):** + +Replace the conditional `if (agent.customEvents$ !== undefined) ...` block with an unconditional assertion: + +```ts +it('events$ is an Observable-like with .subscribe', () => { + const agent = factory(); + expect(typeof agent.events$.subscribe).toBe('function'); +}); +``` + +### Consumer migration + +Only one consumer in production: `libs/chat/src/lib/compositions/chat/chat.component.ts`. Today: + +```ts +const stream$ = agent.customEvents$; +if (!stream$) return; +stream$.pipe(takeUntilDestroyed(this.destroyRef)).subscribe((event) => { + if (event.type !== 'state_update') return; + const data = event['data']; + if (!data || typeof data !== 'object') return; + // ...store.update(data as Record); +}); +``` + +After: + +```ts +agent.events$.pipe(takeUntilDestroyed(this.destroyRef)).subscribe((event) => { + if (event.type !== 'state_update') return; + // event.data is Record — narrowed by the discriminator + const store = this.resolvedStore(); + if (!store) return; + store.update(event.data); +}); +``` + +Cleaner: no presence check, no untyped index access, no manual shape guard. + +### Public API + +`libs/chat/src/public-api.ts`: +- Remove: `AgentCustomEvent` (old free-form) export +- Add: `AgentEvent`, `AgentStateUpdateEvent`, `AgentCustomEvent` (new structured) exports + +```ts +export type { + // ...other types unchanged... + AgentEvent, + AgentStateUpdateEvent, + AgentCustomEvent, // now the structured escape-hatch variant +} from './lib/agent'; +``` + +## What's deliberately NOT in the union + +- **`tool_call_started/finished`** — already in `toolCalls: Signal`; the array length and per-call `status` reflect lifecycle. +- **`interrupt_raised/resolved`** — already in `interrupt?: Signal`; presence change reflects the lifecycle. +- **`run_started/finished/errored`** — already in `status: Signal` and `error: Signal`. +- **`message_streamed`** — already in `messages: Signal`; partial deltas are reflected as in-place message content updates. +- **`subagent_spawned`** — already in `subagents?: Signal>`. + +Adding events for any of these would violate the no-duplication invariant. If a consumer needs "happened-once" semantics for one of these (e.g., "fire a toast when a tool call finishes"), they derive it from the signal via an Angular `effect` that compares previous/current values. + +## When to add new structured event types + +Triggers: +- A second adapter (AG-UI) reveals an event pattern that isn't state-shaped (i.e., not a snapshot or current-value of any concern), and is used by enough consumers to deserve type narrowing. +- A chat-library convention emerges (similar to `state_update`) that crosses the runtime/library boundary. + +Each addition is purely additive to the `AgentEvent` union — existing adapters that don't emit the new variant remain conformant. + +## Out of Scope + +- Backwards-compat alias `customEvents$` on `Agent`. Migration breaks compilation; consumers update at the same time. +- Snapshot/replay semantics for late subscribers. State signals already carry current values; `events$` is delta-only. +- Renaming `state_update` to a more chat-library-specific name. The convention is established and runtime-agnostic. +- Adding non-event-shaped concerns (history, threads, etc.) to `events$`. +- Renaming the bridge helper beyond `buildCustomEvents$` → `buildEvents$`. + +## Risk + +- **Breaking change to `customEvents$` shape and name.** Any external code subscribing to `agent.customEvents$` won't compile. Mitigated by the fact that only one production consumer exists today (`chat.component.ts`); the rest are tests and conformance helpers. +- **`state_update` heuristic in LangGraph translation.** The `e.name === 'state_update'` branch is a string-literal convention — if a runtime emits `state_update` with non-`Record` data, we fall through to `custom`. Documented in the translator helper. diff --git a/libs/chat/src/lib/agent/agent-custom-event.spec.ts b/libs/chat/src/lib/agent/agent-custom-event.spec.ts deleted file mode 100644 index 6cff87791..000000000 --- a/libs/chat/src/lib/agent/agent-custom-event.spec.ts +++ /dev/null @@ -1,30 +0,0 @@ -// SPDX-License-Identifier: PolyForm-Noncommercial-1.0.0 -import type { AgentCustomEvent } from './agent-custom-event'; - -describe('AgentCustomEvent', () => { - it('accepts a minimal { type } event', () => { - const event: AgentCustomEvent = { type: 'state_update' }; - expect(event.type).toBe('state_update'); - }); - - it('accepts arbitrary additional fields via index signature', () => { - const event: AgentCustomEvent = { - type: 'a2ui.surface', - surfaceId: 'main', - payload: { foo: 'bar' }, - timestamp: 1234567890, - }; - expect(event['surfaceId']).toBe('main'); - expect(event['payload']).toEqual({ foo: 'bar' }); - }); - - it('allows AG-UI-shaped events to pass through without remapping', () => { - const agUiEvent: AgentCustomEvent = { - type: 'TEXT_MESSAGE_START', - messageId: 'msg-1', - role: 'assistant', - }; - expect(agUiEvent.type).toBe('TEXT_MESSAGE_START'); - expect(agUiEvent['messageId']).toBe('msg-1'); - }); -}); diff --git a/libs/chat/src/lib/agent/agent-custom-event.ts b/libs/chat/src/lib/agent/agent-custom-event.ts deleted file mode 100644 index 5632abada..000000000 --- a/libs/chat/src/lib/agent/agent-custom-event.ts +++ /dev/null @@ -1,17 +0,0 @@ -// SPDX-License-Identifier: PolyForm-Noncommercial-1.0.0 - -/** - * Runtime-neutral custom event shape flowing through `Agent.customEvents$`. - * - * The only required field is `type` — a string discriminator consumers switch - * on. All other fields pass through verbatim from the source runtime, which - * lets AG-UI, LangGraph, a2ui, and json-render emit their own event shapes - * without the core contract owning their union. - * - * Adapters are responsible for normalising their native shape to include a - * `type` field (e.g., `toAgent` aliases LangGraph's `name` to `type`). - */ -export interface AgentCustomEvent { - readonly type: string; - readonly [key: string]: unknown; -} diff --git a/libs/chat/src/lib/agent/agent-event.spec.ts b/libs/chat/src/lib/agent/agent-event.spec.ts new file mode 100644 index 000000000..d3fb8680c --- /dev/null +++ b/libs/chat/src/lib/agent/agent-event.spec.ts @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: PolyForm-Noncommercial-1.0.0 +import { describe, it, expect } from 'vitest'; +import type { + AgentEvent, + AgentStateUpdateEvent, + AgentCustomEvent, +} from './agent-event'; + +describe('AgentEvent', () => { + it('narrows AgentStateUpdateEvent by type discriminator', () => { + const e: AgentEvent = { type: 'state_update', data: { foo: 1 } }; + if (e.type === 'state_update') { + expect(e.data.foo).toBe(1); + } + }); + + it('narrows AgentCustomEvent by type discriminator', () => { + const e: AgentEvent = { type: 'custom', name: 'tick', data: 42 }; + if (e.type === 'custom') { + expect(e.name).toBe('tick'); + expect(e.data).toBe(42); + } + }); + + it('AgentStateUpdateEvent.data is Record-shaped', () => { + const e: AgentStateUpdateEvent = { type: 'state_update', data: {} }; + expect(typeof e.data).toBe('object'); + }); + + it('AgentCustomEvent.data is unknown', () => { + const e: AgentCustomEvent = { type: 'custom', name: 'x', data: null }; + expect(e.data).toBeNull(); + }); +}); diff --git a/libs/chat/src/lib/agent/agent-event.ts b/libs/chat/src/lib/agent/agent-event.ts new file mode 100644 index 000000000..314c14d3f --- /dev/null +++ b/libs/chat/src/lib/agent/agent-event.ts @@ -0,0 +1,32 @@ +// SPDX-License-Identifier: PolyForm-Noncommercial-1.0.0 + +/** + * Render-state-store sync event. Adapters emit this when the runtime + * publishes a state-snapshot intended for the chat library's render store + * (used by generative UI and a2ui surfaces). + */ +export interface AgentStateUpdateEvent { + readonly type: 'state_update'; + readonly data: Record; +} + +/** + * Escape hatch for runtime-specific or user-defined events that do not + * (yet) have a well-known structured variant. `name` carries the runtime + * event name; `data` carries the payload verbatim. + */ +export interface AgentCustomEvent { + readonly type: 'custom'; + readonly name: string; + readonly data: unknown; +} + +/** + * Discriminated union of events flowing on `Agent.events$`. + * + * Invariant: state lives on signals (`messages`, `status`, `toolCalls`, + * `state`, `interrupt`, `subagents`, `history`); events on `events$` + * carry only things that are not derivable from signals. New variants + * are added purely additively when patterns prove necessary. + */ +export type AgentEvent = AgentStateUpdateEvent | AgentCustomEvent; diff --git a/libs/chat/src/lib/agent/agent.ts b/libs/chat/src/lib/agent/agent.ts index 4e32b49f5..11e4ecad4 100644 --- a/libs/chat/src/lib/agent/agent.ts +++ b/libs/chat/src/lib/agent/agent.ts @@ -6,7 +6,7 @@ import type { ToolCall } from './tool-call'; import type { AgentStatus } from './agent-status'; import type { AgentInterrupt } from './agent-interrupt'; import type { Subagent } from './subagent'; -import type { AgentCustomEvent } from './agent-custom-event'; +import type { AgentEvent } from './agent-event'; import type { AgentSubmitInput, AgentSubmitOptions } from './agent-submit'; /** @@ -15,9 +15,12 @@ import type { AgentSubmitInput, AgentSubmitOptions } from './agent-submit'; * Implementations are produced by runtime adapters (e.g. a LangGraph or * AG-UI adapter) or by user code for custom backends. * - * `interrupt`, `subagents`, and `customEvents$` are optional: runtimes that - * do not support these concepts should leave them undefined, and primitives - * that need them check presence and render a neutral fallback when absent. + * `interrupt` and `subagents` are optional: runtimes that do not support these + * concepts should leave them undefined, and primitives that need them check + * presence and render a neutral fallback when absent. + * + * Invariant: state lives on signals; `events$` carries only things that are + * not derivable from signals. */ export interface Agent { // Core state @@ -33,7 +36,9 @@ export interface Agent { stop: () => Promise; // Extended (optional; absent when runtime does not support) - interrupt?: Signal; - subagents?: Signal>; - customEvents$?: Observable; + interrupt?: Signal; + subagents?: Signal>; + + // Events stream (required; emit EMPTY if runtime produces no events) + events$: Observable; } diff --git a/libs/chat/src/lib/agent/index.ts b/libs/chat/src/lib/agent/index.ts index 6c6aebda6..b83e6742e 100644 --- a/libs/chat/src/lib/agent/index.ts +++ b/libs/chat/src/lib/agent/index.ts @@ -8,6 +8,10 @@ export type { AgentStatus } from './agent-status'; export type { AgentInterrupt } from './agent-interrupt'; export type { Subagent, SubagentStatus } from './subagent'; export type { AgentSubmitInput, AgentSubmitOptions } from './agent-submit'; -export type { AgentCustomEvent } from './agent-custom-event'; +export type { + AgentEvent, + AgentStateUpdateEvent, + AgentCustomEvent, +} from './agent-event'; export type { AgentCheckpoint } from './agent-checkpoint'; export type { AgentWithHistory } from './agent-with-history'; diff --git a/libs/chat/src/lib/compositions/chat/chat.component.spec.ts b/libs/chat/src/lib/compositions/chat/chat.component.spec.ts index 8e02dc44c..0a3eff660 100644 --- a/libs/chat/src/lib/compositions/chat/chat.component.spec.ts +++ b/libs/chat/src/lib/compositions/chat/chat.component.spec.ts @@ -10,7 +10,7 @@ import { messageContent } from '../shared/message-utils'; import { createContentClassifier, type ContentClassifier } from '../../streaming/content-classifier'; import { mockAgent } from '../../testing/mock-agent'; import { signalStateStore } from '@cacheplane/render'; -import type { AgentCustomEvent } from '../../agent/agent-custom-event'; +import type { AgentEvent } from '../../agent/agent-event'; describe('ChatComponent', () => { it('is defined as a class', () => { @@ -120,7 +120,7 @@ describe('ChatComponent — content classification', () => { }); }); -describe('ChatComponent — customEvents$ routing', () => { +describe('ChatComponent — events$ routing', () => { // Angular 21 zoneless mode (ZONELESS_ENABLED defaults to true) means // ComponentFixture.autoDetect cannot be disabled, making createComponent // + setInput impractical for required-input signal components. We test the @@ -128,14 +128,14 @@ describe('ChatComponent — customEvents$ routing', () => { // exactly the effect body in ChatComponent's constructor — the same pattern // used by other primitive specs in this library. These tests verify the // routing contract: state_update events update the store; other event types - // and non-object data payloads are silently ignored. + // are silently ignored. - it('routes state_update customEvents to the resolved render store', () => { + it('routes state_update events to the resolved render store', () => { TestBed.configureTestingModule({}); TestBed.runInInjectionContext(() => { - const events$ = new Subject(); + const events$ = new Subject(); const store = signalStateStore({}); - const agent = mockAgent({ customEvents$: events$.asObservable() }); + const agent = mockAgent({ events$: events$.asObservable() }); const destroyRef = inject(DestroyRef); // Re-implement the exact routing effect from ChatComponent's constructor @@ -147,13 +147,9 @@ describe('ChatComponent — customEvents$ routing', () => { effect(() => { if (subscribed) return; subscribed = true; - const stream$ = agentSig().customEvents$; - if (!stream$) return; - stream$.pipe(takeUntilDestroyed(destroyRef)).subscribe((event) => { + agentSig().events$.pipe(takeUntilDestroyed(destroyRef)).subscribe((event) => { if (event.type !== 'state_update') return; - const data = event['data']; - if (!data || typeof data !== 'object') return; - storeSig().update(data as Record); + storeSig().update(event.data); }); }); @@ -165,12 +161,12 @@ describe('ChatComponent — customEvents$ routing', () => { }); }); - it('ignores non-state_update events and events with non-object data', () => { + it('ignores non-state_update events', () => { TestBed.configureTestingModule({}); TestBed.runInInjectionContext(() => { - const events$ = new Subject(); + const events$ = new Subject(); const store = signalStateStore({ initial: true }); - const agent = mockAgent({ customEvents$: events$.asObservable() }); + const agent = mockAgent({ events$: events$.asObservable() }); const destroyRef = inject(DestroyRef); const agentSig = signal(agent); @@ -179,20 +175,15 @@ describe('ChatComponent — customEvents$ routing', () => { effect(() => { if (subscribed) return; subscribed = true; - const stream$ = agentSig().customEvents$; - if (!stream$) return; - stream$.pipe(takeUntilDestroyed(destroyRef)).subscribe((event) => { + agentSig().events$.pipe(takeUntilDestroyed(destroyRef)).subscribe((event) => { if (event.type !== 'state_update') return; - const data = event['data']; - if (!data || typeof data !== 'object') return; - storeSig().update(data as Record); + storeSig().update(event.data); }); }); // Flush pending effects so the subscription is established before emitting. TestBed.flushEffects(); - events$.next({ type: 'a2ui.surface', data: { surfaceId: 'main' } }); - events$.next({ type: 'state_update', data: 'not-an-object' }); + events$.next({ type: 'custom', name: 'a2ui.surface', data: { surfaceId: 'main' } }); expect(store.getSnapshot()).toEqual({ initial: true }); }); diff --git a/libs/chat/src/lib/compositions/chat/chat.component.ts b/libs/chat/src/lib/compositions/chat/chat.component.ts index 636acd1c1..9381f9e76 100644 --- a/libs/chat/src/lib/compositions/chat/chat.component.ts +++ b/libs/chat/src/lib/compositions/chat/chat.component.ts @@ -250,7 +250,7 @@ export class ChatComponent { }); private readonly destroyRef = inject(DestroyRef); - private customEventsSubscribed = false; + private eventsSubscribed = false; private readonly classifiers = new Map(); @@ -270,33 +270,23 @@ export class ChatComponent { private prevMessageCount = 0; constructor() { - // Route `state_update` custom events from the agent stream to the render - // state store so components bound to `$state` paths reactively update. - // customEvents$ is optional — runtimes without custom-event support leave - // it undefined and this wiring becomes a no-op after the first effect run. - // Guard with customEventsSubscribed so we subscribe at most once even if - // the effect re-runs due to other reactive reads. We only set the flag - // after successfully reading the required `agent` input, so it remains - // false until Angular has satisfied the required-input contract. + // Route state_update events from the agent to the render state store + // so components bound to $state paths reactively update. effect(() => { - if (this.customEventsSubscribed) return; + if (this.eventsSubscribed) return; let agent: ReturnType; try { agent = this.agent(); } catch { - // Required input not yet available — skip this run; effect will retry. + // Required input not yet available — skip; effect will retry. return; } - this.customEventsSubscribed = true; - const stream$ = agent.customEvents$; - if (!stream$) return; - stream$.pipe(takeUntilDestroyed(this.destroyRef)).subscribe((event) => { + this.eventsSubscribed = true; + agent.events$.pipe(takeUntilDestroyed(this.destroyRef)).subscribe((event) => { if (event.type !== 'state_update') return; - const data = event['data']; - if (!data || typeof data !== 'object') return; const store = this.resolvedStore(); if (!store) return; - store.update(data as Record); + store.update(event.data); }); }); diff --git a/libs/chat/src/lib/testing/agent-conformance.ts b/libs/chat/src/lib/testing/agent-conformance.ts index 53de84e34..c43f4b368 100644 --- a/libs/chat/src/lib/testing/agent-conformance.ts +++ b/libs/chat/src/lib/testing/agent-conformance.ts @@ -57,13 +57,9 @@ export function runAgentConformance( expect(result).toBeInstanceOf(Promise); }); - it('if customEvents$ is present, it is an Observable-like with .subscribe', () => { + it('events$ is an Observable-like with .subscribe', () => { const agent = factory(); - if (agent.customEvents$ !== undefined) { - expect(typeof agent.customEvents$.subscribe).toBe('function'); - } else { - expect(agent.customEvents$).toBeUndefined(); - } + expect(typeof agent.events$.subscribe).toBe('function'); }); }); } diff --git a/libs/chat/src/lib/testing/mock-agent.ts b/libs/chat/src/lib/testing/mock-agent.ts index 68e0f2ac5..2452c9128 100644 --- a/libs/chat/src/lib/testing/mock-agent.ts +++ b/libs/chat/src/lib/testing/mock-agent.ts @@ -1,6 +1,6 @@ // SPDX-License-Identifier: PolyForm-Noncommercial-1.0.0 import { signal, WritableSignal } from '@angular/core'; -import type { Observable } from 'rxjs'; +import { EMPTY, type Observable } from 'rxjs'; import type { Agent, Message, @@ -10,9 +10,9 @@ import type { Subagent, AgentSubmitInput, AgentSubmitOptions, + AgentEvent, AgentCheckpoint, } from '../agent'; -import type { AgentCustomEvent } from '../agent/agent-custom-event'; export interface MockAgent extends Agent { messages: WritableSignal; @@ -24,7 +24,7 @@ export interface MockAgent extends Agent { interrupt?: WritableSignal; subagents?: WritableSignal>; history?: WritableSignal; - customEvents$?: Observable; + events$: Observable; /** Captured calls to submit() in order. */ submitCalls: Array<{ input: AgentSubmitInput; opts?: AgentSubmitOptions }>; /** Count of stop() invocations. */ @@ -41,7 +41,7 @@ export interface MockAgentOptions { withInterrupt?: boolean; withSubagents?: boolean; history?: AgentCheckpoint[]; - customEvents$?: Observable; + events$?: Observable; } export function mockAgent(opts: MockAgentOptions = {}): MockAgent { @@ -67,10 +67,10 @@ export function mockAgent(opts: MockAgentOptions = {}): MockAgent { const agent: MockAgent = { messages, status, isLoading, error, toolCalls, state, - ...(interrupt ? { interrupt } : {}), - ...(subagents ? { subagents } : {}), - ...(history ? { history } : {}), - ...(opts.customEvents$ ? { customEvents$: opts.customEvents$ } : {}), + ...(interrupt ? { interrupt } : {}), + ...(subagents ? { subagents } : {}), + ...(history ? { history } : {}), + events$: opts.events$ ?? EMPTY, submit: async (input, submitOpts) => { submitCalls.push({ input, opts: submitOpts }); }, stop: async () => { stopCount++; }, submitCalls, diff --git a/libs/chat/src/public-api.ts b/libs/chat/src/public-api.ts index ac4df3bd2..b1fd2c6a4 100644 --- a/libs/chat/src/public-api.ts +++ b/libs/chat/src/public-api.ts @@ -19,6 +19,8 @@ export type { SubagentStatus, AgentSubmitInput, AgentSubmitOptions, + AgentEvent, + AgentStateUpdateEvent, AgentCustomEvent, AgentCheckpoint, } from './lib/agent'; diff --git a/libs/langgraph/src/lib/to-agent.spec.ts b/libs/langgraph/src/lib/to-agent.spec.ts index c636b56a5..ffacd9ba2 100644 --- a/libs/langgraph/src/lib/to-agent.spec.ts +++ b/libs/langgraph/src/lib/to-agent.spec.ts @@ -2,7 +2,7 @@ import { signal } from '@angular/core'; import { TestBed } from '@angular/core/testing'; import { HumanMessage, AIMessage } from '@langchain/core/messages'; -import type { Agent, AgentCustomEvent } from '@cacheplane/chat'; +import type { Agent, AgentEvent } from '@cacheplane/chat'; import type { AgentRef, CustomStreamEvent } from './agent.types'; import { ResourceStatus } from './agent.types'; import { toAgent } from './to-agent'; @@ -115,7 +115,39 @@ describe('toAgent (LangGraph adapter)', () => { }); }); - it('exposes customEvents$ that emits newly-appended events with type aliased from name', () => { + it('translates a state_update CustomStreamEvent into AgentStateUpdateEvent', () => { + TestBed.runInInjectionContext(() => { + const customEvents = signal([]); + const ref = stubAgentRef({ customEvents } as any); + const chat = toAgent(ref); + + const received: any[] = []; + chat.events$.subscribe((e) => received.push(e)); + + customEvents.set([{ name: 'state_update', data: { count: 1 } }]); + TestBed.flushEffects(); + + expect(received).toEqual([{ type: 'state_update', data: { count: 1 } }]); + }); + }); + + it('wraps non-state_update CustomStreamEvent as AgentCustomEvent', () => { + TestBed.runInInjectionContext(() => { + const customEvents = signal([]); + const ref = stubAgentRef({ customEvents } as any); + const chat = toAgent(ref); + + const received: any[] = []; + chat.events$.subscribe((e) => received.push(e)); + + customEvents.set([{ name: 'tick', data: 42 }]); + TestBed.flushEffects(); + + expect(received).toEqual([{ type: 'custom', name: 'tick', data: 42 }]); + }); + }); + + it('exposes events$ that emits newly-appended events as structured AgentEvent', () => { const customSig = signal([]); const ref = stubAgentRef({ customEvents: customSig }); @@ -124,8 +156,8 @@ describe('toAgent (LangGraph adapter)', () => { adapter = toAgent(ref); }); - const received: AgentCustomEvent[] = []; - adapter.customEvents$!.subscribe((e) => received.push(e)); + const received: AgentEvent[] = []; + adapter.events$.subscribe((e) => received.push(e)); customSig.set([{ name: 'state_update', data: { counter: 1 } }]); TestBed.flushEffects(); @@ -142,7 +174,7 @@ describe('toAgent (LangGraph adapter)', () => { expect(received).toEqual([ { type: 'state_update', data: { counter: 1 } }, - { type: 'a2ui.surface', data: { surfaceId: 'main' } }, + { type: 'custom', name: 'a2ui.surface', data: { surfaceId: 'main' } }, ]); }); }); diff --git a/libs/langgraph/src/lib/to-agent.ts b/libs/langgraph/src/lib/to-agent.ts index caf7d1047..b9c5d46cc 100644 --- a/libs/langgraph/src/lib/to-agent.ts +++ b/libs/langgraph/src/lib/to-agent.ts @@ -4,18 +4,9 @@ import { Subject, type Observable } from 'rxjs'; import type { BaseMessage } from '@langchain/core/messages'; import type { ToolCallWithResult, Interrupt } from '@langchain/langgraph-sdk'; import type { - AgentWithHistory, - AgentCheckpoint, - AgentCustomEvent, - Message, - Role, - AgentStatus, - ToolCall, - ToolCallStatus, - AgentInterrupt, - Subagent, - AgentSubmitInput, - AgentSubmitOptions, + AgentWithHistory, AgentCheckpoint, AgentEvent, + Message, Role, ToolCall, ToolCallStatus, AgentStatus, + AgentInterrupt, Subagent, AgentSubmitInput, AgentSubmitOptions, } from '@cacheplane/chat'; import type { AgentRef, CustomStreamEvent, SubagentStreamRef, ThreadState } from './agent.types'; import { ResourceStatus } from './agent.types'; @@ -56,7 +47,7 @@ export function toAgent(ref: AgentRef): AgentWithHistory { return out; }); - const customEvents$ = buildCustomEvents$(ref); + const events$ = buildEvents$(ref); const history = computed(() => ref.history().map(toCheckpoint), @@ -71,7 +62,7 @@ export function toAgent(ref: AgentRef): AgentWithHistory { state, interrupt, subagents, - customEvents$, + events$, history, submit: (input: AgentSubmitInput, opts?: AgentSubmitOptions) => ref.submit(buildSubmitPayload(input), opts ? { signal: opts.signal } as never : undefined), @@ -80,15 +71,15 @@ export function toAgent(ref: AgentRef): AgentWithHistory { } /** - * Build an Observable that bridges LangGraph's + * Build an Observable that bridges LangGraph's * `Signal` (append-only array) into a stream of newly * emitted events. Each effect firing compares against a cursor tracking the * previously-seen length and emits only the tail slice. */ -function buildCustomEvents$( +function buildEvents$( ref: AgentRef, -): Observable { - const subject = new Subject(); +): Observable { + const subject = new Subject(); let seen = 0; effect(() => { const all = ref.customEvents(); @@ -97,15 +88,18 @@ function buildCustomEvents$( seen = 0; } for (let i = seen; i < all.length; i++) { - subject.next(toCustomEvent(all[i])); + subject.next(toAgentEvent(all[i])); } seen = all.length; }); return subject.asObservable(); } -function toCustomEvent(e: CustomStreamEvent): AgentCustomEvent { - return { type: e.name, data: e.data }; +function toAgentEvent(e: CustomStreamEvent): AgentEvent { + if (e.name === 'state_update' && isRecord(e.data)) { + return { type: 'state_update', data: e.data }; + } + return { type: 'custom', name: e.name, data: e.data }; } function mapStatus(s: ResourceStatus): AgentStatus {