Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion apps/cloud/src/mcp/session-durable-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import {
type DbServiceShape,
} from "../db/db";
import { CloudExecutionStackLayer, makeExecutionStack } from "../engine/execution-stack";
import { DoTelemetryLive } from "../observability/telemetry";
import { DoTelemetryLive, flushTracerProvider } from "../observability/telemetry";
import { captureCause as reportCause } from "../observability";

// Re-export the shared types so existing cloud importers
Expand Down Expand Up @@ -196,6 +196,9 @@ export class McpSessionDO extends McpSessionDOBase<CloudSessionDbHandle> {
);
// Build the description here so the postgres query it runs
// (`executor.sources.list`) lands as a child of `McpSessionDO.createRuntime`.
// It also tags the span with this org's source/connector inventory (ids,
// kinds, plugin ids, connection counts) — see `buildExecuteDescription` —
// so a failing init names *what* it was resolving without re-listing.
// host-mcp would otherwise call `Effect.runPromise(engine.getDescription)`
// at its async MCP-SDK boundary and orphan the sub-span.
const description = yield* buildExecuteDescription(executor);
Expand Down Expand Up @@ -240,4 +243,13 @@ export class McpSessionDO extends McpSessionDOBase<CloudSessionDbHandle> {
protected override captureCause(cause: Cause.Cause<unknown>): void {
reportCause(cause);
}

// Force-export the DO isolate's buffered spans before the RPC settles, so a
// dying init/handleRequest still ships its own spans (and the exception +
// stack recorded on them) — not just the worker-side `mcp.do.*` span. The
// base wraps each entrypoint's outermost effect in an `ensuring` that awaits
// this after the span has ended and the SimpleSpanProcessor fired its export.
protected override flushTelemetry(): Promise<void> {
return flushTracerProvider();
}
}
18 changes: 18 additions & 0 deletions packages/core/execution/src/description.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,24 @@ export const buildExecuteDescription = (executor: Executor): Effect.Effect<strin
yield* Effect.annotateCurrentSpan({
"executor.source_count": sources.length,
"schema.kind": "execute",
// Source/connector inventory so a failing session build (which runs this
// during init) names *what* it was resolving: empty/OpenAPI-only scopes
// build cleanly, scopes with remote MCP connectors are the ones that fail.
"executor.source_ids": sources
.map((source) => source.id)
.slice(0, 50)
.join(","),
"executor.source_kinds": [...new Set(sources.map((source) => source.kind))].join(","),
"executor.source_plugin_ids": [...new Set(sources.map((source) => source.pluginId))].join(
",",
),
"executor.connection_count": sources.reduce(
(total, source) => total + source.connectionIds.length,
0,
),
"executor.sources_with_connection": sources.filter(
(source) => source.connectionIds.length > 0,
).length,
});

return description;
Expand Down
62 changes: 59 additions & 3 deletions packages/hosts/cloudflare/src/mcp/session-durable-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,47 @@ export abstract class McpSessionDOBase<
/** Optional error seam: report a fatal request cause (cloud → Sentry). */
protected captureCause(_cause: Cause.Cause<unknown>): void {}

/** Optional flush seam: force-export buffered spans before the DO RPC
* settles. Default is a no-op; cloud overrides it with the tracer
* provider's `forceFlush`. Without this, a DO whose Effect dies tears the
* isolate down with the SimpleSpanProcessor's in-flight export `fetch`
* still pending — so the failing method's OWN spans (and the exception +
* stack the Effect tracer records on them) never reach the collector, and
* only the worker-side `mcp.do.*` span (which carries just the RPC-boundary
* message, no real stack) survives. Flushing in an `ensuring` finalizer —
* placed OUTSIDE the span so the span has already ended and the
* SimpleSpanProcessor has fired its export — lets the flush await that
* export before the RPC rejects. */
protected flushTelemetry(): Promise<void> {
return Promise.resolve();
}

/** Mirror an error cause onto the active span as top-level attributes. The
* Effect OTel tracer already records the cause as an `exception` span
* EVENT (`exception.stacktrace` = the full `Cause.pretty` rendering), but
* span events are awkward to query in Axiom and the worker-side spans never
* see them. Copying type/message/stack onto plain span attributes makes the
* failing frame a one-field APL lookup on the span row itself. */
private recordCauseOnSpan(cause: Cause.Cause<unknown>): Effect.Effect<void> {
const errors = Cause.prettyErrors(cause);
// No `Error` reasons means a pure interruption (or empty cause) — nothing
// worth annotating, and we don't want to flag interrupts as failures.
if (errors.length === 0) return Effect.void;
const first = errors[0];
return Effect.annotateCurrentSpan({
"exception.type": first?.name ?? "Error",
"exception.message": first?.message ?? "unknown",
"exception.stacktrace": Cause.pretty(cause),
});
}

