diff --git a/AGENTS.md b/AGENTS.md index 7ffae6793..4700a675f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -70,6 +70,11 @@ a given Git URL into `.reference` and pull latest before using it. vocabulary the HTTP protocol plugins compose (core never imports it, keeping it carrier-agnostic). - `packages/core/storage-*`: storage adapters and test support. +- `packages/core/observability`: the shared structured-logging + OTLP layer + (`observabilityLayer`): JSON logs to stderr with trace correlation, plus + OTLP traces/logs/metrics export via `effect/unstable/observability` (works + on Bun and workerd). Every server app boots it; see RUNNING.md for the env + vars. - `packages/plugins/*`: protocol and provider plugins; their runtime, React, API, and testing helpers live with the owning plugin. - `packages/react`: shared React UI and atom/client integration. diff --git a/RUNNING.md b/RUNNING.md index f478b8636..88bf48c62 100644 --- a/RUNNING.md +++ b/RUNNING.md @@ -40,6 +40,42 @@ develop on its `main`, publish a bump, then bump the dependency here. The The e2e globalsetup files are the source of truth for "how do I boot a working instance of X" — read them before inventing a boot path. +## Observability: structured logs + OTel + +Every server app (local, host-selfhost, host-cloudflare, cloud) boots the +shared `@executor-js/observability` layer: structured JSON logging plus an +optional OTLP traces/logs/metrics pipeline. Config is env-driven and inert +when unset: + +- `OTEL_EXPORTER_OTLP_ENDPOINT` — OTLP/HTTP base URL (e.g. + `http://localhost:4318` or a Traceway/collector endpoint); + `/v1/{traces,logs,metrics}` are appended. Unset → no export, JSON logging + stays on. +- `OTEL_EXPORTER_OTLP_HEADERS` — standard `key=value,key2=value2` format + (auth tokens etc.). +- `LOG_LEVEL` — minimum level (`trace`/`debug`/`info`/`warn`/`error`), + default `info`. Applies to both the console lines and OTLP log records. + +Notes: + +- JSON log lines go to STDERR, never stdout — the MCP stdio transport owns + stdout as its JSON-RPC channel. Lines carry `trace_id`/`span_id` when a + span is active, so console logs correlate with exported traces. +- On the Workers apps the vars are bindings (wrangler vars / `.dev.vars`), + not `process.env`. Cloud keeps its existing Axiom trace pipeline + (`AXIOM_TOKEN` etc.) and Sentry correlation; the OTLP endpoint there adds + logs + metrics only. +- MCP protocol traffic is logged as structured events (`mcp.tool.start/end` + with outcome + duration, `mcp.session.*` lifecycle, `mcp.auth.outcome`, + elicitation at debug), and `mcp.tool.calls` / `mcp.tool.duration_ms` + metrics are emitted per tool call. `EXECUTOR_MCP_DEBUG=1` still enables + the verbose stderr debug hook. + +Quick local check: run an OTLP collector (or a stub printing posts to +`/v1/*`), then +`OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318 LOG_LEVEL=debug bun run start` +in `apps/local` and drive an MCP `execute` call. + ## E2E: running, viewing, sharing `e2e/AGENTS.md` covers writing scenarios. Operationally: diff --git a/apps/cloud/package.json b/apps/cloud/package.json index 997356e93..c9eb6f82d 100644 --- a/apps/cloud/package.json +++ b/apps/cloud/package.json @@ -41,6 +41,7 @@ "@executor-js/execution": "workspace:*", "@executor-js/fumadb": "workspace:*", "@executor-js/host-mcp": "workspace:*", + "@executor-js/observability": "workspace:*", "@executor-js/plugin-google": "workspace:*", "@executor-js/plugin-graphql": "workspace:*", "@executor-js/plugin-mcp": "workspace:*", diff --git a/apps/cloud/src/app.ts b/apps/cloud/src/app.ts index 77afa9c7f..022cc9571 100644 --- a/apps/cloud/src/app.ts +++ b/apps/cloud/src/app.ts @@ -23,7 +23,7 @@ import { CloudHostConfig, CloudPluginsProvider, } from "./engine/execution-stack"; -import { WorkerTelemetryLive } from "./observability/telemetry"; +import { CloudObservabilityLive, WorkerTelemetryLive } from "./observability/telemetry"; // =========================================================================== // The Executor CLOUD app, as ONE `ExecutorApp.make` call. @@ -121,7 +121,12 @@ const { appLayer, toWebHandler, mcpExport } = ExecutorApp.make({ // platform. A boot-time WorkOS misconfig is unrecoverable -> `orDie`. boot: controlPlane.pipe( Layer.merge( - Layer.mergeAll(WorkerTelemetryLive, HttpServer.layerServices, AutumnService.Default), + Layer.mergeAll( + WorkerTelemetryLive, + CloudObservabilityLive, + HttpServer.layerServices, + AutumnService.Default, + ), ), // oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: a boot-time WorkOS misconfiguration is unrecoverable Layer.orDie, diff --git a/apps/cloud/src/env-augment.d.ts b/apps/cloud/src/env-augment.d.ts index adb3e3b2d..088d6a6d8 100644 --- a/apps/cloud/src/env-augment.d.ts +++ b/apps/cloud/src/env-augment.d.ts @@ -10,6 +10,13 @@ declare global { AXIOM_DATASET?: string; AXIOM_TRACES_URL?: string; AXIOM_TRACES_SAMPLE_RATIO?: string; + /** OTLP/HTTP base endpoint for structured logs + metrics export (traces + * keep flowing through the Axiom pipeline above); unset disables it. */ + OTEL_EXPORTER_OTLP_ENDPOINT?: string; + /** OTLP export headers in the standard `key=value,key2=value2` format. */ + OTEL_EXPORTER_OTLP_HEADERS?: string; + /** Minimum structured-log level (trace/debug/info/warn/error/fatal). */ + LOG_LEVEL?: string; SENTRY_DSN?: string; SENTRY_OTEL_LOG_PAYLOAD?: string; SENTRY_OTEL_VERIFY?: string; diff --git a/apps/cloud/src/observability/telemetry.ts b/apps/cloud/src/observability/telemetry.ts index 7bd5baa58..84df093e7 100644 --- a/apps/cloud/src/observability/telemetry.ts +++ b/apps/cloud/src/observability/telemetry.ts @@ -45,6 +45,8 @@ import { ATTR_SERVICE_NAME, ATTR_SERVICE_VERSION } from "@opentelemetry/semantic import { env } from "cloudflare:workers"; import { Effect, Layer } from "effect"; +import { observabilityLayer } from "@executor-js/observability"; + import { CountingSpanExporter, CountingSpanProcessor, @@ -139,3 +141,21 @@ const makeTelemetryLive = (): Layer.Layer => export const WorkerTelemetryLive: Layer.Layer = makeTelemetryLive(); export const DoTelemetryLive: Layer.Layer = makeTelemetryLive(); + +// Structured JSON stdout logging + OTLP logs/metrics export. Traces are +// deliberately NOT exported here (`traces: false`): they keep flowing through +// the Axiom WebTracerProvider pipeline above, which the non-Effect fetch +// paths (server.ts's raw `http.server` span) and Sentry correlation depend +// on. `Layer.unwrap(Effect.sync(...))` defers the `env` read to layer build +// time, matching `makeTelemetryLive`'s lazy gate. +export const CloudObservabilityLive: Layer.Layer = Layer.unwrap( + Effect.sync(() => + observabilityLayer({ + serviceName: SERVICE_NAME, + endpoint: env.OTEL_EXPORTER_OTLP_ENDPOINT, + headers: env.OTEL_EXPORTER_OTLP_HEADERS, + logLevel: env.LOG_LEVEL, + traces: false, + }), + ), +); diff --git a/apps/host-cloudflare/package.json b/apps/host-cloudflare/package.json index 159266b85..7920cd92a 100644 --- a/apps/host-cloudflare/package.json +++ b/apps/host-cloudflare/package.json @@ -23,6 +23,7 @@ "@executor-js/execution": "workspace:*", "@executor-js/fumadb": "workspace:*", "@executor-js/host-mcp": "workspace:*", + "@executor-js/observability": "workspace:*", "@executor-js/plugin-encrypted-secrets": "workspace:*", "@executor-js/plugin-google": "workspace:*", "@executor-js/plugin-graphql": "workspace:*", diff --git a/apps/host-cloudflare/src/app.ts b/apps/host-cloudflare/src/app.ts index 8c3eebf78..d0c62aa6d 100644 --- a/apps/host-cloudflare/src/app.ts +++ b/apps/host-cloudflare/src/app.ts @@ -1,4 +1,4 @@ -import { Effect } from "effect"; +import { Effect, Layer } from "effect"; import { HttpEffect, HttpRouter } from "effect/unstable/http"; import { dbProviderLayer, ExecutorApp, textFailureStrategy } from "@executor-js/api/server"; @@ -12,7 +12,7 @@ import { makeCloudflareHostConfig, makeCloudflarePluginsProvider, } from "./execution"; -import { ErrorCaptureLive } from "./observability"; +import { ErrorCaptureLive, makeCloudflareObservabilityLayer } from "./observability"; import { cloudflareAccountMiddleware } from "./account/account-provider"; import { makeCloudflareApprovalHandler } from "./mcp"; import { makeCloudflareMcpAgentHandler } from "./mcp/agent-handler"; @@ -73,7 +73,7 @@ export const makeCloudflareApp = async (env: CloudflareEnv) => { ], }, config: { mountPrefix: "/api", failure: textFailureStrategy }, - boot: identityLayer, + boot: Layer.merge(identityLayer, makeCloudflareObservabilityLayer(env)), }); return { appLayer, toWebHandler, mcpAgentHandler }; diff --git a/apps/host-cloudflare/src/config.ts b/apps/host-cloudflare/src/config.ts index bded06965..f7686fcbb 100644 --- a/apps/host-cloudflare/src/config.ts +++ b/apps/host-cloudflare/src/config.ts @@ -45,6 +45,12 @@ export interface CloudflareEnv { readonly EXECUTOR_SECRET_KEY?: string; readonly ALLOW_LOCAL_NETWORK?: string; readonly VITE_PUBLIC_SITE_URL?: string; + /** OTLP/HTTP base endpoint for traces/logs/metrics export; unset disables export. */ + readonly OTEL_EXPORTER_OTLP_ENDPOINT?: string; + /** OTLP export headers in the standard `key=value,key2=value2` format. */ + readonly OTEL_EXPORTER_OTLP_HEADERS?: string; + /** Minimum structured-log level (trace/debug/info/warn/error/fatal). */ + readonly LOG_LEVEL?: string; /** * Dev/single-user escape hatch: when "true", skip Cloudflare Access entirely * and treat every request as a fixed admin. For local `wrangler dev` and diff --git a/apps/host-cloudflare/src/observability.ts b/apps/host-cloudflare/src/observability.ts index 249eb5206..c23b74ad4 100644 --- a/apps/host-cloudflare/src/observability.ts +++ b/apps/host-cloudflare/src/observability.ts @@ -1,7 +1,27 @@ -// Cloudflare host `ErrorCapture` — the shared console implementation with a -// `cloudflare-` trace-id prefix. Worker stdout is routed to Logpush/the -// dashboard, so the squashed cause is grep-able by the opaque 500 traceId. +// Cloudflare host observability. +// +// `ErrorCaptureLive` — the shared console implementation with a `cloudflare-` +// trace-id prefix. Worker stdout is routed to Logpush/the dashboard, so the +// squashed cause is grep-able by the opaque 500 traceId. +// +// `makeCloudflareObservabilityLayer` — structured JSON stdout logging plus the +// OTLP traces/logs/metrics pipeline (fetch-based, workerd-safe), enabled by +// the `OTEL_EXPORTER_OTLP_ENDPOINT` binding. A Worker has no module-scope +// bindings, so the layer closes over the per-fetch `env` instead of process.env. + +import type { Layer } from "effect"; import { consoleErrorCapture } from "@executor-js/api/server"; +import { observabilityLayer } from "@executor-js/observability"; + +import type { CloudflareEnv } from "./config"; export const ErrorCaptureLive = consoleErrorCapture("cloudflare"); + +export const makeCloudflareObservabilityLayer = (env: CloudflareEnv): Layer.Layer => + observabilityLayer({ + serviceName: "executor-cloudflare", + endpoint: env.OTEL_EXPORTER_OTLP_ENDPOINT, + headers: env.OTEL_EXPORTER_OTLP_HEADERS, + logLevel: env.LOG_LEVEL, + }); diff --git a/apps/host-selfhost/package.json b/apps/host-selfhost/package.json index 1c73a8cc8..c5289f66b 100644 --- a/apps/host-selfhost/package.json +++ b/apps/host-selfhost/package.json @@ -26,6 +26,7 @@ "@executor-js/execution": "workspace:*", "@executor-js/fumadb": "workspace:*", "@executor-js/host-mcp": "workspace:*", + "@executor-js/observability": "workspace:*", "@executor-js/plugin-encrypted-secrets": "workspace:*", "@executor-js/plugin-google": "workspace:*", "@executor-js/plugin-graphql": "workspace:*", diff --git a/apps/host-selfhost/src/app.ts b/apps/host-selfhost/src/app.ts index c4a080f3b..8c9ce644c 100644 --- a/apps/host-selfhost/src/app.ts +++ b/apps/host-selfhost/src/app.ts @@ -20,7 +20,7 @@ import { } from "./execution"; import { makeSelfHostMcpSeams } from "./mcp"; import { selfHostPlugins } from "./plugins"; -import { ErrorCaptureLive } from "./observability"; +import { ErrorCaptureLive, SelfHostObservabilityLive } from "./observability"; import { oauthCallbackSignInRedirectLocation } from "./auth/oauth-callback-login"; // =========================================================================== @@ -126,8 +126,13 @@ export const makeSelfHostApp = async (options: MakeSelfHostAppOptions = {}) => { config: { mountPrefix: "/api", failure: textFailureStrategy }, // The boot-scoped context provideMerge'd under everything: the long-lived DB // handle (read by the DbProvider seam, Better Auth, and the MCP store) + the - // resolved identity (captured once by the execution middleware + MCP auth). - boot: Layer.merge(Layer.succeed(SelfHostDb)(dbHandle), identityLayer), + // resolved identity (captured once by the execution middleware + MCP auth) + // + structured logging / OTLP telemetry. + boot: Layer.mergeAll( + Layer.succeed(SelfHostDb)(dbHandle), + identityLayer, + SelfHostObservabilityLive, + ), }); return { diff --git a/apps/host-selfhost/src/observability.ts b/apps/host-selfhost/src/observability.ts index 065cf8ddb..d6d16c45c 100644 --- a/apps/host-selfhost/src/observability.ts +++ b/apps/host-selfhost/src/observability.ts @@ -1,11 +1,25 @@ // --------------------------------------------------------------------------- -// Self-host `ErrorCapture` — the shared console implementation with a -// `selfhost-` trace id prefix. Prints the squashed + pretty cause to stderr -// and returns a short correlation id that surfaces in the opaque 500 traceId, -// so operators can grep their logs. Cloud swaps in a Sentry-backed impl behind -// the same tag. +// Self-host observability. +// +// `ErrorCaptureLive` — the shared console implementation with a `selfhost-` +// trace id prefix. Prints the squashed + pretty cause to stderr and returns a +// short correlation id that surfaces in the opaque 500 traceId, so operators +// can grep their logs. Cloud swaps in a Sentry-backed impl behind the same tag. +// +// `SelfHostObservabilityLive` — structured JSON stdout logging plus the OTLP +// traces/logs/metrics pipeline, enabled by `OTEL_EXPORTER_OTLP_ENDPOINT`. // --------------------------------------------------------------------------- +import type { Layer } from "effect"; + import { consoleErrorCapture } from "@executor-js/api/server"; +import { observabilityLayer } from "@executor-js/observability"; export const ErrorCaptureLive = consoleErrorCapture("selfhost"); + +export const SelfHostObservabilityLive: Layer.Layer = observabilityLayer({ + serviceName: "executor-selfhost", + endpoint: process.env.OTEL_EXPORTER_OTLP_ENDPOINT, + headers: process.env.OTEL_EXPORTER_OTLP_HEADERS, + logLevel: process.env.LOG_LEVEL, +}); diff --git a/apps/host-selfhost/src/serve.ts b/apps/host-selfhost/src/serve.ts index 1803e7bb0..ea4e99804 100644 --- a/apps/host-selfhost/src/serve.ts +++ b/apps/host-selfhost/src/serve.ts @@ -26,6 +26,8 @@ import { import { BunFileSystem, BunHttpServer, BunPath, BunRuntime } from "@effect/platform-bun"; import { Effect, Layer } from "effect"; +import { httpAccessLogger } from "@executor-js/observability"; + import { makeSelfHostApp } from "./app"; import { loadConfig } from "./config"; import type { BetterAuthHandle } from "./auth"; @@ -121,7 +123,11 @@ export const startServer = async (): Promise => { }).pipe(Layer.provide(BunFileSystem.layer), Layer.provide(BunPath.layer)); const ServerLive = HttpRouter.serve(Layer.mergeAll(AppLayer, AssetsLive, SpaLive), { - middleware: selfHostHttpMiddleware(betterAuth), + // The default Effect access logger is swapped for `httpAccessLogger` + // (composed message instead of the constant "Sent HTTP response"); it + // wraps outermost, exactly where `HttpRouter.serve` would put its own. + disableLogger: true, + middleware: (httpApp) => httpAccessLogger(selfHostHttpMiddleware(betterAuth)(httpApp)), }).pipe( Layer.provide( BunHttpServer.layer({ hostname: config.host, port: config.port, idleTimeout: 0 }), diff --git a/apps/local/package.json b/apps/local/package.json index 09a97e9d5..ae5039dee 100644 --- a/apps/local/package.json +++ b/apps/local/package.json @@ -29,6 +29,7 @@ "@executor-js/fumadb": "workspace:*", "@executor-js/host-mcp": "workspace:*", "@executor-js/integrations-registry": "workspace:*", + "@executor-js/observability": "workspace:*", "@executor-js/plugin-desktop-settings": "workspace:*", "@executor-js/plugin-example": "workspace:*", "@executor-js/plugin-file-secrets": "workspace:*", diff --git a/apps/local/src/app.ts b/apps/local/src/app.ts index a9915daf5..73d8b8927 100644 --- a/apps/local/src/app.ts +++ b/apps/local/src/app.ts @@ -12,7 +12,7 @@ import { makeQuickJsExecutor } from "@executor-js/runtime-quickjs"; import { getExecutorBundle, type LocalExecutor } from "./executor"; import { makeLocalIdentityLayer } from "./identity"; -import { ErrorCaptureLive } from "./observability"; +import { ErrorCaptureLive, LocalObservabilityLive } from "./observability"; // =========================================================================== // The LOCAL Executor app, as ONE `ExecutorApp.make` call. @@ -118,8 +118,9 @@ export const makeLocalApiHandler = async (token: string): Promise {} @@ -63,6 +69,9 @@ const closeServerHandlers = async (handlers: ServerHandlers): Promise => { // after the surfaces are closed so server shutdown (and failed startup // cleanup via disposeServerHandlers) releases the owned data-dir lock. yield* ignoreDisposeFailure("disposeExecutor", () => disposeExecutor()); + // Flush buffered OTLP telemetry from the MCP surface's runtime last, so + // logs emitted during the disposals above still export. + yield* ignoreDisposeFailure("observability.dispose", () => disposeObservabilityRuntime()); }), ); }; diff --git a/apps/local/src/mcp.ts b/apps/local/src/mcp.ts index e74912998..3af8f7feb 100644 --- a/apps/local/src/mcp.ts +++ b/apps/local/src/mcp.ts @@ -27,6 +27,7 @@ import { } from "@executor-js/execution"; import { startIntegrationsRefresh } from "./integrations"; +import { disposeObservabilityRuntime, getObservabilityRuntime } from "./observability"; type AnyExecutionEngine = ExecutionEngine; @@ -193,8 +194,21 @@ export const createMcpRequestHandler = ( const engine = resourceConfig ? engineFromConfig(resourceConfig.config) : null; if (engine) sessionEngines.set(sid, engine); if (resourceConfig?.close) sessionClosers.set(sid, resourceConfig.close); + // SDK lifecycle callbacks run outside Effect; emit structured console + // lines so session lifecycle still reaches host logs. + console.info( + JSON.stringify({ + event: "mcp.session.created", + sessionId: sid, + resourceKind: resource.kind, + elicitationMode: readElicitationMode(request), + }), + ); + }, + onsessionclosed: (sid) => { + console.info(JSON.stringify({ event: "mcp.session.closed", sessionId: sid })); + void dispose(sid, { server: true }); }, - onsessionclosed: (sid) => void dispose(sid, { server: true }), }); transport.onclose = () => { @@ -206,7 +220,7 @@ export const createMcpRequestHandler = ( try { const elicitationMode = readElicitationMode(request); resourceConfig = await configForResource(resource); - created = await Effect.runPromise( + created = await getObservabilityRuntime().runPromise( createExecutorMcpServer({ ...resourceConfig.config, browserApprovalStore: approvals.store, @@ -231,7 +245,13 @@ export const createMcpRequestHandler = ( } return response; } catch (error) { - console.error("[mcp] handleRequest error:", formatBoundaryError(error)); + console.error( + JSON.stringify({ + event: "mcp.session.handle_request_error", + sessionId: transport.sessionId ?? null, + error: formatBoundaryError(error), + }), + ); if (!transport.sessionId) { await ignoreClose(() => transport.close()); const server = created; @@ -285,7 +305,7 @@ export const createMcpRequestHandler = ( export const runMcpStdioServer = async (config: ExecutorMcpServerConfig): Promise => { startIntegrationsRefresh(); - const server = await Effect.runPromise(createExecutorMcpServer(config)); + const server = await getObservabilityRuntime().runPromise(createExecutorMcpServer(config)); const transport = new StdioServerTransport(); const waitForExit = () => @@ -310,5 +330,6 @@ export const runMcpStdioServer = async (config: ExecutorMcpServerConfig): Promis } finally { await ignoreClose(() => transport.close()); await ignoreClose(() => server.close()); + await ignoreClose(disposeObservabilityRuntime); } }; diff --git a/apps/local/src/observability.ts b/apps/local/src/observability.ts index 6c13257ae..ab0a68b3d 100644 --- a/apps/local/src/observability.ts +++ b/apps/local/src/observability.ts @@ -1,10 +1,49 @@ // --------------------------------------------------------------------------- -// Local-app `ErrorCapture` — the shared console implementation with a `local-` +// Local-app observability. +// +// `ErrorCaptureLive` — the shared console implementation with a `local-` // trace id prefix. Prints the squashed cause + pretty-printed structured cause // to stderr and returns a short correlation id. Operators can grep for the id // in their terminal scrollback when a user reports an opaque 500 traceId. +// +// `LocalObservabilityLive` — structured JSON stdout logging plus the OTLP +// traces/logs/metrics pipeline (enabled by `OTEL_EXPORTER_OTLP_ENDPOINT`). +// It rides in the app's `boot` layer for the typed `/api`, and the in-process +// MCP surface (`mcp.ts`) runs its Effects through `observabilityRuntime` so +// tool-call logs, spans, and metrics flow through the SAME pipeline. The +// runtime is process-lifetime: disposing it flushes buffered OTLP exports. // --------------------------------------------------------------------------- +import { ManagedRuntime, type Layer } from "effect"; + import { consoleErrorCapture } from "@executor-js/api/server"; +import { observabilityLayer } from "@executor-js/observability"; export const ErrorCaptureLive = consoleErrorCapture("local"); + +export const LocalObservabilityLive: Layer.Layer = observabilityLayer({ + serviceName: "executor-local", + endpoint: process.env.OTEL_EXPORTER_OTLP_ENDPOINT, + headers: process.env.OTEL_EXPORTER_OTLP_HEADERS, + logLevel: process.env.LOG_LEVEL, +}); + +// Built lazily on first use and re-creatable after dispose (mirrors +// `serverHandlersRuntime` in main.ts, whose handlers can be rebuilt after +// `disposeServerHandlers`). A ManagedRuntime separate from the boot layer +// means the MCP surface owns its own exporter lifetime (the boot layer's copy +// is scoped to the API handler). +let observabilityRuntime: ManagedRuntime.ManagedRuntime | null = null; + +export const getObservabilityRuntime = (): ManagedRuntime.ManagedRuntime => { + observabilityRuntime ??= ManagedRuntime.make(LocalObservabilityLive); + return observabilityRuntime; +}; + +/** Dispose the MCP surface's observability runtime, flushing pending OTLP exports. */ +export const disposeObservabilityRuntime = async (): Promise => { + const runtime = observabilityRuntime; + if (!runtime) return; + observabilityRuntime = null; + await runtime.dispose(); +}; diff --git a/bun.lock b/bun.lock index e1f7250b2..368487422 100644 --- a/bun.lock +++ b/bun.lock @@ -66,6 +66,7 @@ "@executor-js/execution": "workspace:*", "@executor-js/fumadb": "workspace:*", "@executor-js/host-mcp": "workspace:*", + "@executor-js/observability": "workspace:*", "@executor-js/plugin-google": "workspace:*", "@executor-js/plugin-graphql": "workspace:*", "@executor-js/plugin-mcp": "workspace:*", @@ -175,6 +176,7 @@ "@executor-js/execution": "workspace:*", "@executor-js/fumadb": "workspace:*", "@executor-js/host-mcp": "workspace:*", + "@executor-js/observability": "workspace:*", "@executor-js/plugin-encrypted-secrets": "workspace:*", "@executor-js/plugin-google": "workspace:*", "@executor-js/plugin-graphql": "workspace:*", @@ -224,6 +226,7 @@ "@executor-js/execution": "workspace:*", "@executor-js/fumadb": "workspace:*", "@executor-js/host-mcp": "workspace:*", + "@executor-js/observability": "workspace:*", "@executor-js/plugin-encrypted-secrets": "workspace:*", "@executor-js/plugin-google": "workspace:*", "@executor-js/plugin-graphql": "workspace:*", @@ -273,6 +276,7 @@ "@executor-js/fumadb": "workspace:*", "@executor-js/host-mcp": "workspace:*", "@executor-js/integrations-registry": "workspace:*", + "@executor-js/observability": "workspace:*", "@executor-js/plugin-desktop-settings": "workspace:*", "@executor-js/plugin-example": "workspace:*", "@executor-js/plugin-file-secrets": "workspace:*", @@ -456,6 +460,7 @@ "dependencies": { "@executor-js/execution": "workspace:*", "@executor-js/host-mcp": "workspace:*", + "@executor-js/observability": "workspace:*", "@executor-js/sdk": "workspace:*", "effect": "catalog:", }, @@ -575,6 +580,21 @@ "vitest": "catalog:", }, }, + "packages/core/observability": { + "name": "@executor-js/observability", + "version": "0.0.1", + "devDependencies": { + "@effect/vitest": "catalog:", + "@types/node": "catalog:", + "effect": "catalog:", + "tsup": "catalog:", + "typescript": "catalog:", + "vitest": "catalog:", + }, + "peerDependencies": { + "effect": "catalog:", + }, + }, "packages/core/sdk": { "name": "@executor-js/sdk", "version": "1.5.28", @@ -1215,9 +1235,9 @@ }, }, "patchedDependencies": { - "@1password/sdk-core@0.4.1-beta.1": "patches/@1password%2Fsdk-core@0.4.1-beta.1.patch", "@electric-sql/pglite-socket@0.1.4": "patches/@electric-sql%2Fpglite-socket@0.1.4.patch", "libsql@0.5.29": "patches/libsql@0.5.29.patch", + "@1password/sdk-core@0.4.1-beta.1": "patches/@1password%2Fsdk-core@0.4.1-beta.1.patch", "agents@0.17.3": "patches/agents@0.17.3.patch", "postgres@3.4.9": "patches/postgres@3.4.9.patch", }, @@ -1776,6 +1796,8 @@ "@executor-js/motel": ["@executor-js/motel@0.2.5-executor.1", "", { "dependencies": { "@effect/atom-react": "^4.0.0-beta.49", "@effect/opentelemetry": "^4.0.0-beta.49", "@effect/platform-bun": "^4.0.0-beta.50", "@opentelemetry/api": "^1.9.0", "@opentelemetry/exporter-logs-otlp-http": "^0.214.0", "@opentelemetry/exporter-trace-otlp-http": "^0.211.0", "@opentelemetry/sdk-logs": "^0.214.0", "@opentelemetry/sdk-node": "^0.214.0", "@opentelemetry/sdk-trace-base": "^2.5.0", "@opentelemetry/sdk-trace-node": "^2.6.1", "@opentui/core": "^0.1.99", "@opentui/react": "^0.1.99", "effect": "^4.0.0-beta.49", "react": "^19.2.5", "scheduler": "^0.27.0" }, "bin": { "motel": "src/motel.ts", "motel-mcp": "src/mcp.ts" } }, "sha512-fLGJ5FIIBYd+4L2OurpYFjWzvcCbACvmdNofh6THXQJE1Gku93EKURtgn27/XddyM9SKGST6/znTRcmwKiYdXw=="], + "@executor-js/observability": ["@executor-js/observability@workspace:packages/core/observability"], + "@executor-js/plugin-desktop-settings": ["@executor-js/plugin-desktop-settings@workspace:packages/plugins/desktop-settings"], "@executor-js/plugin-encrypted-secrets": ["@executor-js/plugin-encrypted-secrets@workspace:packages/plugins/encrypted-secrets"], diff --git a/packages/core/api/package.json b/packages/core/api/package.json index 2c8071678..f5af8f0bc 100644 --- a/packages/core/api/package.json +++ b/packages/core/api/package.json @@ -16,6 +16,7 @@ "dependencies": { "@executor-js/execution": "workspace:*", "@executor-js/host-mcp": "workspace:*", + "@executor-js/observability": "workspace:*", "@executor-js/sdk": "workspace:*", "effect": "catalog:" }, diff --git a/packages/core/api/src/server/host-foundation.ts b/packages/core/api/src/server/host-foundation.ts index 1b22b70c4..21e8a24b1 100644 --- a/packages/core/api/src/server/host-foundation.ts +++ b/packages/core/api/src/server/host-foundation.ts @@ -34,6 +34,7 @@ import { HttpApiBuilder } from "effect/unstable/httpapi"; import { HttpRouter, HttpServer } from "effect/unstable/http"; import { Layer } from "effect"; +import { httpAccessLogger } from "@executor-js/observability"; import type { AnyPlugin } from "@executor-js/sdk"; import { observabilityMiddleware, type ErrorCapture } from "../observability"; @@ -212,8 +213,13 @@ export const toApiHandler = ( appLayer: Layer.Layer, ): ApiHandler => { // `HttpServer.layerServices` supplies the synthetic HTTP platform so - // `toWebHandler` can run handlers without a listening socket. - const web = HttpRouter.toWebHandler(appLayer.pipe(Layer.provideMerge(HttpServer.layerServices))); + // `toWebHandler` can run handlers without a listening socket. The default + // Effect access logger is swapped for `httpAccessLogger` (composed message + // instead of the constant "Sent HTTP response"). + const web = HttpRouter.toWebHandler(appLayer.pipe(Layer.provideMerge(HttpServer.layerServices)), { + disableLogger: true, + middleware: httpAccessLogger, + }); // With every requirement provided the leftover `HR` is `never`, so `handler` // is the one-arg `(request) => Promise` form — but the loose // `R = any` input widens `HR` to `any` (a two-arg signature), so narrow back diff --git a/packages/core/observability/CHANGELOG.md b/packages/core/observability/CHANGELOG.md new file mode 100644 index 000000000..ddf7a230a --- /dev/null +++ b/packages/core/observability/CHANGELOG.md @@ -0,0 +1 @@ +# @executor-js/observability diff --git a/packages/core/observability/package.json b/packages/core/observability/package.json new file mode 100644 index 000000000..4297b38c2 --- /dev/null +++ b/packages/core/observability/package.json @@ -0,0 +1,49 @@ +{ + "name": "@executor-js/observability", + "version": "0.0.1", + "homepage": "https://github.com/UsefulSoftwareCo/executor/tree/main/packages/core/observability", + "bugs": { + "url": "https://github.com/UsefulSoftwareCo/executor/issues" + }, + "license": "MIT", + "repository": { + "type": "git", + "url": "git+https://github.com/UsefulSoftwareCo/executor.git", + "directory": "packages/core/observability" + }, + "files": [ + "dist" + ], + "type": "module", + "exports": { + ".": "./src/index.ts" + }, + "publishConfig": { + "access": "public", + "exports": { + ".": { + "import": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + } + } + } + }, + "scripts": { + "build": "tsup && (tsc --declaration --emitDeclarationOnly --outDir dist --rootDir src || true)", + "typecheck": "tsgo --noEmit", + "test": "vitest run", + "typecheck:slow": "tsc --noEmit" + }, + "devDependencies": { + "@effect/vitest": "catalog:", + "@types/node": "catalog:", + "effect": "catalog:", + "tsup": "catalog:", + "typescript": "catalog:", + "vitest": "catalog:" + }, + "peerDependencies": { + "effect": "catalog:" + } +} diff --git a/packages/core/observability/src/index.test.ts b/packages/core/observability/src/index.test.ts new file mode 100644 index 000000000..f47a35930 --- /dev/null +++ b/packages/core/observability/src/index.test.ts @@ -0,0 +1,210 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect, Logger, Schema } from "effect"; +import { HttpServerRequest, HttpServerResponse } from "effect/unstable/http"; + +import { + httpAccessLogger, + jsonSpanLogger, + observabilityLayer, + parseLogLevel, + parseOtlpHeaders, + structuredLoggerLayer, +} from "./index"; + +describe("parseOtlpHeaders", () => { + it("parses the standard key=value,key2=value2 format", () => { + expect(parseOtlpHeaders("authorization=Bearer abc,x-dataset=prod")).toEqual({ + authorization: "Bearer abc", + "x-dataset": "prod", + }); + }); + + it("trims whitespace around keys and values", () => { + expect(parseOtlpHeaders(" a = 1 , b = 2 ")).toEqual({ a: "1", b: "2" }); + }); + + it("keeps '=' inside values", () => { + expect(parseOtlpHeaders("authorization=Basic dXNlcj1wdw==")).toEqual({ + authorization: "Basic dXNlcj1wdw==", + }); + }); + + it("drops malformed pairs and tolerates blank input", () => { + expect(parseOtlpHeaders("novalue,=orphan,ok=1")).toEqual({ ok: "1" }); + expect(parseOtlpHeaders(undefined)).toEqual({}); + expect(parseOtlpHeaders(" ")).toEqual({}); + }); +}); + +describe("parseLogLevel", () => { + it("maps common level strings case-insensitively", () => { + expect(parseLogLevel("debug")).toBe("Debug"); + expect(parseLogLevel("WARN")).toBe("Warn"); + expect(parseLogLevel("warning")).toBe("Warn"); + }); + + it("returns undefined for unknown or absent values", () => { + expect(parseLogLevel("verbose")).toBeUndefined(); + expect(parseLogLevel(undefined)).toBeUndefined(); + }); +}); + +const decodeLogLine = Schema.decodeUnknownSync( + Schema.fromJsonString(Schema.Record(Schema.String, Schema.Unknown)), +); + +describe("jsonSpanLogger", () => { + const captureLines = async (program: Effect.Effect): Promise => { + const lines: string[] = []; + const original = console.error; + console.error = (line: unknown) => { + lines.push(String(line)); + }; + // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: test must restore the patched console.error on any outcome + try { + await Effect.runPromise( + program.pipe(Effect.provide(Logger.layer([jsonSpanLogger], { mergeWithExisting: false }))), + ); + } finally { + console.error = original; + } + return lines; + }; + + it("emits one JSON line with level, message, and annotations", async () => { + const lines = await captureLines( + Effect.logInfo("mcp.tool.end").pipe(Effect.annotateLogs({ "mcp.tool.name": "execute" })), + ); + expect(lines).toHaveLength(1); + const record = decodeLogLine(lines[0]!); + expect(record.level).toBe("INFO"); + expect(record.message).toBe("mcp.tool.end"); + expect(record.annotations).toEqual({ "mcp.tool.name": "execute" }); + expect(record.trace_id).toBeUndefined(); + }); + + it("includes trace_id/span_id when a span is active", async () => { + const lines = await captureLines(Effect.log("inside").pipe(Effect.withSpan("test.span"))); + const record = decodeLogLine(lines[0]!); + expect(typeof record.trace_id).toBe("string"); + expect(typeof record.span_id).toBe("string"); + }); +}); + +describe("httpAccessLogger", () => { + type CapturedLog = ReturnType<(typeof Logger.formatStructured)["log"]>; + + const runWithRequest = async ( + url: string, + app: Effect.Effect, + ): Promise => { + const records: CapturedLog[] = []; + const captureLogger = Logger.make((options) => { + records.push(Logger.formatStructured.log(options)); + }); + await Effect.runPromise( + httpAccessLogger(app).pipe( + Effect.provideService( + HttpServerRequest.HttpServerRequest, + HttpServerRequest.fromWeb(new Request(url)), + ), + Effect.provide(Logger.layer([captureLogger], { mergeWithExisting: false })), + Effect.ignore, + ), + ); + return records; + }; + + it("composes method, path, and status into the message", async () => { + const records = await runWithRequest( + "http://localhost/api/things?id=42#frag", + Effect.succeed(HttpServerResponse.empty({ status: 201 })), + ); + expect(records).toHaveLength(1); + expect(records[0]!.message).toMatch(/^GET \/api\/things 201 in \d+ms$/); + expect(records[0]!.annotations).toMatchObject({ + "http.method": "GET", + "http.url": "/api/things", + "http.status": 201, + }); + expect(typeof records[0]!.annotations["http.duration_ms"]).toBe("number"); + }); + + it("logs failures with the cause and no status", async () => { + const records = await runWithRequest( + "http://localhost/missing", + Effect.fail("route not found"), + ); + expect(records).toHaveLength(1); + expect(records[0]!.message).toMatch(/^GET \/missing failed in \d+ms$/); + expect(records[0]!.annotations["http.status"]).toBeUndefined(); + }); +}); + +describe("layers", () => { + it("structuredLoggerLayer applies the minimum log level", async () => { + const lines: string[] = []; + const original = console.error; + console.error = (line: unknown) => { + lines.push(String(line)); + }; + // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: test must restore the patched console.error on any outcome + try { + await Effect.runPromise( + Effect.gen(function* () { + yield* Effect.logDebug("hidden"); + yield* Effect.logWarning("shown"); + }).pipe(Effect.provide(structuredLoggerLayer({ logLevel: "warn" }))), + ); + } finally { + console.error = original; + } + expect(lines).toHaveLength(1); + expect(decodeLogLine(lines[0]!).message).toBe("shown"); + }); + + it("observabilityLayer without endpoint builds without network access", async () => { + const layer = observabilityLayer({ serviceName: "test", logLevel: "info" }); + await Effect.runPromise(Effect.provide(Effect.void, layer)); + }); + + it("observabilityLayer with endpoint builds and shuts down cleanly", async () => { + // No collector is listening; the exporters buffer in the background and + // must not fail layer construction or teardown. + const layer = observabilityLayer({ + serviceName: "test", + endpoint: "http://127.0.0.1:1", + headers: "a=1", + }); + await Effect.runPromise(Effect.provide(Effect.log("buffered"), layer)); + }); + + it("observabilityLayer keeps the minimum level visible alongside OTLP layers", async () => { + const lines: string[] = []; + const original = console.error; + console.error = (line: unknown) => { + lines.push(String(line)); + }; + // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: test must restore the patched console.error on any outcome + try { + await Effect.runPromise( + Effect.gen(function* () { + yield* Effect.logInfo("hidden"); + yield* Effect.logError("shown"); + }).pipe( + Effect.provide( + observabilityLayer({ + serviceName: "test", + endpoint: "http://127.0.0.1:1", + logLevel: "error", + }), + ), + ), + ); + } finally { + console.error = original; + } + const messages = lines.map((line) => decodeLogLine(line).message); + expect(messages).toEqual(["shown"]); + }); +}); diff --git a/packages/core/observability/src/index.ts b/packages/core/observability/src/index.ts new file mode 100644 index 000000000..49e28d065 --- /dev/null +++ b/packages/core/observability/src/index.ts @@ -0,0 +1,207 @@ +import { Duration, Effect, Exit, Layer, Logger, References } from "effect"; +import type * as LogLevel from "effect/LogLevel"; +import { FetchHttpClient, HttpMiddleware, HttpServerRequest } from "effect/unstable/http"; +import type { HttpClient } from "effect/unstable/http"; +import { + OtlpLogger, + OtlpMetrics, + OtlpSerialization, + OtlpTracer, +} from "effect/unstable/observability"; + +// --------------------------------------------------------------------------- +// Shared observability: structured JSON logging + OTLP traces/logs/metrics. +// +// One implementation for every server app. The OTLP exporters are the +// fetch-based `effect/unstable/observability` modules (no `async_hooks`), so +// the SAME layer works on Bun (local, self-host) and Cloudflare workerd +// (host-cloudflare, cloud). Everything is env-driven and fully inert when no +// OTLP endpoint is configured: structured stdout logging stays on, export +// layers collapse to `Layer.empty`. +// --------------------------------------------------------------------------- + +/** + * Parse the standard `OTEL_EXPORTER_OTLP_HEADERS` format + * (`key=value,key2=value2`) into a header record. Undefined/blank input and + * malformed pairs are dropped rather than failing boot. + */ +export const parseOtlpHeaders = (value: string | undefined): Record => { + if (!value?.trim()) return {}; + return Object.fromEntries( + value.split(",").flatMap((pair): ReadonlyArray => { + const eq = pair.indexOf("="); + if (eq < 1) return []; + return [[pair.slice(0, eq).trim(), pair.slice(eq + 1).trim()]]; + }), + ); +}; + +const LOG_LEVELS: Record = { + all: "All", + trace: "Trace", + debug: "Debug", + info: "Info", + warn: "Warn", + warning: "Warn", + error: "Error", + fatal: "Fatal", + none: "None", +}; + +/** Parse a `LOG_LEVEL`-style string; unknown/absent values return undefined. */ +export const parseLogLevel = (value: string | undefined): LogLevel.LogLevel | undefined => + value ? LOG_LEVELS[value.trim().toLowerCase()] : undefined; + +/** + * A structured JSON-lines console logger. Reuses Effect's + * `Logger.formatStructured` record (level, timestamp, message, annotations, + * cause, log spans, fiber id) and adds `trace_id`/`span_id` from the fiber's + * current tracer span, so console lines correlate with exported traces the + * same way OTLP log records do. + * + * Writes to STDERR (`console.error`), never stdout: the MCP stdio transport + * uses the process's stdout as its JSON-RPC channel, so a stdout log line + * from the same process corrupts the protocol stream. Cloudflare Workers and + * log shippers capture stderr the same as stdout. + */ +export const jsonSpanLogger: Logger.Logger = Logger.make((options) => { + const record: Record = { ...Logger.formatStructured.log(options) }; + const span = options.fiber.currentSpan; + if (span) { + record["trace_id"] = span.traceId; + record["span_id"] = span.spanId; + } + // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: log emission must tolerate non-serializable annotation values + try { + console.error(JSON.stringify(record)); + } catch { + console.error(record); + } +}); + +/** + * Replace the default pretty logger with {@link jsonSpanLogger} and apply the + * minimum log level (a `LOG_LEVEL`-style string; defaults to the runtime's + * `Info` when absent or unrecognized). + */ +export const structuredLoggerLayer = (options?: { + readonly logLevel?: string | undefined; +}): Layer.Layer => { + const logger = Logger.layer([jsonSpanLogger], { mergeWithExisting: false }); + const level = parseLogLevel(options?.logLevel); + return level === undefined + ? logger + : Layer.merge(logger, Layer.succeed(References.MinimumLogLevel)(level)); +}; + +// --------------------------------------------------------------------------- +// HTTP access log +// --------------------------------------------------------------------------- + +const stripSearchAndHash = (url: string): string => { + const query = url.indexOf("?"); + const hash = url.indexOf("#"); + const end = query === -1 ? hash : hash === -1 ? query : Math.min(query, hash); + return end === -1 ? url : url.slice(0, end); +}; + +/** + * Access-log middleware replacing Effect's built-in `HttpMiddleware.logger`, + * whose message is the constant string "Sent HTTP response" (request context + * lives only in annotations, so log UIs that render the message column show + * an uninformative constant). This emits a composed message + * (`GET /api/things 200 in 12ms`) while keeping the same `http.*` annotations + * for filtering, plus `http.duration_ms`. + * + * Apply via `HttpRouter.serve`/`toWebHandler` `middleware` together with + * `disableLogger: true`, or every request logs twice. + */ +export const httpAccessLogger = HttpMiddleware.make((httpApp) => + Effect.gen(function* () { + const request = yield* HttpServerRequest.HttpServerRequest; + const path = stripSearchAndHash(request.url); + const [duration, exit] = yield* Effect.timed(Effect.exit(httpApp)); + const durationMs = Math.round(Duration.toMillis(duration)); + if (Exit.isSuccess(exit)) { + yield* Effect.logInfo( + `${request.method} ${path} ${exit.value.status} in ${durationMs}ms`, + ).pipe( + Effect.annotateLogs({ + "http.method": request.method, + "http.url": path, + "http.status": exit.value.status, + "http.duration_ms": durationMs, + }), + ); + return exit.value; + } + // An exit failure here is an unhandled route error (e.g. RouteNotFound) or + // a defect; the platform's error renderer still owns the response. Info + // level, matching the built-in logger: not-found probes are routine noise. + yield* Effect.logInfo(`${request.method} ${path} failed in ${durationMs}ms`, exit.cause).pipe( + Effect.annotateLogs({ + "http.method": request.method, + "http.url": path, + "http.duration_ms": durationMs, + }), + ); + return yield* Effect.failCause(exit.cause); + }), +); + +export type ObservabilityConfig = { + /** OTLP resource service name, e.g. "executor-local". */ + readonly serviceName: string; + /** + * OTLP/HTTP base endpoint (`OTEL_EXPORTER_OTLP_ENDPOINT`), e.g. + * `http://localhost:4318`. `/v1/{traces,logs,metrics}` are appended. When + * absent, OTLP export is disabled entirely. + */ + readonly endpoint?: string | undefined; + /** `OTEL_EXPORTER_OTLP_HEADERS` string or an already-parsed header record. */ + readonly headers?: string | Record | undefined; + /** `LOG_LEVEL`-style minimum level for ALL logging (stdout and OTLP). */ + readonly logLevel?: string | undefined; + /** + * Export spans over OTLP (default true). Cloud passes `false`: its traces + * keep flowing through the existing Axiom `WebTracerProvider` pipeline, + * which non-Effect fetch paths and Sentry correlation depend on. + */ + readonly traces?: boolean | undefined; +}; + +/** + * The one layer an app boots: structured JSON stdout logging (always) plus + * OTLP logs + metrics + optional traces (only when `endpoint` is set). + * + * The OTLP logger is ADDED to the current logger set (`mergeWithExisting`), so + * stdout and OTLP both receive every log record; `Layer.provideMerge` keeps + * the stdout logger and minimum-level reference visible to the app while + * letting the OTLP layers build on top of them. + */ +export const observabilityLayer = (config: ObservabilityConfig): Layer.Layer => { + const logger = structuredLoggerLayer({ logLevel: config.logLevel }); + const endpoint = config.endpoint?.trim(); + if (!endpoint) return logger; + + const base = endpoint.replace(/\/+$/, ""); + const resource = { serviceName: config.serviceName }; + const headers = + typeof config.headers === "string" ? parseOtlpHeaders(config.headers) : (config.headers ?? {}); + + const signals: Array< + Layer.Layer + > = [ + OtlpLogger.layer({ url: `${base}/v1/logs`, resource, headers, mergeWithExisting: true }), + OtlpMetrics.layer({ url: `${base}/v1/metrics`, resource, headers }), + ]; + if (config.traces !== false) { + signals.push(OtlpTracer.layer({ url: `${base}/v1/traces`, resource, headers })); + } + + const otlp = Layer.mergeAll(signals[0]!, ...signals.slice(1)).pipe( + Layer.provide(OtlpSerialization.layerJson), + Layer.provide(FetchHttpClient.layer), + ); + return otlp.pipe(Layer.provideMerge(logger)); +}; diff --git a/packages/core/observability/tsconfig.json b/packages/core/observability/tsconfig.json new file mode 100644 index 000000000..e42a50daf --- /dev/null +++ b/packages/core/observability/tsconfig.json @@ -0,0 +1,25 @@ +{ + "compilerOptions": { + "target": "ESNext", + "module": "ESNext", + "moduleResolution": "bundler", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "outDir": "dist", + "rootDir": "src", + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "types": ["node"], + "plugins": [ + { + "name": "@effect/language-service", + "ignoreEffectSuggestionsInTscExitCode": true, + "ignoreEffectWarningsInTscExitCode": true, + "diagnosticSeverity": {} + } + ] + }, + "include": ["src"] +} diff --git a/packages/core/observability/tsup.config.ts b/packages/core/observability/tsup.config.ts new file mode 100644 index 000000000..7fc2eead2 --- /dev/null +++ b/packages/core/observability/tsup.config.ts @@ -0,0 +1,12 @@ +import { defineConfig } from "tsup"; + +export default defineConfig({ + entry: { + index: "src/index.ts", + }, + format: ["esm"], + dts: false, + sourcemap: true, + clean: true, + external: [/^@executor-js\//, /^effect/, /^@effect\//], +}); diff --git a/packages/core/observability/vitest.config.ts b/packages/core/observability/vitest.config.ts new file mode 100644 index 000000000..ae847ff6d --- /dev/null +++ b/packages/core/observability/vitest.config.ts @@ -0,0 +1,7 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + include: ["src/**/*.test.ts"], + }, +}); diff --git a/packages/hosts/cloudflare/src/mcp/agent-session-durable-object.ts b/packages/hosts/cloudflare/src/mcp/agent-session-durable-object.ts index 2f802a827..1c298e0ae 100644 --- a/packages/hosts/cloudflare/src/mcp/agent-session-durable-object.ts +++ b/packages/hosts/cloudflare/src/mcp/agent-session-durable-object.ts @@ -30,7 +30,6 @@ import { MAX_PAUSED_SESSION_IDLE_MS, SESSION_TIMEOUT_MS, decideSessionAlarm, - pausedLeaseExtensionLog, } from "./session-alarm-policy"; export type IncomingTraceHeaders = IncomingPropagationHeaders; @@ -404,13 +403,14 @@ export abstract class McpAgentSessionDOBase< readonly idleMs: number; readonly pausedExecutionCount: number; }): Promise { - console.info( - JSON.stringify({ - event: "mcp_session_idle_runtime_dispose", - sessionId: this.sessionId, - idleMs: input.idleMs, - pausedExecutionCount: input.pausedExecutionCount, - }), + await Effect.runPromise( + Effect.logInfo("mcp.session.idle_expire").pipe( + Effect.annotateLogs({ + "mcp.session.id": this.sessionId, + "mcp.session.idle_ms": input.idleMs, + "mcp.session.paused_execution_count": input.pausedExecutionCount, + }), + ), ); await Effect.runPromise(this.closeRuntime()); await Effect.runPromise( @@ -454,16 +454,11 @@ export abstract class McpAgentSessionDOBase< }): Effect.Effect { const self = this; return Effect.gen(function* () { - const first = Cause.prettyErrors(input.cause)[0]; - console.error( - JSON.stringify({ - event: "mcp_execution_owner_directory_error", - operation: input.operation, - executionId: input.executionId, - sessionId: self.sessionId, - exceptionType: first?.name ?? "Error", - exceptionMessage: first?.message ?? "unknown", - cause: Cause.pretty(input.cause), + yield* Effect.logError("mcp.execution_owner.directory_error", input.cause).pipe( + Effect.annotateLogs({ + "mcp.execution_owner.operation": input.operation, + "mcp.execution.id": input.executionId, + "mcp.session.id": self.sessionId, }), ); yield* Effect.annotateCurrentSpan({ @@ -480,16 +475,11 @@ export abstract class McpAgentSessionDOBase< }): Effect.Effect { const self = this; return Effect.gen(function* () { - const first = Cause.prettyErrors(input.cause)[0]; - console.error( - JSON.stringify({ - event: "mcp_model_resume_forward_error", - executionId: input.executionId, - sessionId: self.sessionId, - ownerSessionId: input.owner.sessionId, - exceptionType: first?.name ?? "Error", - exceptionMessage: first?.message ?? "unknown", - cause: Cause.pretty(input.cause), + yield* Effect.logError("mcp.execution_owner.forward_error", input.cause).pipe( + Effect.annotateLogs({ + "mcp.execution.id": input.executionId, + "mcp.session.id": self.sessionId, + "mcp.execution_owner.owner_session_id": input.owner.sessionId, }), ); yield* Effect.annotateCurrentSpan({ @@ -506,14 +496,13 @@ export abstract class McpAgentSessionDOBase< }): Effect.Effect { const self = this; return Effect.gen(function* () { - console.error( - JSON.stringify({ - event: "mcp_model_resume_forward_error", - reason: "timeout", - executionId: input.executionId, - sessionId: self.sessionId, - ownerSessionId: input.owner.sessionId, - timeoutMs: input.timeoutMs, + yield* Effect.logError("mcp.execution_owner.forward_error").pipe( + Effect.annotateLogs({ + "mcp.execution_owner.forward.error": "timeout", + "mcp.execution.id": input.executionId, + "mcp.session.id": self.sessionId, + "mcp.execution_owner.owner_session_id": input.owner.sessionId, + "mcp.execution_owner.forward.timeout_ms": input.timeoutMs, }), ); yield* Effect.annotateCurrentSpan({ @@ -623,13 +612,26 @@ export abstract class McpAgentSessionDOBase< self.server = mcpServer; self.engine = engine; self.initialized = true; + yield* Effect.logInfo("mcp.session.created").pipe( + Effect.annotateLogs({ + "mcp.session.id": self.sessionId, + "mcp.auth.organization_id": sessionMeta.organizationId, + "mcp.session.resource_kind": sessionMeta.resource.kind, + "mcp.elicitation.mode": sessionMeta.elicitationMode ?? "model", + }), + ); yield* Effect.promise(() => self.markActivity()).pipe( Effect.withSpan("McpSessionDO.markActivity"), ); }).pipe( Effect.tapCause((cause) => Effect.gen(function* () { - console.error("[mcp-session] init failed:", Cause.pretty(cause)); + yield* Effect.logError("mcp.session.init_failed", cause).pipe( + Effect.annotateLogs({ + "mcp.session.id": self.sessionId, + "mcp.auth.organization_id": props.session.organizationId, + }), + ); yield* self.captureCauseEffect(cause); yield* self.recordCauseOnSpan(cause); }), @@ -845,13 +847,13 @@ export abstract class McpAgentSessionDOBase< } if (decision.kind === "extend_paused_lease") { - console.info( - JSON.stringify( - pausedLeaseExtensionLog({ - sessionId: this.sessionId, - pausedExecutionCount, - idleMs, - leaseMs: decision.leaseMs, + await Effect.runPromise( + Effect.logInfo("mcp.session.alarm.extend_lease").pipe( + Effect.annotateLogs({ + "mcp.session.id": this.sessionId, + "mcp.session.paused_execution_count": pausedExecutionCount, + "mcp.session.idle_ms": idleMs, + "mcp.session.alarm.lease_ms": decision.leaseMs, }), ), ); @@ -1049,12 +1051,12 @@ export abstract class McpAgentSessionDOBase< }), Effect.tapCause((cause) => Effect.gen(function* () { - yield* Effect.sync(() => { - console.error( - "[mcp-session] pending approval lease start failed:", - Cause.pretty(cause), - ); - }); + yield* Effect.logError("mcp.approval.lease_start_failed", cause).pipe( + Effect.annotateLogs({ + "mcp.session.id": self.sessionId, + "mcp.execution.id": executionId, + }), + ); yield* self.captureCauseEffect(cause); }), ), @@ -1076,12 +1078,12 @@ export abstract class McpAgentSessionDOBase< this.expirePendingApproval(executionId).pipe( Effect.tapCause((cause) => Effect.gen(function* () { - yield* Effect.sync(() => { - console.error( - "[mcp-session] pending approval lease expiration failed:", - Cause.pretty(cause), - ); - }); + yield* Effect.logError("mcp.approval.lease_expiration_failed", cause).pipe( + Effect.annotateLogs({ + "mcp.session.id": self.sessionId, + "mcp.execution.id": executionId, + }), + ); yield* self.captureCauseEffect(cause); }), ), @@ -1171,9 +1173,12 @@ export abstract class McpAgentSessionDOBase< action: "decline", content: { reason: "approval_timeout" }, } satisfies ResumeResponse; - yield* Effect.sync(() => { - console.info(JSON.stringify({ event: "mcp_pending_approval_lease_expire", executionId })); - }); + yield* Effect.logInfo("mcp.approval.expire").pipe( + Effect.annotateLogs({ + "mcp.session.id": self.sessionId, + "mcp.execution.id": executionId, + }), + ); yield* self.recordApprovalResponse(executionId, response); if (self.engine && !self.approvalWaiters.has(executionId)) { yield* self.engine.resume(executionId, response).pipe(Effect.ignore); diff --git a/packages/hosts/cloudflare/src/mcp/session-alarm-policy.ts b/packages/hosts/cloudflare/src/mcp/session-alarm-policy.ts index 4b0d7547f..ee52cf126 100644 --- a/packages/hosts/cloudflare/src/mcp/session-alarm-policy.ts +++ b/packages/hosts/cloudflare/src/mcp/session-alarm-policy.ts @@ -42,16 +42,3 @@ export const decideSessionAlarm = (input: { } return { kind: "destroy_idle_session" }; }; - -export const pausedLeaseExtensionLog = (input: { - readonly sessionId: string; - readonly pausedExecutionCount: number; - readonly idleMs: number; - readonly leaseMs: number; -}): Record => ({ - event: "mcp_session_paused_lease_extension", - sessionId: input.sessionId, - pausedExecutionCount: input.pausedExecutionCount, - idleMs: input.idleMs, - leaseMs: input.leaseMs, -}); diff --git a/packages/hosts/mcp/src/envelope.ts b/packages/hosts/mcp/src/envelope.ts index aac2c9af7..33ad8ceec 100644 --- a/packages/hosts/mcp/src/envelope.ts +++ b/packages/hosts/mcp/src/envelope.ts @@ -203,6 +203,28 @@ const mcpDispatch = (resource: McpResource) => // resource; an auth-level Forbidden may not carry either. const outcome = yield* auth.authenticate(request); if (!Predicate.isTagged(outcome, "Authenticated")) { + // Unauthorized is expected background noise (unauthenticated probes, + // RFC 9728 discovery); Forbidden is an audit signal; Unavailable means + // the auth backend itself is degraded. + const [logAuthOutcome, authResult] = Match.value(outcome).pipe( + Match.tag( + "Unavailable", + () => [Effect.logWarning("mcp.auth.outcome"), "Unavailable"] as const, + ), + Match.tag("Forbidden", () => [Effect.logInfo("mcp.auth.outcome"), "Forbidden"] as const), + Match.tag( + "Unauthorized", + () => [Effect.logDebug("mcp.auth.outcome"), "Unauthorized"] as const, + ), + Match.exhaustive, + ); + yield* logAuthOutcome.pipe( + Effect.annotateLogs({ + "mcp.auth.result": authResult, + "mcp.session.id": sessionId ?? "", + "mcp.request.method": request.method, + }), + ); return HttpServerResponse.raw(renderAuthError(auth, request, outcome)); } const principal = outcome.principal; @@ -231,6 +253,21 @@ const mcpDispatch = (resource: McpResource) => sessionId, method: request.method, }); + if (!(result instanceof Response)) { + // not-found is normal for stale session retries; forbidden means a + // cross-bearer access attempt on a live session. + yield* ( + result === "forbidden" + ? Effect.logInfo("mcp.session.dispatch_result") + : Effect.logDebug("mcp.session.dispatch_result") + ).pipe( + Effect.annotateLogs({ + "mcp.session.dispatch_result": result, + "mcp.session.id": sessionId ?? "", + "mcp.request.method": request.method, + }), + ); + } return HttpServerResponse.raw( result instanceof Response ? result : renderDispatchError(result), ); diff --git a/packages/hosts/mcp/src/in-memory-session-store.ts b/packages/hosts/mcp/src/in-memory-session-store.ts index e2d233e8c..2ebf244e0 100644 --- a/packages/hosts/mcp/src/in-memory-session-store.ts +++ b/packages/hosts/mcp/src/in-memory-session-store.ts @@ -108,10 +108,6 @@ const ignoreClose = (close: (() => Promise) | undefined): Promise => ? Effect.runPromise(Effect.ignore(Effect.tryPromise({ try: close, catch: () => undefined }))) : Promise.resolve(); -const formatBoundaryError = (error: unknown): unknown => - // oxlint-disable-next-line executor/no-instanceof-error, executor/no-unknown-error-message -- boundary: log unknown MCP SDK/runtime failures - error instanceof Error ? (error.stack ?? error.message) : error; - // The store's error bodies are INNER responses (no CORS): the serving envelope // re-wraps the store `Response` with CORS before it leaves the origin, so the // canonical renderer is called with `cors: false` (content-type only). @@ -185,11 +181,13 @@ export const makeInMemoryMcpSessionStore = ( return Effect.promise(() => transport.handleRequest(request)).pipe( Effect.tap(() => Effect.sync(finish)), Effect.catchCause((cause) => - Effect.sync(() => { - console.error("[mcp] handleRequest error:", formatBoundaryError(cause)); - finish(); - return jsonRpcError(500, -32603, "Internal server error"); - }), + Effect.logError("mcp.session.handle_request_error", cause).pipe( + Effect.annotateLogs({ "mcp.session.id": transport.sessionId ?? "" }), + Effect.map(() => { + finish(); + return jsonRpcError(500, -32603, "Internal server error"); + }), + ), ), ); }; @@ -258,7 +256,12 @@ export const makeInMemoryMcpSessionStore = ( owners.set(sid, { principal, resource }); engines.set(sid, engine); }, - onsessionclosed: (sid) => void dispose(sid, { server: true }), + // SDK lifecycle callbacks run outside Effect; emit structured + // console lines so session teardown still reaches host logs. + onsessionclosed: (sid) => { + console.info(JSON.stringify({ event: "mcp.session.closed", sessionId: sid })); + void dispose(sid, { server: true }); + }, }); transport.onclose = () => { const sid = transport.sessionId; @@ -267,10 +270,21 @@ export const makeInMemoryMcpSessionStore = ( yield* Effect.promise(() => mcpServer.connect(transport)); // The session id is minted on the first (initialize) request, so we // drive `handleRequest` here; if no id results we close eagerly. - return yield* runHandleRequest(transport, request, () => { + const response = yield* runHandleRequest(transport, request, () => { void ignoreClose(() => transport.close()); void ignoreClose(() => mcpServer.close()); }); + if (createdSessionId !== null) { + yield* Effect.logInfo("mcp.session.created").pipe( + Effect.annotateLogs({ + "mcp.session.id": createdSessionId, + "mcp.session.resource_kind": resource.kind, + "mcp.auth.organization_id": principal.organizationId, + "mcp.elicitation.mode": readElicitationMode(request), + }), + ); + } + return response; }), ), // A build failure has nowhere typed to go in the envelope; render a 500. diff --git a/packages/hosts/mcp/src/tool-server.test.ts b/packages/hosts/mcp/src/tool-server.test.ts index d5de1ad65..0232e86c9 100644 --- a/packages/hosts/mcp/src/tool-server.test.ts +++ b/packages/hosts/mcp/src/tool-server.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "@effect/vitest"; -import { Data, Deferred, Effect } from "effect"; +import { Data, Deferred, Effect, Logger } from "effect"; import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { InMemoryTransport } from "@modelcontextprotocol/sdk/inMemory.js"; import { ElicitRequestSchema } from "@modelcontextprotocol/sdk/types.js"; @@ -1758,3 +1758,108 @@ describe("MCP host server — skills tool", () => { }); }); }); + +// --------------------------------------------------------------------------- +// Structured tool-call logging +// --------------------------------------------------------------------------- + +describe("MCP host server — structured tool-call logs", () => { + type CapturedLog = ReturnType<(typeof Logger.formatStructured)["log"]>; + + /** Build the server with a capturing logger in the captured Effect context, + * so SDK-callback tool calls (via `runToolEffect`) log into `records`. */ + const withLoggingClient = async ( + engine: ExecutionEngine, + records: CapturedLog[], + fn: (client: Client) => Promise, + ) => { + const captureLogger = Logger.make((options) => { + records.push(Logger.formatStructured.log(options)); + }); + const mcpServer = await Effect.runPromise( + createExecutorMcpServer({ engine }).pipe( + Effect.provide(Logger.layer([captureLogger], { mergeWithExisting: false })), + ), + ); + const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); + const client = new Client({ name: "test-client", version: "1.0.0" }, { capabilities: {} }); + await mcpServer.connect(serverTransport); + await client.connect(clientTransport); + // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: test helper must close MCP transports after async client assertions + try { + await fn(client); + } finally { + await clientTransport.close(); + await serverTransport.close(); + await mcpServer.close(); + } + }; + + it("emits mcp.tool.start and mcp.tool.end with outcome and duration", async () => { + const records: CapturedLog[] = []; + const engine = makeStubEngine({ + executeWithPause: () => Effect.succeed({ status: "completed", result: { result: "ok" } }), + }); + + await withLoggingClient(engine, records, async (client) => { + await client.callTool({ name: "execute", arguments: { code: "1+1" } }); + }); + + const start = records.find((record) => record.message === "mcp.tool.start"); + expect(start).toBeDefined(); + expect(start?.annotations).toMatchObject({ + "mcp.tool.name": "execute", + "mcp.execute.code_length": 3, + }); + + const end = records.find((record) => record.message === "mcp.tool.end"); + expect(end).toBeDefined(); + expect(end?.annotations).toMatchObject({ + "mcp.tool.name": "execute", + "mcp.tool.outcome": "success", + }); + expect(typeof end?.annotations["mcp.tool.duration_ms"]).toBe("number"); + }); + + it("classifies paused executions and logs the execution id", async () => { + const records: CapturedLog[] = []; + const engine = makeStubEngine({ + executeWithPause: () => + Effect.succeed( + makePausedResult( + "exec-123", + FormElicitation.make({ message: "confirm?", requestedSchema: {} }), + ), + ), + }); + + await withLoggingClient(engine, records, async (client) => { + await client.callTool({ name: "execute", arguments: { code: "pause()" } }); + }); + + const end = records.find((record) => record.message === "mcp.tool.end"); + expect(end?.annotations).toMatchObject({ + "mcp.tool.outcome": "paused", + "mcp.execution.id": "exec-123", + }); + }); + + it("logs an internal error with a correlation id when the engine defects", async () => { + const records: CapturedLog[] = []; + const engine = makeStubEngine({ + executeWithPause: () => Effect.die("boom"), + }); + + await withLoggingClient(engine, records, async (client) => { + const result = await client.callTool({ name: "execute", arguments: { code: "1+1" } }); + expect(result.isError).toBe(true); + }); + + const end = records.find((record) => record.message === "mcp.tool.end"); + expect(end?.annotations["mcp.tool.outcome"]).toBe("error"); + + const internal = records.find((record) => record.message === "mcp.tool.internal_error"); + expect(internal).toBeDefined(); + expect(typeof internal?.annotations["mcp.tool.correlation_id"]).toBe("string"); + }); +}); diff --git a/packages/hosts/mcp/src/tool-server.ts b/packages/hosts/mcp/src/tool-server.ts index 07daf0e23..54e492aa6 100644 --- a/packages/hosts/mcp/src/tool-server.ts +++ b/packages/hosts/mcp/src/tool-server.ts @@ -1,4 +1,4 @@ -import { Duration, Effect, Match, Option, Schema } from "effect"; +import { Duration, Effect, Exit, Match, Metric, Option, Schema } from "effect"; import * as Cause from "effect/Cause"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { ContentBlockSchema, type ContentBlock } from "@modelcontextprotocol/sdk/types.js"; @@ -262,8 +262,8 @@ const makeMcpElicitationHandler = Match.exhaustive, ); - return Effect.promise(async (): Promise => { - const requestTag = elicitationRequestTag(ctx.request); + const requestTag = elicitationRequestTag(ctx.request); + return Effect.gen(function* () { debugLog?.("elicitation.request", { requestTag, supportsUrl, @@ -272,35 +272,59 @@ const makeMcpElicitationHandler = url: elicitationRequestUrl(ctx.request), clientCapabilities: server.server.getClientCapabilities() ?? null, }); + yield* Effect.logDebug("mcp.tool.elicitation.request").pipe( + Effect.annotateLogs({ + "mcp.elicitation.tag": requestTag, + "mcp.elicitation.supports_url": supportsUrl, + "mcp.elicitation.has_schema": requestedSchemaIsNonEmpty(ctx.request), + }), + ); - // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: MCP SDK elicitInput is a Promise API; failures become a cancel response - try { - const response = await server.server.elicitInput( - params as Parameters[0], - ); - - debugLog?.("elicitation.response", { - requestTag, - action: response.action, - hasContent: - typeof response.content === "object" && - response.content !== null && - Object.keys(response.content).length > 0, - }); - - return { - action: response.action as typeof ElicitationResponse.Type.action, - content: response.content, - }; - } catch (err) { - const error = formatBoundaryError(err); - debugLog?.("elicitation.error", { - requestTag, - error, - clientCapabilities: server.server.getClientCapabilities() ?? null, - }); - return { action: "cancel" as const } as ElicitationResponse; - } + return yield* Effect.tryPromise({ + // The MCP SDK elicitInput is a Promise API; failures become a cancel response. + try: () => + server.server.elicitInput(params as Parameters[0]), + catch: formatBoundaryError, + }).pipe( + Effect.matchEffect({ + onSuccess: (response) => + Effect.gen(function* () { + debugLog?.("elicitation.response", { + requestTag, + action: response.action, + hasContent: + typeof response.content === "object" && + response.content !== null && + Object.keys(response.content).length > 0, + }); + yield* Effect.logDebug("mcp.tool.elicitation.response").pipe( + Effect.annotateLogs({ + "mcp.elicitation.tag": requestTag, + "mcp.elicitation.action": response.action, + }), + ); + return { + action: response.action as typeof ElicitationResponse.Type.action, + content: response.content, + }; + }), + onFailure: (error) => + Effect.gen(function* () { + debugLog?.("elicitation.error", { + requestTag, + error, + clientCapabilities: server.server.getClientCapabilities() ?? null, + }); + yield* Effect.logDebug("mcp.tool.elicitation.error").pipe( + Effect.annotateLogs({ + "mcp.elicitation.tag": requestTag, + "mcp.elicitation.error_name": error.name ?? "unknown", + }), + ); + return { action: "cancel" as const } as ElicitationResponse; + }), + }), + ); }); }; @@ -521,25 +545,79 @@ const formatResumeApprovalRequired = (input: { }, }); -const toMcpFailureResult = (cause: Cause.Cause): McpToolResult => { +const toMcpFailureResult = (cause: Cause.Cause): Effect.Effect => { const correlationId = newCorrelationId(); - // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: best-effort defect logging must tolerate non-serializable causes - try { - console.error( - `[executor:mcp] execute defect correlation_id=${correlationId}`, - Cause.pretty(cause), - ); - } catch { - /* ignore logger failures */ - } const text = `Internal tool error [${correlationId}]`; - return { - content: [{ type: "text", text: `Error: ${text}` }], - structuredContent: { status: "error", error: text }, - isError: true, - }; + return Effect.logError("mcp.tool.internal_error", cause).pipe( + Effect.annotateLogs({ "mcp.tool.correlation_id": correlationId }), + Effect.as({ + content: [{ type: "text" as const, text: `Error: ${text}` }], + structuredContent: { status: "error", error: text }, + isError: true, + } satisfies McpToolResult), + ); }; +// --------------------------------------------------------------------------- +// Tool-call telemetry: structured start/end logs plus call-count and duration +// metrics, keyed by tool and outcome. Payloads stay out of the logs — code is +// summarized as `mcp.execute.code_length` (the existing span convention). +// --------------------------------------------------------------------------- + +type McpToolOutcome = "success" | "paused" | "approval_required" | "error"; + +const classifyToolOutcome = (result: McpToolResult): McpToolOutcome => { + if (result.isError) return "error"; + const status = result.structuredContent?.status; + if (status === "waiting_for_interaction") return "paused"; + if (status === "user_approval_required") return "approval_required"; + return "success"; +}; + +const toolCallCounter = Metric.counter("mcp.tool.calls", { + description: "MCP tool invocations by tool and outcome", +}); + +const toolCallDurationMs = Metric.histogram("mcp.tool.duration_ms", { + description: "MCP tool call duration in milliseconds", + boundaries: [10, 50, 100, 250, 500, 1000, 2500, 5000, 10_000, 30_000, 60_000, 120_000, 300_000], +}); + +/** + * Wrap a tool-call effect with `mcp.tool.start` / `mcp.tool.end` structured + * logs and the call metrics. Defects also emit a paired `mcp.tool.end` with + * outcome `error` before re-propagating to `runToolEffect`'s `catchCause`. + */ +const instrumentToolCall = ( + tool: "execute" | "resume", + annotations: Record, + effect: Effect.Effect, +): Effect.Effect => + Effect.gen(function* () { + yield* Effect.logInfo("mcp.tool.start").pipe( + Effect.annotateLogs({ "mcp.tool.name": tool, ...annotations }), + ); + const [duration, exit] = yield* Effect.timed(Effect.exit(effect)); + const outcome = Exit.isSuccess(exit) ? classifyToolOutcome(exit.value) : "error"; + const durationMs = Math.round(Duration.toMillis(duration)); + const attributes = { "mcp.tool.name": tool, "mcp.tool.outcome": outcome }; + yield* Metric.update(Metric.withAttributes(toolCallCounter, attributes), 1); + yield* Metric.update(Metric.withAttributes(toolCallDurationMs, attributes), durationMs); + const executionId = Exit.isSuccess(exit) + ? exit.value.structuredContent?.executionId + : undefined; + yield* Effect.logInfo("mcp.tool.end").pipe( + Effect.annotateLogs({ + "mcp.tool.name": tool, + "mcp.tool.outcome": outcome, + "mcp.tool.duration_ms": durationMs, + ...annotations, + ...(typeof executionId === "string" ? { "mcp.execution.id": executionId } : {}), + }), + ); + return Exit.isSuccess(exit) ? exit.value : yield* Effect.failCause(exit.cause); + }); + const recoveryText = "To recover, run the execute tool again with the original code; if it pauses, a fresh executionId will be issued."; @@ -748,9 +826,7 @@ export const createExecutorMcpServer = ( }; const runToolEffect = (effect: Effect.Effect) => Effect.runPromiseWith(context)( - anchor(effect).pipe( - Effect.catchCause((cause) => Effect.succeed(toMcpFailureResult(cause))), - ), + anchor(effect).pipe(Effect.catchCause((cause) => toMcpFailureResult(cause))), ); const server = yield* Effect.sync( @@ -765,43 +841,53 @@ export const createExecutorMcpServer = ( ).pipe(Effect.withSpan("mcp.host.create_server")); const executeCode = (code: string): Effect.Effect => - Effect.gen(function* () { - debugLog("execute.call", { - elicitationMode: elicitationMode.mode, - elicitationSupport: getElicitationSupport(server), - clientCapabilities: server.server.getClientCapabilities() ?? null, - codeLength: code.length, - }); - if (elicitationMode.mode === "native") { - const result = yield* engine.execute(code, { - onElicitation: makeMcpElicitationHandler(server, debugLog), + instrumentToolCall( + "execute", + { "mcp.execute.code_length": code.length, "mcp.elicitation.mode": elicitationMode.mode }, + Effect.gen(function* () { + debugLog("execute.call", { + elicitationMode: elicitationMode.mode, + elicitationSupport: getElicitationSupport(server), + clientCapabilities: server.server.getClientCapabilities() ?? null, + codeLength: code.length, }); - return toMcpResult(result); - } - const outcome = yield* engine.executeWithPause(code); - debugLog("execute.paused_flow_result", { - status: outcome.status, - executionId: outcome.status === "paused" ? outcome.execution.id : undefined, - interactionKind: - outcome.status === "paused" - ? pausedInteractionKind(outcome.execution.elicitationContext.request) - : undefined, - }); - if (outcome.status === "paused") { - const deadline = pauseDeadline(); - yield* Effect.annotateCurrentSpan({ - "mcp.execute.paused": true, - "mcp.execute.paused_execution_id": outcome.execution.id, - "mcp.execute.pause_source": "execute", + if (elicitationMode.mode === "native") { + const result = yield* engine.execute(code, { + onElicitation: makeMcpElicitationHandler(server, debugLog), + }); + return toMcpResult(result); + } + const outcome = yield* engine.executeWithPause(code); + debugLog("execute.paused_flow_result", { + status: outcome.status, + executionId: outcome.status === "paused" ? outcome.execution.id : undefined, + interactionKind: + outcome.status === "paused" + ? pausedInteractionKind(outcome.execution.elicitationContext.request) + : undefined, }); - yield* onExecutionPaused(outcome.execution.id, deadline); - return elicitationMode.mode === "browser" - ? yield* requireUserResumeApproval(outcome.execution.id) - : toMcpPausedResult(formatPausedExecution(outcome.execution, { deadline })); - } - return toMcpResult(outcome.result); - }).pipe( + if (outcome.status === "paused") { + const deadline = pauseDeadline(); + yield* Effect.annotateCurrentSpan({ + "mcp.execute.paused": true, + "mcp.execute.paused_execution_id": outcome.execution.id, + "mcp.execute.pause_source": "execute", + }); + yield* onExecutionPaused(outcome.execution.id, deadline); + return elicitationMode.mode === "browser" + ? yield* requireUserResumeApproval(outcome.execution.id) + : toMcpPausedResult(formatPausedExecution(outcome.execution, { deadline })); + } + return toMcpResult(outcome.result); + }), + ).pipe( + // OTel SpanKind CONSUMER: a tool call is a remotely-initiated unit of + // work processed by this server. Per OTel semantics, observability + // backends treat consumer spans as task/job roots; a root INTERNAL + // span without HTTP attributes is typically not classifiable when no + // HTTP server span wraps the call (e.g. the stdio transport). Effect.withSpan("mcp.host.tool.execute", { + kind: "consumer", attributes: { "mcp.tool.name": "execute", "mcp.execute.code_length": code.length, @@ -814,41 +900,46 @@ export const createExecutorMcpServer = ( action: "accept" | "decline" | "cancel", content: Record | undefined, ): Effect.Effect => - Effect.gen(function* () { - debugLog("resume.call", { - executionId, - action, - hasContent: content !== undefined, - clientCapabilities: server.server.getClientCapabilities() ?? null, - }); - const outcome = yield* resumeWithLifecycle(executionId, { action, content }); - if (!outcome) { - debugLog("resume.missing_execution", { executionId }); - if (yield* localExecutionAlreadySettled(executionId)) { - return alreadySettledResult(executionId); + instrumentToolCall( + "resume", + { "mcp.execute.execution_id": executionId, "mcp.execute.resume.action": action }, + Effect.gen(function* () { + debugLog("resume.call", { + executionId, + action, + hasContent: content !== undefined, + clientCapabilities: server.server.getClientCapabilities() ?? null, + }); + const outcome = yield* resumeWithLifecycle(executionId, { action, content }); + if (!outcome) { + debugLog("resume.missing_execution", { executionId }); + if (yield* localExecutionAlreadySettled(executionId)) { + return alreadySettledResult(executionId); + } + const fallback = yield* resumeFallback(executionId, { action, content }); + if (fallback) { + debugLog("resume.fallback_result", { executionId, status: fallback.status }); + return fallbackOutcomeResult(executionId, fallback); + } + return missingExecutionResult(executionId); } - const fallback = yield* resumeFallback(executionId, { action, content }); - if (fallback) { - debugLog("resume.fallback_result", { executionId, status: fallback.status }); - return fallbackOutcomeResult(executionId, fallback); + debugLog("resume.result", { + executionId, + status: outcome.status, + nextExecutionId: outcome.status === "paused" ? outcome.execution.id : undefined, + interactionKind: + outcome.status === "paused" + ? pausedInteractionKind(outcome.execution.elicitationContext.request) + : undefined, + }); + if (outcome.status === "paused") { + return yield* formatPausedModelResult(outcome.execution, "resume"); } - return missingExecutionResult(executionId); - } - debugLog("resume.result", { - executionId, - status: outcome.status, - nextExecutionId: outcome.status === "paused" ? outcome.execution.id : undefined, - interactionKind: - outcome.status === "paused" - ? pausedInteractionKind(outcome.execution.elicitationContext.request) - : undefined, - }); - if (outcome.status === "paused") { - return yield* formatPausedModelResult(outcome.execution, "resume"); - } - return toMcpResult(outcome.result); - }).pipe( + return toMcpResult(outcome.result); + }), + ).pipe( Effect.withSpan("mcp.host.tool.resume", { + kind: "consumer", attributes: { "mcp.tool.name": "resume", "mcp.execute.resume.action": action, @@ -899,28 +990,33 @@ export const createExecutorMcpServer = ( }; const resumeAfterBrowserApproval = (executionId: string): Effect.Effect => - Effect.gen(function* () { - const response = yield* waitForBrowserApprovalResponse(executionId); - if (!response) return yield* requireUserResumeApproval(executionId); - - const outcome = yield* resumeWithLifecycle(executionId, response); - if (!outcome) { - return missingExecutionResult(executionId); - } - if (outcome.status === "paused") { - const deadline = pauseDeadline(); - yield* Effect.annotateCurrentSpan({ - "mcp.execute.paused": true, - "mcp.execute.paused_execution_id": outcome.execution.id, - "mcp.execute.pause_source": "browser_resume", - }); - yield* onExecutionPaused(outcome.execution.id, deadline); - } - return outcome.status === "completed" - ? toMcpResult(outcome.result) - : yield* requireUserResumeApproval(outcome.execution.id); - }).pipe( + instrumentToolCall( + "resume", + { "mcp.execute.execution_id": executionId, "mcp.elicitation.mode": elicitationMode.mode }, + Effect.gen(function* () { + const response = yield* waitForBrowserApprovalResponse(executionId); + if (!response) return yield* requireUserResumeApproval(executionId); + + const outcome = yield* resumeWithLifecycle(executionId, response); + if (!outcome) { + return missingExecutionResult(executionId); + } + if (outcome.status === "paused") { + const deadline = pauseDeadline(); + yield* Effect.annotateCurrentSpan({ + "mcp.execute.paused": true, + "mcp.execute.paused_execution_id": outcome.execution.id, + "mcp.execute.pause_source": "browser_resume", + }); + yield* onExecutionPaused(outcome.execution.id, deadline); + } + return outcome.status === "completed" + ? toMcpResult(outcome.result) + : yield* requireUserResumeApproval(outcome.execution.id); + }), + ).pipe( Effect.withSpan("mcp.host.tool.resume.browser_approval", { + kind: "consumer", attributes: { "mcp.tool.name": "resume", "mcp.execute.execution_id": executionId, @@ -1018,22 +1114,22 @@ export const createExecutorMcpServer = ( }), ); - yield* Effect.sync(() => { - console.error( - "[executor] MCP session mode", - JSON.stringify({ - ...capabilitySnapshot(server), - elicitationMode: elicitationMode.mode, - resumeEnabled: elicitationMode.mode !== "native", + yield* Effect.logInfo("mcp.session.server_created").pipe( + Effect.annotateLogs({ + "mcp.elicitation.mode": elicitationMode.mode, + "mcp.resume.enabled": elicitationMode.mode !== "native", + }), + Effect.tap(() => + Effect.sync(() => { + debugLog("tool.visibility", { + ...capabilitySnapshot(server), + elicitationMode: elicitationMode.mode, + resumeEnabled: elicitationMode.mode !== "native", + }); }), - ); - debugLog("tool.visibility", { - clientCapabilities: server.server.getClientCapabilities() ?? null, - elicitationSupport: getElicitationSupport(server), - elicitationMode: elicitationMode.mode, - resumeEnabled: elicitationMode.mode !== "native", - }); - }).pipe(Effect.withSpan("mcp.host.sync_tool_availability")); + ), + Effect.withSpan("mcp.host.sync_tool_availability"), + ); return server; }).pipe(Effect.withSpan("mcp.host.create_executor_server"));