diff --git a/apps/cloud/src/mcp/session-durable-object.ts b/apps/cloud/src/mcp/session-durable-object.ts index de2ef2a58..70d2f29ce 100644 --- a/apps/cloud/src/mcp/session-durable-object.ts +++ b/apps/cloud/src/mcp/session-durable-object.ts @@ -50,7 +50,7 @@ import { type DbServiceShape, } from "../db/db"; import { CloudExecutionStackLayer, makeExecutionStack } from "../engine/execution-stack"; -import { DoTelemetryLive } from "../observability/telemetry"; +import { DoTelemetryLive, flushTracerProvider } from "../observability/telemetry"; import { captureCause as reportCause } from "../observability"; // Re-export the shared types so existing cloud importers @@ -196,6 +196,9 @@ export class McpSessionDO extends McpSessionDOBase { ); // Build the description here so the postgres query it runs // (`executor.sources.list`) lands as a child of `McpSessionDO.createRuntime`. + // It also tags the span with this org's source/connector inventory (ids, + // kinds, plugin ids, connection counts) — see `buildExecuteDescription` — + // so a failing init names *what* it was resolving without re-listing. // host-mcp would otherwise call `Effect.runPromise(engine.getDescription)` // at its async MCP-SDK boundary and orphan the sub-span. const description = yield* buildExecuteDescription(executor); @@ -240,4 +243,13 @@ export class McpSessionDO extends McpSessionDOBase { protected override captureCause(cause: Cause.Cause): void { reportCause(cause); } + + // Force-export the DO isolate's buffered spans before the RPC settles, so a + // dying init/handleRequest still ships its own spans (and the exception + + // stack recorded on them) — not just the worker-side `mcp.do.*` span. The + // base wraps each entrypoint's outermost effect in an `ensuring` that awaits + // this after the span has ended and the SimpleSpanProcessor fired its export. + protected override flushTelemetry(): Promise { + return flushTracerProvider(); + } } diff --git a/packages/core/execution/src/description.ts b/packages/core/execution/src/description.ts index 97ad689ff..4b2e5939a 100644 --- a/packages/core/execution/src/description.ts +++ b/packages/core/execution/src/description.ts @@ -25,6 +25,24 @@ export const buildExecuteDescription = (executor: Executor): Effect.Effect source.id) + .slice(0, 50) + .join(","), + "executor.source_kinds": [...new Set(sources.map((source) => source.kind))].join(","), + "executor.source_plugin_ids": [...new Set(sources.map((source) => source.pluginId))].join( + ",", + ), + "executor.connection_count": sources.reduce( + (total, source) => total + source.connectionIds.length, + 0, + ), + "executor.sources_with_connection": sources.filter( + (source) => source.connectionIds.length > 0, + ).length, }); return description; diff --git a/packages/hosts/cloudflare/src/mcp/session-durable-object.ts b/packages/hosts/cloudflare/src/mcp/session-durable-object.ts index 8f12c9ffb..943368ac6 100644 --- a/packages/hosts/cloudflare/src/mcp/session-durable-object.ts +++ b/packages/hosts/cloudflare/src/mcp/session-durable-object.ts @@ -225,6 +225,47 @@ export abstract class McpSessionDOBase< /** Optional error seam: report a fatal request cause (cloud → Sentry). */ protected captureCause(_cause: Cause.Cause): void {} + /** Optional flush seam: force-export buffered spans before the DO RPC + * settles. Default is a no-op; cloud overrides it with the tracer + * provider's `forceFlush`. Without this, a DO whose Effect dies tears the + * isolate down with the SimpleSpanProcessor's in-flight export `fetch` + * still pending — so the failing method's OWN spans (and the exception + + * stack the Effect tracer records on them) never reach the collector, and + * only the worker-side `mcp.do.*` span (which carries just the RPC-boundary + * message, no real stack) survives. Flushing in an `ensuring` finalizer — + * placed OUTSIDE the span so the span has already ended and the + * SimpleSpanProcessor has fired its export — lets the flush await that + * export before the RPC rejects. */ + protected flushTelemetry(): Promise { + return Promise.resolve(); + } + + /** Mirror an error cause onto the active span as top-level attributes. The + * Effect OTel tracer already records the cause as an `exception` span + * EVENT (`exception.stacktrace` = the full `Cause.pretty` rendering), but + * span events are awkward to query in Axiom and the worker-side spans never + * see them. Copying type/message/stack onto plain span attributes makes the + * failing frame a one-field APL lookup on the span row itself. */ + private recordCauseOnSpan(cause: Cause.Cause): Effect.Effect { + const errors = Cause.prettyErrors(cause); + // No `Error` reasons means a pure interruption (or empty cause) — nothing + // worth annotating, and we don't want to flag interrupts as failures. + if (errors.length === 0) return Effect.void; + const first = errors[0]; + return Effect.annotateCurrentSpan({ + "exception.type": first?.name ?? "Error", + "exception.message": first?.message ?? "unknown", + "exception.stacktrace": Cause.pretty(cause), + }); + } + + /** Flush DO-side spans once the wrapped method (incl. its span) has ended, + * whether it succeeded or died. Apply as the OUTERMOST pipe step. */ + private withSpanFlush(effect: Effect.Effect): Effect.Effect { + const self = this; + return effect.pipe(Effect.ensuring(Effect.promise(() => self.flushTelemetry()))); + } + /** The session id — equal to this DO's id. */ protected get sessionId(): string { return this.ctx.id.toString(); @@ -546,6 +587,7 @@ export abstract class McpSessionDOBase< (eff) => this.withTelemetry(eff, incoming), // oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: Durable Object init method can only reject its Promise Effect.orDie, + (eff) => self.withSpanFlush(eff), ), ); } @@ -584,8 +626,11 @@ export abstract class McpSessionDOBase< ); }).pipe( Effect.tapCause((cause) => - Effect.sync(() => { - console.error("[mcp-session] init failed:", cause); + Effect.gen(function* () { + console.error("[mcp-session] init failed:", Cause.pretty(cause)); + // Annotate `McpSessionDO.init` (the active span here — `doInit` opens + // none of its own) so the surviving, flushed span names the frame. + yield* self.recordCauseOnSpan(cause); }), ), Effect.catchCause((cause) => @@ -637,6 +682,11 @@ export abstract class McpSessionDOBase< ), ); }).pipe( + // Cold-restore failures (`restoreRuntimeFromStorage` is `orDie`'d and is + // NOT under the handled `dispatchAuthorizedRequest` branch) die straight + // through here — annotate the handleRequest span with their stack before + // it ends so the flushed span names the frame. + Effect.tapCause((cause) => self.recordCauseOnSpan(cause)), Effect.withSpan("McpSessionDO.handleRequest", { attributes: { "mcp.request.method": request.method, @@ -644,6 +694,7 @@ export abstract class McpSessionDOBase< }, }), (eff) => this.withTelemetry(eff, incoming), + (eff) => self.withSpanFlush(eff), ); return Effect.runPromise(program); } @@ -678,6 +729,7 @@ export abstract class McpSessionDOBase< (eff) => this.withTelemetry(eff, incoming), // oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: DO RPC exposes Promise results Effect.orDie, + (eff) => self.withSpanFlush(eff), ), ); } @@ -752,6 +804,7 @@ export abstract class McpSessionDOBase< (eff) => this.withTelemetry(eff, incoming), // oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: DO RPC exposes Promise results Effect.orDie, + (eff) => self.withSpanFlush(eff), ), ); } @@ -814,9 +867,10 @@ export abstract class McpSessionDOBase< return response; }).pipe( Effect.catchCause((cause) => - Effect.sync(() => { + Effect.gen(function* () { console.error("[mcp-session] handleRequest error:", Cause.pretty(cause)); self.captureCause(cause); + yield* self.recordCauseOnSpan(cause); return jsonRpcError(500, -32603, "Internal error"); }), ), @@ -827,6 +881,7 @@ export abstract class McpSessionDOBase< const program = Effect.promise(() => this.runAlarm()).pipe( Effect.withSpan("McpSessionDO.alarm"), (eff) => this.withTelemetry(eff), + (eff) => this.withSpanFlush(eff), ); return Effect.runPromise(program); } @@ -836,6 +891,7 @@ export abstract class McpSessionDOBase< Effect.promise(() => this.cleanup()).pipe( Effect.withSpan("McpSessionDO.clearSession"), (eff) => this.withTelemetry(eff, incoming), + (eff) => this.withSpanFlush(eff), ), ); }