Skip to content

Commit 06c2af7

Browse files
fix: improve HTTP streaming connection stability (#88)
* fix: improve HTTP streaming connection stability * dead `await`
1 parent eb34bed commit 06c2af7

File tree

2 files changed

+155
-62
lines changed

2 files changed

+155
-62
lines changed

src/FastMCP.test.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ const runWithTestServer = async ({
7878
);
7979

8080
const session = await new Promise<FastMCPSession>((resolve) => {
81-
server.on("connect", (event) => {
81+
server.on("connect", async (event) => {
82+
// Wait for session to be fully ready before resolving
83+
await event.session.waitForReady();
8284
resolve(event.session);
8385
});
8486

@@ -2138,11 +2140,16 @@ test("HTTP Stream: calls a tool", { timeout: 20000 }, async () => {
21382140
new URL(`http://localhost:${port}/stream`),
21392141
);
21402142

2141-
// Connect client to server
2142-
await client.connect(transport);
2143+
// Connect client to server and wait for session to be ready
2144+
const sessionPromise = new Promise<FastMCPSession>((resolve) => {
2145+
server.on("connect", async (event) => {
2146+
await event.session.waitForReady();
2147+
resolve(event.session);
2148+
});
2149+
});
21432150

2144-
// Wait a bit to ensure connection is established
2145-
await delay(1000);
2151+
await client.connect(transport);
2152+
await sessionPromise;
21462153

21472154
// Call tool
21482155
const result = await client.callTool({

src/FastMCP.ts

Lines changed: 143 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type FastMCPEvents<T extends FastMCPSessionAuth> = {
4444

4545
type FastMCPSessionEvents = {
4646
error: (event: { error: Error }) => void;
47+
ready: () => void;
4748
rootsChanged: (event: { roots: Root[] }) => void;
4849
};
4950

@@ -628,6 +629,9 @@ export class FastMCPSession<
628629
public get clientCapabilities(): ClientCapabilities | null {
629630
return this.#clientCapabilities ?? null;
630631
}
632+
public get isReady(): boolean {
633+
return this.#connectionState === "ready";
634+
}
631635
public get loggingLevel(): LoggingLevel {
632636
return this.#loggingLevel;
633637
}
@@ -640,6 +644,7 @@ export class FastMCPSession<
640644
#auth: T | undefined;
641645
#capabilities: ServerCapabilities = {};
642646
#clientCapabilities?: ClientCapabilities;
647+
#connectionState: "closed" | "connecting" | "error" | "ready" = "connecting";
643648
#loggingLevel: LoggingLevel = "info";
644649
#pingConfig?: ServerOptions<T>["ping"];
645650
#pingInterval: null | ReturnType<typeof setInterval> = null;
@@ -739,6 +744,8 @@ export class FastMCPSession<
739744
}
740745

741746
public async close() {
747+
this.#connectionState = "closed";
748+
742749
if (this.#pingInterval) {
743750
clearInterval(this.#pingInterval);
744751
}
@@ -755,72 +762,90 @@ export class FastMCPSession<
755762
throw new UnexpectedStateError("Server is already connected");
756763
}
757764

758-
await this.#server.connect(transport);
765+
this.#connectionState = "connecting";
759766

760-
let attempt = 0;
767+
try {
768+
await this.#server.connect(transport);
761769

762-
while (attempt++ < 10) {
763-
const capabilities = await this.#server.getClientCapabilities();
770+
let attempt = 0;
764771

765-
if (capabilities) {
766-
this.#clientCapabilities = capabilities;
772+
while (attempt++ < 10) {
773+
const capabilities = this.#server.getClientCapabilities();
767774

768-
break;
769-
}
775+
if (capabilities) {
776+
this.#clientCapabilities = capabilities;
770777

771-
await delay(100);
772-
}
778+
break;
779+
}
773780

774-
if (!this.#clientCapabilities) {
775-
console.warn("[FastMCP warning] could not infer client capabilities");
776-
}
781+
await delay(100);
782+
}
777783

778-
if (
779-
this.#clientCapabilities?.roots?.listChanged &&
780-
typeof this.#server.listRoots === "function"
781-
) {
782-
try {
783-
const roots = await this.#server.listRoots();
784-
this.#roots = roots.roots;
785-
} catch (e) {
786-
if (e instanceof McpError && e.code === ErrorCode.MethodNotFound) {
787-
console.debug(
788-
"[FastMCP debug] listRoots method not supported by client",
789-
);
790-
} else {
791-
console.error(
792-
`[FastMCP error] received error listing roots.\n\n${e instanceof Error ? e.stack : JSON.stringify(e)}`,
793-
);
784+
if (!this.#clientCapabilities) {
785+
console.warn("[FastMCP warning] could not infer client capabilities");
786+
}
787+
788+
if (
789+
this.#clientCapabilities?.roots?.listChanged &&
790+
typeof this.#server.listRoots === "function"
791+
) {
792+
try {
793+
const roots = await this.#server.listRoots();
794+
this.#roots = roots.roots;
795+
} catch (e) {
796+
if (e instanceof McpError && e.code === ErrorCode.MethodNotFound) {
797+
console.debug(
798+
"[FastMCP debug] listRoots method not supported by client",
799+
);
800+
} else {
801+
console.error(
802+
`[FastMCP error] received error listing roots.\n\n${e instanceof Error ? e.stack : JSON.stringify(e)}`,
803+
);
804+
}
794805
}
795806
}
796-
}
797807

798-
if (this.#clientCapabilities) {
799-
const pingConfig = this.#getPingConfig(transport);
808+
if (this.#clientCapabilities) {
809+
const pingConfig = this.#getPingConfig(transport);
800810

801-
if (pingConfig.enabled) {
802-
this.#pingInterval = setInterval(async () => {
803-
try {
804-
await this.#server.ping();
805-
} catch {
806-
// The reason we are not emitting an error here is because some clients
807-
// seem to not respond to the ping request, and we don't want to crash the server,
808-
// e.g., https://github.com/punkpeye/fastmcp/issues/38.
809-
const logLevel = pingConfig.logLevel;
810-
if (logLevel === "debug") {
811-
console.debug("[FastMCP debug] server ping failed");
812-
} else if (logLevel === "warning") {
813-
console.warn(
814-
"[FastMCP warning] server is not responding to ping",
815-
);
816-
} else if (logLevel === "error") {
817-
console.error("[FastMCP error] server is not responding to ping");
818-
} else {
819-
console.info("[FastMCP info] server ping failed");
811+
if (pingConfig.enabled) {
812+
this.#pingInterval = setInterval(async () => {
813+
try {
814+
await this.#server.ping();
815+
} catch {
816+
// The reason we are not emitting an error here is because some clients
817+
// seem to not respond to the ping request, and we don't want to crash the server,
818+
// e.g., https://github.com/punkpeye/fastmcp/issues/38.
819+
const logLevel = pingConfig.logLevel;
820+
821+
if (logLevel === "debug") {
822+
console.debug("[FastMCP debug] server ping failed");
823+
} else if (logLevel === "warning") {
824+
console.warn(
825+
"[FastMCP warning] server is not responding to ping",
826+
);
827+
} else if (logLevel === "error") {
828+
console.error(
829+
"[FastMCP error] server is not responding to ping",
830+
);
831+
} else {
832+
console.info("[FastMCP info] server ping failed");
833+
}
820834
}
821-
}
822-
}, pingConfig.intervalMs);
835+
}, pingConfig.intervalMs);
836+
}
823837
}
838+
839+
// Mark connection as ready and emit event
840+
this.#connectionState = "ready";
841+
this.emit("ready");
842+
} catch (error) {
843+
this.#connectionState = "error";
844+
const errorEvent = {
845+
error: error instanceof Error ? error : new Error(String(error)),
846+
};
847+
this.emit("error", errorEvent);
848+
throw error;
824849
}
825850
}
826851

@@ -830,6 +855,41 @@ export class FastMCPSession<
830855
return this.#server.createMessage(message);
831856
}
832857

858+
public waitForReady(): Promise<void> {
859+
if (this.isReady) {
860+
return Promise.resolve();
861+
}
862+
863+
if (
864+
this.#connectionState === "error" ||
865+
this.#connectionState === "closed"
866+
) {
867+
return Promise.reject(
868+
new Error(`Connection is in ${this.#connectionState} state`),
869+
);
870+
}
871+
872+
return new Promise((resolve, reject) => {
873+
const timeout = setTimeout(() => {
874+
reject(
875+
new Error(
876+
"Connection timeout: Session failed to become ready within 5 seconds",
877+
),
878+
);
879+
}, 5000);
880+
881+
this.once("ready", () => {
882+
clearTimeout(timeout);
883+
resolve();
884+
});
885+
886+
this.once("error", (event) => {
887+
clearTimeout(timeout);
888+
reject(event.error);
889+
});
890+
});
891+
}
892+
833893
#getPingConfig(transport: Transport): {
834894
enabled: boolean;
835895
intervalMs: number;
@@ -1658,12 +1718,10 @@ export class FastMCP<
16581718

16591719
if (enabled) {
16601720
const path = healthConfig.path ?? "/health";
1721+
const url = new URL(req.url || "", "http://localhost");
16611722

16621723
try {
1663-
if (
1664-
req.method === "GET" &&
1665-
new URL(req.url || "", "http://localhost").pathname === path
1666-
) {
1724+
if (req.method === "GET" && url.pathname === path) {
16671725
res
16681726
.writeHead(healthConfig.status ?? 200, {
16691727
"Content-Type": "text/plain",
@@ -1672,6 +1730,34 @@ export class FastMCP<
16721730

16731731
return;
16741732
}
1733+
1734+
// Enhanced readiness check endpoint
1735+
if (req.method === "GET" && url.pathname === "/ready") {
1736+
const readySessions = this.#sessions.filter(
1737+
(s) => s.isReady,
1738+
).length;
1739+
const totalSessions = this.#sessions.length;
1740+
const allReady =
1741+
readySessions === totalSessions && totalSessions > 0;
1742+
1743+
const response = {
1744+
ready: readySessions,
1745+
status: allReady
1746+
? "ready"
1747+
: totalSessions === 0
1748+
? "no_sessions"
1749+
: "initializing",
1750+
total: totalSessions,
1751+
};
1752+
1753+
res
1754+
.writeHead(allReady ? 200 : 503, {
1755+
"Content-Type": "application/json",
1756+
})
1757+
.end(JSON.stringify(response));
1758+
1759+
return;
1760+
}
16751761
} catch (error) {
16761762
console.error("[FastMCP error] health endpoint error", error);
16771763
}

0 commit comments

Comments
 (0)