From a7ee7026183cb8b785cfa399fed743b832447f00 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Tue, 16 Jun 2026 14:19:10 -0600 Subject: [PATCH] feat(supervise): event bus priority bump + A++ observability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Upgrade the child→parent bus from FIFO to the real coordination core: - PRIORITY pull — a blocking question (urgency→priority: blocks-run=20, blocks-step=10) bumps ahead of queued settles/findings so the driver sees it first; ties FIFO by seq. Pass-through (subscribe/onEvent) still delivers immediately. - Observability A++ — every event stamped (seq/at/priority); history() is the full ordered audit/replay trail; stats() counts published/pulled/by-kind. Surfaced on CoordinationTools and the serveCoordinationMcp handle. - Generalized questions — ask_parent questions are bus events carrying urgency-derived priority; peek() + unsubscribe round out the primitive. bus 6 + coordination 10 tests; full suite 999 pass; typecheck/build/lint clean. --- CLAUDE.md | 2 +- src/mcp/tools/coordination.ts | 32 ++++-- src/runtime/index.ts | 9 +- src/runtime/supervise/coordination-mcp.ts | 7 ++ src/runtime/supervise/event-bus.ts | 119 ++++++++++++++++++---- tests/loops/coordination.test.ts | 37 +++++++ tests/loops/event-bus.test.ts | 77 +++++++++++--- 7 files changed, 238 insertions(+), 45 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index bc07feaa..f3bba77a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -62,7 +62,7 @@ Types that stay in THIS repo because they're runtime-shaped (coupled to a runnin - `run-loop.ts` — `runLoop`, the round-synchronous leaf kernel. Per round: `driver.plan()`→N tasks→one sandbox/iteration (bounded by `maxConcurrency`, round-robin `agentRuns`)→`streamPrompt`→`output.parse`→`validator.validate`→`driver.decide`. Owns iteration accounting, concurrency, abort, cost+token aggregation, trace emission, box teardown. Exports `defaultSelectWinner` (best-valid-score, ties→earliest) — the single-sourced selection the personify combinators reuse. - `supervise/` — the recursive execution atom (keystone): `Scope` + `Supervisor` over the open `Executor` port, spawn/settle on a **conserved budget pool** so equal-compute holds by construction; journal→replay/resume. `runtime.ts` also holds `createExecutor({backend})` — the ONE built-in executor (backend-as-data: `router`/`router-tools`/`bridge`/`cli`/`sandbox`; `router-tools` is the off-box tool-using agentic loop — chat→tool_calls→`executeToolCall`→repeat — over the router's tool-calling, no sandbox); the per-backend bodies are internal case-arms, BYO agents implement `Executor` directly. - `personify/` — the content-free generic combinators (`fanout`/`loopUntil`/`widen`/`panel`/`verify`/`pipeline`) + `definePersona`/`runPersonified` + the cross-run `Corpus` + `createScopeAnalyst` (the analyst-on-scope steer firewall). -- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`) AND queued for the driver to pull (`await_event`, kind-filterable; `await_next` is the settled-only view). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend. +- the **agent-driver** is the canonical "drive an agent" path: an `AgentProfile` driving another `AgentProfile` via the coordination toolbox (`createCoordinationTools`, `src/mcp/tools/coordination.ts`) over the `Scope`/`Supervisor`, plus `runAgentic`/`defineStrategy`/`runPersonified` (`strategy.ts`/`personify/persona.ts`) on the Supervisor. Child→parent messages ride ONE typed pipe — `createEventBus` (`supervise/event-bus.ts`): settled outputs, `ask_parent` questions, and analyst findings are all `CoordinationEvent` kinds, delivered pass-through (`subscribe`/`onEvent`, immediate) AND queued for the driver to pull (`await_event`, kind-filterable; `await_next` is the settled-only view). The pull queue is **priority-ordered** — a blocking question (urgency→priority: `blocks-run`=20/`blocks-step`=10) is bumped ahead of queued settles/findings; ties FIFO by `seq`. Observability is first-class: every event is stamped (`seq`/`at`/`priority`), the full `history()` is an audit/replay trail, `stats()` counts throughput (both surfaced on `CoordinationTools` and the MCP handle). `analyzeOnSettle` auto-fires trace analysts when a worker settles `done`, re-entering each result as a `finding` on the same bus (cost-governed opt-in; the firewall stays in the analyst registry). The in-process queue and a future cross-box durable mailbox share this one interface. `assertTraceDerivedFindings` (`personify/analyst.ts`) is the steer-firewall (selector≠judge). `types.ts` holds `Driver`/`AgentRunSpec`/`OutputAdapter`/`Validator`/`Iteration`/`LoopResult`/`SandboxClient` + the `LoopTraceEvent` union. `sandbox-run.ts` is `openSandboxRun` — the one run/stream/resume sandbox seam; `inline-sandbox-client.ts` is `inlineSandboxClient` — the one adapter presenting any non-box `Executor` as a `SandboxClient` for `runLoop`. `loop-dispatch.ts` adapts `runLoop`→agent-eval campaigns; `report-usage.ts` forwards token usage so the integrity guard sees a real backend. Two substrates coexist for the same "recursive agent decision" atom: the round-synchronous `runLoop` kernel (the leaf, what most sandbox benches drive today) and the reactive `Scope`/`Supervisor`+combinators (the canonical core — the agent-driver, `runAgentic`/`defineStrategy`/`runPersonified`). Prefer the latter for new recursive/keystone work. Both run over the one `Executor` port. diff --git a/src/mcp/tools/coordination.ts b/src/mcp/tools/coordination.ts index 4c66d63c..41239fbd 100644 --- a/src/mcp/tools/coordination.ts +++ b/src/mcp/tools/coordination.ts @@ -14,7 +14,7 @@ import type { Settled, Agent as SuperviseAgent, } from '../../runtime' -import { createEventBus } from '../../runtime/supervise/event-bus' +import { type BusRecord, type BusStats, createEventBus } from '../../runtime/supervise/event-bus' import type { McpToolDescriptor } from '../server' /** A worker the driver has drained via `await_next`. */ @@ -101,6 +101,11 @@ export interface CoordinationTools { stopReason(): string | undefined settled(): ReadonlyArray questions(): ReadonlyArray + /** The full ordered log of every bus event (settled / question / finding) — the observability + * audit + replay trail. Each record carries seq, timestamp, and priority. */ + history(): ReadonlyArray> + /** Bus throughput counters (published / pulled / by-kind) for live dashboards. */ + stats(): BusStats } const idArg = { type: 'string', description: 'The workerId returned by spawn_worker.' } as const @@ -114,11 +119,20 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin const questions: QuestionRecord[] = [] const questionPolicy = opts.questionPolicy ?? 'auto' - // The one child→parent pipe. `onEvent` (back-compat) becomes a pass-through subscriber, so every - // event kind — question, settled, finding — reaches it, and the driver pulls queued findings / - // questions via `await_event`. + // The one child→parent pipe. `onEvent` (back-compat) becomes a pass-through subscriber receiving + // the bare event, so every kind — question, settled, finding — reaches it immediately, and the + // driver pulls queued findings / questions via `await_event`. const bus = createEventBus() - if (opts.onEvent) bus.subscribe(opts.onEvent) + if (opts.onEvent) { + const cb = opts.onEvent + bus.subscribe((rec) => cb(rec.event)) + } + + // Urgency → bus priority: a blocking question is bumped ahead of queued settles/findings so the + // driver sees it FIRST when it drains the inbox (and pass-through already delivered it the instant + // it was raised). Non-blocking messages share priority 0 and resolve FIFO. + const urgencyPriority = (u: QuestionUrgency): number => + u === 'blocks-run' ? 20 : u === 'blocks-step' ? 10 : 0 const str = (v: unknown, field: string): string => { if (typeof v !== 'string' || v.length === 0) @@ -245,7 +259,11 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin question: QuestionRecord added: boolean }): Promise => { - if (record.added) await bus.publish({ type: 'question', question: record.question }) + if (record.added) + await bus.publish( + { type: 'question', question: record.question }, + { priority: urgencyPriority(record.question.urgency) }, + ) return record.question } const decideQuestion = (questionId: string, decision: QuestionDecision): QuestionRecord => { @@ -530,6 +548,8 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin return { tools, + history: () => bus.history(), + stats: () => bus.stats(), isStopped: () => stopped, stopReason: () => reason, settled: () => ledger, diff --git a/src/runtime/index.ts b/src/runtime/index.ts index 683b90c3..904692c8 100644 --- a/src/runtime/index.ts +++ b/src/runtime/index.ts @@ -328,7 +328,14 @@ export { } from './supervise/driver-executor' // The child→parent message bus: the one typed pipe carrying settled outputs, questions, and // analyst findings up to the driver (pass-through + queued lanes, transport-agnostic). -export { type BusEvent, createEventBus, type EventBus } from './supervise/event-bus' +export { + type BusEvent, + type BusRecord, + type BusStats, + createEventBus, + type EventBus, + type PublishOptions, +} from './supervise/event-bus' // The production `DriverChat`: adapt the router's tool-calling to the seam a // `coordinationDriverAgent` drives. The one turnkey piece a consumer needs to run the driver // brain in-process — tests script a mock `DriverChat`, production passes `routerDriverChat(cfg)`. diff --git a/src/runtime/supervise/coordination-mcp.ts b/src/runtime/supervise/coordination-mcp.ts index 1d6e517f..67296b85 100644 --- a/src/runtime/supervise/coordination-mcp.ts +++ b/src/runtime/supervise/coordination-mcp.ts @@ -17,6 +17,7 @@ import { createMcpServer } from '../../mcp/server' import { type AnalystRegistry, type CoordinationEvent, + type CoordinationTools, createCoordinationTools, type MakeWorkerAgent, type QuestionPolicy, @@ -30,6 +31,10 @@ export interface CoordinationMcpHandle { /** The coordination tools' settled-worker ledger (for the driver's finalize). */ settled(): ReadonlyArray<{ status: string; score?: number; valid?: boolean; outRef?: string }> isStopped(): boolean + /** The full ordered bus-event log — observability audit + replay trail. */ + history: CoordinationTools['history'] + /** Bus throughput counters for live dashboards. */ + stats: CoordinationTools['stats'] close(): Promise } @@ -112,6 +117,8 @@ export async function serveCoordinationMcp(opts: { port, settled: () => coord.settled(), isStopped: () => coord.isStopped(), + history: () => coord.history(), + stats: () => coord.stats(), close: () => new Promise((resolve) => { server.close(() => resolve()) diff --git a/src/runtime/supervise/event-bus.ts b/src/runtime/supervise/event-bus.ts index 4636e1e3..c5d06702 100644 --- a/src/runtime/supervise/event-bus.ts +++ b/src/runtime/supervise/event-bus.ts @@ -3,13 +3,18 @@ * * The child→parent message bus: the ONE pipe carrying every message a worker, sub-driver, or * analyst sends up to the driver — settled outputs, questions, and trace-analyst findings. It - * unifies three channels that were ad-hoc before (the settled-worker cursor, the ask-parent - * question channel, and analyst results) into a single typed primitive with two lanes: + * unifies channels that were ad-hoc before (the settled-worker cursor, the ask-parent question + * channel, and analyst results) into a single typed primitive with two lanes: * * - PASS-THROUGH (`subscribe`): every published event reaches subscribers immediately — the * express lane for online steering and live observation (a UI, a hook, the parent's box). - * - STANDBY (`pull`): events also queue so the driver consumes them on its own cadence (and a - * blocking question parks the worker until its answer arrives) — the accumulative lane. + * - STANDBY (`pull`): events also queue so the driver consumes them on its own cadence. The queue + * is PRIORITY-ordered: a higher-`priority` event (a blocking question) is bumped ahead of + * queued settles/findings so the driver sees it first; ties resolve FIFO by publish order. + * + * Observability is first-class (A++): every event is stamped with a monotonic `seq` and wall-clock + * `at`, the full ordered `history()` is retained as an audit/replay trail, and `stats()` exposes + * published/pulled counts by kind. Subscribers receive the stamped record, not a bare event. * * The interface is transport-agnostic on purpose. Same box → this in-process queue. Cross box → * the SAME publish/pull/subscribe surface backed by a durable mailbox on the parent's box (children @@ -22,39 +27,111 @@ export interface BusEvent { readonly type: string } +/** A published event stamped for ordering and observability. `seq` is the monotonic publish index; + * `priority` drives pull order (higher = bumped ahead); `at` is the wall-clock publish time (ms). */ +export interface BusRecord { + readonly seq: number + readonly at: number + readonly priority: number + readonly event: E +} + +export interface PublishOptions { + /** Higher = pulled ahead of lower-priority queued events (default 0). A blocking question sets + * this so it bumps to the front of the driver's inbox. */ + readonly priority?: number +} + +export interface BusStats { + readonly published: number + readonly pulled: number + /** Count published per event `type`. */ + readonly byKind: Readonly> +} + export interface EventBus { - /** Push an event: queue it for `pull`, then deliver to every `subscribe` handler in order. */ - publish(event: E): Promise - /** Remove and return the oldest QUEUED event whose type is in `kinds` (any type if omitted); - * `undefined` when nothing matches. The pass-through copy already went to subscribers. */ + /** Stamp + queue the event, then deliver the stamped record to every subscriber in order. + * Returns the stamped record. */ + publish(event: E, opts?: PublishOptions): Promise> + /** Remove and return the highest-priority QUEUED event whose type is in `kinds` (any if omitted), + * ties broken FIFO by `seq`; `undefined` when nothing matches. */ pull(kinds?: ReadonlyArray): E | undefined - /** Register a pass-through handler; it receives every event published after registration. */ - subscribe(handler: (event: E) => void | Promise): void + /** Like `pull` but non-destructive — inspect the next event without consuming it. */ + peek(kinds?: ReadonlyArray): E | undefined + /** Register a pass-through handler; it receives the stamped record of every event published after + * registration. Returns an unsubscribe fn. */ + subscribe(handler: (record: BusRecord) => void | Promise): () => void /** Count of queued, not-yet-pulled events (filtered by `kinds` when given). */ pending(kinds?: ReadonlyArray): number + /** The full ordered log of every event ever published (the audit/replay trail). */ + history(): ReadonlyArray> + /** Throughput counters for observability dashboards. */ + stats(): BusStats } -export function createEventBus(): EventBus { - const queue: E[] = [] - const subscribers: Array<(event: E) => void | Promise> = [] - const matches = (e: E, kinds?: ReadonlyArray) => !kinds || kinds.includes(e.type) +export function createEventBus(now: () => number = Date.now): EventBus { + const queue: BusRecord[] = [] + const log: BusRecord[] = [] + const subscribers: Array<(record: BusRecord) => void | Promise> = [] + const byKind: Record = {} + let seq = 0 + let pulled = 0 + + const matches = (r: BusRecord, kinds?: ReadonlyArray) => + !kinds || kinds.includes(r.event.type) + + // Index of the highest-priority matching record; ties resolve to the earliest `seq` (the queue is + // maintained in publish order, so the first scan hit at the max priority is the oldest). + const bestIndex = (kinds?: ReadonlyArray): number => { + let best = -1 + let bestPriority = Number.NEGATIVE_INFINITY + for (let i = 0; i < queue.length; i++) { + const r = queue[i] + if (!r || !matches(r, kinds)) continue + if (r.priority > bestPriority) { + best = i + bestPriority = r.priority + } + } + return best + } + return { - async publish(event) { - queue.push(event) + async publish(event, opts) { + const record: BusRecord = { seq: seq++, at: now(), priority: opts?.priority ?? 0, event } + queue.push(record) + log.push(record) + byKind[event.type] = (byKind[event.type] ?? 0) + 1 // Sequential, not Promise.all: a subscriber that steers off this event must observe a // consistent order, and a throwing subscriber must not silently drop siblings' delivery. - for (const handler of subscribers) await handler(event) + for (const handler of subscribers) await handler(record) + return record }, pull(kinds) { - const index = queue.findIndex((e) => matches(e, kinds)) - if (index < 0) return undefined - return queue.splice(index, 1)[0] + const i = bestIndex(kinds) + if (i < 0) return undefined + pulled++ + return queue.splice(i, 1)[0]?.event + }, + peek(kinds) { + const i = bestIndex(kinds) + return i < 0 ? undefined : queue[i]?.event }, subscribe(handler) { subscribers.push(handler) + return () => { + const i = subscribers.indexOf(handler) + if (i >= 0) subscribers.splice(i, 1) + } }, pending(kinds) { - return kinds ? queue.filter((e) => matches(e, kinds)).length : queue.length + return kinds ? queue.filter((r) => matches(r, kinds)).length : queue.length + }, + history() { + return log + }, + stats() { + return { published: seq, pulled, byKind: { ...byKind } } }, } } diff --git a/tests/loops/coordination.test.ts b/tests/loops/coordination.test.ts index c6a36383..96f0cea6 100644 --- a/tests/loops/coordination.test.ts +++ b/tests/loops/coordination.test.ts @@ -227,6 +227,43 @@ describe('coordination tools', () => { ) }) + it('await_event bumps a blocking question ahead of a non-blocking one (urgency→priority)', async () => { + const { scope } = mockScope() + const tb = createCoordinationTools({ + scope, + blobs, + makeWorkerAgent, + perWorker: { maxIterations: 1, maxTokens: 10 }, + }) + // A low-urgency question is raised first... + await tool(tb, 'ask_parent').handler({ + from: 'w-a', + level: 'worker', + question: 'nice-to-know?', + reason: 'minor', + urgency: 'continue-without', + }) + // ...then a blocking one. It arrives later but must be pulled FIRST. + await tool(tb, 'ask_parent').handler({ + from: 'w-b', + level: 'driver', + question: 'which API version?', + reason: 'blocks the run', + urgency: 'blocks-run', + }) + expect(await tool(tb, 'await_event').handler({ kinds: ['question'] })).toMatchObject({ + type: 'question', + question: { question: 'which API version?', urgency: 'blocks-run' }, + }) + expect(await tool(tb, 'await_event').handler({ kinds: ['question'] })).toMatchObject({ + type: 'question', + question: { question: 'nice-to-know?' }, + }) + // The history audit trail recorded both, in publish order, with the bumped priority stamped. + expect(tb.history().map((r) => r.priority)).toEqual([0, 20]) + expect(tb.stats()).toMatchObject({ published: 2, pulled: 2, byKind: { question: 2 } }) + }) + it('analyze-on-settle auto-runs lenses and await_event surfaces settled + finding', async () => { const { scope } = mockScope() const settlements = [ diff --git a/tests/loops/event-bus.test.ts b/tests/loops/event-bus.test.ts index 6b8f6323..b8b2226a 100644 --- a/tests/loops/event-bus.test.ts +++ b/tests/loops/event-bus.test.ts @@ -1,30 +1,36 @@ import { describe, expect, it } from 'vitest' -import { type BusEvent, createEventBus } from '../../src/runtime' +import { type BusEvent, type BusRecord, createEventBus } from '../../src/runtime' type E = | { type: 'settled'; id: string } | { type: 'finding'; claim: string } | { type: 'question'; q: string } +// A monotonic clock so timestamp assertions are deterministic. +const fakeClock = () => { + let t = 1000 + return () => t++ +} + describe('event bus', () => { - it('passes every event to subscribers and queues it for pull', async () => { - const bus = createEventBus() - const seen: E[] = [] - bus.subscribe((e) => { - seen.push(e) + it('passes the stamped record to subscribers and queues the event for pull', async () => { + const bus = createEventBus(fakeClock()) + const seen: BusRecord[] = [] + bus.subscribe((r) => { + seen.push(r) }) await bus.publish({ type: 'settled', id: 'w1' }) await bus.publish({ type: 'finding', claim: 'X missing' }) - // pass-through lane: subscriber saw both immediately + // pass-through lane: subscriber saw both immediately, stamped with seq + timestamp + priority expect(seen).toEqual([ - { type: 'settled', id: 'w1' }, - { type: 'finding', claim: 'X missing' }, + { seq: 0, at: 1000, priority: 0, event: { type: 'settled', id: 'w1' } }, + { seq: 1, at: 1001, priority: 0, event: { type: 'finding', claim: 'X missing' } }, ]) // standby lane: still queued for the driver to pull expect(bus.pending()).toBe(2) }) - it('pull is FIFO and kind-filtered, draining each event once', async () => { + it('pull is FIFO within a priority, and kind-filtered, draining each event once', async () => { const bus = createEventBus() await bus.publish({ type: 'settled', id: 'w1' }) await bus.publish({ type: 'finding', claim: 'a' }) @@ -38,14 +44,53 @@ describe('event bus', () => { expect(bus.pull()).toBeUndefined() }) - it('subscribers registered after a publish do not receive the earlier event', async () => { + it('pull bumps higher-priority events ahead of the queue (the blocking-question lane)', async () => { + const bus = createEventBus() + await bus.publish({ type: 'settled', id: 'w1' }) + await bus.publish({ type: 'settled', id: 'w2' }) + // a blocking question arrives last but jumps to the front + await bus.publish({ type: 'question', q: 'which API version?' }, { priority: 20 }) + await bus.publish({ type: 'finding', claim: 'late finding' }) + expect(bus.pull()).toEqual({ type: 'question', q: 'which API version?' }) + // then the rest drain FIFO at priority 0 + expect(bus.pull()).toEqual({ type: 'settled', id: 'w1' }) + expect(bus.pull()).toEqual({ type: 'settled', id: 'w2' }) + expect(bus.pull()).toEqual({ type: 'finding', claim: 'late finding' }) + }) + + it('peek is non-destructive and respects priority', async () => { + const bus = createEventBus() + await bus.publish({ type: 'settled', id: 'w1' }) + await bus.publish({ type: 'question', q: 'q' }, { priority: 10 }) + expect(bus.peek()).toEqual({ type: 'question', q: 'q' }) + expect(bus.pending()).toBe(2) // peek consumed nothing + expect(bus.pull()).toEqual({ type: 'question', q: 'q' }) + }) + + it('history is the full ordered audit trail; stats count throughput', async () => { + const bus = createEventBus(fakeClock()) + await bus.publish({ type: 'settled', id: 'w1' }) + await bus.publish({ type: 'finding', claim: 'a' }, { priority: 0 }) + await bus.publish({ type: 'question', q: 'q' }, { priority: 20 }) + bus.pull() // pulls the priority-20 question + expect(bus.history().map((r) => r.event.type)).toEqual(['settled', 'finding', 'question']) + expect(bus.history()[2]).toMatchObject({ seq: 2, priority: 20, at: 1002 }) + expect(bus.stats()).toEqual({ + published: 3, + pulled: 1, + byKind: { settled: 1, finding: 1, question: 1 }, + }) + }) + + it('unsubscribe stops delivery', async () => { const bus = createEventBus() - await bus.publish({ type: 'settled' }) - const late: BusEvent[] = [] - bus.subscribe((e) => { - late.push(e) + const seen: string[] = [] + const off = bus.subscribe((r) => { + seen.push(r.event.type) }) + await bus.publish({ type: 'settled' }) + off() await bus.publish({ type: 'finding' }) - expect(late).toEqual([{ type: 'finding' }]) + expect(seen).toEqual(['settled']) }) })