Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 69 additions & 55 deletions litellm/proxy/management_helpers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
## Helper utils for the management endpoints (keys/users/teams)
from datetime import datetime
from functools import wraps
from typing import List, Optional, Tuple
from typing import Any, Callable, List, Optional, Tuple

from fastapi import HTTPException, Request

Expand Down Expand Up @@ -435,6 +435,58 @@ async def send_management_endpoint_alert(
)


async def _emit_management_endpoint_otel_span(
func: Callable,
kwargs: dict,
parent_otel_span: Any,
start_time: datetime,
end_time: datetime,
result: Any = None,
exception: Optional[Exception] = None,
) -> None:
"""Stamp + end the parent OTEL SERVER span for a management endpoint.

Routes the request/response (or exception) through the OTEL success/failure
hook. Falls back to ``func.__name__`` for the route when the handler has no
``http_request`` param — endpoints like ``/key/generate`` never receive one,
and gating the hook on it leaked their SERVER span (created in auth, never
ended → never exported). Always emitting keeps both success and failure
paths consistent.
"""
from litellm.proxy.proxy_server import open_telemetry_logger

if open_telemetry_logger is None:
return

http_request: Optional[Request] = kwargs.get("http_request")
if http_request is not None:
route = http_request.url.path
request_body: dict = await _read_request_body(request=http_request)
else:
route = func.__name__
request_body = {}

logging_payload = ManagementEndpointLoggingPayload(
route=route,
request_data=request_body,
response=dict(result) if (exception is None and result is not None) else None,
start_time=start_time,
end_time=end_time,
exception=exception,
)

if exception is None:
await open_telemetry_logger.async_management_endpoint_success_hook(
logging_payload=logging_payload,
parent_otel_span=parent_otel_span,
)
else:
await open_telemetry_logger.async_management_endpoint_failure_hook(
logging_payload=logging_payload,
parent_otel_span=parent_otel_span,
)


def management_endpoint_wrapper(func):
"""
This wrapper does the following:
Expand All @@ -446,13 +498,10 @@ def management_endpoint_wrapper(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = datetime.now()
_http_request: Optional[Request] = None
try:
result = await func(*args, **kwargs)
end_time = datetime.now()
try:
if kwargs is None:
kwargs = {}
user_api_key_dict: UserAPIKeyAuth = (
kwargs.get("user_api_key_dict") or UserAPIKeyAuth()
)
Expand All @@ -462,31 +511,16 @@ async def wrapper(*args, **kwargs):
user_api_key_dict=user_api_key_dict,
function_name=func.__name__,
)
_http_request = kwargs.get("http_request", None)
parent_otel_span = getattr(user_api_key_dict, "parent_otel_span", None)
if parent_otel_span is not None:
from litellm.proxy.proxy_server import open_telemetry_logger

if open_telemetry_logger is not None:
if _http_request:
_route = _http_request.url.path
_request_body: dict = await _read_request_body(
request=_http_request
)
_response = dict(result) if result is not None else None

logging_payload = ManagementEndpointLoggingPayload(
route=_route,
request_data=_request_body,
response=_response,
start_time=start_time,
end_time=end_time,
)

await open_telemetry_logger.async_management_endpoint_success_hook( # type: ignore
logging_payload=logging_payload,
parent_otel_span=parent_otel_span,
)
await _emit_management_endpoint_otel_span(
func=func,
kwargs=kwargs,
parent_otel_span=parent_otel_span,
start_time=start_time,
end_time=end_time,
result=result,
)

# Delete updated/deleted info from cache
_delete_api_key_from_cache(kwargs=kwargs)
Expand All @@ -502,39 +536,19 @@ async def wrapper(*args, **kwargs):
except Exception as e:
end_time = datetime.now()

if kwargs is None:
kwargs = {}
user_api_key_dict: UserAPIKeyAuth = (
kwargs.get("user_api_key_dict") or UserAPIKeyAuth()
)
parent_otel_span = getattr(user_api_key_dict, "parent_otel_span", None)
if parent_otel_span is not None:
from litellm.proxy.proxy_server import open_telemetry_logger

if open_telemetry_logger is not None:
_http_request = kwargs.get("http_request")
if _http_request:
_route = _http_request.url.path
_request_body: dict = await _read_request_body(
request=_http_request
)
else:
_route = func.__name__
_request_body = {}

logging_payload = ManagementEndpointLoggingPayload(
route=_route,
request_data=_request_body,
response=None,
start_time=start_time,
end_time=end_time,
exception=e,
)

await open_telemetry_logger.async_management_endpoint_failure_hook( # type: ignore
logging_payload=logging_payload,
parent_otel_span=parent_otel_span,
)
await _emit_management_endpoint_otel_span(
func=func,
kwargs=kwargs,
parent_otel_span=parent_otel_span,
start_time=start_time,
end_time=end_time,
exception=e,
)

raise e

Expand Down
7 changes: 7 additions & 0 deletions otel_verify/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Runtime artifacts produced by a verification run (incl. a live virtual key)
.team_id
.team_key
spans.jsonl
spans_admin.jsonl
proxy.log
__pycache__/
18 changes: 18 additions & 0 deletions otel_verify/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
model_list:
- model_name: gpt-mock
litellm_params:
model: openai/gpt-4o-mini
api_key: sk-fake-unused

litellm_settings:
callbacks: ["otel_file_exporter.otel_logger"]

guardrails:
- guardrail_name: "verify-guardrail"
litellm_params:
guardrail: verify_guardrail.VerifyGuardrail
mode: "pre_call"
default_on: true

general_settings:
master_key: sk-1234
36 changes: 36 additions & 0 deletions otel_verify/otel_file_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Custom OTEL logger that writes every finished span as one compact JSON line.

Registered as a litellm callback (``otel_file_exporter.otel_logger``). Because it
subclasses litellm's ``OpenTelemetry`` integration, its ``__init__`` registers
itself as ``proxy_server.open_telemetry_logger`` (first-registered-wins), so the
proxy's SERVER-span lifecycle flows through it exactly like ``callbacks: ["otel"]``
would — but spans land in a file we can parse instead of pretty-printed stdout.
"""