/** Flush DO-side spans once the wrapped method (incl. its span) has ended,
* whether it succeeded or died. Apply as the OUTERMOST pipe step. */
private withSpanFlush<A, E>(effect: Effect.Effect<A, E>): Effect.Effect<A, E> {
const self = this;
return effect.pipe(Effect.ensuring(Effect.promise(() => self.flushTelemetry())));
}

/** The session id — equal to this DO's id. */
protected get sessionId(): string {
return this.ctx.id.toString();
Expand Down Expand Up @@ -546,6 +587,7 @@ export abstract class McpSessionDOBase<
(eff) => this.withTelemetry(eff, incoming),
// oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: Durable Object init method can only reject its Promise
Effect.orDie,
(eff) => self.withSpanFlush(eff),
),
);
}
Expand Down Expand Up @@ -584,8 +626,11 @@ export abstract class McpSessionDOBase<
);
}).pipe(
Effect.tapCause((cause) =>
Effect.sync(() => {
console.error("[mcp-session] init failed:", cause);
Effect.gen(function* () {
console.error("[mcp-session] init failed:", Cause.pretty(cause));
// Annotate `McpSessionDO.init` (the active span here — `doInit` opens
// none of its own) so the surviving, flushed span names the frame.
yield* self.recordCauseOnSpan(cause);
}),
),
Effect.catchCause((cause) =>
Expand Down Expand Up @@ -637,13 +682,19 @@ export abstract class McpSessionDOBase<
),
);
}).pipe(
// Cold-restore failures (`restoreRuntimeFromStorage` is `orDie`'d and is
// NOT under the handled `dispatchAuthorizedRequest` branch) die straight
// through here — annotate the handleRequest span with their stack before
// it ends so the flushed span names the frame.
Effect.tapCause((cause) => self.recordCauseOnSpan(cause)),
Effect.withSpan("McpSessionDO.handleRequest", {
attributes: {
"mcp.request.method": request.method,
"mcp.request.session_id_present": !!request.headers.get("mcp-session-id"),
},
}),
(eff) => this.withTelemetry(eff, incoming),
(eff) => self.withSpanFlush(eff),
);
return Effect.runPromise(program);
}
Expand Down Expand Up @@ -678,6 +729,7 @@ export abstract class McpSessionDOBase<
(eff) => this.withTelemetry(eff, incoming),
// oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: DO RPC exposes Promise results
Effect.orDie,
(eff) => self.withSpanFlush(eff),
),
);
}
Expand Down Expand Up @@ -752,6 +804,7 @@ export abstract class McpSessionDOBase<
(eff) => this.withTelemetry(eff, incoming),
// oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: DO RPC exposes Promise results
Effect.orDie,
(eff) => self.withSpanFlush(eff),
),
);
}
Expand Down Expand Up @@ -814,9 +867,10 @@ export abstract class McpSessionDOBase<
return response;
}).pipe(
Effect.catchCause((cause) =>
Effect.sync(() => {
Effect.gen(function* () {
console.error("[mcp-session] handleRequest error:", Cause.pretty(cause));
self.captureCause(cause);
yield* self.recordCauseOnSpan(cause);
return jsonRpcError(500, -32603, "Internal error");
}),
),
Expand All @@ -827,6 +881,7 @@ export abstract class McpSessionDOBase<
const program = Effect.promise(() => this.runAlarm()).pipe(
Effect.withSpan("McpSessionDO.alarm"),
(eff) => this.withTelemetry(eff),
(eff) => this.withSpanFlush(eff),
);
return Effect.runPromise(program);
}
Expand All @@ -836,6 +891,7 @@ export abstract class McpSessionDOBase<
Effect.promise(() => this.cleanup()).pipe(
Effect.withSpan("McpSessionDO.clearSession"),
(eff) => this.withTelemetry(eff, incoming),
(eff) => this.withSpanFlush(eff),
),
);
}
Expand Down
Loading