From 2b9223c61d6884d6da35df8434cb94ea0c219826 Mon Sep 17 00:00:00 2001 From: Genmin Date: Tue, 28 Apr 2026 17:37:09 -0700 Subject: [PATCH 1/2] fix: make in-memory event replay use stored stream ids --- examples/server/src/inMemoryEventStore.ts | 10 ++---- .../integration/test/taskResumability.test.ts | 31 +++++++++++++++++-- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/examples/server/src/inMemoryEventStore.ts b/examples/server/src/inMemoryEventStore.ts index 604b84d39c..c07d83e6a0 100644 --- a/examples/server/src/inMemoryEventStore.ts +++ b/examples/server/src/inMemoryEventStore.ts @@ -16,11 +16,10 @@ export class InMemoryEventStore implements EventStore { } /** - * Extracts the stream ID from an event ID + * Looks up the stream ID for an event ID */ private getStreamIdFromEventId(eventId: string): string { - const parts = eventId.split('_'); - return parts.length > 0 ? parts[0]! : ''; + return this.events.get(eventId)?.streamId ?? ''; } /** @@ -53,10 +52,7 @@ export class InMemoryEventStore implements EventStore { let foundLastEvent = false; - // Sort events by eventId for chronological ordering - const sortedEvents = [...this.events.entries()].toSorted((a, b) => a[0].localeCompare(b[0])); - - for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) { + for (const [eventId, { streamId: eventStreamId, message }] of this.events) { // Only include events from the same stream if (eventStreamId !== streamId) { continue; diff --git a/test/integration/test/taskResumability.test.ts b/test/integration/test/taskResumability.test.ts index f7b4174d18..02940d21f3 100644 --- a/test/integration/test/taskResumability.test.ts +++ b/test/integration/test/taskResumability.test.ts @@ -26,12 +26,11 @@ class InMemoryEventStore implements EventStore { { send }: { send: (eventId: string, message: JSONRPCMessage) => Promise } ): Promise { if (!lastEventId || !this.events.has(lastEventId)) return ''; - const streamId = lastEventId.split('_')[0] ?? ''; + const streamId = this.events.get(lastEventId)?.streamId ?? ''; if (!streamId) return ''; let found = false; - const sorted = [...this.events.entries()].toSorted((a, b) => a[0].localeCompare(b[0])); - for (const [eventId, { streamId: sid, message }] of sorted) { + for (const [eventId, { streamId: sid, message }] of this.events) { if (sid !== streamId) continue; if (eventId === lastEventId) { found = true; @@ -141,6 +140,32 @@ describe('Zod v4', () => { server.close(); }); + it('replays events for stream IDs that contain underscores', async () => { + const firstMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'notifications/message', + params: { level: 'info', data: 'first' } + }; + const secondMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'notifications/message', + params: { level: 'info', data: 'second' } + }; + + const firstEventId = await eventStore.storeEvent('_GET_stream', firstMessage); + const secondEventId = await eventStore.storeEvent('_GET_stream', secondMessage); + const replayed: Array<{ eventId: string; message: JSONRPCMessage }> = []; + + const streamId = await eventStore.replayEventsAfter(firstEventId, { + send: async (eventId, message) => { + replayed.push({ eventId, message }); + } + }); + + expect(streamId).toBe('_GET_stream'); + expect(replayed).toEqual([{ eventId: secondEventId, message: secondMessage }]); + }); + it('should store session ID when client connects', async () => { // Create and connect a client const client = new Client({ From 17fb9ec18ccba6632d5ab5d0a83790553688b930 Mon Sep 17 00:00:00 2001 From: Genmin Date: Wed, 29 Apr 2026 07:27:44 -0700 Subject: [PATCH 2/2] chore: add event replay changeset --- .changeset/hot-panthers-matter.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/hot-panthers-matter.md diff --git a/.changeset/hot-panthers-matter.md b/.changeset/hot-panthers-matter.md new file mode 100644 index 0000000000..2b0fa1ec13 --- /dev/null +++ b/.changeset/hot-panthers-matter.md @@ -0,0 +1,6 @@ +--- +"@modelcontextprotocol/examples-server": patch +"@modelcontextprotocol/test-integration": patch +--- + +Fix in-memory event replay to preserve the stored event stream id when resuming task notifications.