Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ jobs:
storyboard-v3-reference-seller:
name: AdCP storyboard runner — v3 reference seller (translator)
runs-on: ubuntu-latest
env:
# Intentionally non-secret: gates loopback-only debug counters
# inside this CI job.
ADCP_DEBUG_TOKEN: storyboard-debug-token
# Required as of @adcp/sdk@6.7.0 (sales-guaranteed mock-server
# canonicalized; closes #449). Storyboard run + traffic-counter
# assertions gate every PR's translator-pattern conformance.
Expand Down Expand Up @@ -668,7 +672,8 @@ jobs:
# are empty/zero, the seller served stub data without translating
# to upstream — the façade-mode failure this job exists to catch
# (see #410).
curl -sf http://127.0.0.1:3001/_debug/traffic > traffic.json
curl -sf -H "X-Debug-Token: ${ADCP_DEBUG_TOKEN}" \
http://127.0.0.1:3001/_debug/traffic > traffic.json
python -c "
import json, sys
with open('traffic.json') as f:
Expand Down
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@

## [6.3.0-beta.7](https://github.com/adcontextprotocol/adcp-client-python/compare/v6.3.0-beta.6...v6.3.0-beta.7) (2026-05-30)

### Features

* **client:** add explicit MCP Streamable HTTP session termination helper ([#903](https://github.com/adcontextprotocol/adcp-client-python/issues/903))
* **server:** expose opt-in MCP session debug snapshots ([#903](https://github.com/adcontextprotocol/adcp-client-python/issues/903))

### Breaking changes

* **server:** `enable_debug_endpoints=True` now fails closed unless callers also pass `debug_validate_request=` or explicitly opt into unauthenticated local/test access with `debug_public=True`.

### Security

* **server:** debug endpoints are no longer exposed unauthenticated by default; use a validator such as an `X-Debug-Token` check for network-reachable deployments.

### Bug Fixes

Expand Down
40 changes: 40 additions & 0 deletions docs/handler-authoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,13 @@ workflow, and closes the client session when the workflow ends. Do not create a
fresh MCP client for every AdCP operation (`get_products`, `create_media_buy`,
`sync_creatives`, etc.) unless you also close each session promptly.

SDK clients can explicitly terminate the current stateful MCP session by sending
DELETE with the current `Mcp-Session-Id`:

```python
await client.close_mcp_session()
```

The SDK defaults `session_idle_timeout=1800.0`, so abandoned sessions are reaped
after 30 minutes. Public or service-to-service sellers that may see one-shot
callers should tune this lower:
Expand Down Expand Up @@ -1429,6 +1436,39 @@ stats = get_mcp_session_stats(mcp).as_dict()
# stats["active_sessions"], stats["session_age_seconds"], etc.
```

To expose session metrics through the opt-in debug surface, wire a session
source when enabling debug endpoints:

```python
import hmac

serve(
handler,
enable_debug_endpoints=True,
session_count_source=lambda: {
"active_sessions": current_session_count(),
},
debug_validate_request=lambda headers: hmac.compare_digest(
headers.get("x-debug-token", ""),
"secret",
),
)
```

`GET /_debug/sessions` returns the current snapshot. `GET /_debug/traffic` is
still available when you also pass `debug_traffic_source`. For local-only
storyboard runners, pass `debug_public=True` instead of
`debug_validate_request`; do not use public debug routes on network-reachable
deployments.

If your caller is a one-shot container that cannot retain `Mcp-Session-Id`
across operations, use `stateless_http=True` instead of creating abandoned
stateful sessions:

```python
serve(handler, stateless_http=True)
```

## Testing

The integration test pattern in `tests/test_mcp_middleware_composition.py`
Expand Down
13 changes: 12 additions & 1 deletion examples/v3_reference_seller/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from __future__ import annotations

import asyncio
import hmac
import logging
import os
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -347,6 +348,7 @@ def main() -> None:
)
logger.info("Mock-mode upstream: %s (api_key=%s...)", upstream_url, upstream_api_key[:4])
logger.info("Audit sink wired: %s. Tenant router cache: 256 hosts.", type(audit_sink).__name__)
debug_token = os.environ.get("ADCP_DEBUG_TOKEN")

serve(
platform=platform,
Expand Down Expand Up @@ -378,7 +380,16 @@ def main() -> None:
# by simply omitting the kwarg.
validation=ValidationHookConfig(requests="strict", responses="strict"),
mock_ad_server=mock_ad_server,
enable_debug_endpoints=True,
enable_debug_endpoints=debug_token is not None,
debug_validate_request=(
(
lambda headers, token=debug_token: hmac.compare_digest(
headers.get("x-debug-token", ""), token
)
)
if debug_token is not None
else None
),
# Auto-emit binds to the supervisor: when a webhook-signing PEM
# is wired via the ADCP_WEBHOOK_SIGNING_KEY_PATH env var, the
# supervisor signs every auto-emitted completion webhook per
Expand Down
15 changes: 15 additions & 0 deletions src/adcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3859,6 +3859,21 @@ async def close(self) -> None:
logger.debug(f"Closing adapter for agent {self.agent_config.id}")
await self.adapter.close()

async def close_mcp_session(self, session_id: str | None = None) -> None:
"""Explicitly terminate a stateful MCP Streamable HTTP session.

This sends ``DELETE`` to the configured MCP endpoint with the
``Mcp-Session-Id`` header. When ``session_id`` is omitted, the
SDK-managed current session is closed. It is only valid for MCP
agents using ``mcp_transport="streamable_http"``.
"""
if not isinstance(self.adapter, MCPAdapter):
raise TypeError(
"close_mcp_session is only supported for MCP clients; "
f"got {self.agent_config.protocol}"
)
await self.adapter.close_mcp_session(session_id)

async def __aenter__(self) -> ADCPClient:
"""Async context manager entry."""
return self
Expand Down
1 change: 1 addition & 0 deletions src/adcp/decisioning/mock_ad_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async def sync_creatives(self, req, ctx):
serve(
MyPlatform(mock_ad_server=InMemoryMockAdServer()),
enable_debug_endpoints=True,
debug_public=True, # local/storyboard only
)
"""

Expand Down
149 changes: 123 additions & 26 deletions src/adcp/protocols/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
try:
from mcp import ClientSession as _ClientSession
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import MCP_SESSION_ID, streamablehttp_client
from mcp.shared._httpx_utils import create_mcp_http_client

MCP_AVAILABLE = True
except ImportError:
Expand Down Expand Up @@ -201,6 +202,8 @@ def __init__(self, *args: Any, **kwargs: Any):
)
self._session: Any = None
self._exit_stack: Any = None
self._connected_url: str | None = None
self._get_session_id: Callable[[], str | None] | None = None
# True when the session was injected by ADCPClient.from_mcp_client().
# Caller owns the lifecycle — close() is a no-op on injected adapters.
self._session_is_injected: bool = False
Expand All @@ -214,6 +217,42 @@ def _inject_session(self, session: ClientSession) -> None:
self._session = session
self._session_is_injected = True

def _http_headers(self) -> dict[str, str]:
"""Return transport headers for MCP HTTP requests."""
headers: dict[str, str] = {}
if self.agent_config.auth_token:
# Support custom auth headers and types
if self.agent_config.auth_type == "bearer":
headers[self.agent_config.auth_header] = f"Bearer {self.agent_config.auth_token}"
else:
headers[self.agent_config.auth_header] = self.agent_config.auth_token

if self.agent_config.extra_headers:
headers.update(self.agent_config.extra_headers)
return headers

def _urls_to_try(self) -> list[str]:
"""Return MCP endpoint candidates, preserving the configured URL first."""
uri = self.agent_config.agent_uri
base = uri.rstrip("/")
urls_to_try = [uri]
if base.endswith("/mcp"):
# User pointed at the MCP endpoint; also try the other slash form.
urls_to_try.append(f"{base}/" if not uri.endswith("/") else base)
else:
urls_to_try.extend([f"{base}/mcp", f"{base}/mcp/"])
return urls_to_try

def _streamable_http_client_factory(self) -> Callable[..., httpx.AsyncClient]:
"""Return the HTTP client factory used for streamable-http requests."""
if self.signing_request_hook is not None:
return _make_signing_http_factory(self.signing_request_hook)
return create_mcp_http_client

def current_mcp_session_id(self) -> str | None:
"""Return the current SDK-managed MCP Streamable HTTP session id."""
return self._get_session_id() if self._get_session_id is not None else None

async def _cleanup_failed_connection(self, context: str) -> None:
"""
Clean up resources after a failed connection attempt.
Expand All @@ -228,6 +267,8 @@ async def _cleanup_failed_connection(self, context: str) -> None:
old_stack = self._exit_stack
self._exit_stack = None
self._session = None
self._connected_url = None
self._get_session_id = None
try:
await old_stack.aclose()
except BaseException as cleanup_error:
Expand Down Expand Up @@ -322,31 +363,11 @@ async def _get_session(self) -> ClientSession:
if parsed.scheme in ("http", "https"):
self._exit_stack = AsyncExitStack()

# Create SSE client with authentication header
headers = {}
if self.agent_config.auth_token:
# Support custom auth headers and types
if self.agent_config.auth_type == "bearer":
headers[self.agent_config.auth_header] = (
f"Bearer {self.agent_config.auth_token}"
)
else:
headers[self.agent_config.auth_header] = self.agent_config.auth_token

if self.agent_config.extra_headers:
headers.update(self.agent_config.extra_headers)

# Try the user's exact URL first, then the alternate slash form, then
# /mcp discovery paths. MCP servers disagree on whether their endpoint
# is at /mcp or /mcp/ — try both rather than silently normalizing.
uri = self.agent_config.agent_uri
base = uri.rstrip("/")
urls_to_try = [uri]
if base.endswith("/mcp"):
# User pointed at the MCP endpoint; also try the other slash form.
urls_to_try.append(f"{base}/" if not uri.endswith("/") else base)
else:
urls_to_try.extend([f"{base}/mcp", f"{base}/mcp/"])
headers = self._http_headers()
urls_to_try = self._urls_to_try()

# RFC 9421 auto-signing: if ADCPClient installed a signing request
# hook, wire it into streamable_http via a custom httpx client
Expand All @@ -355,8 +376,8 @@ async def _get_session(self) -> ClientSession:
streamable_http_extra: dict[str, Any] = {}
if self.signing_request_hook is not None:
if self.agent_config.mcp_transport == "streamable_http":
streamable_http_extra["httpx_client_factory"] = _make_signing_http_factory(
self.signing_request_hook
streamable_http_extra["httpx_client_factory"] = (
self._streamable_http_client_factory()
)
else:
logger.warning(
Expand All @@ -369,10 +390,11 @@ async def _get_session(self) -> ClientSession:
last_error = None
for url in urls_to_try:
try:
get_session_id: Callable[[], str | None] | None = None
# Choose transport based on configuration
if self.agent_config.mcp_transport == "streamable_http":
# Use streamable HTTP transport (newer, bidirectional)
read, write, _get_session_id = await self._exit_stack.enter_async_context(
read, write, get_session_id = await self._exit_stack.enter_async_context(
streamablehttp_client(
url,
headers=headers,
Expand All @@ -392,6 +414,9 @@ async def _get_session(self) -> ClientSession:

# Initialize the session
await self._session.initialize()
self._connected_url = url
if self.agent_config.mcp_transport == "streamable_http":
self._get_session_id = get_session_id

logger.info(
f"Connected to MCP agent {self.agent_config.id} at {url} "
Expand Down Expand Up @@ -832,6 +857,78 @@ async def close(self) -> None:
return # caller owns lifecycle; never close an injected session
await self._cleanup_failed_connection("during close")

async def close_mcp_session(self, session_id: str | None = None) -> None:
"""Terminate a stateful Streamable HTTP MCP session by id."""
if self._session_is_injected:
raise RuntimeError(
"close_mcp_session is unavailable for from_mcp_client() sessions; "
"the caller owns the injected transport lifecycle."
)
if self.agent_config.mcp_transport != "streamable_http":
raise TypeError(
"close_mcp_session is only supported for MCP streamable_http transport; "
f"got {self.agent_config.mcp_transport!r}."
)
if session_id is None:
session_id = self.current_mcp_session_id()
if session_id is None:
raise ValueError(
"No active MCP session id is available; pass session_id explicitly "
"or call after this client has initialized a Streamable HTTP session."
)
if not session_id or any(ch in session_id for ch in ("\r", "\n", "\x00")):
raise ValueError("session_id must be a non-empty MCP session id header value")
if not HTTPX_AVAILABLE:
raise ImportError("httpx is required to close MCP Streamable HTTP sessions")

headers = self._http_headers()
headers[MCP_SESSION_ID] = session_id
timeout = _httpx.Timeout(self.agent_config.timeout)
httpx_client_factory = self._streamable_http_client_factory()
urls_to_try = (
[self._connected_url] if self._connected_url is not None else self._urls_to_try()
)

last_error: BaseException | None = None
for url in urls_to_try:
try:
async with httpx_client_factory(headers=headers, timeout=timeout) as client:
response = await client.delete(url)
if response.is_redirect:
location = response.headers.get("location")
suffix = f" to {location}" if location else ""
raise HTTPStatusError(
f"Unexpected redirect while closing MCP session{suffix}",
request=response.request,
response=response,
)
response.raise_for_status()

current_session_id = self._get_session_id() if self._get_session_id else None
if current_session_id == session_id:
await self._cleanup_failed_connection("after explicit MCP session close")
return
except _HTTP_STATUS_ERROR_TYPES as exc:
last_error = exc
# Keep fallback behavior symmetrical with session initialization:
# a 404/405 on one candidate usually means "try the slash variant".
exc_response = getattr(exc, "response", None)
status_code = getattr(exc_response, "status_code", None)
if status_code in (404, 405) and url != urls_to_try[-1]:
continue
break
except Exception as exc:
last_error = exc
if url != urls_to_try[-1]:
continue
break

raise ADCPConnectionError(
f"Failed to close MCP session {session_id!r}: {last_error}",
agent_id=self.agent_config.id,
agent_uri=self.agent_config.agent_uri,
) from last_error

# ========================================================================
# V3 Protocol Methods - Protocol Discovery
# ========================================================================
Expand Down
Loading
Loading