import os
import threading

from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

from litellm.integrations.opentelemetry import OpenTelemetry, OpenTelemetryConfig

_SPAN_FILE = os.getenv("OTEL_SPAN_FILE", "/tmp/otel_spans.jsonl")
_lock = threading.Lock()


class FileSpanExporter(SpanExporter):
def export(self, spans):
with _lock, open(_SPAN_FILE, "a") as f:
for span in spans:
f.write(span.to_json(indent=None) + "\n")
return SpanExportResult.SUCCESS

def shutdown(self):
return None

def force_flush(self, timeout_millis: int = 30000) -> bool:
return True


# SimpleSpanProcessor is used for non-string exporters, so spans flush on .end().
otel_logger = OpenTelemetry(config=OpenTelemetryConfig(exporter=FileSpanExporter()))
65 changes: 65 additions & 0 deletions otel_verify/run_matrix.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env bash
# OTEL fidelity verification matrix. Each case uses a deterministic traceparent
# so its spans can be looked up by trace_id. Prints "LABEL -> HTTP <code>".
set -u
BASE=http://localhost:4000
MASTER="sk-1234"
TEAM_KEY="$(cat "$(dirname "$0")/.team_key")"

tp() { echo "00-$1-0000000000000001-01"; } # build a traceparent from a 32-hex trace id

run() { # label traceid curl-args...
local label="$1" tid="$2"; shift 2
local code
code=$(curl -s -o /dev/null -w "%{http_code}" -H "traceparent: $(tp "$tid")" "$@")
printf "%-28s trace=%s -> HTTP %s\n" "$label" "$tid" "$code"
}

J="Content-Type: application/json"

# ---- #28405 http.response.status_code + #28273 team attrs (success/failure) ----
run "T01_chat_200_team" 00000000000000000000000000000001 -X POST $BASE/v1/chat/completions \
-H "Authorization: Bearer $TEAM_KEY" -H "$J" \
-d '{"model":"gpt-mock","messages":[{"role":"user","content":"hello"}],"mock_response":"hi there"}'

run "T02_chat_400_badjson" 00000000000000000000000000000002 -X POST $BASE/v1/chat/completions \
-H "Authorization: Bearer $TEAM_KEY" -H "$J" -d '{ this is not valid json'

