Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions src/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,112 @@ describe('StreamableHTTPClientTransport', () => {
expect(reconnectHeaders.get('last-event-id')).toBe('event-123');
});

it('should NOT reconnect a POST stream when response was received', async () => {
// ARRANGE
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions: {
initialReconnectionDelay: 10,
maxRetries: 1,
maxReconnectionDelay: 1000,
reconnectionDelayGrowFactor: 1
}
});

// Create a stream that sends:
// 1. Priming event with ID (enables potential reconnection)
// 2. The actual response (should prevent reconnection)
// 3. Then closes
const streamWithResponse = new ReadableStream({
start(controller) {
// Priming event with ID
controller.enqueue(new TextEncoder().encode('id: priming-123\ndata: \n\n'));
// The actual response to the request
controller.enqueue(
new TextEncoder().encode('id: response-456\ndata: {"jsonrpc":"2.0","result":{"tools":[]},"id":"request-1"}\n\n')
);
// Stream closes normally
controller.close();
}
});

const fetchMock = global.fetch as Mock;
fetchMock.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ 'content-type': 'text/event-stream' }),
body: streamWithResponse
});

const requestMessage: JSONRPCRequest = {
jsonrpc: '2.0',
method: 'tools/list',
id: 'request-1',
params: {}
};

// ACT
await transport.start();
await transport.send(requestMessage);
await vi.advanceTimersByTimeAsync(50);

// ASSERT
// THE KEY ASSERTION: Fetch was called ONCE only - no reconnection!
// The response was received, so no need to reconnect.
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0][1]?.method).toBe('POST');
});

it('should not attempt reconnection after close() is called', async () => {
// ARRANGE
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions: {
initialReconnectionDelay: 100,
maxRetries: 3,
maxReconnectionDelay: 1000,
reconnectionDelayGrowFactor: 1
}
});

// Stream with priming event + notification (no response) that closes
// This triggers reconnection scheduling
const streamWithPriming = new ReadableStream({
start(controller) {
controller.enqueue(
new TextEncoder().encode('id: event-123\ndata: {"jsonrpc":"2.0","method":"notifications/test","params":{}}\n\n')
);
controller.close();
}
});

const fetchMock = global.fetch as Mock;

// POST request returns streaming response
fetchMock.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ 'content-type': 'text/event-stream' }),
body: streamWithPriming
});

// ACT
await transport.start();
await transport.send({ jsonrpc: '2.0', method: 'test', id: '1', params: {} });

// Wait a tick to let stream processing complete and schedule reconnection
await vi.advanceTimersByTimeAsync(10);

// Now close() - reconnection timeout is pending (scheduled for 100ms)
await transport.close();

// Advance past reconnection delay
await vi.advanceTimersByTimeAsync(200);

// ASSERT
// Only 1 call: the initial POST. No reconnection attempts after close().
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0][1]?.method).toBe('POST');
});

it('should not throw JSON parse error on priming events with empty data', async () => {
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'));

Expand Down
28 changes: 21 additions & 7 deletions src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ export class StreamableHTTPClientTransport implements Transport {
private _hasCompletedAuthFlow = false; // Circuit breaker: detect auth success followed by immediate 401
private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping.
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
private _reconnectionTimeout?: ReturnType<typeof setTimeout>;

onclose?: () => void;
onerror?: (error: Error) => void;
Expand Down Expand Up @@ -287,7 +288,7 @@ export class StreamableHTTPClientTransport implements Transport {
const delay = this._getNextReconnectionDelay(attemptCount);

// Schedule the reconnection
setTimeout(() => {
this._reconnectionTimeout = setTimeout(() => {
// Use the last event ID to resume where we left off
this._startOrAuthSse(options).catch(error => {
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
Expand All @@ -307,6 +308,9 @@ export class StreamableHTTPClientTransport implements Transport {
// Track whether we've received a priming event (event with ID)
// Per spec, server SHOULD send a priming event with ID before closing
let hasPrimingEvent = false;
// Track whether we've received a response - if so, no need to reconnect
// Reconnection is for when server disconnects BEFORE sending response
let receivedResponse = false;
const processStream = async () => {
// this is the closest we can get to trying to catch network errors
// if something happens reader will throw
Expand Down Expand Up @@ -346,8 +350,12 @@ export class StreamableHTTPClientTransport implements Transport {
if (!event.event || event.event === 'message') {
try {
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
if (replayMessageId !== undefined && isJSONRPCResponse(message)) {
message.id = replayMessageId;
if (isJSONRPCResponse(message)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wasn't sure if this would get triggered by notifications etc. but those would be a different schema, so shouldn't.

// Mark that we received a response - no need to reconnect for this request
receivedResponse = true;
if (replayMessageId !== undefined) {
message.id = replayMessageId;
}
}
this.onmessage?.(message);
} catch (error) {
Expand All @@ -359,8 +367,10 @@ export class StreamableHTTPClientTransport implements Transport {
// Handle graceful server-side disconnect
// Server may close connection after sending event ID and retry field
// Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID)
// BUT don't reconnect if we already received a response - the request is complete
const canResume = isReconnectable || hasPrimingEvent;
if (canResume && this._abortController && !this._abortController.signal.aborted) {
const needsReconnect = canResume && !receivedResponse;
if (needsReconnect && this._abortController && !this._abortController.signal.aborted) {
this._scheduleReconnection(
{
resumptionToken: lastEventId,
Expand All @@ -376,8 +386,10 @@ export class StreamableHTTPClientTransport implements Transport {

// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
// Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID)
// BUT don't reconnect if we already received a response - the request is complete
const canResume = isReconnectable || hasPrimingEvent;
if (canResume && this._abortController && !this._abortController.signal.aborted) {
const needsReconnect = canResume && !receivedResponse;
if (needsReconnect && this._abortController && !this._abortController.signal.aborted) {
// Use the exponential backoff reconnection strategy
try {
this._scheduleReconnection(
Expand Down Expand Up @@ -428,9 +440,11 @@ export class StreamableHTTPClientTransport implements Transport {
}

async close(): Promise<void> {
// Abort any pending requests
if (this._reconnectionTimeout) {
clearTimeout(this._reconnectionTimeout);
this._reconnectionTimeout = undefined;
}
this._abortController?.abort();

this.onclose?.();
}

Expand Down
Loading