Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions packages/hosts/cloudflare/src/mcp/agents-sse-max-age.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,30 @@ const waitFor = async (predicate: () => boolean): Promise<void> => {
expect(predicate()).toBe(true);
};

const drainResponse = (response: Response): Promise<void> => {
return Effect.runPromise(
const drainResponse = async (response: Response): Promise<string> => {
const decoder = new TextDecoder();
let body = "";

await Effect.runPromise(
Effect.ignore(
Effect.tryPromise({
try: () =>
response.body?.pipeTo(
new WritableStream<Uint8Array>({
write: () => {},
close: () => {
body += decoder.decode();
},
write: (chunk) => {
body += decoder.decode(chunk, { stream: true });
},
}),
) ?? Promise.resolve(),
catch: () => undefined,
}),
),
);

return body;
};

const installStallingTransformStream = () => {
Expand Down Expand Up @@ -233,14 +243,19 @@ const rotationLogs = (logs: ReadonlyArray<string>): ReadonlyArray<RotationLog> =

describe("agents SSE max-age rotation", () => {
let errorLogs: string[] = [];
let infoLogs: string[] = [];

beforeEach(() => {
errorLogs = [];
infoLogs = [];
vi.useFakeTimers();
vi.setSystemTime(0);
vi.spyOn(console, "error").mockImplementation((line) => {
errorLogs.push(String(line));
});
vi.spyOn(console, "log").mockImplementation((line) => {
infoLogs.push(String(line));
});
});

afterEach(() => {
Expand All @@ -254,17 +269,18 @@ describe("agents SSE max-age rotation", () => {
const drained = drainResponse(response);

await vi.advanceTimersByTimeAsync(MAX_SSE_AGE_MS + KEEPALIVE_INTERVAL_MS);
await waitFor(() => ws.closeCode === 1013);
await waitFor(() => ws.closeCode === 1000);

expect(ws.closeReason).toBe("SSE client not draining");
const [rotationLog] = rotationLogs(errorLogs);
expect(ws.closeReason).toBe("sse_max_age_rotation");
const [rotationLog] = rotationLogs(infoLogs);
expect(rotationLog?.event).toBe("sse_max_age_close");
expect(rotationLog?.ageMs).toBeGreaterThan(MAX_SSE_AGE_MS);
expect(rotationLog?.ageMs).toBeLessThanOrEqual(MAX_SSE_AGE_MS + KEEPALIVE_INTERVAL_MS);
expect(rotationLog?.pendingBytes).toBeGreaterThanOrEqual(0);
expect(errorLogs).toEqual([]);
expect(vi.getTimerCount()).toBe(0);

await drained;
await expect(drained).resolves.toContain(": max-age rotation, reconnect\n\n");
});

it("does not rotate an in-flight POST response past max age", async () => {
Expand All @@ -277,7 +293,7 @@ describe("agents SSE max-age rotation", () => {

expect(ws.closeCode).toBeUndefined();
expect(ws.closeReason).toBeUndefined();
expect(rotationLogs(errorLogs)).toEqual([]);
expect(rotationLogs(infoLogs)).toEqual([]);

emitAgentEvent(
ws,
Expand All @@ -299,7 +315,7 @@ describe("agents SSE max-age rotation", () => {
await flushMicrotasks();

expect(ws.closeCode).toBeUndefined();
expect(rotationLogs(errorLogs)).toEqual([]);
expect(rotationLogs(infoLogs)).toEqual([]);

emitClose(ws);
await drained;
Expand All @@ -325,7 +341,7 @@ describe("agents SSE max-age rotation", () => {
expect(ws.closeCode).toBe(1013);
expect(ws.closeReason).toBe("SSE client not draining");
expect(transform.abortReason()).toBeInstanceOf(Error);
expect(rotationLogs(errorLogs)).toEqual([]);
expect(rotationLogs(infoLogs)).toEqual([]);
expect(vi.getTimerCount()).toBe(0);
});

Expand All @@ -338,7 +354,7 @@ describe("agents SSE max-age rotation", () => {
await drained;

expect(ws.closeCode).toBeUndefined();
expect(rotationLogs(errorLogs)).toEqual([]);
expect(rotationLogs(infoLogs)).toEqual([]);
expect(vi.getTimerCount()).toBe(0);
});
});
72 changes: 55 additions & 17 deletions patches/agents@0.17.3.patch
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/dist/mcp/index.d.ts b/dist/mcp/index.d.ts
index c8fad44..439da01 100644
index c8fad448e8797b89690a99d93490d1363851b225..439da01e789713d217778e8bdebfa1a9d52a71e2 100644
--- a/dist/mcp/index.d.ts
+++ b/dist/mcp/index.d.ts
@@ -29,6 +29,7 @@ import {
Expand All @@ -19,7 +19,7 @@ index c8fad44..439da01 100644
McpAgent,
type McpAuthContext,
diff --git a/dist/mcp/index.js b/dist/mcp/index.js
index 1edcf0c..707f1cf 100644
index 1edcf0c8c9e67aa211ae515e7672cdf79912101e..0ba4f5bff8e6a968aa18c609952e576cedf13d8f 100644
--- a/dist/mcp/index.js
+++ b/dist/mcp/index.js
@@ -28,13 +28,14 @@ import { WebStandardStreamableHTTPServerTransport } from "@modelcontextprotocol/
Expand Down Expand Up @@ -178,7 +178,7 @@ index 1edcf0c..707f1cf 100644
props: ctx.props,
jurisdiction: options.jurisdiction
});
@@ -306,27 +344,66 @@ const createStreamingHttpHandler = (basePath, namespace, options = {}) => {
@@ -306,27 +344,86 @@ const createStreamingHttpHandler = (basePath, namespace, options = {}) => {
if (!ws) {
await writer.close();
return new Response("Failed to establish WS to DO", { status: 500 });
Expand All @@ -187,7 +187,7 @@ index 1edcf0c..707f1cf 100644
- ws.addEventListener("message", (event) => {
+ }
+ ws.accept();
+ const __closeSse = () => {
+ const __abortSse = () => {
+ __sseClosed = true;
+ try {
+ clearInterval(keepAlive);
Expand All @@ -197,22 +197,42 @@ index 1edcf0c..707f1cf 100644
+ } catch {}
+ writer.abort(new Error("SSE client not draining")).catch(() => {});
+ };
+ const __closeSseGracefully = () => {
+ __sseClosed = true;
+ try {
+ clearInterval(keepAlive);
+ } catch {}
+ try {
+ const finalFrame = encoder.encode(": max-age rotation, reconnect\n\n");
+ __writeChain = __writeChain.then(() => writer.write(finalFrame)).catch(() => {}).then(() => writer.close()).catch(() => {
+ writer.abort().catch(() => {});
+ });
+ } catch {
+ __writeChain = writer.close().catch(() => {
+ writer.abort().catch(() => {});
+ });
+ }
+ try {
+ ws.close(1000, "sse_max_age_rotation");
+ } catch {}
+ return __writeChain;
+ };
+ const __forwardSse = (frame) => {
+ if (__sseClosed) return __writeChain;
+ const ageMs = Date.now() - __openedAt;
+ if (ageMs > MAX_SSE_AGE_MS) {
+ console.error(JSON.stringify({
+ console.log(JSON.stringify({
+ event: "sse_max_age_close",
+ sessionId,
+ variant: "streamable-get",
+ ageMs,
+ pendingBytes: __pendingBytes
+ }));
+ __closeSse();
+ __closeSseGracefully();
+ return Promise.resolve();
+ }
+ if (__pendingBytes + frame.byteLength > MAX_PENDING_SSE_BYTES) {
+ __closeSse();
+ __abortSse();
+ return Promise.resolve();
+ }
+ __pendingBytes += frame.byteLength;
Expand Down Expand Up @@ -260,7 +280,7 @@ index 1edcf0c..707f1cf 100644
return new Response(readable, {
headers: {
"Cache-Control": "no-cache",
@@ -389,10 +466,16 @@ const createLegacySseHandler = (basePath, namespace, options = {}) => {
@@ -389,10 +486,16 @@ const createLegacySseHandler = (basePath, namespace, options = {}) => {
const url = new URL(request.url);
if (request.method === "GET" && basePattern.test(url)) {
const sessionId = url.searchParams.get("sessionId") || namespace.newUniqueId().toString();
Expand All @@ -281,7 +301,7 @@ index 1edcf0c..707f1cf 100644
endpointUrl.pathname = encodeURI(`${basePath}/message`);
endpointUrl.searchParams.set("sessionId", sessionId);
const endpointMessage = `event: endpoint\ndata: ${endpointUrl.pathname + endpointUrl.search + endpointUrl.hash}\n\n`;
@@ -414,35 +497,74 @@ const createLegacySseHandler = (basePath, namespace, options = {}) => {
@@ -414,35 +517,94 @@ const createLegacySseHandler = (basePath, namespace, options = {}) => {
console.error("Failed to establish WebSocket connection");
await writer.close();
return new Response("Failed to establish WebSocket connection", { status: 500 });
Expand All @@ -290,7 +310,7 @@ index 1edcf0c..707f1cf 100644
- ws.addEventListener("message", (event) => {
+ }
+ ws.accept();
+ const __closeSse = () => {
+ const __abortSse = () => {
+ __sseClosed = true;
+ try {
+ clearInterval(keepAlive);
Expand All @@ -300,22 +320,42 @@ index 1edcf0c..707f1cf 100644
+ } catch {}
+ writer.abort(new Error("SSE client not draining")).catch(() => {});
+ };
+ const __closeSseGracefully = () => {
+ __sseClosed = true;
+ try {
+ clearInterval(keepAlive);
+ } catch {}
+ try {
+ const finalFrame = encoder.encode(": max-age rotation, reconnect\n\n");
+ __writeChain = __writeChain.then(() => writer.write(finalFrame)).catch(() => {}).then(() => writer.close()).catch(() => {
+ writer.abort().catch(() => {});
+ });
+ } catch {
+ __writeChain = writer.close().catch(() => {
+ writer.abort().catch(() => {});
+ });
+ }
+ try {
+ ws.close(1000, "sse_max_age_rotation");
+ } catch {}
+ return __writeChain;
+ };
+ const __forwardSse = (frame) => {
+ if (__sseClosed) return __writeChain;
+ const ageMs = Date.now() - __openedAt;
+ if (ageMs > MAX_SSE_AGE_MS) {
+ console.error(JSON.stringify({
+ console.log(JSON.stringify({
+ event: "sse_max_age_close",
+ sessionId,
+ variant: "legacy-sse",
+ ageMs,
+ pendingBytes: __pendingBytes
+ }));
+ __closeSse();
+ __closeSseGracefully();
+ return Promise.resolve();
+ }
+ if (__pendingBytes + frame.byteLength > MAX_PENDING_SSE_BYTES) {
+ __closeSse();
+ __abortSse();
+ return Promise.resolve();
+ }
+ __pendingBytes += frame.byteLength;
Expand Down Expand Up @@ -374,13 +414,11 @@ index 1edcf0c..707f1cf 100644
console.error("Error closing SSE connection:", error);
}
}
@@ -1698,6 +1820,6 @@ var McpAgent = class McpAgent extends Agent {
@@ -1698,6 +1860,6 @@ var McpAgent = class McpAgent extends Agent {
};
McpAgent.STREAM_REQS_KEY_PREFIX = "__mcp_stream_reqs__:";
//#endregion
-export { DurableObjectEventStore, ElicitRequestSchema, MCP_SERVER_ID_MAX_LENGTH, McpAgent, RPCClientTransport, RPCServerTransport, RPC_DO_PREFIX, SSEEdgeClientTransport, StreamableHTTPEdgeClientTransport, WorkerTransport, createMcpHandler, experimental_createMcpHandler, getMcpAuthContext, normalizeServerId };
+export { DurableObjectEventStore, ElicitRequestSchema, MAX_SSE_AGE_MS, MCP_SERVER_ID_MAX_LENGTH, McpAgent, RPCClientTransport, RPCServerTransport, RPC_DO_PREFIX, SSEEdgeClientTransport, StreamableHTTPEdgeClientTransport, WorkerTransport, createMcpHandler, experimental_createMcpHandler, getMcpAuthContext, normalizeServerId };

-//# sourceMappingURL=index.js.map
\ No newline at end of file
+//# sourceMappingURL=index.js.map
//# sourceMappingURL=index.js.map
Loading