run "T03_chat_401_badkey" 00000000000000000000000000000003 -X POST $BASE/v1/chat/completions \
-H "Authorization: Bearer sk-does-not-exist" -H "$J" \
-d '{"model":"gpt-mock","messages":[{"role":"user","content":"hi"}],"mock_response":"x"}'

run "T04_chat_400_unknownmodel" 00000000000000000000000000000004 -X POST $BASE/v1/chat/completions \
-H "Authorization: Bearer $MASTER" -H "$J" \
-d '{"model":"definitely-not-a-real-model","messages":[{"role":"user","content":"hi"}]}'

run "T05_chat_429_team" 00000000000000000000000000000005 -X POST $BASE/v1/chat/completions \
-H "Authorization: Bearer $TEAM_KEY" -H "$J" \
-d '{"model":"gpt-mock","messages":[{"role":"user","content":"hi"}],"mock_response":"litellm.RateLimitError"}'

run "T06_chat_500_team" 00000000000000000000000000000006 -X POST $BASE/v1/chat/completions \
-H "Authorization: Bearer $TEAM_KEY" -H "$J" \
-d '{"model":"gpt-mock","messages":[{"role":"user","content":"hi"}],"mock_response":"litellm.InternalServerError"}'

run "T07_messages_200_team" 00000000000000000000000000000007 -X POST $BASE/v1/messages \
-H "Authorization: Bearer $TEAM_KEY" -H "$J" \
-d '{"model":"gpt-mock","max_tokens":50,"messages":[{"role":"user","content":"hi"}],"mock_response":"hi there"}'

# ---- #28405 admin-endpoint paths ----
run "T08_admin_200_keygen" 00000000000000000000000000000008 -X POST $BASE/key/generate \
-H "Authorization: Bearer $MASTER" -H "$J" -d '{"models":["gpt-mock"]}'

run "T09_admin_500_keygen" 00000000000000000000000000000009 -X POST $BASE/key/generate \
-H "Authorization: Bearer $MASTER" -H "$J" -d '{"duration":"not-a-valid-duration"}'

run "T10_admin_422_keygen" 00000000000000000000000000000010 -X POST $BASE/key/generate \
-H "Authorization: Bearer $MASTER" -H "$J" -d '{"models":"should-be-a-list"}'

# ---- #28362 serialize guardrail_response + #28364 guardrail span on failure ----
run "T11_guardrail_block_400" 00000000000000000000000000000011 -X POST $BASE/v1/chat/completions \
-H "Authorization: Bearer $TEAM_KEY" -H "$J" \
-d '{"model":"gpt-mock","messages":[{"role":"user","content":"please blockme now"}]}'

run "T12_guardrail_allow_200" 00000000000000000000000000000012 -X POST $BASE/v1/chat/completions \
-H "Authorization: Bearer $TEAM_KEY" -H "$J" \
-d '{"model":"gpt-mock","messages":[{"role":"user","content":"please scanme now"}],"mock_response":"clean"}'
22 changes: 22 additions & 0 deletions otel_verify/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env bash
# Create a team + a virtual key (with allow_client_mock_response so the matrix
# can drive deterministic mock successes/errors). Writes .team_id / .team_key
# which run_matrix.sh and verify_spans.py read. Re-run after a fresh proxy/db.
set -euo pipefail
BASE="${BASE:-http://localhost:4000}"
MASTER="${MASTER_KEY:-sk-1234}"
HERE="$(dirname "$0")"

team=$(curl -s -X POST "$BASE/team/new" -H "Authorization: Bearer $MASTER" \
-H "Content-Type: application/json" -d '{"team_alias":"otel-verify-team"}')
team_id=$(python3 -c "import json,sys; print(json.load(sys.stdin)['team_id'])" <<<"$team")

key=$(curl -s -X POST "$BASE/key/generate" -H "Authorization: Bearer $MASTER" \
-H "Content-Type: application/json" \
-d "{\"team_id\":\"$team_id\",\"models\":[\"gpt-mock\"],\"metadata\":{\"allow_client_mock_response\":true}}")
team_key=$(python3 -c "import json,sys; print(json.load(sys.stdin)['key'])" <<<"$key")

printf '%s' "$team_id" > "$HERE/.team_id"
printf '%s' "$team_key" > "$HERE/.team_key"
echo "team_id=$team_id"
echo "team_key=$team_key (saved to .team_key)"
Loading
Loading