Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/hosts/cloudflare/src/mcp/agents-sse-max-age.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "@effect/vitest"
import { MAX_SSE_AGE_MS, McpAgent } from "agents/mcp";
import { Effect, Option, Schema } from "effect";

import { SESSION_TIMEOUT_MS } from "./session-alarm-policy";

const KEEPALIVE_INTERVAL_MS = 25_000;
const MAX_PENDING_SSE_BYTES = 8 * 1024 * 1024;

Expand Down Expand Up @@ -264,6 +266,11 @@ describe("agents SSE max-age rotation", () => {
vi.useRealTimers();
});

it("keeps the default max age well above the session idle timeout", () => {
expect(MAX_SSE_AGE_MS).toBe(30 * 60 * 1000);
expect(MAX_SSE_AGE_MS).toBeGreaterThanOrEqual(6 * SESSION_TIMEOUT_MS);
});

it("closes a healthy draining SSE connection within one keepalive tick after max age", async () => {
const { response, ws } = await openSse();
const drained = drainResponse(response);
Expand Down
23 changes: 13 additions & 10 deletions patches/agents@0.17.3.patch
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ index c8fad448e8797b89690a99d93490d1363851b225..439da01e789713d217778e8bdebfa1a9
xt as MCPClientOAuthCallbackConfig,
zt as ElicitResult
} from "../agent-tool-types-CNyE1iz_.js";
+declare const MAX_SSE_AGE_MS = 300000;
+declare const MAX_SSE_AGE_MS = 1800000;
export {
type ClearableEventStore,
type CreateMcpHandlerOptions,
Expand All @@ -22,11 +22,14 @@ diff --git a/dist/mcp/index.js b/dist/mcp/index.js
index 1edcf0c8c9e67aa211ae515e7672cdf79912101e..0ba4f5bff8e6a968aa18c609952e576cedf13d8f 100644
--- a/dist/mcp/index.js
+++ b/dist/mcp/index.js
@@ -28,13 +28,14 @@ import { WebStandardStreamableHTTPServerTransport } from "@modelcontextprotocol/
@@ -28,13 +28,17 @@ import { WebStandardStreamableHTTPServerTransport } from "@modelcontextprotocol/
const KEEPALIVE_INTERVAL_MS = 25e3;
/** SSE comment frame the parser drops before any event dispatch. */
const KEEPALIVE_FRAME = ": keepalive\n\n";
+const MAX_SSE_AGE_MS = 300_000;
+// Max age is a stalled-client memory backstop, not session idleness. Keep it
+// well above Executor's 5 minute session idle timeout so active clients rarely
+// rotate.
+const MAX_SSE_AGE_MS = 30 * 60 * 1000;
/**
* Start an SSE keepalive on `writer`. Returns a `clearInterval` handle
* that the stream cleanup must invoke when the stream closes.
Expand All @@ -39,7 +42,7 @@ index 1edcf0c8c9e67aa211ae515e7672cdf79912101e..0ba4f5bff8e6a968aa18c609952e576c
}, KEEPALIVE_INTERVAL_MS);
return handle;
}
@@ -180,10 +181,15 @@ const createStreamingHttpHandler = (basePath, namespace, options = {}) => {
@@ -180,10 +184,15 @@ const createStreamingHttpHandler = (basePath, namespace, options = {}) => {
});
return new Response(body, { status: 404 });
}
Expand All @@ -59,7 +62,7 @@ index 1edcf0c8c9e67aa211ae515e7672cdf79912101e..0ba4f5bff8e6a968aa18c609952e576c
request.headers.forEach((value, key) => {
existingHeaders[key] = value;
});
@@ -206,45 +212,71 @@ const createStreamingHttpHandler = (basePath, namespace, options = {}) => {
@@ -206,45 +215,71 @@ const createStreamingHttpHandler = (basePath, namespace, options = {}) => {
jsonrpc: "2.0"
});
return new Response(body, { status: 500 });
Expand Down Expand Up @@ -157,7 +160,7 @@ index 1edcf0c8c9e67aa211ae515e7672cdf79912101e..0ba4f5bff8e6a968aa18c609952e576c
}
onClose().catch(console.error);
});
@@ -279,10 +311,16 @@ const createStreamingHttpHandler = (basePath, namespace, options = {}) => {
@@ -279,10 +314,16 @@ const createStreamingHttpHandler = (basePath, namespace, options = {}) => {
id: null,
jsonrpc: "2.0"
}), { status: 400 });
Expand All @@ -178,7 +181,7 @@ index 1edcf0c8c9e67aa211ae515e7672cdf79912101e..0ba4f5bff8e6a968aa18c609952e576c
props: ctx.props,
jurisdiction: options.jurisdiction
});
@@ -306,27 +344,86 @@ const createStreamingHttpHandler = (basePath, namespace, options = {}) => {
@@ -306,27 +347,86 @@ const createStreamingHttpHandler = (basePath, namespace, options = {}) => {
if (!ws) {
await writer.close();
return new Response("Failed to establish WS to DO", { status: 500 });
Expand Down Expand Up @@ -280,7 +283,7 @@ index 1edcf0c8c9e67aa211ae515e7672cdf79912101e..0ba4f5bff8e6a968aa18c609952e576c
return new Response(readable, {
headers: {
"Cache-Control": "no-cache",
@@ -389,10 +486,16 @@ const createLegacySseHandler = (basePath, namespace, options = {}) => {
@@ -389,10 +489,16 @@ const createLegacySseHandler = (basePath, namespace, options = {}) => {
const url = new URL(request.url);
if (request.method === "GET" && basePattern.test(url)) {
const sessionId = url.searchParams.get("sessionId") || namespace.newUniqueId().toString();
Expand All @@ -301,7 +304,7 @@ index 1edcf0c8c9e67aa211ae515e7672cdf79912101e..0ba4f5bff8e6a968aa18c609952e576c
endpointUrl.pathname = encodeURI(`${basePath}/message`);
endpointUrl.searchParams.set("sessionId", sessionId);
const endpointMessage = `event: endpoint\ndata: ${endpointUrl.pathname + endpointUrl.search + endpointUrl.hash}\n\n`;
@@ -414,35 +517,94 @@ const createLegacySseHandler = (basePath, namespace, options = {}) => {
@@ -414,35 +520,94 @@ const createLegacySseHandler = (basePath, namespace, options = {}) => {
console.error("Failed to establish WebSocket connection");
await writer.close();
return new Response("Failed to establish WebSocket connection", { status: 500 });
Expand Down Expand Up @@ -414,7 +417,7 @@ index 1edcf0c8c9e67aa211ae515e7672cdf79912101e..0ba4f5bff8e6a968aa18c609952e576c
console.error("Error closing SSE connection:", error);
}
}
@@ -1698,6 +1860,6 @@ var McpAgent = class McpAgent extends Agent {
@@ -1698,6 +1863,6 @@ var McpAgent = class McpAgent extends Agent {
};
McpAgent.STREAM_REQS_KEY_PREFIX = "__mcp_stream_reqs__:";
//#endregion
Expand Down
Loading