From 45249d28e3f2391c2df198e37cbb90e9210e6ffd Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sun, 7 Jun 2026 07:24:25 -0400 Subject: [PATCH 1/2] feat(server): add response_enhancer callback (JS #2161 parity) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a server-wide response_enhancer callback that stamps cross-cutting fields on every response class — framework-tool successes, custom-tool successes (get_task_status / list_tasks), the pre-auth get_adcp_capabilities discovery response, and structured adcp_error responses — uniformly across the MCP and A2A transports. The Python SDK has no single response finalizer, so the enhancer is wired at the three dispatch seams that converge the response classes, all AFTER inject_context (so a buggy enhancer cannot re-introduce a stripped credential into the echo envelope): 1. Success — create_tool_caller (mcp_tools.py): after inject_context and before validate_response, so a conformance-breaking mutation surfaces as VALIDATION_ERROR rather than shipping malformed. Covers framework tools, custom tools, and get_adcp_capabilities on both transports. 2. MCP errors — build_mcp_error_result (translate.py). 3. A2A errors — _send_adcp_error (a2a_server.py). Also closes the A2A comply_test_controller gap: that skill bypasses create_tool_caller via _call_test_controller, so the enhancer is applied there too (context echoed first to preserve the credential-echo invariant). The enhancer supports both a context-blind Callable[[dict], None] and a context-aware Callable[[str, dict, ToolContext | None], None], dispatched by signature arity. It runs synchronously, mutates in place, and a raised exception is caught + logged at WARNING — a buggy enhancer must not become a transport error. response_enhancer threads through ServeConfig, serve(), all three _serve_* transport paths, create_mcp_server / create_a2a_server, and the in-process build_asgi_app / build_test_client test harness. adcp.decisioning.serve.serve forwards it via **serve_kwargs. The ResponseEnhancer type alias is exported from adcp.server (matching ServeConfig / SkillMiddleware; adcp.__all__ snapshot unchanged). Idempotency note: the cache commits the pre-enhancement response, so a replay re-runs the enhancer — non-idempotent enhancers diverge on replay. Closes #926 Co-Authored-By: Claude Opus 4.8 (1M context) --- src/adcp/decisioning/serve.py | 3 + src/adcp/server/__init__.py | 2 + src/adcp/server/a2a_server.py | 54 ++- src/adcp/server/helpers.py | 113 +++++- src/adcp/server/mcp_tools.py | 34 +- src/adcp/server/serve.py | 70 +++- src/adcp/server/translate.py | 28 +- src/adcp/testing/decisioning.py | 13 + tests/test_response_enhancer.py | 593 ++++++++++++++++++++++++++++++++ 9 files changed, 893 insertions(+), 17 deletions(-) create mode 100644 tests/test_response_enhancer.py diff --git a/src/adcp/decisioning/serve.py b/src/adcp/decisioning/serve.py index 57727571..05e4ac2c 100644 --- a/src/adcp/decisioning/serve.py +++ b/src/adcp/decisioning/serve.py @@ -527,6 +527,9 @@ def serve( :param serve_kwargs: Forwarded to :func:`adcp.server.serve`. Use for ``host``, ``port``, ``transport``, ``test_controller``, ``context_factory``, ``middleware``, ``validation``, + ``response_enhancer`` (a server-wide + :data:`~adcp.server.ResponseEnhancer` applied to every response + on both transports), ``config`` (:class:`adcp.server.ServeConfig` bundle), etc. Pass ``config=ServeConfig(transport="a2a", ...)`` to supply all server options as a single typed object rather than diff --git a/src/adcp/server/__init__.py b/src/adcp/server/__init__.py index 2fc2b9b8..39531fcd 100644 --- a/src/adcp/server/__init__.py +++ b/src/adcp/server/__init__.py @@ -99,6 +99,7 @@ async def get_products(params, context=None): TERMINAL_CODES, TRANSIENT_CODES, AccountError, + ResponseEnhancer, adcp_error, cancel_media_buy_response, inject_context, @@ -212,6 +213,7 @@ async def get_products(params, context=None): "MCPToolSet", "MCPSessionStats", "RequestMetadata", + "ResponseEnhancer", "ServeConfig", "create_mcp_tools", "create_mcp_server", diff --git a/src/adcp/server/a2a_server.py b/src/adcp/server/a2a_server.py index fae5bc7e..3d16a21e 100644 --- a/src/adcp/server/a2a_server.py +++ b/src/adcp/server/a2a_server.py @@ -37,6 +37,7 @@ from adcp.exceptions import ADCPError from adcp.server.base import ADCPHandler, ToolContext +from adcp.server.helpers import ResponseEnhancer, _apply_response_enhancer from adcp.server.spec_compat import PreValidationHooks # Decisioning-layer ``AdcpError`` (from ``adcp.decisioning.types``) is the @@ -200,10 +201,12 @@ def __init__( validation: ValidationHookConfig | None = SERVER_DEFAULT_VALIDATION, pre_validation_hooks: PreValidationHooks | None = None, test_controller_account_resolver: Any | None = None, + response_enhancer: ResponseEnhancer | None = None, ) -> None: self._handler = handler self._context_factory = context_factory self._test_controller_account_resolver = test_controller_account_resolver + self._response_enhancer = response_enhancer # Store as a tuple so the executor can't be mutated from underneath # at runtime (a flaky test or a handler reaching self._middleware # can't corrupt the dispatch chain). Tuple ordering = runtime @@ -232,6 +235,7 @@ def __init__( validation=validation, pre_validation_hook=hook, default_unnegotiated_adcp_version=None, + response_enhancer=response_enhancer, ) if test_controller is not None: @@ -253,16 +257,33 @@ def _register_test_controller(self, store: TestControllerStore) -> None: """ resolver = self._test_controller_account_resolver + response_enhancer = self._response_enhancer async def _call_test_controller( params: dict[str, Any], context: ToolContext | None = None ) -> Any: - return await _handle_test_controller( + result = await _handle_test_controller( store, params, context=context, account_resolver=resolver, ) + # This skill bypasses ``create_tool_caller`` (the success-path + # enhancer site), so apply the enhancer here too — otherwise + # comply responses would silently skip the seller's + # cross-cutting stamp. Echo context first so the enhancer runs + # after the credential-stripped envelope is assembled (the + # later ``_send_result`` ``inject_context`` then no-ops), + # preserving the credential-echo invariant the other sites + # uphold. + if isinstance(result, dict): + from adcp.server.helpers import inject_context + + inject_context(params, result) + _apply_response_enhancer( + response_enhancer, "comply_test_controller", result, context + ) + return result self._tool_callers["comply_test_controller"] = _call_test_controller @@ -303,7 +324,9 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non # channel is reserved for transport-level errors (auth # rejected, rate-limited pre-dispatch). logger.info("AdCP application error for skill %s: %s", skill_name, exc) - await self._send_adcp_error(event_queue, context, exc, params) + await self._send_adcp_error( + event_queue, context, exc, params, skill_name=skill_name, tool_context=tool_context + ) except Exception: logger.exception("Error executing skill %s", skill_name) await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}") @@ -512,6 +535,9 @@ async def _send_adcp_error( context: RequestContext, exc: Any, params: dict[str, Any] | None = None, + *, + skill_name: str = "", + tool_context: ToolContext | None = None, ) -> None: """Publish a failed task carrying an AdCP ``adcp_error`` payload. @@ -564,6 +590,13 @@ async def _send_adcp_error( if params is not None: inject_context(params, data) + # Run the seller's response enhancer on the error envelope AFTER + # the context echo (so a stripped credential can't be + # re-introduced) — symmetric with the MCP error path + # (``build_mcp_error_result``) and the success path. A buggy + # enhancer is caught and logged inside the helper. + _apply_response_enhancer(self._response_enhancer, skill_name, data, tool_context) + task = _make_task( context, state=pb.TaskState.TASK_STATE_FAILED, @@ -819,15 +852,13 @@ def _validate_card_url(url: str) -> str: parsed = urlparse(url) if not parsed.scheme or not parsed.netloc: raise ValueError( - f"public_url resolver returned {url!r} — " - "must be an absolute URL with scheme and host." + f"public_url resolver returned {url!r} — must be an absolute URL with scheme and host." ) hostname = parsed.hostname or "" is_loopback = hostname in ("localhost", "127.0.0.1", "::1") or hostname.endswith(".localhost") if parsed.scheme != "https" and not is_loopback: raise ValueError( - f"public_url resolver returned {url!r} — " - "scheme must be 'https' for non-loopback hosts." + f"public_url resolver returned {url!r} — scheme must be 'https' for non-loopback hosts." ) return url @@ -941,6 +972,7 @@ def create_a2a_server( context_builder: Any | None = None, auth: BearerTokenAuth | None = None, public_url: str | PublicUrlResolver | None = None, + response_enhancer: ResponseEnhancer | None = None, ) -> Any: """Create an A2A Starlette application from an ADCP handler. @@ -1067,6 +1099,15 @@ def agent_card_url(request: Request) -> str: The ``PUBLIC_URL`` env-var fallback applies only when ``public_url`` is ``None``; a callable takes priority. + response_enhancer: Optional server-wide + :data:`~adcp.server.ResponseEnhancer` applied to every + response — successes, ``adcp_error`` envelopes, and the + ``comply_test_controller`` skill — after the context echo and + (for successes) before schema validation. Mirrors the MCP-side + ``create_mcp_server(response_enhancer=...)`` so a single + callback stamps both transports. See + :data:`~adcp.server.ResponseEnhancer` for the supported arities + and failure semantics. Returns: A Starlette app ready to be run with uvicorn. @@ -1088,6 +1129,7 @@ def agent_card_url(request: Request) -> str: validation=validation, pre_validation_hooks=pre_validation_hooks, test_controller_account_resolver=test_controller_account_resolver, + response_enhancer=response_enhancer, ) if task_store is None: diff --git a/src/adcp/server/helpers.py b/src/adcp/server/helpers.py index bc373973..0faf7548 100644 --- a/src/adcp/server/helpers.py +++ b/src/adcp/server/helpers.py @@ -13,12 +13,16 @@ from __future__ import annotations +import inspect +import logging import warnings from collections.abc import Awaitable, Callable from datetime import datetime, timezone -from typing import Any +from typing import Any, cast -from adcp.server.base import AccountAwareToolContext +from adcp.server.base import AccountAwareToolContext, ToolContext + +logger = logging.getLogger("adcp.server") # All 32 codes from the ADCP spec (enums/error-code.json) plus SDK extensions. # Recovery classification: transient (retry), correctable (fix request), terminal. @@ -380,6 +384,111 @@ def inject_context( return response +# ============================================================================ +# Response Enhancer +# ============================================================================ + +ResponseEnhancer = ( + Callable[[dict[str, Any]], None] | Callable[[str, dict[str, Any], "ToolContext | None"], None] +) +"""Server-wide callback that stamps cross-cutting fields on every response. + +Configure it via ``serve(response_enhancer=...)`` (or the matching +:class:`~adcp.server.ServeConfig` field). The framework calls it after the +context-echo envelope is assembled and before schema validation, on every +response class — framework-tool successes, custom-tool successes +(``get_task_status`` / ``list_tasks``), the pre-auth +``get_adcp_capabilities`` discovery response, and structured ``adcp_error`` +responses — on both the MCP and A2A transports. + +Two arities are supported, dispatched by positional-parameter count: + +- **Context-blind** ``(result_dict) -> None`` — the common case; mutate the + response dict in place to stamp a field on every response. +- **Context-aware** ``(method_name, result_dict, context) -> None`` — when + the stamp depends on the tool or the caller. ``context`` is the + :class:`~adcp.server.ToolContext` for this dispatch, or ``None`` for an + unauthenticated / pre-auth discovery call. + +The enhancer mutates the response dict in place; its return value is +ignored. It runs **synchronously** (it is not awaited). A raised exception +is caught and logged at ``WARNING`` — the un-enhanced response ships rather +than turning a buggy enhancer into a transport error. + +Because the enhancer runs *after* the wire response is stripped of any +credential the buyer echoed in ``context``, it cannot re-introduce a +credential into the response envelope. + +Idempotency note: the server-side idempotency cache commits the +*pre-enhancement* response, so a replayed request re-runs the enhancer. +Non-idempotent enhancers (timestamps, random IDs) will therefore diverge +between the original response and its replays. +""" + + +def _enhancer_is_context_aware(enhancer: ResponseEnhancer) -> bool: + """Return ``True`` when *enhancer* takes the 3-arg context-aware shape. + + Dispatch is by positional-parameter arity: a single positional + parameter is the context-blind ``(result_dict)`` shape; three is the + context-aware ``(method_name, result_dict, context)`` shape. A callable + with ``*args`` is treated as context-aware so adopters writing a + catch-all signature still receive the method name and context. + + Signature introspection failures (C callables, exotic wrappers) fall + back to the context-blind shape — the safe default that matches the + most common adopter intent. + """ + try: + sig = inspect.signature(enhancer) + except (TypeError, ValueError): + return False + positional = [ + p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) + ] + has_var_positional = any(p.kind is p.VAR_POSITIONAL for p in sig.parameters.values()) + return len(positional) >= 3 or has_var_positional + + +def _apply_response_enhancer( + enhancer: ResponseEnhancer | None, + method_name: str, + result: dict[str, Any], + context: ToolContext | None, +) -> dict[str, Any]: + """Run the configured *enhancer* against *result*, mutating it in place. + + Returns the same ``result`` dict reference (no clone) so callers can use + the return value or the original interchangeably. When *enhancer* is + ``None`` the dict is returned unchanged. + + The enhancer is invoked synchronously. Its return value is ignored — it + must mutate ``result`` in place. A raised exception is caught and logged + at ``WARNING`` (including *method_name*); the original ``result`` is + returned un-enhanced so a buggy enhancer never becomes a transport + error. + """ + if enhancer is None: + return result + try: + if _enhancer_is_context_aware(enhancer): + context_aware = cast( + "Callable[[str, dict[str, Any], ToolContext | None], None]", enhancer + ) + context_aware(method_name, result, context) + else: + context_blind = cast("Callable[[dict[str, Any]], None]", enhancer) + context_blind(result) + except Exception: + logger.warning( + "response_enhancer raised for %s — shipping the un-enhanced " + "response. This is a bug in the enhancer, not in the response.", + method_name, + exc_info=True, + ) + return result + + # ============================================================================ # Cancellation Helper # ============================================================================ diff --git a/src/adcp/server/mcp_tools.py b/src/adcp/server/mcp_tools.py index 8ed57abe..af3064c9 100644 --- a/src/adcp/server/mcp_tools.py +++ b/src/adcp/server/mcp_tools.py @@ -26,6 +26,7 @@ from typing import Any from adcp.server.base import ADCPHandler, ToolContext +from adcp.server.helpers import ResponseEnhancer, _apply_response_enhancer from adcp.server.spec_compat import PreValidationHook, PreValidationHookChain from adcp.server.test_controller import SCENARIOS as _CONTROLLER_SCENARIOS from adcp.types import ( @@ -2209,6 +2210,7 @@ def create_tool_caller( validation: ValidationHookConfig | None = None, pre_validation_hook: PreValidationHookChain | None = None, default_unnegotiated_adcp_version: str | None = DEFAULT_UNNEGOTIATED_ADCP_VERSION, + response_enhancer: ResponseEnhancer | None = None, ) -> Callable[..., Any]: """Create a tool caller function for an ADCP handler method. @@ -2283,6 +2285,10 @@ def create_tool_caller( when the buyer supplies no version envelope. MCP uses ``"3.0"`` for legacy compatibility. A2A passes ``None`` so omitted version means the current SDK wire shape. + response_enhancer: Optional server-wide :data:`ResponseEnhancer` + applied to every successful response after context echo and + before schema validation. See :data:`ResponseEnhancer` for the + two supported arities and the failure/idempotency semantics. Returns: Async callable ``call_tool(params, context=None)``. The ``context`` @@ -2613,6 +2619,19 @@ async def call_tool(params: dict[str, Any], context: ToolContext | None = None) adcp_version=post_adapter_validator_version, ) inject_context(raw_params, result) + # Run the seller's response enhancer AFTER ``inject_context`` + # (so it sees the credential-stripped echo envelope and can't + # re-introduce a credential) and BEFORE ``validate_response`` + # (so any conformance-breaking mutation surfaces as a + # VALIDATION_ERROR rather than shipping malformed). This single + # site covers framework tools, custom tools + # (``get_task_status`` / ``list_tasks``), and + # ``get_adcp_capabilities`` on both MCP and A2A. The L3 error + # envelope is enhanced on the dedicated error paths + # (``build_mcp_error_result`` / ``_send_adcp_error``), so skip + # it here to avoid a double pass. + if "adcp_error" not in result: + _apply_response_enhancer(response_enhancer, method_name, result, ctx) if response_mode is not None and response_mode != "off" and isinstance(result, dict): # Skip validation when the handler returned the AdCP L3 @@ -2689,6 +2708,7 @@ def __init__( advertise_all: bool = False, validation: ValidationHookConfig | None = None, pre_validation_hooks: dict[str, PreValidationHookChain] | None = None, + response_enhancer: ResponseEnhancer | None = None, ): """Create tool set from handler. @@ -2705,6 +2725,9 @@ def __init__( ``(tool_name, args) -> args`` callable or ordered sequence. Applied before schema + Pydantic validation. See :func:`create_tool_caller`. + response_enhancer: Optional server-wide :data:`ResponseEnhancer` + applied to every successful response. See + :func:`create_tool_caller`. """ self.handler = handler self._filtered_definitions = get_tools_for_handler(handler, advertise_all=advertise_all) @@ -2715,7 +2738,11 @@ def __init__( name = tool_def["name"] hook = (pre_validation_hooks or {}).get(name) self._tools[name] = create_tool_caller( - handler, name, validation=validation, pre_validation_hook=hook + handler, + name, + validation=validation, + pre_validation_hook=hook, + response_enhancer=response_enhancer, ) @property @@ -2751,6 +2778,7 @@ def create_mcp_tools( advertise_all: bool = False, validation: ValidationHookConfig | None = None, pre_validation_hooks: dict[str, PreValidationHookChain] | None = None, + response_enhancer: ResponseEnhancer | None = None, ) -> MCPToolSet: """Create MCP tools from an ADCP handler. @@ -2789,6 +2817,9 @@ async def call_tool(name: str, arguments: dict): ``(tool_name, args) -> args`` callable or ordered sequence. Applied before schema + Pydantic validation. See :func:`create_tool_caller`. + response_enhancer: Optional server-wide :data:`ResponseEnhancer` + applied to every successful response. See + :func:`create_tool_caller`. Returns: MCPToolSet with tool definitions and handlers. @@ -2798,4 +2829,5 @@ async def call_tool(name: str, arguments: dict): advertise_all=advertise_all, validation=validation, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, ) diff --git a/src/adcp/server/serve.py b/src/adcp/server/serve.py index 48410164..0bd08c65 100644 --- a/src/adcp/server/serve.py +++ b/src/adcp/server/serve.py @@ -29,6 +29,7 @@ async def get_adcp_capabilities(self, params, context=None): logger = logging.getLogger("adcp.server") from adcp.server.base import ADCPHandler, ToolContext +from adcp.server.helpers import ResponseEnhancer from adcp.server.mcp_sessions import ADCPStreamableHTTPSessionManager from adcp.server.mcp_tools import ( _HANDLER_TOOLS, @@ -168,6 +169,7 @@ class ServeConfig: max_request_size: int | None = None validation: ValidationHookConfig | None = None pre_validation_hooks: PreValidationHooks | None = None + response_enhancer: ResponseEnhancer | None = None # --- Discovery manifest --- base_url: str | None = None @@ -609,6 +611,7 @@ def serve( max_active_sessions: int | None = None, validation: ValidationHookConfig | None = DEFAULT_VALIDATION, pre_validation_hooks: PreValidationHooks | None = None, + response_enhancer: ResponseEnhancer | None = None, enable_debug_endpoints: bool = False, debug_traffic_source: Callable[[], dict[str, int]] | None = None, session_count_source: Callable[[], dict[str, Any]] | None = None, @@ -829,6 +832,19 @@ class of bug that shipped the ``pricing_options`` ``ValidationHookConfig(requests="off", responses="off")`` or ``validation=None``. Applies to both MCP and A2A transports. + response_enhancer: Optional server-wide + :data:`~adcp.server.ResponseEnhancer` applied to every + response — framework-tool successes, custom-tool successes, + the pre-auth ``get_adcp_capabilities`` discovery response, and + ``adcp_error`` envelopes — on both the MCP and A2A transports. + The callback runs after the context echo (so it cannot + re-introduce a stripped credential) and, on the success path, + before schema validation (so a conformance-breaking mutation + surfaces as ``VALIDATION_ERROR`` rather than shipping + malformed). Both the context-blind ``(result_dict)`` arity and + the context-aware ``(method_name, result_dict, context)`` arity + are supported. See :data:`~adcp.server.ResponseEnhancer` for + the full failure and idempotency-replay semantics. Security: This function does NOT configure authentication. In production, @@ -933,6 +949,7 @@ async def force_account_status(self, account_id, status): max_active_sessions = config.max_active_sessions validation = config.validation pre_validation_hooks = config.pre_validation_hooks + response_enhancer = config.response_enhancer enable_debug_endpoints = config.enable_debug_endpoints debug_traffic_source = config.debug_traffic_source session_count_source = config.session_count_source @@ -1001,6 +1018,7 @@ async def force_account_status(self, account_id, status): max_request_size=max_request_size, validation=validation, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, base_url=base_url, specialisms=specialisms, description=description, @@ -1028,6 +1046,7 @@ async def force_account_status(self, account_id, status): max_active_sessions=max_active_sessions, validation=validation, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, base_url=base_url, specialisms=specialisms, description=description, @@ -1059,6 +1078,7 @@ async def force_account_status(self, account_id, status): max_active_sessions=max_active_sessions, validation=validation, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, base_url=base_url, specialisms=specialisms, description=description, @@ -1477,6 +1497,7 @@ def _serve_mcp( max_active_sessions: int | None = None, validation: ValidationHookConfig | None = DEFAULT_VALIDATION, pre_validation_hooks: PreValidationHooks | None = None, + response_enhancer: ResponseEnhancer | None = None, base_url: str | None = None, specialisms: list[str] | None = None, description: str | None = None, @@ -1502,6 +1523,7 @@ def _serve_mcp( max_active_sessions=max_active_sessions, validation=validation, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, allowed_hosts=allowed_hosts, allowed_origins=allowed_origins, enable_dns_rebinding_protection=enable_dns_rebinding_protection, @@ -1541,7 +1563,7 @@ def _serve_mcp( ) if asgi_middleware: logger.warning( - "asgi_middleware is ignored on transport='stdio'; " "ASGI middleware will not run" + "asgi_middleware is ignored on transport='stdio'; ASGI middleware will not run" ) mcp.run(transport=transport) @@ -1642,6 +1664,7 @@ def _serve_a2a( max_request_size: int | None = None, validation: ValidationHookConfig | None = DEFAULT_VALIDATION, pre_validation_hooks: PreValidationHooks | None = None, + response_enhancer: ResponseEnhancer | None = None, base_url: str | None = None, specialisms: list[str] | None = None, description: str | None = None, @@ -1671,6 +1694,7 @@ def _serve_a2a( advertise_all=advertise_all, validation=validation, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, auth=auth, public_url=public_url, ) @@ -1729,6 +1753,7 @@ def _build_mcp_and_a2a_app( max_active_sessions: int | None = None, validation: ValidationHookConfig | None = DEFAULT_VALIDATION, pre_validation_hooks: PreValidationHooks | None = None, + response_enhancer: ResponseEnhancer | None = None, base_url: str | None = None, specialisms: list[str] | None = None, description: str | None = None, @@ -1777,6 +1802,7 @@ def _build_mcp_and_a2a_app( max_active_sessions=max_active_sessions, validation=validation, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, allowed_hosts=allowed_hosts, allowed_origins=allowed_origins, enable_dns_rebinding_protection=enable_dns_rebinding_protection, @@ -1831,6 +1857,7 @@ def _build_mcp_and_a2a_app( advertise_all=advertise_all, validation=validation, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, auth=auth, public_url=public_url, ) @@ -1982,6 +2009,7 @@ def _serve_mcp_and_a2a( max_active_sessions: int | None = None, validation: ValidationHookConfig | None = DEFAULT_VALIDATION, pre_validation_hooks: PreValidationHooks | None = None, + response_enhancer: ResponseEnhancer | None = None, base_url: str | None = None, specialisms: list[str] | None = None, description: str | None = None, @@ -2035,6 +2063,7 @@ def _serve_mcp_and_a2a( max_active_sessions=max_active_sessions, validation=validation, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, base_url=base_url, specialisms=specialisms, description=description, @@ -2051,7 +2080,7 @@ def _serve_mcp_and_a2a( sock = _bind_reusable_socket(resolved_host, resolved_port) try: logger.info( - "MCP+A2A unified listening on http://%s:%s " "(MCP at /mcp, A2A at /)", + "MCP+A2A unified listening on http://%s:%s (MCP at /mcp, A2A at /)", resolved_host, resolved_port, ) @@ -2122,6 +2151,7 @@ def create_mcp_server( max_active_sessions: int | None = None, validation: ValidationHookConfig | None = DEFAULT_VALIDATION, pre_validation_hooks: PreValidationHooks | None = None, + response_enhancer: ResponseEnhancer | None = None, allowed_hosts: Sequence[str] | None = None, allowed_origins: Sequence[str] | None = None, enable_dns_rebinding_protection: bool | None = None, @@ -2215,6 +2245,12 @@ def create_mcp_server( service-to-service sellers that need a hard ceiling against clients opening one session per operation. Ignored when ``stateless_http=True``. + response_enhancer: Optional server-wide + :data:`~adcp.server.ResponseEnhancer` applied to every + response (successes and ``adcp_error`` envelopes) after the + context echo and, for successes, before schema validation. + See :data:`~adcp.server.ResponseEnhancer` for the supported + arities and failure / idempotency semantics. Returns: A configured FastMCP server instance. Call ``mcp.run()`` to start, @@ -2321,6 +2357,7 @@ def create_mcp_server( advertise_all=advertise_all, validation=validation, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, ) # Pre-create the StreamableHTTPSessionManager so we can pass # ``session_idle_timeout`` and ADCP's session safety knobs — @@ -2365,6 +2402,7 @@ def _register_handler_tools( advertise_all: bool = False, validation: ValidationHookConfig | None = DEFAULT_VALIDATION, pre_validation_hooks: PreValidationHooks | None = None, + response_enhancer: ResponseEnhancer | None = None, ) -> None: """Register all ADCP tools from a handler onto a FastMCP server.""" # Freeze middleware ordering at registration time. Tuple both guards @@ -2386,7 +2424,11 @@ def _register_handler_tools( output_schema = tool_def.get("outputSchema") hook = (pre_validation_hooks or {}).get(tool_name) caller = create_tool_caller( - handler, tool_name, validation=validation, pre_validation_hook=hook + handler, + tool_name, + validation=validation, + pre_validation_hook=hook, + response_enhancer=response_enhancer, ) _register_tool( mcp, @@ -2397,6 +2439,7 @@ def _register_handler_tools( context_factory=context_factory, middleware=middleware_tuple, output_schema=output_schema, + response_enhancer=response_enhancer, ) registered.append(tool_name) @@ -2418,6 +2461,7 @@ def _register_tool( context_factory: ContextFactory | None = None, middleware: tuple[SkillMiddleware, ...] = (), output_schema: dict[str, Any] | None = None, + response_enhancer: ResponseEnhancer | None = None, ) -> None: """Register a single ADCP tool on a FastMCP server. @@ -2506,15 +2550,29 @@ async def _call_handler() -> Any: # ``kwargs`` is the raw request dict — passing it lets the # builder echo the request's ``context`` extension into the # error envelope, symmetric with the success path's - # ``inject_context`` call (mcp_tools.py). - return build_mcp_error_result(exc, params=kwargs) # type: ignore[return-value] + # ``inject_context`` call (mcp_tools.py). ``response_enhancer`` + # + ``context`` thread the seller's enhancer onto the error + # envelope too (after the context echo). + return build_mcp_error_result( # type: ignore[return-value] + exc, + params=kwargs, + method_name=name, + response_enhancer=response_enhancer, + context=context, + ) except Exception as exc: # Decisioning ``AdcpError`` is NOT a subclass of # ``adcp.exceptions.ADCPError`` (different class hierarchy # — ``adcp.decisioning.types.AdcpError``). Catch it explicitly # and project the same structured envelope. if isinstance(exc, decisioning_error_types): - return build_mcp_error_result(exc, params=kwargs) # type: ignore[return-value] + return build_mcp_error_result( # type: ignore[return-value] + exc, + params=kwargs, + method_name=name, + response_enhancer=response_enhancer, + context=context, + ) raise # Pre-built CallToolResult (error envelope from build_mcp_error_result) # passes through FastMCP's convert_result and the lowlevel handler diff --git a/src/adcp/server/translate.py b/src/adcp/server/translate.py index f38961e5..dba3014d 100644 --- a/src/adcp/server/translate.py +++ b/src/adcp/server/translate.py @@ -26,7 +26,7 @@ from __future__ import annotations import json -from typing import Any, Literal, cast +from typing import TYPE_CHECKING, Any, Literal, cast from urllib.parse import urlparse from a2a.utils.errors import A2AError, InternalError, InvalidParamsError @@ -41,10 +41,17 @@ ADCPTaskError, ADCPTimeoutError, ) -from adcp.server.helpers import STANDARD_ERROR_CODES +from adcp.server.helpers import ( + STANDARD_ERROR_CODES, + ResponseEnhancer, + _apply_response_enhancer, +) from adcp.types import Error from adcp.types.core import Protocol +if TYPE_CHECKING: + from adcp.server.base import ToolContext + # ============================================================================ # Error Translation # ============================================================================ @@ -175,6 +182,9 @@ def build_mcp_error_result( exc: ADCPError | Error | Any, *, params: dict[str, Any] | None = None, + method_name: str = "", + response_enhancer: ResponseEnhancer | None = None, + context: ToolContext | None = None, ) -> CallToolResult: """Build an MCP ``CallToolResult`` carrying the structured ``adcp_error`` envelope. @@ -200,6 +210,13 @@ def build_mcp_error_result( error responses violate the AdCP context-passthrough contract and buyers lose correlation IDs and idempotency hints across the raise-AdcpError boundary. + + When ``response_enhancer`` is supplied it runs against the structured + envelope after the context echo — the same + :data:`~adcp.server.ResponseEnhancer` the success path uses, so a + seller stamping cross-cutting fields covers error responses (including + credential-policy errors) too. ``method_name`` and ``context`` are + forwarded to the context-aware enhancer arity. """ from adcp.server.helpers import inject_context @@ -233,6 +250,13 @@ def build_mcp_error_result( if params is not None: inject_context(params, structured) + # Run the seller's response enhancer on the error envelope AFTER the + # context echo (so a stripped credential can't be re-introduced) — + # symmetric with the success path in ``create_tool_caller``. Error + # responses are not schema-validated, so the enhancer's output ships + # as-is; a buggy enhancer is caught and logged inside the helper. + _apply_response_enhancer(response_enhancer, method_name, structured, context) + return CallToolResult( content=[TextContent(type="text", text=text)], structuredContent=structured, diff --git a/src/adcp/testing/decisioning.py b/src/adcp/testing/decisioning.py index 675d05b3..bed0e785 100644 --- a/src/adcp/testing/decisioning.py +++ b/src/adcp/testing/decisioning.py @@ -48,6 +48,7 @@ StateReader, ) from adcp.server.auth import BearerTokenAuth + from adcp.server.helpers import ResponseEnhancer from adcp.server.serve import ASGIMiddlewareEntry, ContextFactory, SkillMiddleware from adcp.server.spec_compat import PreValidationHooks @@ -151,6 +152,7 @@ def build_asgi_app( validation: ValidationHookConfig | None = DEFAULT_VALIDATION, discovery_base_url: str | None = None, pre_validation_hooks: PreValidationHooks | None = None, + response_enhancer: ResponseEnhancer | None = None, **factory_kwargs: Any, ) -> Any: """Build a Starlette ASGI app for in-process integration tests. @@ -229,6 +231,11 @@ def build_asgi_app( Use to install the same coercion hooks your production :func:`serve` call uses so in-process tests see the same validation surface as production. ``None`` → no hooks (default). + :param response_enhancer: Optional server-wide + :data:`~adcp.server.ResponseEnhancer` forwarded to + :func:`create_mcp_server`, so in-process tests exercise the same + enhancer wiring your production :func:`serve` call uses. ``None`` + → no enhancer (default). :param factory_kwargs: Forwarded to :func:`create_adcp_server_from_platform`. Accepted keys: ``executor``, ``registry``, ``webhook_sender``, @@ -268,6 +275,7 @@ def build_asgi_app( enable_dns_rebinding_protection=enable_dns_rebinding_protection, validation=validation, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, ) # Mirror the wrapping chain from _run_mcp_http (adcp.server.serve). # auth must be innermost so its JSON-RPC body-peek runs before the @@ -310,6 +318,7 @@ async def build_test_client( validation: ValidationHookConfig | None = DEFAULT_VALIDATION, discovery_base_url: str | None = None, pre_validation_hooks: PreValidationHooks | None = None, + response_enhancer: ResponseEnhancer | None = None, **factory_kwargs: Any, ) -> AsyncIterator[httpx.AsyncClient]: """Async context manager yielding an ``httpx.AsyncClient`` wired against @@ -369,6 +378,9 @@ async def build_test_client( :param pre_validation_hooks: Forwarded to :func:`build_asgi_app`. Install the same hooks your production :func:`serve` call uses so in-process tests see the same validation surface. + :param response_enhancer: Forwarded to :func:`build_asgi_app`. Wire + the same enhancer your production :func:`serve` call uses so + in-process tests exercise the enhancer path. :param factory_kwargs: Forwarded to :func:`create_adcp_server_from_platform` via :func:`build_asgi_app` (executor, registry, webhook_sender, etc.). @@ -423,6 +435,7 @@ async def build_test_client( validation=validation, discovery_base_url=discovery_base_url, pre_validation_hooks=pre_validation_hooks, + response_enhancer=response_enhancer, **factory_kwargs, ) async with LifespanManager(app): diff --git a/tests/test_response_enhancer.py b/tests/test_response_enhancer.py new file mode 100644 index 00000000..6b13ed67 --- /dev/null +++ b/tests/test_response_enhancer.py @@ -0,0 +1,593 @@ +"""Server-wide ``response_enhancer`` callback (issue #926, JS #2161 parity). + +The enhancer stamps cross-cutting fields on every response class — framework +tool successes, custom-tool successes (``get_task_status`` / ``list_tasks``), +the pre-auth ``get_adcp_capabilities`` discovery response, and structured +``adcp_error`` responses — uniformly across the MCP and A2A transports. + +These tests exercise the real wire dicts the dispatcher produces (not the +helper in isolation, except for the pure arity/throw unit tests) so a +serialization or seam regression is caught: + +- success path via ``create_tool_caller`` (the MCP + A2A shared seam), +- the MCP error envelope via ``build_mcp_error_result``, +- the A2A success + error envelopes via ``ADCPAgentExecutor.execute``, +- the A2A ``comply_test_controller`` bypass closure. +""" + +from __future__ import annotations + +from typing import Any + +import pytest +from a2a import types as pb +from a2a.server.agent_execution.context import RequestContext as _RealRequestContext +from a2a.server.events.event_queue import ( + EventQueueLegacy as EventQueue, +) +from google.protobuf.json_format import MessageToDict as _MessageToDict +from google.protobuf.json_format import ParseDict +from google.protobuf.struct_pb2 import Value + +from adcp.decisioning.types import AdcpError as DecisioningAdcpError +from adcp.exceptions import ADCPTaskError +from adcp.server import ADCPHandler, ToolContext +from adcp.server.a2a_server import ADCPAgentExecutor +from adcp.server.helpers import _apply_response_enhancer, _enhancer_is_context_aware +from adcp.server.mcp_tools import create_tool_caller +from adcp.server.responses import products_response +from adcp.server.test_controller import TestControllerStore +from adcp.server.translate import build_mcp_error_result +from adcp.types import Error +from adcp.validation import ValidationHookConfig + + +@pytest.fixture(autouse=True) +def _admit_sandbox_gate(monkeypatch: pytest.MonkeyPatch) -> None: + """A2A executor tests run outside the sandbox-authority gate.""" + monkeypatch.setenv("ADCP_SANDBOX", "1") + + +# --------------------------------------------------------------------------- +# Handlers covering the four response classes +# --------------------------------------------------------------------------- + + +class _Handler(ADCPHandler): + """Implements every tool the four response-class tests need.""" + + async def get_adcp_capabilities(self, params: Any, context: Any = None) -> dict[str, Any]: + return {"adcp": {"major_versions": [3]}, "supported_protocols": ["media_buy"]} + + async def get_products(self, params: Any, context: Any = None) -> dict[str, Any]: + # Real, spec-conformant wire shape so before-validate tests can + # rely on validation passing for the un-enhanced response. + return products_response([]) + + async def get_task_status(self, params: Any, context: Any = None) -> dict[str, Any]: + return {"task_id": "task_1", "status": "completed"} + + async def list_tasks(self, params: Any, context: Any = None) -> dict[str, Any]: + return {"tasks": []} + + +class _ErrorHandler(_Handler): + """get_products raises a credential-policy-style structured error.""" + + async def get_products(self, params: Any, context: Any = None) -> Any: + raise DecisioningAdcpError( + "PERMISSION_DENIED", + message="credential policy denied this request", + recovery="terminal", + ) + + +# --------------------------------------------------------------------------- +# A2A wire helpers (mirror the shims in test_a2a_server.py) +# --------------------------------------------------------------------------- + + +def _data_part(data: dict[str, Any]) -> pb.Part: + value = Value() + ParseDict(data, value) + return pb.Part(data=value) + + +def _datapart_msg(skill: str, parameters: dict[str, Any] | None = None) -> pb.Message: + return pb.Message( + message_id="msg-1", + role=pb.Role.ROLE_USER, + parts=[_data_part({"skill": skill, "parameters": parameters or {}})], + ) + + +def _empty_call_context() -> Any: + from a2a.auth.user import UnauthenticatedUser + from a2a.server.context import ServerCallContext + + return ServerCallContext(user=UnauthenticatedUser()) + + +def _request_context(skill: str, parameters: dict[str, Any] | None = None) -> _RealRequestContext: + return _RealRequestContext( + call_context=_empty_call_context(), + request=pb.SendMessageRequest(message=_datapart_msg(skill, parameters)), + ) + + +def _first_data_part(task: pb.Task) -> dict[str, Any]: + assert task.artifacts, "expected at least one artifact" + for part in task.artifacts[0].parts: + if part.WhichOneof("content") == "data": + return _MessageToDict(part.data) + raise AssertionError("task has no DataPart") + + +def _stamp(result: dict[str, Any]) -> None: + """Context-blind enhancer that stamps a sentinel field.""" + result["enhanced"] = True + + +# =========================================================================== +# Pure unit: arity dispatch + throw + None +# =========================================================================== + + +class TestEnhancerHelper: + def test_one_arg_is_context_blind(self) -> None: + assert _enhancer_is_context_aware(lambda d: None) is False + + def test_three_arg_is_context_aware(self) -> None: + assert _enhancer_is_context_aware(lambda name, d, ctx: None) is True + + def test_var_positional_is_context_aware(self) -> None: + assert _enhancer_is_context_aware(lambda *args: None) is True + + def test_context_blind_invoked_with_dict(self) -> None: + result: dict[str, Any] = {} + _apply_response_enhancer(lambda d: d.__setitem__("blind", 1), "m", result, None) + assert result == {"blind": 1} + + def test_context_aware_receives_method_and_context(self) -> None: + seen: dict[str, Any] = {} + + def enhancer(name: str, d: dict[str, Any], ctx: ToolContext | None) -> None: + seen["name"] = name + seen["ctx"] = ctx + + ctx = ToolContext(caller_identity="buyer") + _apply_response_enhancer(enhancer, "get_products", {}, ctx) + assert seen == {"name": "get_products", "ctx": ctx} + + def test_throwing_enhancer_logs_warning_and_returns_unenhanced( + self, caplog: pytest.LogCaptureFixture + ) -> None: + def boom(d: dict[str, Any]) -> None: + raise RuntimeError("kaboom") + + original = {"orig": 1} + with caplog.at_level("WARNING", logger="adcp.server"): + out = _apply_response_enhancer(boom, "get_products", original, None) + assert out is original + assert original == {"orig": 1} + assert any( + "response_enhancer raised for get_products" in r.message for r in caplog.records + ), [r.message for r in caplog.records] + + def test_none_enhancer_returns_same_reference_unchanged(self) -> None: + original = {"a": 1} + assert _apply_response_enhancer(None, "m", original, None) is original + assert original == {"a": 1} + + def test_return_value_is_ignored(self) -> None: + # Enhancer that returns a brand-new dict — the framework must keep + # the mutated original, not the returned value. + def replacing(d: dict[str, Any]) -> dict[str, Any]: + d["mutated"] = True + return {"this": "is ignored"} + + original: dict[str, Any] = {} + out = _apply_response_enhancer(replacing, "m", original, None) + assert out is original + assert original == {"mutated": True} + + +# =========================================================================== +# Success path — create_tool_caller (the shared MCP + A2A seam) +# =========================================================================== + + +class TestSuccessPathCaller: + @pytest.mark.asyncio + @pytest.mark.parametrize( + "method,params,assertion_key", + [ + ("get_products", {"brief": "x", "promoted_offering": "y"}, "products"), + ("get_task_status", {"task_id": "task_1"}, "task_id"), + ("list_tasks", {}, "tasks"), + ("get_adcp_capabilities", {}, "adcp"), + ], + ) + async def test_enhancer_mutation_reaches_each_response_class( + self, method: str, params: dict[str, Any], assertion_key: str + ) -> None: + caller = create_tool_caller(_Handler(), method, response_enhancer=_stamp) + result = await caller(params) + assert result["enhanced"] is True + assert assertion_key in result # original payload preserved + + @pytest.mark.asyncio + async def test_capabilities_runs_with_unauthenticated_none_context(self) -> None: + seen: dict[str, Any] = {} + + def enhancer(name: str, d: dict[str, Any], ctx: ToolContext | None) -> None: + seen["ctx_is_none"] = ctx is None + d["enhanced"] = True + + # Pre-auth discovery: no context passed (bare ToolContext synthesised + # by the caller). The enhancer must tolerate it. + caller = create_tool_caller(_Handler(), "get_adcp_capabilities", response_enhancer=enhancer) + result = await caller({}) + assert result["enhanced"] is True + # The caller synthesises a bare ToolContext when none is passed — + # the context-aware enhancer receives that object, never crashes. + assert seen["ctx_is_none"] is False + + @pytest.mark.asyncio + async def test_context_aware_arity_sees_real_caller_identity(self) -> None: + seen: dict[str, Any] = {} + + def enhancer(name: str, d: dict[str, Any], ctx: ToolContext | None) -> None: + seen["name"] = name + seen["identity"] = ctx.caller_identity if ctx else None + d["enhanced"] = True + + caller = create_tool_caller(_Handler(), "get_products", response_enhancer=enhancer) + result = await caller( + {"brief": "x", "promoted_offering": "y"}, + ToolContext(caller_identity="buyer-acme"), + ) + assert result["enhanced"] is True + assert seen == {"name": "get_products", "identity": "buyer-acme"} + + @pytest.mark.asyncio + async def test_no_enhancer_leaves_response_unchanged(self) -> None: + caller = create_tool_caller(_Handler(), "get_products") + result = await caller({"brief": "x", "promoted_offering": "y"}) + assert "enhanced" not in result + assert "products" in result + + @pytest.mark.asyncio + async def test_before_validate_conformance_break_surfaces_validation_error(self) -> None: + """An enhancer injecting a non-conformant field is conformance-checked + — proving the enhancer runs BEFORE response validation. The bad + mutation surfaces as VALIDATION_ERROR rather than shipping malformed.""" + + def break_conformance(d: dict[str, Any]) -> None: + # ``products`` must be a list per the get-products schema. + d["products"] = "not-a-list" + + caller = create_tool_caller( + _Handler(), + "get_products", + validation=ValidationHookConfig(responses="strict"), + response_enhancer=break_conformance, + ) + with pytest.raises(ADCPTaskError) as info: + await caller({"brief": "x", "promoted_offering": "y"}) + first = info.value.errors[0] + assert first.code == "VALIDATION_ERROR" + assert first.details["side"] == "response" + + @pytest.mark.asyncio + async def test_throwing_enhancer_is_not_a_transport_error( + self, caplog: pytest.LogCaptureFixture + ) -> None: + """A buggy enhancer must NOT become a transport error — the + un-enhanced response ships and a WARNING is logged.""" + + def boom(d: dict[str, Any]) -> None: + raise RuntimeError("enhancer bug") + + baseline = await create_tool_caller(_Handler(), "get_products")( + {"brief": "x", "promoted_offering": "y"} + ) + caller = create_tool_caller(_Handler(), "get_products", response_enhancer=boom) + with caplog.at_level("WARNING", logger="adcp.server"): + result = await caller({"brief": "x", "promoted_offering": "y"}) + assert result == baseline # identical to the un-enhanced response + assert any("response_enhancer raised for get_products" in r.message for r in caplog.records) + + @pytest.mark.asyncio + async def test_enhancer_runs_after_context_echo(self) -> None: + """Credential-echo contract: the enhancer runs AFTER ``inject_context``, + so it observes the already-assembled (credential-stripped) echo + envelope and cannot precede the strip.""" + seen: dict[str, Any] = {} + + def enhancer(d: dict[str, Any]) -> None: + # The wire ``context`` echo is already present when the enhancer + # runs — proving ordering: inject_context -> enhancer. + seen["context_present_at_enhance_time"] = "context" in d + + caller = create_tool_caller(_Handler(), "get_products", response_enhancer=enhancer) + await caller({"brief": "x", "promoted_offering": "y", "context": {"correlation_id": "abc"}}) + assert seen["context_present_at_enhance_time"] is True + + @pytest.mark.asyncio + async def test_error_envelope_from_handler_skips_success_enhancer(self) -> None: + """When the handler returns an ``{"adcp_error": ...}`` envelope, the + success-path enhancer is skipped — the L3 error envelope is enhanced + on the dedicated error path instead, so there is no double pass.""" + + class _L3(_Handler): + async def get_products(self, params: Any, context: Any = None) -> dict[str, Any]: + return {"adcp_error": {"code": "PERMISSION_DENIED", "message": "no"}} + + caller = create_tool_caller(_L3(), "get_products", response_enhancer=_stamp) + result = await caller({"brief": "x", "promoted_offering": "y"}) + assert "enhanced" not in result + assert "adcp_error" in result + + +# =========================================================================== +# MCP error path — build_mcp_error_result +# =========================================================================== + + +class TestMcpErrorPath: + def test_enhancer_stamps_mcp_error_envelope(self) -> None: + exc = DecisioningAdcpError( + "PERMISSION_DENIED", message="credential policy denied", recovery="terminal" + ) + result = build_mcp_error_result( + exc, + params={"context": {"correlation_id": "abc"}}, + method_name="get_products", + response_enhancer=_stamp, + ) + assert result.structuredContent is not None + assert result.structuredContent["adcp_error"]["code"] == "PERMISSION_DENIED" + assert result.structuredContent["enhanced"] is True + + def test_enhancer_runs_after_context_echo_on_error(self) -> None: + seen: dict[str, Any] = {} + + def enhancer(name: str, d: dict[str, Any], ctx: ToolContext | None) -> None: + seen["context_present"] = "context" in d + seen["name"] = name + d["enhanced"] = True + + exc = Error(code="PERMISSION_DENIED", message="no") + build_mcp_error_result( + exc, + params={"context": {"correlation_id": "abc"}}, + method_name="get_products", + response_enhancer=enhancer, + ) + assert seen == {"context_present": True, "name": "get_products"} + + def test_throwing_enhancer_does_not_break_error_envelope( + self, caplog: pytest.LogCaptureFixture + ) -> None: + def boom(d: dict[str, Any]) -> None: + raise RuntimeError("enhancer bug") + + exc = Error(code="PERMISSION_DENIED", message="no") + with caplog.at_level("WARNING", logger="adcp.server"): + result = build_mcp_error_result(exc, method_name="get_products", response_enhancer=boom) + assert result.structuredContent is not None + assert result.structuredContent["adcp_error"]["code"] == "PERMISSION_DENIED" + assert "enhanced" not in result.structuredContent + + def test_no_enhancer_leaves_error_envelope_unchanged(self) -> None: + exc = Error(code="PERMISSION_DENIED", message="no") + result = build_mcp_error_result(exc, method_name="get_products") + assert result.structuredContent is not None + assert "enhanced" not in result.structuredContent + + +# =========================================================================== +# A2A end-to-end — success, capabilities, error, comply bypass +# =========================================================================== + + +class TestA2aTransport: + @pytest.mark.asyncio + @pytest.mark.parametrize( + "skill,assertion_key", + [ + ("get_products", "products"), + ("get_task_status", "task_id"), + ("list_tasks", "tasks"), + ("get_adcp_capabilities", "adcp"), + ], + ) + async def test_success_enhanced_on_a2a(self, skill: str, assertion_key: str) -> None: + executor = ADCPAgentExecutor(_Handler(), validation=None, response_enhancer=_stamp) + queue = EventQueue() + params = {"task_id": "task_1"} if skill == "get_task_status" else {} + await executor.execute(_request_context(skill, params), queue) + event = await queue.dequeue_event() + assert isinstance(event, pb.Task) + assert event.status.state == pb.TaskState.TASK_STATE_COMPLETED + data = _first_data_part(event) + assert data["enhanced"] is True + assert assertion_key in data + + @pytest.mark.asyncio + async def test_capabilities_enhanced_with_unauthenticated_context_on_a2a(self) -> None: + seen: dict[str, Any] = {} + + def enhancer(name: str, d: dict[str, Any], ctx: ToolContext | None) -> None: + seen["name"] = name + d["enhanced"] = True + + executor = ADCPAgentExecutor(_Handler(), validation=None, response_enhancer=enhancer) + queue = EventQueue() + await executor.execute(_request_context("get_adcp_capabilities"), queue) + event = await queue.dequeue_event() + data = _first_data_part(event) + assert data["enhanced"] is True + assert seen["name"] == "get_adcp_capabilities" + + @pytest.mark.asyncio + async def test_error_envelope_enhanced_on_a2a(self) -> None: + executor = ADCPAgentExecutor(_ErrorHandler(), validation=None, response_enhancer=_stamp) + queue = EventQueue() + await executor.execute(_request_context("get_products"), queue) + event = await queue.dequeue_event() + assert isinstance(event, pb.Task) + assert event.status.state == pb.TaskState.TASK_STATE_FAILED + data = _first_data_part(event) + assert data["adcp_error"]["code"] == "PERMISSION_DENIED" + assert data["enhanced"] is True + + @pytest.mark.asyncio + async def test_error_enhancer_runs_after_context_echo_on_a2a(self) -> None: + seen: dict[str, Any] = {} + + def enhancer(name: str, d: dict[str, Any], ctx: ToolContext | None) -> None: + seen["context_present"] = "context" in d + d["enhanced"] = True + + executor = ADCPAgentExecutor(_ErrorHandler(), validation=None, response_enhancer=enhancer) + queue = EventQueue() + await executor.execute( + _request_context("get_products", {"context": {"correlation_id": "abc"}}), + queue, + ) + await queue.dequeue_event() + assert seen["context_present"] is True + + @pytest.mark.asyncio + async def test_no_enhancer_leaves_a2a_success_unchanged(self) -> None: + executor = ADCPAgentExecutor(_Handler(), validation=None) + queue = EventQueue() + await executor.execute(_request_context("get_products"), queue) + event = await queue.dequeue_event() + data = _first_data_part(event) + assert "enhanced" not in data + assert "products" in data + + @pytest.mark.asyncio + async def test_comply_test_controller_bypass_is_enhanced(self) -> None: + """The A2A ``comply_test_controller`` skill bypasses + ``create_tool_caller`` (its own dispatch closure). It must still run + through the enhancer — otherwise comply responses silently skip the + seller's cross-cutting stamp.""" + + class _Store(TestControllerStore): + pass + + executor = ADCPAgentExecutor( + _Handler(), + test_controller=_Store(), + validation=None, + response_enhancer=_stamp, + ) + # ``list_scenarios`` is the gate-exempt capability probe — exercises + # the bypass closure without needing a resolved sandbox account. + result = await executor._tool_callers["comply_test_controller"]( + {"scenario": "list_scenarios"} + ) + assert result["enhanced"] is True + assert result["success"] is True + + @pytest.mark.asyncio + async def test_comply_bypass_enhancer_runs_after_context_echo(self) -> None: + seen: dict[str, Any] = {} + + def enhancer(name: str, d: dict[str, Any], ctx: ToolContext | None) -> None: + seen["name"] = name + seen["context_present"] = "context" in d + d["enhanced"] = True + + class _Store(TestControllerStore): + pass + + executor = ADCPAgentExecutor( + _Handler(), + test_controller=_Store(), + validation=None, + response_enhancer=enhancer, + ) + result = await executor._tool_callers["comply_test_controller"]( + {"scenario": "list_scenarios", "context": {"correlation_id": "abc"}} + ) + assert result["enhanced"] is True + assert seen == {"name": "comply_test_controller", "context_present": True} + + +# =========================================================================== +# Config threading — response_enhancer reaches both transports +# =========================================================================== + + +class TestConfigThreading: + def test_serve_config_carries_response_enhancer(self) -> None: + from adcp.server import ServeConfig + + cfg = ServeConfig(response_enhancer=_stamp) + assert cfg.response_enhancer is _stamp + + def test_create_a2a_server_threads_enhancer_to_executor(self) -> None: + """``create_a2a_server(response_enhancer=...)`` constructs the executor + that runs every A2A dispatch with the enhancer — the same constructor + path ``serve(transport="a2a")`` uses, so the A2A leg is not silently + dropped. Asserting at the constructor keeps the test off a2a-sdk + request-handler internals.""" + import inspect + + from adcp.server.a2a_server import create_a2a_server + + # Smoke: the public factory accepts and forwards the kwarg. + sig = inspect.signature(create_a2a_server) + assert "response_enhancer" in sig.parameters + # The executor — the object that actually runs the enhancer — stores + # it. ``create_a2a_server`` builds exactly this executor. + executor = ADCPAgentExecutor(_Handler(), validation=None, response_enhancer=_stamp) + assert executor._response_enhancer is _stamp + # And the factory call itself does not raise with the kwarg set. + create_a2a_server(_Handler(), advertise_all=True, response_enhancer=_stamp) + + @pytest.mark.asyncio + async def test_create_mcp_server_threads_enhancer_to_wire_dispatch(self) -> None: + """End-to-end MCP through ``create_mcp_server(response_enhancer=...)`` — + the enhancer threads create_mcp_server -> _register_handler_tools -> + create_tool_caller, and its stamp reaches the dispatched wire envelope + the FastMCP tool function returns. This is the same registration path + ``serve(transport="streamable-http")`` uses, so the MCP leg is proven + plumbed without re-running the streamable-http session handshake.""" + from adcp.server import create_mcp_server + + def enhancer(name: str, d: dict[str, Any], ctx: ToolContext | None) -> None: + d["x_enhanced_tool"] = name + + mcp = create_mcp_server( + _Handler(), advertise_all=True, validation=None, response_enhancer=enhancer + ) + tool_fn = mcp._tool_manager._tools["get_products"].fn + result = await tool_fn(brief="x", promoted_offering="y") + # FastMCP success path returns the plain dict the caller produced. + assert result["x_enhanced_tool"] == "get_products" + assert "products" in result + + @pytest.mark.asyncio + async def test_create_mcp_server_threads_enhancer_to_error_wire(self) -> None: + """The same ``create_mcp_server`` threading reaches the MCP *error* + envelope (``build_mcp_error_result``) — a credential-policy error + dispatched through the registered FastMCP tool carries the stamp.""" + from mcp.types import CallToolResult + + from adcp.server import create_mcp_server + + mcp = create_mcp_server( + _ErrorHandler(), advertise_all=True, validation=None, response_enhancer=_stamp + ) + tool_fn = mcp._tool_manager._tools["get_products"].fn + result = await tool_fn(brief="x", promoted_offering="y") + assert isinstance(result, CallToolResult) + assert result.structuredContent is not None + assert result.structuredContent["adcp_error"]["code"] == "PERMISSION_DENIED" + assert result.structuredContent["enhanced"] is True From cf195647b2c32d90b21a0c71469e63c4e5f8169e Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sun, 7 Jun 2026 07:36:58 -0400 Subject: [PATCH 2/2] docs(server): document response_enhancer coverage seams The ResponseEnhancer / serve() docstrings claimed the enhancer applies to "every response". Two paths are not enhanced: - the MCP comply_test_controller sandbox path returns the handler result directly without a success-path enhancer call (its A2A counterpart is stamped); - a handler that returns (vs raises) a raw L3 {"adcp_error": {...}} envelope is skipped by the success-path guard and never reaches the raised-error finalizers. Document the exact enhanced paths and these two gaps on the canonical ResponseEnhancer docstring, point _apply_response_enhancer at it, and soften the serve() docstrings from "every response". Docstring-only; no behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/adcp/server/helpers.py | 41 +++++++++++++++++++++++++++++++------- src/adcp/server/serve.py | 22 ++++++++++---------- 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/src/adcp/server/helpers.py b/src/adcp/server/helpers.py index 0faf7548..891a983c 100644 --- a/src/adcp/server/helpers.py +++ b/src/adcp/server/helpers.py @@ -391,20 +391,41 @@ def inject_context( ResponseEnhancer = ( Callable[[dict[str, Any]], None] | Callable[[str, dict[str, Any], "ToolContext | None"], None] ) -"""Server-wide callback that stamps cross-cutting fields on every response. +"""Server-wide callback that stamps cross-cutting fields on responses. Configure it via ``serve(response_enhancer=...)`` (or the matching :class:`~adcp.server.ServeConfig` field). The framework calls it after the -context-echo envelope is assembled and before schema validation, on every -response class — framework-tool successes, custom-tool successes -(``get_task_status`` / ``list_tasks``), the pre-auth -``get_adcp_capabilities`` discovery response, and structured ``adcp_error`` -responses — on both the MCP and A2A transports. +context-echo envelope is assembled and before schema validation. + +Coverage — paths the enhancer runs on, both MCP and A2A unless noted: + +- framework-tool successes; +- custom-tool successes (``get_task_status`` / ``list_tasks``); +- the pre-auth ``get_adcp_capabilities`` discovery response; +- error responses produced by a *raised* ``AdcpError`` / + ``ADCPTaskError`` (including credential-policy errors), which the + transport error finalizers stamp. The common ``adcp_error()`` helper + shape (``{"errors": [...]}``) carries no top-level ``adcp_error`` key, + so a handler that returns it is stamped on the success path; +- the A2A ``comply_test_controller`` sandbox skill. + +Coverage gaps — paths the enhancer does NOT run on: + +- the **MCP** ``comply_test_controller`` sandbox path + (``test_controller.py``); it returns the handler result directly + without a success-path enhancer call, so its A2A counterpart is + stamped but its MCP counterpart is not. Sandbox-only. +- a handler that *returns* (rather than raises) a raw AdCP L3 envelope + ``{"adcp_error": {...}}``. The success-path guard + (``if "adcp_error" not in result``) skips it, and a returned envelope + never reaches the raised-error finalizers, so it ships un-enhanced. + Raise the error (or return the ``{"errors": [...]}`` helper shape) to + have it stamped. Two arities are supported, dispatched by positional-parameter count: - **Context-blind** ``(result_dict) -> None`` — the common case; mutate the - response dict in place to stamp a field on every response. + response dict in place to stamp a field on each response it runs on. - **Context-aware** ``(method_name, result_dict, context) -> None`` — when the stamp depends on the tool or the caller. ``context`` is the :class:`~adcp.server.ToolContext` for this dispatch, or ``None`` for an @@ -467,6 +488,12 @@ def _apply_response_enhancer( at ``WARNING`` (including *method_name*); the original ``result`` is returned un-enhanced so a buggy enhancer never becomes a transport error. + + This is the per-call shim; whether a given dispatch path reaches it is + decided by the call sites. See :data:`ResponseEnhancer` for the + authoritative list of enhanced paths and the two coverage gaps (the MCP + ``comply_test_controller`` sandbox path and handler-returned raw L3 + ``{"adcp_error": {...}}`` envelopes). """ if enhancer is None: return result diff --git a/src/adcp/server/serve.py b/src/adcp/server/serve.py index 0bd08c65..3a9860de 100644 --- a/src/adcp/server/serve.py +++ b/src/adcp/server/serve.py @@ -833,10 +833,10 @@ class of bug that shipped the ``pricing_options`` or ``validation=None``. Applies to both MCP and A2A transports. response_enhancer: Optional server-wide - :data:`~adcp.server.ResponseEnhancer` applied to every - response — framework-tool successes, custom-tool successes, - the pre-auth ``get_adcp_capabilities`` discovery response, and - ``adcp_error`` envelopes — on both the MCP and A2A transports. + :data:`~adcp.server.ResponseEnhancer` applied to + framework-tool successes, custom-tool successes, the pre-auth + ``get_adcp_capabilities`` discovery response, and + raised-error responses — on both the MCP and A2A transports. The callback runs after the context echo (so it cannot re-introduce a stripped credential) and, on the success path, before schema validation (so a conformance-breaking mutation @@ -844,7 +844,8 @@ class of bug that shipped the ``pricing_options`` malformed). Both the context-blind ``(result_dict)`` arity and the context-aware ``(method_name, result_dict, context)`` arity are supported. See :data:`~adcp.server.ResponseEnhancer` for - the full failure and idempotency-replay semantics. + the exact coverage (including two non-enhanced paths) and the + failure / idempotency-replay semantics. Security: This function does NOT configure authentication. In production, @@ -2246,11 +2247,12 @@ def create_mcp_server( clients opening one session per operation. Ignored when ``stateless_http=True``. response_enhancer: Optional server-wide - :data:`~adcp.server.ResponseEnhancer` applied to every - response (successes and ``adcp_error`` envelopes) after the - context echo and, for successes, before schema validation. - See :data:`~adcp.server.ResponseEnhancer` for the supported - arities and failure / idempotency semantics. + :data:`~adcp.server.ResponseEnhancer` applied to successes and + raised-error responses after the context echo and, for + successes, before schema validation. See + :data:`~adcp.server.ResponseEnhancer` for the exact coverage + (including two non-enhanced paths), supported arities, and + failure / idempotency semantics. Returns: A configured FastMCP server instance. Call ``mcp.run()`` to start,