Skip to content

Commit 52fb100

Browse files
fix: don't reconnect POST stream when response was already received
Per SEP-1699, the client should only reconnect when the server disconnects BEFORE sending a response (for long-running operations). The previous implementation would reconnect after ANY stream completion when a priming event was received, even when the response was already delivered. This fix tracks whether a response was received and skips reconnection if so. This prevents: 1. Unnecessary network round-trips 2. Race conditions with rapid stream cycling (which caused crashes in consumers like the inspector) Adds test case to verify the correct behavior.
1 parent b4c6090 commit 52fb100

File tree

2 files changed

+70
-4
lines changed

2 files changed

+70
-4
lines changed

src/client/streamableHttp.test.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -866,6 +866,61 @@ describe('StreamableHTTPClientTransport', () => {
866866
expect(reconnectHeaders.get('last-event-id')).toBe('event-123');
867867
});
868868

869+
it('should NOT reconnect a POST stream when response was received', async () => {
870+
// ARRANGE
871+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
872+
reconnectionOptions: {
873+
initialReconnectionDelay: 10,
874+
maxRetries: 1,
875+
maxReconnectionDelay: 1000,
876+
reconnectionDelayGrowFactor: 1
877+
}
878+
});
879+
880+
// Create a stream that sends:
881+
// 1. Priming event with ID (enables potential reconnection)
882+
// 2. The actual response (should prevent reconnection)
883+
// 3. Then closes
884+
const streamWithResponse = new ReadableStream({
885+
start(controller) {
886+
// Priming event with ID
887+
controller.enqueue(new TextEncoder().encode('id: priming-123\ndata: \n\n'));
888+
// The actual response to the request
889+
controller.enqueue(
890+
new TextEncoder().encode('id: response-456\ndata: {"jsonrpc":"2.0","result":{"tools":[]},"id":"request-1"}\n\n')
891+
);
892+
// Stream closes normally
893+
controller.close();
894+
}
895+
});
896+
897+
const fetchMock = global.fetch as Mock;
898+
fetchMock.mockResolvedValueOnce({
899+
ok: true,
900+
status: 200,
901+
headers: new Headers({ 'content-type': 'text/event-stream' }),
902+
body: streamWithResponse
903+
});
904+
905+
const requestMessage: JSONRPCRequest = {
906+
jsonrpc: '2.0',
907+
method: 'tools/list',
908+
id: 'request-1',
909+
params: {}
910+
};
911+
912+
// ACT
913+
await transport.start();
914+
await transport.send(requestMessage);
915+
await vi.advanceTimersByTimeAsync(50);
916+
917+
// ASSERT
918+
// THE KEY ASSERTION: Fetch was called ONCE only - no reconnection!
919+
// The response was received, so no need to reconnect.
920+
expect(fetchMock).toHaveBeenCalledTimes(1);
921+
expect(fetchMock.mock.calls[0][1]?.method).toBe('POST');
922+
});
923+
869924
it('should not throw JSON parse error on priming events with empty data', async () => {
870925
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'));
871926

src/client/streamableHttp.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,9 @@ export class StreamableHTTPClientTransport implements Transport {
307307
// Track whether we've received a priming event (event with ID)
308308
// Per spec, server SHOULD send a priming event with ID before closing
309309
let hasPrimingEvent = false;
310+
// Track whether we've received a response - if so, no need to reconnect
311+
// Per SEP-1699, reconnection is for when server disconnects BEFORE sending response
312+
let receivedResponse = false;
310313
const processStream = async () => {
311314
// this is the closest we can get to trying to catch network errors
312315
// if something happens reader will throw
@@ -346,8 +349,12 @@ export class StreamableHTTPClientTransport implements Transport {
346349
if (!event.event || event.event === 'message') {
347350
try {
348351
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
349-
if (replayMessageId !== undefined && isJSONRPCResponse(message)) {
350-
message.id = replayMessageId;
352+
if (isJSONRPCResponse(message)) {
353+
// Mark that we received a response - no need to reconnect for this request
354+
receivedResponse = true;
355+
if (replayMessageId !== undefined) {
356+
message.id = replayMessageId;
357+
}
351358
}
352359
this.onmessage?.(message);
353360
} catch (error) {
@@ -359,8 +366,10 @@ export class StreamableHTTPClientTransport implements Transport {
359366
// Handle graceful server-side disconnect
360367
// Server may close connection after sending event ID and retry field
361368
// Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID)
369+
// BUT don't reconnect if we already received a response - the request is complete
362370
const canResume = isReconnectable || hasPrimingEvent;
363-
if (canResume && this._abortController && !this._abortController.signal.aborted) {
371+
const needsReconnect = canResume && !receivedResponse;
372+
if (needsReconnect && this._abortController && !this._abortController.signal.aborted) {
364373
this._scheduleReconnection(
365374
{
366375
resumptionToken: lastEventId,
@@ -376,8 +385,10 @@ export class StreamableHTTPClientTransport implements Transport {
376385

377386
// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
378387
// Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID)
388+
// BUT don't reconnect if we already received a response - the request is complete
379389
const canResume = isReconnectable || hasPrimingEvent;
380-
if (canResume && this._abortController && !this._abortController.signal.aborted) {
390+
const needsReconnect = canResume && !receivedResponse;
391+
if (needsReconnect && this._abortController && !this._abortController.signal.aborted) {
381392
// Use the exponential backoff reconnection strategy
382393
try {
383394
this._scheduleReconnection(

0 commit comments

Comments
 (0)