Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
)
from .execute_tool_scope import ExecuteToolScope
from .execution_type import ExecutionType
from .exporters.enriched_span import EnrichedReadableSpan
from .exporters.enriching_span_processor import (
get_span_enricher,
register_span_enricher,
unregister_span_enricher,
)
from .inference_call_details import InferenceCallDetails
from .inference_operation_type import InferenceOperationType
from .inference_scope import InferenceScope
Expand All @@ -32,6 +38,11 @@
"is_configured",
"get_tracer",
"get_tracer_provider",
# Span enrichment
"register_span_enricher",
"unregister_span_enricher",
"get_span_enricher",
"EnrichedReadableSpan",
# Span processor
"SpanProcessor",
# Base scope class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_NAMESPACE, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.trace.export import ConsoleSpanExporter

from .exporters.agent365_exporter import _Agent365Exporter
from .exporters.agent365_exporter_options import Agent365ExporterOptions
from .exporters.enriching_span_processor import (
_EnrichingBatchSpanProcessor,
)
from .exporters.utils import is_agent365_exporter_enabled
from .trace_processor.span_processor import SpanProcessor

Expand Down Expand Up @@ -166,8 +169,9 @@ def _configure_internal(

# Add span processors

# Create BatchSpanProcessor with optimized settings
batch_processor = BatchSpanProcessor(exporter, **batch_processor_kwargs)
# Create _EnrichingBatchSpanProcessor with optimized settings
# This allows extensions to enrich spans before export
batch_processor = _EnrichingBatchSpanProcessor(exporter, **batch_processor_kwargs)
agent_processor = SpanProcessor()

tracer_provider.add_span_processor(batch_processor)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Enriched ReadableSpan wrapper for adding attributes to immutable spans."""

import json
from typing import Any

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.util import types


class EnrichedReadableSpan(ReadableSpan):
"""
Wrapper to add attributes to an immutable ReadableSpan.

Since ReadableSpan is immutable after a span ends, this wrapper allows
extensions to add additional attributes before export without modifying
the original span.
"""

def __init__(self, span: ReadableSpan, extra_attributes: dict):
"""
Initialize the enriched span wrapper.

Args:
span: The original ReadableSpan to wrap.
extra_attributes: Additional attributes to merge with the original.
"""
self._span = span
self._extra_attributes = extra_attributes

@property
def attributes(self) -> types.Attributes:
"""Return merged attributes from original span and extra attributes."""
original = dict(self._span.attributes or {})
original.update(self._extra_attributes)
return original

@property
def name(self):
"""Return the span name."""
return self._span.name

@property
def context(self):
"""Return the span context."""
return self._span.context

@property
def parent(self):
"""Return the parent span context."""
return self._span.parent

@property
def start_time(self):
"""Return the span start time."""
return self._span.start_time

@property
def end_time(self):
"""Return the span end time."""
return self._span.end_time

@property
def status(self):
"""Return the span status."""
return self._span.status

@property
def kind(self):
"""Return the span kind."""
return self._span.kind

@property
def events(self):
"""Return the span events."""
return self._span.events

@property
def links(self):
"""Return the span links."""
return self._span.links

@property
def resource(self):
"""Return the span resource."""
return self._span.resource

@property
def instrumentation_scope(self):
"""Return the instrumentation scope."""
return self._span.instrumentation_scope

def to_json(self, indent: int | None = 4) -> str:
"""
Convert span to JSON string with enriched attributes.

Args:
indent: JSON indentation level.

Returns:
JSON string representation of the span.
"""
# Build the JSON dict manually to include enriched attributes
return json.dumps(
{
"name": self.name,
"context": {
"trace_id": f"0x{self.context.trace_id:032x}",
"span_id": f"0x{self.context.span_id:016x}",
"trace_state": str(self.context.trace_state),
}
if self.context
else None,
"kind": str(self.kind),
"parent_id": f"0x{self.parent.span_id:016x}" if self.parent else None,
"start_time": self._format_time(self.start_time),
"end_time": self._format_time(self.end_time),
"status": {
"status_code": str(self.status.status_code),
"description": self.status.description,
}
if self.status
else None,
"attributes": dict(self.attributes) if self.attributes else None,
"events": [self._format_event(e) for e in self.events] if self.events else None,
"links": [self._format_link(lnk) for lnk in self.links] if self.links else None,
"resource": dict(self.resource.attributes) if self.resource else None,
},
indent=indent,
)

def _format_time(self, time_ns: int | None) -> str | None:
"""Format nanosecond timestamp to ISO string."""
if time_ns is None:
return None
from datetime import datetime, timezone

return datetime.fromtimestamp(time_ns / 1e9, tz=timezone.utc).isoformat()

def _format_event(self, event: Any) -> dict:
"""Format a span event."""
return {
"name": event.name,
"timestamp": self._format_time(event.timestamp),
"attributes": dict(event.attributes) if event.attributes else None,
}

def _format_link(self, link: Any) -> dict:
"""Format a span link."""
return {
"context": {
"trace_id": f"0x{link.context.trace_id:032x}",
"span_id": f"0x{link.context.span_id:016x}",
}
if link.context
else None,
"attributes": dict(link.attributes) if link.attributes else None,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Span enrichment support for the Agent365 exporter pipeline."""

import logging
import threading
from collections.abc import Callable

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import BatchSpanProcessor

logger = logging.getLogger(__name__)

# Single span enricher - only one platform instrumentor should be active at a time
_span_enricher: Callable[[ReadableSpan], ReadableSpan] | None = None
_enricher_lock = threading.Lock()


def register_span_enricher(enricher: Callable[[ReadableSpan], ReadableSpan]) -> None:
"""Register the span enricher for the active platform instrumentor.

Only one enricher can be registered at a time since auto-instrumentation
is platform-specific (Semantic Kernel, LangChain, or OpenAI Agents).

Args:
enricher: Function that takes a ReadableSpan and returns an enriched span.

Raises:
RuntimeError: If an enricher is already registered.
"""
global _span_enricher
with _enricher_lock:
if _span_enricher is not None:
raise RuntimeError(
"A span enricher is already registered. "
"Only one platform instrumentor can be active at a time."
)
_span_enricher = enricher
logger.debug("Span enricher registered: %s", enricher.__name__)


def unregister_span_enricher() -> None:
"""Unregister the current span enricher.

Called during uninstrumentation to clean up.
"""
global _span_enricher
with _enricher_lock:
if _span_enricher is not None:
logger.debug("Span enricher unregistered: %s", _span_enricher.__name__)
_span_enricher = None


def get_span_enricher() -> Callable[[ReadableSpan], ReadableSpan] | None:
"""Get the currently registered span enricher.

Returns:
The registered enricher function, or None if no enricher is registered.
"""
with _enricher_lock:
return _span_enricher


class _EnrichingBatchSpanProcessor(BatchSpanProcessor):
"""BatchSpanProcessor that applies the registered enricher before batching."""

def on_end(self, span: ReadableSpan) -> None:
"""Apply the span enricher and pass to parent for batching.

Args:
span: The span that has ended.
"""
enriched_span = span

enricher = get_span_enricher()
if enricher is not None:
try:
enriched_span = enricher(span)
except Exception:
logger.exception(
"Span enricher %s raised an exception, using original span",
enricher.__name__,
)

super().on_end(enriched_span)
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Span enricher for Semantic Kernel."""

from microsoft_agents_a365.observability.core.constants import (
EXECUTE_TOOL_OPERATION_NAME,
GEN_AI_INPUT_MESSAGES_KEY,
GEN_AI_OUTPUT_MESSAGES_KEY,
GEN_AI_TOOL_ARGS_KEY,
GEN_AI_TOOL_CALL_RESULT_KEY,
INVOKE_AGENT_OPERATION_NAME,
)
from microsoft_agents_a365.observability.core.exporters.enriched_span import EnrichedReadableSpan
from opentelemetry.sdk.trace import ReadableSpan

from .utils import extract_content_as_string_list

# Semantic Kernel specific attribute keys
SK_TOOL_CALL_ARGUMENTS_KEY = "gen_ai.tool.call.arguments"
SK_TOOL_CALL_RESULT_KEY = "gen_ai.tool.call.result"


def enrich_semantic_kernel_span(span: ReadableSpan) -> ReadableSpan:
"""
Enricher function for Semantic Kernel spans.

Transforms SK-specific attributes to standard gen_ai attributes
before the span is exported. Enrichment is applied based on span type:
- invoke_agent spans: Extract only content from input/output messages
- execute_tool spans: Map tool arguments and results to standard keys

Args:
span: The ReadableSpan to enrich.

Returns:
The enriched span (wrapped if attributes were added), or the
original span if no enrichment was needed.
"""
extra_attributes = {}
attributes = span.attributes or {}

# Only extract content for invoke_agent spans
if span.name.startswith(INVOKE_AGENT_OPERATION_NAME):
# Transform SK-specific agent invocation attributes to standard gen_ai attributes
# Extract only the content from the full message objects
# Support both gen_ai.agent.invocation_input and gen_ai.input_messages as sources
input_messages = attributes.get("gen_ai.agent.invocation_input") or attributes.get(
GEN_AI_INPUT_MESSAGES_KEY
)
if input_messages:
extra_attributes[GEN_AI_INPUT_MESSAGES_KEY] = extract_content_as_string_list(
input_messages
)

output_messages = attributes.get("gen_ai.agent.invocation_output") or attributes.get(
GEN_AI_OUTPUT_MESSAGES_KEY
)
if output_messages:
extra_attributes[GEN_AI_OUTPUT_MESSAGES_KEY] = extract_content_as_string_list(
output_messages
)

# Map tool attributes for execute_tool spans
elif span.name.startswith(EXECUTE_TOOL_OPERATION_NAME):
if SK_TOOL_CALL_ARGUMENTS_KEY in attributes:
extra_attributes[GEN_AI_TOOL_ARGS_KEY] = attributes[SK_TOOL_CALL_ARGUMENTS_KEY]

if SK_TOOL_CALL_RESULT_KEY in attributes:
extra_attributes[GEN_AI_TOOL_CALL_RESULT_KEY] = attributes[SK_TOOL_CALL_RESULT_KEY]

if extra_attributes:
return EnrichedReadableSpan(span, extra_attributes)

return span
Loading
Loading