Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/client/src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ export class StreamableHTTPClientTransport implements Transport {
private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping.
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
private _reconnectionTimeout?: ReturnType<typeof setTimeout>;
private _sseStreamOpened = false; // Track if SSE stream was successfully opened

onclose?: () => void;
onerror?: (error: Error) => void;
Expand Down Expand Up @@ -247,6 +248,7 @@ export class StreamableHTTPClientTransport implements Transport {
});
}

this._sseStreamOpened = true;
this._handleSseStream(response.body, options, true);
} catch (error) {
this.onerror?.(error as Error);
Expand Down Expand Up @@ -486,10 +488,19 @@ export class StreamableHTTPClientTransport implements Transport {

// Handle session ID received during initialization
const sessionId = response.headers.get('mcp-session-id');
const hadSessionId = this._sessionId !== undefined;
if (sessionId) {
this._sessionId = sessionId;
}

// If we just received a session ID for the first time and SSE stream is not open,
// try to open it now. This handles the case where the initial SSE connection
// during start() was rejected because the server wasn't initialized yet.
// See: https://github.com/modelcontextprotocol/typescript-sdk/issues/1167
if (sessionId && !hadSessionId && !this._sseStreamOpened) {
this._startOrAuthSse({ resumptionToken: undefined }).catch(error => this.onerror?.(error));
}

if (!response.ok) {
const text = await response.text?.().catch(() => null);

Expand Down
34 changes: 34 additions & 0 deletions packages/client/test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,19 @@ describe('StreamableHTTPClientTransport', () => {
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
});

// Mock the SSE stream GET request that happens after receiving session ID
(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: false,
status: 405,
headers: new Headers(),
body: { cancel: vi.fn() }
});

await transport.send(message);

// Allow the async SSE connection attempt to complete
await new Promise(resolve => setTimeout(resolve, 10));

// Send a second message that should include the session ID
(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: true,
Expand Down Expand Up @@ -140,7 +151,19 @@ describe('StreamableHTTPClientTransport', () => {
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
});

// Mock the SSE stream GET request that happens after receiving session ID
(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: false,
status: 405,
headers: new Headers(),
body: { cancel: vi.fn() }
});

await transport.send(message);

// Allow the async SSE connection attempt to complete
await new Promise(resolve => setTimeout(resolve, 10));

expect(transport.sessionId).toBe('test-session-id');

// Now terminate the session
Expand Down Expand Up @@ -180,8 +203,19 @@ describe('StreamableHTTPClientTransport', () => {
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
});

// Mock the SSE stream GET request that happens after receiving session ID
(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: false,
status: 405,
headers: new Headers(),
body: { cancel: vi.fn() }
});

await transport.send(message);

// Allow the async SSE connection attempt to complete
await new Promise(resolve => setTimeout(resolve, 10));

// Now terminate the session, but server responds with 405
(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: false,
Expand Down
Loading
Loading