fix(responses): use OpenAI SSEDecoder for Responses API streaming#28566
Conversation
httpx aiter_lines() uses str.splitlines(), which splits on U+2028 inside JSON payloads and silently drops response.completed (no spend log). Use openai._streaming.SSEDecoder (bytes.splitlines before decode) instead. Co-authored-by: Cursor <cursoragent@cursor.com>
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
Greptile SummaryThis PR fixes a bug where U+2028 (LINE SEPARATOR) characters inside JSON payloads in Responses API SSE streams caused
Confidence Score: 5/5Safe to merge — the change is narrowly scoped to the SSE decode path, the fix aligns with how the OpenAI SDK itself decodes server-sent events, and both the bug scenario and normal stream termination paths are covered by the updated tests. The new decoder correctly handles U+2028 at the byte level, the empty-string short-circuit in _process_chunk covers keep-alive/comment-only events that SSEDecoder may emit with an empty data field, and [DONE] marker handling is unaffected. Mock updates in the tests are structurally equivalent to the originals — they exercise the same code paths through the new interface rather than bypassing them. No files require special attention.
|
| Filename | Overview |
|---|---|
| litellm/responses/streaming_iterator.py | Replaces httpx aiter_lines/iter_lines with OpenAI's SSEDecoder.aiter_bytes/iter_bytes to fix U+2028 line-split bug; removes now-redundant _strip_sse_data_from_chunk call and passes sse.data directly to _process_chunk. |
| tests/llm_responses_api_testing/test_base_responses_api_streaming_iterator.py | Adds regression test for U+2028 inside SSE JSON; updates existing stop-iteration and error-path tests to mock aiter_bytes/iter_bytes instead of aiter_lines/iter_lines to match the new decoder interface. |
Reviews (2): Last reviewed commit: "fix(responses): drop redundant SSE prefi..." | Re-trigger Greptile
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using high effort and found 1 potential issue.
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Redundant SSE prefix stripping may corrupt data values
- Removed the redundant CustomStreamWrapper._strip_sse_data_from_chunk call (and unused import) in _process_chunk since SSEDecoder already strips the SSE 'data:' field prefix, preventing potential corruption of payloads whose content begins with 'data:'.
Preview (0b353ab2e6)
diff --git a/litellm/responses/streaming_iterator.py b/litellm/responses/streaming_iterator.py
--- a/litellm/responses/streaming_iterator.py
+++ b/litellm/responses/streaming_iterator.py
@@ -9,6 +9,7 @@
from typing import Any, Dict, List, Literal, Optional
import httpx
+from openai._streaming import SSEDecoder
import litellm
from litellm.constants import (
@@ -27,7 +28,7 @@
from litellm.responses.utils import ResponsesAPIRequestUtils
from litellm.types.llms.openai import ResponsesAPIStreamEvents
from litellm.types.utils import CallTypes
-from litellm.utils import CustomStreamWrapper, async_post_call_success_deployment_hook
+from litellm.utils import async_post_call_success_deployment_hook
@lru_cache(maxsize=1)
@@ -120,10 +121,10 @@
if not chunk:
return None
- # Handle SSE format (data: {...})
- chunk = CustomStreamWrapper._strip_sse_data_from_chunk(chunk)
- if chunk is None:
- return None
+ # NOTE: ``SSEDecoder`` already strips the SSE ``data:`` field prefix, so
+ # the value passed in here is the raw field content. Do not re-run
+ # ``_strip_sse_data_from_chunk`` on it — doing so would incorrectly mangle
+ # payloads whose actual JSON value happens to start with ``data:``.
# Handle "[DONE]" marker
if chunk == STREAM_SSE_DONE_STRING:
@@ -634,7 +635,7 @@
request_data,
call_type,
)
- self.stream_iterator = response.aiter_lines()
+ self.stream_iterator = SSEDecoder().aiter_bytes(response.aiter_bytes())
def __aiter__(self):
return self
@@ -645,13 +646,13 @@
while True:
# Get the next chunk from the stream
try:
- chunk = await self.stream_iterator.__anext__()
+ sse = await self.stream_iterator.__anext__()
except StopAsyncIteration:
self.finished = True
raise StopAsyncIteration
self._check_max_streaming_duration()
- result = self._process_chunk(chunk)
+ result = self._process_chunk(sse.data)
if self.finished:
raise StopAsyncIteration
@@ -708,7 +709,7 @@
request_data,
call_type,
)
- self.stream_iterator = response.iter_lines()
+ self.stream_iterator = SSEDecoder().iter_bytes(response.iter_bytes())
def __iter__(self):
return self
@@ -719,13 +720,13 @@
while True:
# Get the next chunk from the stream
try:
- chunk = next(self.stream_iterator)
+ sse = next(self.stream_iterator)
except StopIteration:
self.finished = True
raise StopIteration
self._check_max_streaming_duration()
- result = self._process_chunk(chunk)
+ result = self._process_chunk(sse.data)
if self.finished:
raise StopIteration
diff --git a/tests/llm_responses_api_testing/test_base_responses_api_streaming_iterator.py b/tests/llm_responses_api_testing/test_base_responses_api_streaming_iterator.py
--- a/tests/llm_responses_api_testing/test_base_responses_api_streaming_iterator.py
+++ b/tests/llm_responses_api_testing/test_base_responses_api_streaming_iterator.py
@@ -41,6 +41,62 @@
class TestBaseResponsesAPIStreamingIterator:
"""Test cases for BaseResponsesAPIStreamingIterator"""
+ @pytest.mark.asyncio
+ async def test_responses_streaming_iterator_parses_u2028_in_sse_json(self):
+ """
+ U+2028 inside JSON must not split the SSE event. httpx aiter_lines uses
+ str.splitlines() and drops response.completed; OpenAI SSEDecoder does not.
+ """
+ from litellm.responses.streaming_iterator import ResponsesAPIStreamingIterator
+
+ u2028 = "\u2028"
+ payload = json.dumps(
+ {
+ "type": "response.completed",
+ "response": {"instructions": f"eligible{u2028}promo"},
+ }
+ )
+ sse_bytes = f"data: {payload}\n\n".encode("utf-8")
+
+ async def mock_aiter_bytes():
+ yield sse_bytes
+
+ mock_response = Mock()
+ mock_response.headers = {}
+ mock_response.aiter_bytes = mock_aiter_bytes
+
+ mock_logging_obj = Mock(spec=LiteLLMLoggingObj)
+ mock_logging_obj.model_call_details = {"litellm_params": {}}
+ mock_config = Mock(spec=BaseResponsesAPIConfig)
+
+ mock_responses_api_response = Mock(spec=ResponsesAPIResponse)
+ mock_responses_api_response.id = "resp_u2028"
+ mock_completed_event = Mock(spec=ResponseCompletedEvent)
+ mock_completed_event.type = ResponsesAPIStreamEvents.RESPONSE_COMPLETED
+ mock_completed_event.response = mock_responses_api_response
+ mock_config.transform_streaming_response.return_value = mock_completed_event
+
+ iterator = ResponsesAPIStreamingIterator(
+ response=mock_response,
+ model="gpt-5.5",
+ responses_api_provider_config=mock_config,
+ logging_obj=mock_logging_obj,
+ litellm_metadata={"model_info": {"id": "model_123"}},
+ custom_llm_provider="openai",
+ )
+
+ chunks = []
+ with (
+ patch("asyncio.create_task"),
+ patch("litellm.responses.streaming_iterator.executor"),
+ ):
+ async for chunk in iterator:
+ chunks.append(chunk)
+
+ assert len(chunks) == 1
+ assert chunks[0].type == ResponsesAPIStreamEvents.RESPONSE_COMPLETED
+ assert iterator.completed_response is not None
+
def test_process_chunk_with_response_completed_event(self):
"""
Test that _process_chunk correctly processes a ResponseCompletedEvent
@@ -270,7 +326,7 @@
# Mock dependencies
mock_response = Mock()
mock_response.headers = {}
- mock_response.aiter_lines = Mock()
+ mock_response.aiter_bytes = Mock()
mock_logging_obj = Mock(spec=LiteLLMLoggingObj)
mock_logging_obj.model_call_details = {"litellm_params": {}}
mock_logging_obj.async_success_handler = Mock()
@@ -334,12 +390,10 @@
mock_response = Mock()
mock_response.headers = {}
- # Create an async iterator that raises StopAsyncIteration after yielding one chunk
- async def mock_aiter_lines():
- yield 'data: {"type": "response.output_text.delta", "delta": "test"}'
- # Normal end of stream - raise StopAsyncIteration
+ async def mock_aiter_bytes():
+ yield b'data: {"type": "response.output_text.delta", "delta": "test"}\n\n'
- mock_response.aiter_lines = mock_aiter_lines
+ mock_response.aiter_bytes = mock_aiter_bytes
mock_logging_obj = Mock(spec=LiteLLMLoggingObj)
mock_logging_obj.model_call_details = {"litellm_params": {}}
@@ -396,12 +450,10 @@
mock_response = Mock()
mock_response.headers = {}
- # Create a sync iterator that raises StopIteration after yielding one chunk
- def mock_iter_lines():
- yield 'data: {"type": "response.output_text.delta", "delta": "test"}'
- # Normal end of stream - raise StopIteration
+ def mock_iter_bytes():
+ yield b'data: {"type": "response.output_text.delta", "delta": "test"}\n\n'
- mock_response.iter_lines = mock_iter_lines
+ mock_response.iter_bytes = mock_iter_bytes
mock_logging_obj = Mock(spec=LiteLLMLoggingObj)
mock_logging_obj.model_call_details = {"litellm_params": {}}
@@ -450,7 +502,7 @@
mock_response = Mock()
mock_response.headers = {}
- mock_response.aiter_lines = Mock()
+ mock_response.aiter_bytes = Mock()
mock_logging_obj = Mock(spec=LiteLLMLoggingObj)
mock_logging_obj.model_call_details = {"litellm_params": {}}
mock_logging_obj.async_failure_handler = Mock()
@@ -532,7 +584,7 @@
mock_response = Mock()
mock_response.headers = {}
- mock_response.aiter_lines = Mock()
+ mock_response.aiter_bytes = Mock()
mock_logging_obj = Mock(spec=LiteLLMLoggingObj)
mock_logging_obj.model_call_details = {"litellm_params": {}}
mock_logging_obj.async_failure_handler = Mock()You can send follow-ups to the cloud agent here.
Reviewed by Cursor Bugbot for commit 0923b6e. Configure here.
SSEDecoder already strips the 'data:' field prefix from each event, so the extra call to _strip_sse_data_from_chunk on sse.data was redundant and could incorrectly mangle payloads whose actual content starts with 'data:'. Co-authored-by: Yassin Kortam <yassin@berri.ai>
|
|
21a21e0
into
litellm_internal_staging
…m-model ADDITIVE feature on the v1.89.0-rc.1 line. When a request targets a router/alias (e.g. an auto_router/smart-router), the proxy restamps response.model back to the requested alias, so clients can't see which backend actually served the request. Upstream's stock headers identify the deployment (x-litellm-model-id), the backend URL (x-litellm-model-api-base) and the client alias (x-litellm-model-group), but none names the routed backend MODEL — this adds x-litellm-downstream-model. Purely additive, hooked into upstream's existing code (never replaces it): - _override_openai_response_model already computes downstream_model for its mismatch logging; stash it into response._hidden_params['downstream_model'] (dict + object paths) right before the existing requested-model override. - STREAMING: streams build headers from the wrapper's _hidden_params and return early, before the override runs — so derive the value there from the stream wrapper's .model (falling back to the routed deployment's litellm_params.model) and stash it into hidden_params ahead of get_custom_headers. Survives the streamed path (verified live: streamed smart-router request returns x-litellm-downstream-model: MiniMax-M3). - get_custom_headers emits the header alongside the other x-litellm-model-* headers; None is filtered by the existing exclude_values pass, so the header is absent when routed == requested. - 5 unit tests in TestDownstreamModelHeader. Supersedes the v1.85.0-based commits ed0c355/73e4cf5 (archive/v1.85.0-patches), re-anchored for the rewritten common_request_processing.py. The old local codex-output-fix is intentionally dropped: upstream fixed it natively in PR BerriAI#28566 (SSEDecoder) + PR BerriAI#26569 (sse_output_recovery), both in this base.

Summary
httpxaiter_lines()/iter_lines()with OpenAI'sSSEDecoderfor Responses API streaming iteratorshttpxusesstr.splitlines(), which treatsU+2028(LINE SEPARATOR) inside JSON as a line break, causingresponse.completedevents to fail JSON parse and be silently dropped — no spend logTest plan
pytest tests/llm_responses_api_testing/test_base_responses_api_streaming_iterator.py::TestBaseResponsesAPIStreamingIterator::test_responses_streaming_iterator_parses_u2028_in_sse_json -vaiter_bytes/iter_bytesNote
Medium Risk
Changes the low-level streaming decode path for Responses API (sync + async), which can affect event boundaries and parsing across providers; risk is mitigated by added regression coverage for U+2028 and updated iterator tests.
Overview
Switches Responses API HTTP streaming iterators from
httpxline splitting (aiter_lines/iter_lines) to OpenAI’s byte-basedSSEDecoder, and adjusts chunk handling to consumesse.datawithout re-strippingdata:prefixes.Adds a regression test ensuring
response.completedevents containing U+2028 inside JSON aren’t split/dropped, and updates existing streaming iterator tests to mockaiter_bytes/iter_bytespayloads to match the new decoder behavior.Reviewed by Cursor Bugbot for commit 0b353ab. Bugbot is set up for automated code reviews on this repo. Configure here.