From a9cf69ed06121680eec30879f8af11195c51e2ba Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Fri, 10 Apr 2026 18:39:59 -0700 Subject: [PATCH 01/13] feat: streaming support in m serve OpenAI API server Fixes: #822 Signed-off-by: Mark Sturdevant --- cli/serve/app.py | 36 ++--- cli/serve/models.py | 74 ++++++++-- docs/examples/m_serve/README.md | 64 ++++++++- docs/examples/m_serve/client_streaming.py | 49 +++++++ .../m_serve/m_serve_example_streaming.py | 41 ++++++ mellea/helpers/openai_compatible_helpers.py | 127 +++++++++++++++++- test/cli/test_build_model_options.py | 8 +- 7 files changed, 362 insertions(+), 37 deletions(-) create mode 100644 docs/examples/m_serve/client_streaming.py create mode 100644 docs/examples/m_serve/m_serve_example_streaming.py diff --git a/cli/serve/app.py b/cli/serve/app.py index b4613b9cf..405be7105 100644 --- a/cli/serve/app.py +++ b/cli/serve/app.py @@ -12,16 +12,19 @@ import uvicorn from fastapi import FastAPI, Request from fastapi.exceptions import RequestValidationError -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, StreamingResponse from mellea.backends.model_options import ModelOption +from mellea.helpers.openai_compatible_helpers import ( + build_completion_usage, + stream_chat_completion_chunks, +) from .models import ( ChatCompletion, ChatCompletionMessage, ChatCompletionRequest, Choice, - CompletionUsage, OpenAIError, OpenAIErrorResponse, ) @@ -94,8 +97,8 @@ def _build_model_options(request: ChatCompletionRequest) -> dict: "n", # Number of completions - not supported in Mellea's model_options "user", # User tracking ID - metadata, not a generation parameter "extra", # Pydantic's extra fields dict - unused (see model_config) + "stream_options", # Streaming options - handled separately in streaming response # Not-yet-implemented OpenAI parameters (silently ignored) - "stream", # Streaming responses - not yet implemented "stop", # Stop sequences - not yet implemented "top_p", # Nucleus sampling - not yet implemented "presence_penalty", # Presence penalty - not yet implemented @@ -111,6 +114,7 @@ def _build_model_options(request: ChatCompletionRequest) -> dict: "temperature": ModelOption.TEMPERATURE, "max_tokens": ModelOption.MAX_NEW_TOKENS, "seed": ModelOption.SEED, + "stream": ModelOption.STREAM, } filtered_options = { @@ -157,19 +161,17 @@ async def endpoint(request: ChatCompletionRequest): model_options=model_options, ) - # Extract usage information from the ModelOutputThunk if available - usage = None - if hasattr(output, "usage") and output.usage is not None: - prompt_tokens = output.usage.get("prompt_tokens", 0) - completion_tokens = output.usage.get("completion_tokens", 0) - # Calculate total_tokens if not provided - total_tokens = output.usage.get( - "total_tokens", prompt_tokens + completion_tokens - ) - usage = CompletionUsage( - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - total_tokens=total_tokens, + # Handle streaming response + if request.stream: + return StreamingResponse( + stream_chat_completion_chunks( + output=output, + completion_id=completion_id, + model=request.model, + created=created_timestamp, + stream_options=request.stream_options, + ), + media_type="text/event-stream", ) # system_fingerprint represents backend config hash, not model name @@ -192,7 +194,7 @@ async def endpoint(request: ChatCompletionRequest): ], object="chat.completion", # type: ignore system_fingerprint=system_fingerprint, - usage=usage, + usage=build_completion_usage(output), ) # type: ignore except ValueError as e: # Handle validation errors or invalid input diff --git a/cli/serve/models.py b/cli/serve/models.py index 967ed1684..6a876d685 100644 --- a/cli/serve/models.py +++ b/cli/serve/models.py @@ -2,6 +2,8 @@ from pydantic import BaseModel, Field +from mellea.helpers.openai_compatible_helpers import CompletionUsage + class ChatMessage(BaseModel): role: Literal["system", "user", "assistant", "tool", "function"] @@ -58,6 +60,13 @@ class ChatCompletionRequest(BaseModel): seed: int | None = None response_format: ResponseFormat | None = None + # OpenAI-compatible streaming options. Only applies when stream=True. + # Supports `include_usage` (bool) to control whether usage statistics are + # included in the final streaming chunk. Defaults to True (include usage) + # when not specified for backward compatibility. For non-streaming requests + # (stream=False), usage is always included regardless of this parameter. + stream_options: dict[str, Any] | None = None + # For future/undocumented fields extra: dict[str, Any] = Field(default_factory=dict) @@ -88,17 +97,6 @@ class Choice(BaseModel): """The reason the model stopped generating tokens.""" -class CompletionUsage(BaseModel): - completion_tokens: int - """Number of tokens in the generated completion.""" - - prompt_tokens: int - """Number of tokens in the prompt.""" - - total_tokens: int - """Total number of tokens used in the request (prompt + completion).""" - - class ChatCompletion(BaseModel): id: str """A unique identifier for the chat completion.""" @@ -125,6 +123,60 @@ class ChatCompletion(BaseModel): """Usage statistics for the completion request.""" +class ChatCompletionChunkDelta(BaseModel): + """Delta content in a streaming chunk.""" + + content: str | None = None + """The content fragment in this chunk.""" + + role: Literal["assistant"] | None = None + """The role (only present in first chunk).""" + + refusal: str | None = None + """The refusal message fragment, if any.""" + + +class ChatCompletionChunkChoice(BaseModel): + """A choice in a streaming chunk.""" + + index: int + """The index of the choice in the list of choices.""" + + delta: ChatCompletionChunkDelta + """The delta content for this chunk.""" + + finish_reason: ( + Literal["stop", "length", "content_filter", "tool_calls", "function_call"] + | None + ) = None + """The reason the model stopped generating tokens (only in final chunk).""" + + +class ChatCompletionChunk(BaseModel): + """A chunk in a streaming chat completion response.""" + + id: str + """A unique identifier for the chat completion.""" + + choices: list[ChatCompletionChunkChoice] + """A list of chat completion choices.""" + + created: int + """The Unix timestamp (in seconds) of when the chat completion was created.""" + + model: str + """The model used for the chat completion.""" + + object: Literal["chat.completion.chunk"] + """The object type, which is always `chat.completion.chunk`.""" + + system_fingerprint: str | None = None + """This fingerprint represents the backend configuration that the model runs with.""" + + usage: CompletionUsage | None = None + """Usage statistics for the final streaming chunk when available from the backend.""" + + class OpenAIError(BaseModel): """OpenAI API error object.""" diff --git a/docs/examples/m_serve/README.md b/docs/examples/m_serve/README.md index 204a208fe..c7daaf895 100644 --- a/docs/examples/m_serve/README.md +++ b/docs/examples/m_serve/README.md @@ -13,11 +13,21 @@ A simple example showing how to structure a Mellea program for serving as an API - Custom validation functions for API constraints - Handling chat message inputs +### m_serve_example_streaming.py +A dedicated streaming example for `m serve` that supports both modes: +- `stream=False` returns a normal computed response +- `stream=True` returns an uncomputed thunk so the server can emit + incremental Server-Sent Events (SSE) chunks + ### pii_serve.py Example of serving a PII (Personally Identifiable Information) detection service. ### client.py -Client code for testing the served API endpoints. +Client code for testing the served API endpoints with non-streaming requests. + +### client_streaming.py +Client code demonstrating streaming responses using Server-Sent Events (SSE) +against `m_serve_example_streaming.py`. ## Concepts Demonstrated @@ -26,6 +36,7 @@ Client code for testing the served API endpoints. - **Output Formatting**: Returning appropriate response types - **Validation in Production**: Using requirements in deployed services - **Model Options**: Passing model configuration through API +- **Streaming Responses**: Real-time token streaming via Server-Sent Events (SSE) ## Basic Pattern @@ -53,12 +64,59 @@ def serve(input: list[ChatMessage], ## Running the Server +### Sampling + ```bash -# Start the server +# Start the sampling example server m serve docs/examples/m_serve/m_serve_example_simple.py -# In another terminal, test with client +# In another terminal, test with the non-streaming client python docs/examples/m_serve/client.py + +### Streaming + +# Start the dedicated streaming example server +m serve docs/examples/m_serve/m_serve_example_streaming.py + +# In another terminal, test with the streaming client +python docs/examples/m_serve/client_streaming.py +``` + +## Streaming Support + +The server supports streaming responses via Server-Sent Events (SSE) when the +`stream=True` parameter is set in the request. This allows clients to receive +tokens as they are generated, providing a better user experience for long-running +generations. + +For a real streaming demo, serve `m_serve_example_streaming.py`. That example +supports both normal and streaming responses consistently. The sampling example +(`m_serve_example_simple.py`) demonstrates rejection sampling and validation, +not token-by-token streaming. + +**Key Features:** +- Real-time token streaming using SSE +- OpenAI-compatible streaming format (`ChatCompletionChunk`) +- Final chunk includes usage statistics when the backend provides usage data +- The dedicated streaming example supports both `stream=False` and `stream=True` +- Works with any backend that supports `ModelOutputThunk.astream()` + +**Example:** +```python +import openai + +client = openai.OpenAI(api_key="na", base_url="http://0.0.0.0:8080/v1") + +# Enable streaming with stream=True +stream = client.chat.completions.create( + messages=[{"role": "user", "content": "Tell me a story"}], + model="granite4:micro-h", + stream=True, +) + +for chunk in stream: + if chunk.choices[0].delta.content: + print(chunk.choices[0].delta.content, end="", flush=True) ``` ## API Endpoints diff --git a/docs/examples/m_serve/client_streaming.py b/docs/examples/m_serve/client_streaming.py new file mode 100644 index 000000000..3606148ef --- /dev/null +++ b/docs/examples/m_serve/client_streaming.py @@ -0,0 +1,49 @@ +# pytest: skip_always +"""Example client demonstrating responses from m serve. + +This example shows how to use the OpenAI Python client with a Mellea server +started with: + + m serve docs/examples/m_serve/m_serve_example_streaming.py + +Set ``streaming`` below to: +- ``True`` for incremental SSE chunks +- ``False`` for a normal non-streaming response +""" + +import openai + +PORT = 8080 + +client = openai.OpenAI(api_key="na", base_url=f"http://0.0.0.0:{PORT}/v1") + +streaming = True # streaming enabled toggle + +print(f"stream={streaming} response:") +print("-" * 50) + +# Request either a streaming or non-streaming response from the dedicated example server +if streaming: + stream_result = client.chat.completions.create( + messages=[ + {"role": "user", "content": "Count down from 100 using words not digits."} + ], + model="granite4:micro-h", + stream=True, + ) + for chunk in stream_result: + if chunk.choices[0].delta.content: + # If you want to see the chunks more clearly separated, change end + print(chunk.choices[0].delta.content, end="", flush=True) +else: + completion_result = client.chat.completions.create( + messages=[ + {"role": "user", "content": "Count down from 100 using words not digits."} + ], + model="granite4:micro-h", + stream=False, + ) + print(completion_result.choices[0].message.content) + +print("\n" + "-" * 50) +print("Stream complete!") diff --git a/docs/examples/m_serve/m_serve_example_streaming.py b/docs/examples/m_serve/m_serve_example_streaming.py new file mode 100644 index 000000000..12ec86fe8 --- /dev/null +++ b/docs/examples/m_serve/m_serve_example_streaming.py @@ -0,0 +1,41 @@ +# pytest: ollama, e2e + +"""Example to run m serve with true streaming support.""" + +import mellea +from cli.serve.models import ChatMessage +from mellea.backends.model_options import ModelOption +from mellea.core import ComputedModelOutputThunk, ModelOutputThunk +from mellea.stdlib.context import SimpleContext + +session = mellea.start_session(ctx=SimpleContext()) + + +async def serve( + input: list[ChatMessage], + requirements: list[str] | None = None, + model_options: dict | None = None, +) -> ModelOutputThunk | ComputedModelOutputThunk: + """Support both normal and streaming responses from the same example. + + Returns a computed result for non-streaming requests and an uncomputed thunk + for streaming requests. + """ + del requirements + message = input[-1].content or "" + is_streaming = bool((model_options or {}).get(ModelOption.STREAM, False)) + + if is_streaming: + return await session.ainstruct( + description=message, + strategy=None, + model_options=model_options, + await_result=False, + ) + + return await session.ainstruct( + description=message, + strategy=None, + model_options=model_options, + await_result=True, + ) diff --git a/mellea/helpers/openai_compatible_helpers.py b/mellea/helpers/openai_compatible_helpers.py index 4858ea366..7583ee049 100644 --- a/mellea/helpers/openai_compatible_helpers.py +++ b/mellea/helpers/openai_compatible_helpers.py @@ -1,15 +1,30 @@ """A file for helper functions that deal with OpenAI API compatible helpers.""" import json -from collections.abc import Callable +from collections.abc import AsyncGenerator from typing import Any +from pydantic import BaseModel + from ..backends.tools import validate_tool_arguments from ..core import FancyLogger, ModelToolCall from ..core.base import AbstractMelleaTool from ..stdlib.components import Document, Message +class CompletionUsage(BaseModel): + """Token usage statistics for a completion request.""" + + completion_tokens: int + """Number of tokens in the generated completion.""" + + prompt_tokens: int + """Number of tokens in the prompt.""" + + total_tokens: int + """Total number of tokens used in the request (prompt + completion).""" + + def extract_model_tool_requests( tools: dict[str, AbstractMelleaTool], response: dict[str, Any] ) -> dict[str, ModelToolCall] | None: @@ -205,3 +220,113 @@ def messages_to_docs(msgs: list[Message]) -> list[dict[str, str]]: json_doc["doc_id"] = doc.doc_id json_docs.append(json_doc) return json_docs + + +def build_completion_usage(output: Any) -> CompletionUsage | None: + """Build a normalized usage object from a model output, if available.""" + if not hasattr(output, "usage") or output.usage is None: + return None + + prompt_tokens = output.usage.get("prompt_tokens", 0) + completion_tokens = output.usage.get("completion_tokens", 0) + total_tokens = output.usage.get("total_tokens", prompt_tokens + completion_tokens) + return CompletionUsage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + ) + + +async def stream_chat_completion_chunks( + output: Any, + completion_id: str, + model: str, + created: int, + stream_options: dict[str, Any] | None = None, +) -> AsyncGenerator[str, None]: + """Generate OpenAI-compatible SSE chat completion chunks from a model output. + + Args: + output: The model output object to stream. + completion_id: Unique identifier for this completion. + model: Model name to include in chunks. + created: Unix timestamp of when the completion was created. + stream_options: OpenAI-compatible streaming options. Currently supports + ``include_usage`` (bool) to control whether usage stats are included + in the final chunk. Defaults to including usage when available. + """ + from cli.serve.models import ( + ChatCompletionChunk, + ChatCompletionChunkChoice, + ChatCompletionChunkDelta, + OpenAIError, + OpenAIErrorResponse, + ) + + try: + initial_chunk = ChatCompletionChunk( + id=completion_id, + model=model, + created=created, + choices=[ + ChatCompletionChunkChoice( + index=0, + delta=ChatCompletionChunkDelta(role="assistant", content=""), + finish_reason=None, + ) + ], + object="chat.completion.chunk", + ) + yield f"data: {initial_chunk.model_dump_json()}\n\n" + + previous_length = 0 + while not output.is_computed(): + new_content = await output.astream() + previous_length += len(new_content) + + if new_content: + chunk = ChatCompletionChunk( + id=completion_id, + model=model, + created=created, + choices=[ + ChatCompletionChunkChoice( + index=0, + delta=ChatCompletionChunkDelta(content=new_content), + finish_reason=None, + ) + ], + object="chat.completion.chunk", + ) + yield f"data: {chunk.model_dump_json()}\n\n" + + # Include usage in final chunk if requested via stream_options + # Default to True (include usage) for backward compatibility + include_usage = True + if stream_options is not None: + include_usage = stream_options.get("include_usage", True) + + usage = build_completion_usage(output) if include_usage else None + + final_chunk = ChatCompletionChunk( + id=completion_id, + model=model, + created=created, + choices=[ + ChatCompletionChunkChoice( + index=0, + delta=ChatCompletionChunkDelta(content=""), + finish_reason="stop", + ) + ], + object="chat.completion.chunk", + usage=usage, + ) + yield f"data: {final_chunk.model_dump_json()}\n\n" + yield "data: [DONE]\n\n" + + except Exception as e: + error_response = OpenAIErrorResponse( + error=OpenAIError(message=f"Streaming error: {e!s}", type="server_error") + ) + yield f"data: {error_response.model_dump_json()}\n\n" diff --git a/test/cli/test_build_model_options.py b/test/cli/test_build_model_options.py index 54702aea0..4bd52f264 100644 --- a/test/cli/test_build_model_options.py +++ b/test/cli/test_build_model_options.py @@ -1,7 +1,5 @@ """Unit tests for _build_model_options function.""" -import pytest - from cli.serve.app import _build_model_options from cli.serve.models import ChatCompletionRequest, ChatMessage from mellea.backends.model_options import ModelOption @@ -87,13 +85,13 @@ def test_none_values_excluded(self): assert ModelOption.MAX_NEW_TOKENS not in options def test_minimal_request_includes_defaults(self): - """Test that a minimal request includes default values like temperature.""" + """Test that a minimal request includes default values like temperature and stream.""" request = ChatCompletionRequest( model="test-model", messages=[ChatMessage(role="user", content="test")] ) options = _build_model_options(request) - # ChatCompletionRequest has default temperature=1.0 - assert options == {ModelOption.TEMPERATURE: 1.0} + # ChatCompletionRequest has default temperature=1.0 and stream=False + assert options == {ModelOption.TEMPERATURE: 1.0, ModelOption.STREAM: False} def test_requirements_excluded(self): """Test that requirements field is excluded from model_options.""" From 0a2e4d93cb0389bab923efb3fe9a6d26bcccb7fa Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Fri, 10 Apr 2026 18:51:04 -0700 Subject: [PATCH 02/13] test: add streaming test file Signed-off-by: Mark Sturdevant --- test/cli/test_serve_streaming.py | 666 +++++++++++++++++++++++++++++++ 1 file changed, 666 insertions(+) create mode 100644 test/cli/test_serve_streaming.py diff --git a/test/cli/test_serve_streaming.py b/test/cli/test_serve_streaming.py new file mode 100644 index 000000000..0ca27d25d --- /dev/null +++ b/test/cli/test_serve_streaming.py @@ -0,0 +1,666 @@ +"""Tests for streaming support in the m serve OpenAI-compatible API server.""" + +import json +from unittest.mock import AsyncMock, Mock + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from cli.serve.app import make_chat_endpoint +from cli.serve.models import ChatCompletionRequest, ChatMessage +from mellea.core.base import ModelOutputThunk +from mellea.helpers.openai_compatible_helpers import ( + build_completion_usage, + stream_chat_completion_chunks, +) + + +@pytest.fixture +def mock_module(): + """Create a mock module with an async serve function.""" + module = Mock() + module.__name__ = "test_streaming_module" + module.serve = AsyncMock() + return module + + +@pytest.fixture +def streaming_request(): + """Create a sample streaming ChatCompletionRequest.""" + return ChatCompletionRequest( + model="test-model", + messages=[ChatMessage(role="user", content="Hello, world!")], + stream=True, + temperature=0.7, + ) + + +@pytest.fixture +def non_streaming_request(): + """Create a sample non-streaming ChatCompletionRequest.""" + return ChatCompletionRequest( + model="test-model", + messages=[ChatMessage(role="user", content="Hello, world!")], + stream=False, + temperature=0.7, + ) + + +class TestStreamingHelpers: + """Tests for reusable streaming helper functions.""" + + @pytest.mark.asyncio + async def test_build_completion_usage_with_full_usage(self): + """Test usage normalization with complete usage data.""" + output = ModelOutputThunk("done") + output.usage = {"prompt_tokens": 5, "completion_tokens": 3, "total_tokens": 8} + + usage = build_completion_usage(output) + + assert usage is not None + assert usage.prompt_tokens == 5 + assert usage.completion_tokens == 3 + assert usage.total_tokens == 8 + + @pytest.mark.asyncio + async def test_build_completion_usage_with_partial_usage(self): + """Test usage normalization fills missing values safely.""" + output = ModelOutputThunk("done") + output.usage = {"prompt_tokens": 5} + + usage = build_completion_usage(output) + + assert usage is not None + assert usage.prompt_tokens == 5 + assert usage.completion_tokens == 0 + assert usage.total_tokens == 5 + + @pytest.mark.asyncio + async def test_build_completion_usage_without_usage(self): + """Test usage normalization returns None when usage is unavailable.""" + output = ModelOutputThunk("done") + + assert build_completion_usage(output) is None + + @pytest.mark.asyncio + async def test_stream_chat_completion_chunks_emits_incremental_content(self): + """Test helper emits only incremental content fragments.""" + output = ModelOutputThunk(None) + output._computed = False + output._generate_type = output._generate_type.ASYNC + + chunks = ["Hello", " there", "!"] + + async def mock_astream(): + if chunks: + value = chunks.pop(0) + if not chunks: + output._computed = True + return value + output._computed = True + return "" + + output.astream = mock_astream + output.is_computed = lambda: output._computed + output.usage = {"prompt_tokens": 5, "completion_tokens": 3, "total_tokens": 8} + + events = [] + async for event in stream_chat_completion_chunks( + output=output, + completion_id="chatcmpl-test123", + model="test-model", + created=123, + ): + events.append(event) + + assert events[0].startswith("data: ") + assert events[-1] == "data: [DONE]\n\n" + + parsed = [] + for event in events: + if event.startswith("data: ") and event != "data: [DONE]\n\n": + parsed.append(json.loads(event[6:].strip())) + + assert parsed[0]["choices"][0]["delta"]["role"] == "assistant" + content_chunks = [ + chunk["choices"][0]["delta"].get("content") + for chunk in parsed[1:-1] + if chunk["choices"][0]["delta"].get("content") + ] + assert content_chunks == ["Hello", " there", "!"] + assert parsed[-1]["choices"][0]["finish_reason"] == "stop" + assert parsed[-1]["usage"]["total_tokens"] == 8 + + @pytest.mark.asyncio + async def test_stream_chat_completion_chunks_emits_error_event(self): + """Test helper emits an error payload when streaming fails.""" + output = ModelOutputThunk(None) + output._computed = False + output._generate_type = output._generate_type.ASYNC + + async def mock_astream(): + raise RuntimeError("boom") + + output.astream = mock_astream + output.is_computed = lambda: output._computed + + events = [] + async for event in stream_chat_completion_chunks( + output=output, + completion_id="chatcmpl-test123", + model="test-model", + created=123, + ): + events.append(event) + + assert len(events) == 2 + error_payload = json.loads(events[1][6:].strip()) + assert error_payload["error"]["type"] == "server_error" + assert "boom" in error_payload["error"]["message"] + + +class TestStreamingEndpoint: + """Tests for streaming chat completion endpoint.""" + + @pytest.mark.asyncio + async def test_streaming_response_format(self, mock_module, streaming_request): + """Test that streaming returns SSE format with proper chunks.""" + # Create a mock output that simulates streaming + mock_output = ModelOutputThunk(None) + mock_output._computed = False + mock_output._generate_type = mock_output._generate_type.ASYNC + + # Simulate streaming chunks + chunks = ["Hello", " there", "!"] + accumulated = "" + + async def mock_astream(): + nonlocal accumulated + if chunks: + accumulated += chunks.pop(0) + else: + mock_output._computed = True + return accumulated + + mock_output.astream = mock_astream + mock_output.is_computed = lambda: mock_output._computed + mock_output.usage = { + "prompt_tokens": 5, + "completion_tokens": 3, + "total_tokens": 8, + } + + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make streaming request + response = client.post( + "/v1/chat/completions", json=streaming_request.model_dump(mode="json") + ) + + assert response.status_code == 200 + assert response.headers["content-type"] == "text/event-stream; charset=utf-8" + + # Parse SSE events + events = [] + for line in response.text.strip().split("\n\n"): + if line.startswith("data: "): + data = line[6:] # Remove "data: " prefix + if data != "[DONE]": + events.append(json.loads(data)) + + # Verify we got multiple chunks + assert len(events) > 0 + + # First chunk should have role + first_chunk = events[0] + assert first_chunk["object"] == "chat.completion.chunk" + assert first_chunk["choices"][0]["delta"]["role"] == "assistant" + + # Last chunk should have finish_reason and usage + last_chunk = events[-1] + assert last_chunk["choices"][0]["finish_reason"] == "stop" + assert last_chunk["usage"] is not None + assert last_chunk["usage"]["total_tokens"] == 8 + + @pytest.mark.asyncio + async def test_non_streaming_still_works(self, mock_module, non_streaming_request): + """Test that non-streaming requests still work correctly.""" + mock_output = ModelOutputThunk("Complete response") + mock_output.usage = { + "prompt_tokens": 5, + "completion_tokens": 2, + "total_tokens": 7, + } + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make non-streaming request + response = client.post( + "/v1/chat/completions", json=non_streaming_request.model_dump(mode="json") + ) + + assert response.status_code == 200 + data = response.json() + + # Should be a regular ChatCompletion, not streaming + assert data["object"] == "chat.completion" + assert data["choices"][0]["message"]["content"] == "Complete response" + assert data["choices"][0]["finish_reason"] == "stop" + assert data["usage"]["total_tokens"] == 7 + + @pytest.mark.asyncio + async def test_stream_parameter_passed_to_model_options( + self, mock_module, streaming_request + ): + """Test that stream parameter is passed to model_options.""" + from mellea.backends.model_options import ModelOption + + mock_output = ModelOutputThunk(None) + mock_output._computed = False + mock_output._generate_type = mock_output._generate_type.ASYNC + mock_output.astream = AsyncMock( + side_effect=lambda: setattr(mock_output, "_computed", True) or "done" + ) + mock_output.is_computed = lambda: mock_output._computed + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make streaming request + client.post( + "/v1/chat/completions", json=streaming_request.model_dump(mode="json") + ) + + # Verify serve was called with stream in model_options + call_args = mock_module.serve.call_args + assert call_args is not None + model_options = call_args.kwargs["model_options"] + assert ModelOption.STREAM in model_options + assert model_options[ModelOption.STREAM] is True + + @pytest.mark.asyncio + async def test_streaming_with_empty_content(self, mock_module, streaming_request): + """Test streaming handles empty content chunks gracefully.""" + mock_output = ModelOutputThunk(None) + mock_output._computed = False + mock_output._generate_type = mock_output._generate_type.ASYNC + + # Simulate streaming with some empty incremental chunks + chunks = ["", "Hello", "", " world", ""] + + async def mock_astream(): + if chunks: + chunk = chunks.pop(0) + if not chunks: + mock_output._computed = True + return chunk + mock_output._computed = True + return "" + + mock_output.astream = mock_astream + mock_output.is_computed = lambda: mock_output._computed + mock_output.usage = None + + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make streaming request + response = client.post( + "/v1/chat/completions", json=streaming_request.model_dump(mode="json") + ) + + assert response.status_code == 200 + + # Parse events and verify we only get chunks with actual content + events = [] + for line in response.text.strip().split("\n\n"): + if line.startswith("data: "): + data = line[6:] + if data != "[DONE]": + chunk = json.loads(data) + events.append(chunk) + + # Count chunks with actual content (excluding initial role chunk and final finish chunk) + content_chunks = [ + e + for e in events + if e["choices"][0]["delta"].get("content") + and e["choices"][0]["delta"]["content"] != "" + ] + assert len(content_chunks) == 2 # "Hello" and " world" + + @pytest.mark.asyncio + async def test_streaming_completion_id_consistent( + self, mock_module, streaming_request + ): + """Test that completion ID is consistent across all chunks.""" + mock_output = ModelOutputThunk(None) + mock_output._computed = False + mock_output._generate_type = mock_output._generate_type.ASYNC + + chunks = ["A", "B"] + accumulated = "" + + async def mock_astream(): + nonlocal accumulated + if chunks: + accumulated += chunks.pop(0) + else: + mock_output._computed = True + return accumulated + + mock_output.astream = mock_astream + mock_output.is_computed = lambda: mock_output._computed + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make streaming request + response = client.post( + "/v1/chat/completions", json=streaming_request.model_dump(mode="json") + ) + + # Parse events + events = [] + for line in response.text.strip().split("\n\n"): + if line.startswith("data: "): + data = line[6:] + if data != "[DONE]": + events.append(json.loads(data)) + + # All chunks should have the same ID + ids = [e["id"] for e in events] + assert len(set(ids)) == 1 # All IDs are the same + assert ids[0].startswith("chatcmpl-") + + @pytest.mark.asyncio + async def test_streaming_ends_with_done(self, mock_module, streaming_request): + """Test that streaming response ends with [DONE] marker.""" + mock_output = ModelOutputThunk(None) + mock_output._computed = False + mock_output._generate_type = mock_output._generate_type.ASYNC + + async def mock_astream(): + mock_output._computed = True + return "done" + + mock_output.astream = mock_astream + mock_output.is_computed = lambda: mock_output._computed + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make streaming request + response = client.post( + "/v1/chat/completions", json=streaming_request.model_dump(mode="json") + ) + + # Verify response ends with [DONE] + assert response.text.strip().endswith("data: [DONE]") + + @pytest.mark.asyncio + async def test_streaming_model_field_correct(self, mock_module, streaming_request): + """Test that model field is correctly set in streaming chunks.""" + mock_output = ModelOutputThunk(None) + mock_output._computed = False + mock_output._generate_type = mock_output._generate_type.ASYNC + + async def mock_astream(): + mock_output._computed = True + return "test" + + mock_output.astream = mock_astream + mock_output.is_computed = lambda: mock_output._computed + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make streaming request + response = client.post( + "/v1/chat/completions", json=streaming_request.model_dump(mode="json") + ) + + # Parse first chunk + first_line = response.text.split("\n\n")[0] + first_chunk = json.loads(first_line[6:]) # Remove "data: " + + # Model should match request + assert first_chunk["model"] == "test-model" + + @pytest.mark.asyncio + async def test_stream_options_include_usage_true(self, mock_module): + """Test that stream_options with include_usage=true includes usage in final chunk.""" + request = ChatCompletionRequest( + model="test-model", + messages=[ChatMessage(role="user", content="Hello")], + stream=True, + stream_options={"include_usage": True}, + ) + + mock_output = ModelOutputThunk(None) + mock_output._computed = False + mock_output._generate_type = mock_output._generate_type.ASYNC + + async def mock_astream(): + mock_output._computed = True + return "response" + + mock_output.astream = mock_astream + mock_output.is_computed = lambda: mock_output._computed + mock_output.usage = { + "prompt_tokens": 5, + "completion_tokens": 3, + "total_tokens": 8, + } + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make streaming request + response = client.post( + "/v1/chat/completions", json=request.model_dump(mode="json") + ) + + assert response.status_code == 200 + + # Parse events + events = [] + for line in response.text.strip().split("\n\n"): + if line.startswith("data: "): + data = line[6:] + if data != "[DONE]": + events.append(json.loads(data)) + + # Last chunk should have usage + last_chunk = events[-1] + assert last_chunk["usage"] is not None + assert last_chunk["usage"]["total_tokens"] == 8 + + @pytest.mark.asyncio + async def test_stream_options_include_usage_false(self, mock_module): + """Test that stream_options with include_usage=false excludes usage from final chunk.""" + request = ChatCompletionRequest( + model="test-model", + messages=[ChatMessage(role="user", content="Hello")], + stream=True, + stream_options={"include_usage": False}, + ) + + mock_output = ModelOutputThunk(None) + mock_output._computed = False + mock_output._generate_type = mock_output._generate_type.ASYNC + + async def mock_astream(): + mock_output._computed = True + return "response" + + mock_output.astream = mock_astream + mock_output.is_computed = lambda: mock_output._computed + mock_output.usage = { + "prompt_tokens": 5, + "completion_tokens": 3, + "total_tokens": 8, + } + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make streaming request + response = client.post( + "/v1/chat/completions", json=request.model_dump(mode="json") + ) + + assert response.status_code == 200 + + # Parse events + events = [] + for line in response.text.strip().split("\n\n"): + if line.startswith("data: "): + data = line[6:] + if data != "[DONE]": + events.append(json.loads(data)) + + # Last chunk should NOT have usage + last_chunk = events[-1] + assert last_chunk["usage"] is None + + @pytest.mark.asyncio + async def test_stream_options_default_includes_usage(self, mock_module): + """Test that without stream_options, usage is included by default (backward compat).""" + request = ChatCompletionRequest( + model="test-model", + messages=[ChatMessage(role="user", content="Hello")], + stream=True, + # No stream_options specified + ) + + mock_output = ModelOutputThunk(None) + mock_output._computed = False + mock_output._generate_type = mock_output._generate_type.ASYNC + + async def mock_astream(): + mock_output._computed = True + return "response" + + mock_output.astream = mock_astream + mock_output.is_computed = lambda: mock_output._computed + mock_output.usage = { + "prompt_tokens": 5, + "completion_tokens": 3, + "total_tokens": 8, + } + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make streaming request + response = client.post( + "/v1/chat/completions", json=request.model_dump(mode="json") + ) + + assert response.status_code == 200 + + # Parse events + events = [] + for line in response.text.strip().split("\n\n"): + if line.startswith("data: "): + data = line[6:] + if data != "[DONE]": + events.append(json.loads(data)) + + # Last chunk should have usage (default behavior) + last_chunk = events[-1] + assert last_chunk["usage"] is not None + assert last_chunk["usage"]["total_tokens"] == 8 + + @pytest.mark.asyncio + async def test_stream_options_ignored_for_non_streaming(self, mock_module): + """Test that stream_options is ignored when stream=False (usage always included).""" + request = ChatCompletionRequest( + model="test-model", + messages=[ChatMessage(role="user", content="Hello")], + stream=False, + stream_options={"include_usage": False}, # Should be ignored + ) + + mock_output = ModelOutputThunk("Complete response") + mock_output.usage = { + "prompt_tokens": 5, + "completion_tokens": 3, + "total_tokens": 8, + } + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make non-streaming request + response = client.post( + "/v1/chat/completions", json=request.model_dump(mode="json") + ) + + assert response.status_code == 200 + data = response.json() + + # Usage should be included regardless of stream_options (non-streaming always includes usage) + assert data["usage"] is not None + assert data["usage"]["total_tokens"] == 8 From ca5205ea1597fd23b36066074470bfb0eca120dc Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Fri, 10 Apr 2026 18:53:45 -0700 Subject: [PATCH 03/13] fix: docstring fix for streaming Signed-off-by: Mark Sturdevant --- mellea/helpers/openai_compatible_helpers.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/mellea/helpers/openai_compatible_helpers.py b/mellea/helpers/openai_compatible_helpers.py index 7583ee049..802056afc 100644 --- a/mellea/helpers/openai_compatible_helpers.py +++ b/mellea/helpers/openai_compatible_helpers.py @@ -223,7 +223,16 @@ def messages_to_docs(msgs: list[Message]) -> list[dict[str, str]]: def build_completion_usage(output: Any) -> CompletionUsage | None: - """Build a normalized usage object from a model output, if available.""" + """Build a normalized usage object from a model output, if available. + + Args: + output: Model output object that may expose a ``usage`` mapping with + token counts. + + Returns: + A ``CompletionUsage`` object when usage metadata is present on the + output, otherwise ``None``. + """ if not hasattr(output, "usage") or output.usage is None: return None @@ -254,6 +263,10 @@ async def stream_chat_completion_chunks( stream_options: OpenAI-compatible streaming options. Currently supports ``include_usage`` (bool) to control whether usage stats are included in the final chunk. Defaults to including usage when available. + + Yields: + Server-sent event payload strings representing OpenAI-compatible chat + completion chunks, including the terminating ``[DONE]`` event. """ from cli.serve.models import ( ChatCompletionChunk, From 417261b965e9cdc93500083e0faf90bda29ff7ef Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Mon, 13 Apr 2026 13:41:33 -0700 Subject: [PATCH 04/13] fix: make system_fingerprint consistent in m serve Added streaming support w/ setting system_fingerprint. Make it consistent. We are currently just setting it to None but now it is consistent for future use. Signed-off-by: Mark Sturdevant --- cli/serve/app.py | 11 ++-- mellea/helpers/openai_compatible_helpers.py | 6 ++ test/cli/test_serve_streaming.py | 62 +++++++++++++++++++++ 3 files changed, 74 insertions(+), 5 deletions(-) diff --git a/cli/serve/app.py b/cli/serve/app.py index 405be7105..a911801ad 100644 --- a/cli/serve/app.py +++ b/cli/serve/app.py @@ -161,6 +161,11 @@ async def endpoint(request: ChatCompletionRequest): model_options=model_options, ) + # system_fingerprint represents backend config hash, not model name + # The model name is already in response.model (line 73) + # Leave as None since we don't track backend config fingerprints yet + system_fingerprint = None + # Handle streaming response if request.stream: return StreamingResponse( @@ -170,15 +175,11 @@ async def endpoint(request: ChatCompletionRequest): model=request.model, created=created_timestamp, stream_options=request.stream_options, + system_fingerprint=system_fingerprint, ), media_type="text/event-stream", ) - # system_fingerprint represents backend config hash, not model name - # The model name is already in response.model (line 73) - # Leave as None since we don't track backend config fingerprints yet - system_fingerprint = None - return ChatCompletion( id=completion_id, model=request.model, diff --git a/mellea/helpers/openai_compatible_helpers.py b/mellea/helpers/openai_compatible_helpers.py index 802056afc..c1d728e20 100644 --- a/mellea/helpers/openai_compatible_helpers.py +++ b/mellea/helpers/openai_compatible_helpers.py @@ -252,6 +252,7 @@ async def stream_chat_completion_chunks( model: str, created: int, stream_options: dict[str, Any] | None = None, + system_fingerprint: str | None = None, ) -> AsyncGenerator[str, None]: """Generate OpenAI-compatible SSE chat completion chunks from a model output. @@ -263,6 +264,8 @@ async def stream_chat_completion_chunks( stream_options: OpenAI-compatible streaming options. Currently supports ``include_usage`` (bool) to control whether usage stats are included in the final chunk. Defaults to including usage when available. + system_fingerprint: Backend configuration fingerprint to include in chunks. + Defaults to ``None``. Yields: Server-sent event payload strings representing OpenAI-compatible chat @@ -289,6 +292,7 @@ async def stream_chat_completion_chunks( ) ], object="chat.completion.chunk", + system_fingerprint=system_fingerprint, ) yield f"data: {initial_chunk.model_dump_json()}\n\n" @@ -310,6 +314,7 @@ async def stream_chat_completion_chunks( ) ], object="chat.completion.chunk", + system_fingerprint=system_fingerprint, ) yield f"data: {chunk.model_dump_json()}\n\n" @@ -333,6 +338,7 @@ async def stream_chat_completion_chunks( ) ], object="chat.completion.chunk", + system_fingerprint=system_fingerprint, usage=usage, ) yield f"data: {final_chunk.model_dump_json()}\n\n" diff --git a/test/cli/test_serve_streaming.py b/test/cli/test_serve_streaming.py index 0ca27d25d..a8f8378e0 100644 --- a/test/cli/test_serve_streaming.py +++ b/test/cli/test_serve_streaming.py @@ -628,6 +628,68 @@ async def mock_astream(): assert last_chunk["usage"] is not None assert last_chunk["usage"]["total_tokens"] == 8 + @pytest.mark.asyncio + async def test_streaming_system_fingerprint_always_none( + self, mock_module, streaming_request + ): + """Test that system_fingerprint is None in all streaming chunks. + + Per OpenAI spec, system_fingerprint represents a hash of backend config, + not the model name. The model name is in chunk.model. + We don't track backend config fingerprints yet, so it should be None. + """ + mock_output = ModelOutputThunk(None) + mock_output._computed = False + mock_output._generate_type = mock_output._generate_type.ASYNC + + chunks = ["Hello", " world"] + + async def mock_astream(): + if chunks: + chunk = chunks.pop(0) + if not chunks: + mock_output._computed = True + return chunk + mock_output._computed = True + return "" + + mock_output.astream = mock_astream + mock_output.is_computed = lambda: mock_output._computed + mock_output.usage = { + "prompt_tokens": 5, + "completion_tokens": 2, + "total_tokens": 7, + } + mock_module.serve.return_value = mock_output + + # Create test app + app = FastAPI() + app.add_api_route( + "/v1/chat/completions", make_chat_endpoint(mock_module), methods=["POST"] + ) + client = TestClient(app) + + # Make streaming request + response = client.post( + "/v1/chat/completions", json=streaming_request.model_dump(mode="json") + ) + + assert response.status_code == 200 + + # Parse all chunks + events = [] + for line in response.text.strip().split("\n\n"): + if line.startswith("data: "): + data = line[6:] + if data != "[DONE]": + events.append(json.loads(data)) + + # All chunks should have system_fingerprint as None + for chunk in events: + assert chunk["system_fingerprint"] is None + # Model name should be in the model field + assert chunk["model"] == "test-model" + @pytest.mark.asyncio async def test_stream_options_ignored_for_non_streaming(self, mock_module): """Test that stream_options is ignored when stream=False (usage always included).""" From ae940f72b35bae09c9372c75630536d563f96421 Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Tue, 14 Apr 2026 15:56:28 -0700 Subject: [PATCH 05/13] fix: move stream_chat_completion_chunks out of helpers and into cli Doesn't belong in library. This is for cli. Signed-off-by: Mark Sturdevant --- cli/serve/app.py | 6 +- cli/serve/streaming.py | 111 ++++++++++++++++++++ mellea/helpers/openai_compatible_helpers.py | 105 ------------------ test/cli/test_serve_streaming.py | 6 +- 4 files changed, 115 insertions(+), 113 deletions(-) create mode 100644 cli/serve/streaming.py diff --git a/cli/serve/app.py b/cli/serve/app.py index a911801ad..337d35993 100644 --- a/cli/serve/app.py +++ b/cli/serve/app.py @@ -15,10 +15,7 @@ from fastapi.responses import JSONResponse, StreamingResponse from mellea.backends.model_options import ModelOption -from mellea.helpers.openai_compatible_helpers import ( - build_completion_usage, - stream_chat_completion_chunks, -) +from mellea.helpers.openai_compatible_helpers import build_completion_usage from .models import ( ChatCompletion, @@ -28,6 +25,7 @@ OpenAIError, OpenAIErrorResponse, ) +from .streaming import stream_chat_completion_chunks app = FastAPI( title="M serve OpenAI API Compatible Server", diff --git a/cli/serve/streaming.py b/cli/serve/streaming.py new file mode 100644 index 000000000..427886c95 --- /dev/null +++ b/cli/serve/streaming.py @@ -0,0 +1,111 @@ +"""Streaming utilities for OpenAI-compatible server responses.""" + +from collections.abc import AsyncGenerator +from typing import Any + +from mellea.helpers.openai_compatible_helpers import build_completion_usage + +from .models import ( + ChatCompletionChunk, + ChatCompletionChunkChoice, + ChatCompletionChunkDelta, + OpenAIError, + OpenAIErrorResponse, +) + + +async def stream_chat_completion_chunks( + output: Any, + completion_id: str, + model: str, + created: int, + stream_options: dict[str, Any] | None = None, + system_fingerprint: str | None = None, +) -> AsyncGenerator[str, None]: + """Generate OpenAI-compatible SSE chat completion chunks from a model output. + + Args: + output: The model output object to stream. + completion_id: Unique identifier for this completion. + model: Model name to include in chunks. + created: Unix timestamp of when the completion was created. + stream_options: OpenAI-compatible streaming options. Currently supports + ``include_usage`` (bool) to control whether usage stats are included + in the final chunk. Defaults to including usage when available. + system_fingerprint: Backend configuration fingerprint to include in chunks. + Defaults to ``None``. + + Yields: + Server-sent event payload strings representing OpenAI-compatible chat + completion chunks, including the terminating ``[DONE]`` event. + """ + try: + initial_chunk = ChatCompletionChunk( + id=completion_id, + model=model, + created=created, + choices=[ + ChatCompletionChunkChoice( + index=0, + delta=ChatCompletionChunkDelta(role="assistant", content=""), + finish_reason=None, + ) + ], + object="chat.completion.chunk", + system_fingerprint=system_fingerprint, + ) + yield f"data: {initial_chunk.model_dump_json()}\n\n" + + previous_length = 0 + while not output.is_computed(): + new_content = await output.astream() + previous_length += len(new_content) + + if new_content: + chunk = ChatCompletionChunk( + id=completion_id, + model=model, + created=created, + choices=[ + ChatCompletionChunkChoice( + index=0, + delta=ChatCompletionChunkDelta(content=new_content), + finish_reason=None, + ) + ], + object="chat.completion.chunk", + system_fingerprint=system_fingerprint, + ) + yield f"data: {chunk.model_dump_json()}\n\n" + + # Include usage in final chunk if requested via stream_options + # Default to True (include usage) for backward compatibility + include_usage = True + if stream_options is not None: + include_usage = stream_options.get("include_usage", True) + + usage = build_completion_usage(output) if include_usage else None + + final_chunk = ChatCompletionChunk( + id=completion_id, + model=model, + created=created, + choices=[ + ChatCompletionChunkChoice( + index=0, + delta=ChatCompletionChunkDelta(content=""), + finish_reason="stop", + ) + ], + object="chat.completion.chunk", + system_fingerprint=system_fingerprint, + usage=usage, + ) + yield f"data: {final_chunk.model_dump_json()}\n\n" + yield "data: [DONE]\n\n" + + except Exception as e: + error_response = OpenAIErrorResponse( + error=OpenAIError(message=f"Streaming error: {e!s}", type="server_error") + ) + yield f"data: {error_response.model_dump_json()}\n\n" diff --git a/mellea/helpers/openai_compatible_helpers.py b/mellea/helpers/openai_compatible_helpers.py index c1d728e20..c8a894905 100644 --- a/mellea/helpers/openai_compatible_helpers.py +++ b/mellea/helpers/openai_compatible_helpers.py @@ -244,108 +244,3 @@ def build_completion_usage(output: Any) -> CompletionUsage | None: completion_tokens=completion_tokens, total_tokens=total_tokens, ) - - -async def stream_chat_completion_chunks( - output: Any, - completion_id: str, - model: str, - created: int, - stream_options: dict[str, Any] | None = None, - system_fingerprint: str | None = None, -) -> AsyncGenerator[str, None]: - """Generate OpenAI-compatible SSE chat completion chunks from a model output. - - Args: - output: The model output object to stream. - completion_id: Unique identifier for this completion. - model: Model name to include in chunks. - created: Unix timestamp of when the completion was created. - stream_options: OpenAI-compatible streaming options. Currently supports - ``include_usage`` (bool) to control whether usage stats are included - in the final chunk. Defaults to including usage when available. - system_fingerprint: Backend configuration fingerprint to include in chunks. - Defaults to ``None``. - - Yields: - Server-sent event payload strings representing OpenAI-compatible chat - completion chunks, including the terminating ``[DONE]`` event. - """ - from cli.serve.models import ( - ChatCompletionChunk, - ChatCompletionChunkChoice, - ChatCompletionChunkDelta, - OpenAIError, - OpenAIErrorResponse, - ) - - try: - initial_chunk = ChatCompletionChunk( - id=completion_id, - model=model, - created=created, - choices=[ - ChatCompletionChunkChoice( - index=0, - delta=ChatCompletionChunkDelta(role="assistant", content=""), - finish_reason=None, - ) - ], - object="chat.completion.chunk", - system_fingerprint=system_fingerprint, - ) - yield f"data: {initial_chunk.model_dump_json()}\n\n" - - previous_length = 0 - while not output.is_computed(): - new_content = await output.astream() - previous_length += len(new_content) - - if new_content: - chunk = ChatCompletionChunk( - id=completion_id, - model=model, - created=created, - choices=[ - ChatCompletionChunkChoice( - index=0, - delta=ChatCompletionChunkDelta(content=new_content), - finish_reason=None, - ) - ], - object="chat.completion.chunk", - system_fingerprint=system_fingerprint, - ) - yield f"data: {chunk.model_dump_json()}\n\n" - - # Include usage in final chunk if requested via stream_options - # Default to True (include usage) for backward compatibility - include_usage = True - if stream_options is not None: - include_usage = stream_options.get("include_usage", True) - - usage = build_completion_usage(output) if include_usage else None - - final_chunk = ChatCompletionChunk( - id=completion_id, - model=model, - created=created, - choices=[ - ChatCompletionChunkChoice( - index=0, - delta=ChatCompletionChunkDelta(content=""), - finish_reason="stop", - ) - ], - object="chat.completion.chunk", - system_fingerprint=system_fingerprint, - usage=usage, - ) - yield f"data: {final_chunk.model_dump_json()}\n\n" - yield "data: [DONE]\n\n" - - except Exception as e: - error_response = OpenAIErrorResponse( - error=OpenAIError(message=f"Streaming error: {e!s}", type="server_error") - ) - yield f"data: {error_response.model_dump_json()}\n\n" diff --git a/test/cli/test_serve_streaming.py b/test/cli/test_serve_streaming.py index a8f8378e0..78c27e4cd 100644 --- a/test/cli/test_serve_streaming.py +++ b/test/cli/test_serve_streaming.py @@ -9,11 +9,9 @@ from cli.serve.app import make_chat_endpoint from cli.serve.models import ChatCompletionRequest, ChatMessage +from cli.serve.streaming import stream_chat_completion_chunks from mellea.core.base import ModelOutputThunk -from mellea.helpers.openai_compatible_helpers import ( - build_completion_usage, - stream_chat_completion_chunks, -) +from mellea.helpers.openai_compatible_helpers import build_completion_usage @pytest.fixture From af54d48cdaeabc1ded04473707119609ddb805fe Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Tue, 14 Apr 2026 16:27:59 -0700 Subject: [PATCH 06/13] fix: astream() contract inconsistency in docstring and streaming mocks astream() returns deltas (only new fragments), not accumulated text. Update docstring. Fix unused previous_length in streaming.py. Rename vars for clarity. Fix streaming tests to be consistent with the non-accumulating behavoir. Signed-off-by: Mark Sturdevant --- cli/serve/streaming.py | 8 +++----- mellea/core/base.py | 12 ++++++++---- test/cli/test_serve_streaming.py | 26 +++++++++++++------------- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/cli/serve/streaming.py b/cli/serve/streaming.py index 427886c95..bcc196fd6 100644 --- a/cli/serve/streaming.py +++ b/cli/serve/streaming.py @@ -56,12 +56,10 @@ async def stream_chat_completion_chunks( ) yield f"data: {initial_chunk.model_dump_json()}\n\n" - previous_length = 0 while not output.is_computed(): - new_content = await output.astream() - previous_length += len(new_content) + delta_content = await output.astream() - if new_content: + if delta_content: chunk = ChatCompletionChunk( id=completion_id, model=model, @@ -69,7 +67,7 @@ async def stream_chat_completion_chunks( choices=[ ChatCompletionChunkChoice( index=0, - delta=ChatCompletionChunkDelta(content=new_content), + delta=ChatCompletionChunkDelta(content=delta_content), finish_reason=None, ) ], diff --git a/mellea/core/base.py b/mellea/core/base.py index c8ab7efc0..e7e74c059 100644 --- a/mellea/core/base.py +++ b/mellea/core/base.py @@ -425,20 +425,24 @@ async def avalue(self) -> str: # If we require a function that returns only the new chunks of data, we can implement that similarly. async def astream(self) -> str: - """Returns the ModelOutputThunk's partial value including the next chunk(s). Can be used for both async streaming and async non-streaming. + """Returns only the NEW text fragment (delta) received since the last call. - Returns the complete value of the ModelOutputThunk if streaming is done. + This method is designed for streaming consumption where you want incremental + updates. Each call returns only the newly received content, not the accumulated + text. When streaming is complete, subsequent calls will raise RuntimeError. **Note**: Be careful with calling this function. Only call it from one location at a time. This means you shouldn't pass a ModelOutputThunk to multiple coroutines/tasks and call astream from those coroutines/tasks simultaneously. We have considered solutions to this but are waiting until we see this error happen in a real use case. Returns: - str: The accumulated output text up to and including the newly received chunk(s). + str: Only the new text fragment received since the last call (delta), not the + accumulated text. Returns empty string if no new content is available yet. Raises: Exception: Propagates any errors from the underlying inference engine api request. - RuntimeError: If called when the ModelOutputThunk's generate function is not async compatible. + RuntimeError: If called when the ModelOutputThunk's generate function is not async compatible, + or if called after the thunk is already computed. """ if self._computed: raise RuntimeError( diff --git a/test/cli/test_serve_streaming.py b/test/cli/test_serve_streaming.py index 78c27e4cd..8034bb2e9 100644 --- a/test/cli/test_serve_streaming.py +++ b/test/cli/test_serve_streaming.py @@ -169,17 +169,17 @@ async def test_streaming_response_format(self, mock_module, streaming_request): mock_output._computed = False mock_output._generate_type = mock_output._generate_type.ASYNC - # Simulate streaming chunks + # Simulate streaming chunks (deltas, not accumulated) chunks = ["Hello", " there", "!"] - accumulated = "" async def mock_astream(): - nonlocal accumulated if chunks: - accumulated += chunks.pop(0) - else: - mock_output._computed = True - return accumulated + delta = chunks.pop(0) + if not chunks: + mock_output._computed = True + return delta + mock_output._computed = True + return "" mock_output.astream = mock_astream mock_output.is_computed = lambda: mock_output._computed @@ -362,15 +362,15 @@ async def test_streaming_completion_id_consistent( mock_output._generate_type = mock_output._generate_type.ASYNC chunks = ["A", "B"] - accumulated = "" async def mock_astream(): - nonlocal accumulated if chunks: - accumulated += chunks.pop(0) - else: - mock_output._computed = True - return accumulated + delta = chunks.pop(0) + if not chunks: + mock_output._computed = True + return delta + mock_output._computed = True + return "" mock_output.astream = mock_astream mock_output.is_computed = lambda: mock_output._computed From 0d629c47cfe69142aba93414368f8c35aaf261bc Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Tue, 14 Apr 2026 16:35:37 -0700 Subject: [PATCH 07/13] fix: emit [DONE] after error when streaming Adds missing yield of the [DONE] that clients will expect. Signed-off-by: Mark Sturdevant --- cli/serve/streaming.py | 1 + test/cli/test_serve_streaming.py | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/cli/serve/streaming.py b/cli/serve/streaming.py index bcc196fd6..c686c77c5 100644 --- a/cli/serve/streaming.py +++ b/cli/serve/streaming.py @@ -107,3 +107,4 @@ async def stream_chat_completion_chunks( error=OpenAIError(message=f"Streaming error: {e!s}", type="server_error") ) yield f"data: {error_response.model_dump_json()}\n\n" + yield "data: [DONE]\n\n" diff --git a/test/cli/test_serve_streaming.py b/test/cli/test_serve_streaming.py index 8034bb2e9..c865e35e6 100644 --- a/test/cli/test_serve_streaming.py +++ b/test/cli/test_serve_streaming.py @@ -132,7 +132,7 @@ async def mock_astream(): @pytest.mark.asyncio async def test_stream_chat_completion_chunks_emits_error_event(self): - """Test helper emits an error payload when streaming fails.""" + """Test helper emits an error payload and [DONE] when streaming fails.""" output = ModelOutputThunk(None) output._computed = False output._generate_type = output._generate_type.ASYNC @@ -152,10 +152,17 @@ async def mock_astream(): ): events.append(event) - assert len(events) == 2 + # Should emit: initial chunk, error payload, [DONE] + assert len(events) == 3 + # First event is initial chunk with role + initial_chunk = json.loads(events[0][6:].strip()) + assert initial_chunk["choices"][0]["delta"]["role"] == "assistant" + # Second event is error payload error_payload = json.loads(events[1][6:].strip()) assert error_payload["error"]["type"] == "server_error" assert "boom" in error_payload["error"]["message"] + # Third event is [DONE] sentinel + assert events[2] == "data: [DONE]\n\n" class TestStreamingEndpoint: From 9d503eebed4c2515fc950191ae44aa10ea8b3004 Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Tue, 14 Apr 2026 17:02:48 -0700 Subject: [PATCH 08/13] fix: do not add stream=False by default (regression) Since stream defaults to False, a regression was introduced where stream=False is now passed to backends where it used to be default. Fix is to only forward stream=True and not add stream=False. Signed-off-by: Mark Sturdevant --- cli/serve/app.py | 6 ++++++ test/cli/test_build_model_options.py | 8 +++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/cli/serve/app.py b/cli/serve/app.py index 337d35993..719764acc 100644 --- a/cli/serve/app.py +++ b/cli/serve/app.py @@ -115,11 +115,17 @@ def _build_model_options(request: ChatCompletionRequest) -> dict: "stream": ModelOption.STREAM, } + # Get all non-None fields filtered_options = { key: value for key, value in request.model_dump(exclude_none=True).items() if key not in excluded_fields } + + # Special handling for stream: only include if True (don't forward False) + if "stream" in filtered_options and not filtered_options["stream"]: + del filtered_options["stream"] + return ModelOption.replace_keys(filtered_options, openai_to_model_option) diff --git a/test/cli/test_build_model_options.py b/test/cli/test_build_model_options.py index 4bd52f264..0b7b8fcc9 100644 --- a/test/cli/test_build_model_options.py +++ b/test/cli/test_build_model_options.py @@ -69,6 +69,7 @@ def test_excluded_fields_not_in_output(self): assert "n" not in options assert "user" not in options assert "stream" not in options + assert ModelOption.STREAM not in options # Check that temperature is present assert ModelOption.TEMPERATURE in options @@ -85,13 +86,14 @@ def test_none_values_excluded(self): assert ModelOption.MAX_NEW_TOKENS not in options def test_minimal_request_includes_defaults(self): - """Test that a minimal request includes default values like temperature and stream.""" + """Test that a minimal request includes default values like temperature.""" request = ChatCompletionRequest( model="test-model", messages=[ChatMessage(role="user", content="test")] ) options = _build_model_options(request) - # ChatCompletionRequest has default temperature=1.0 and stream=False - assert options == {ModelOption.TEMPERATURE: 1.0, ModelOption.STREAM: False} + # ChatCompletionRequest has default temperature=1.0 + # stream is excluded from model_options (handled separately in endpoint logic) + assert options == {ModelOption.TEMPERATURE: 1.0} def test_requirements_excluded(self): """Test that requirements field is excluded from model_options.""" From b96643144885eea993fb43ec58952c8101e00709 Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Tue, 14 Apr 2026 17:21:27 -0700 Subject: [PATCH 09/13] fix: remove misleading asyncio tags in test_serve_streaming.py Stuff was marked asyncio and async def when it didn't need to be. Moved the usage tests a little for readability. Signed-off-by: Mark Sturdevant --- test/cli/test_serve_streaming.py | 55 ++++++++++++-------------------- 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/test/cli/test_serve_streaming.py b/test/cli/test_serve_streaming.py index c865e35e6..5908e269b 100644 --- a/test/cli/test_serve_streaming.py +++ b/test/cli/test_serve_streaming.py @@ -45,11 +45,10 @@ def non_streaming_request(): ) -class TestStreamingHelpers: - """Tests for reusable streaming helper functions.""" +class TestCompletionUsageHelpers: + """Tests for completion usage normalization helpers.""" - @pytest.mark.asyncio - async def test_build_completion_usage_with_full_usage(self): + def test_build_completion_usage_with_full_usage(self): """Test usage normalization with complete usage data.""" output = ModelOutputThunk("done") output.usage = {"prompt_tokens": 5, "completion_tokens": 3, "total_tokens": 8} @@ -61,8 +60,7 @@ async def test_build_completion_usage_with_full_usage(self): assert usage.completion_tokens == 3 assert usage.total_tokens == 8 - @pytest.mark.asyncio - async def test_build_completion_usage_with_partial_usage(self): + def test_build_completion_usage_with_partial_usage(self): """Test usage normalization fills missing values safely.""" output = ModelOutputThunk("done") output.usage = {"prompt_tokens": 5} @@ -74,13 +72,16 @@ async def test_build_completion_usage_with_partial_usage(self): assert usage.completion_tokens == 0 assert usage.total_tokens == 5 - @pytest.mark.asyncio - async def test_build_completion_usage_without_usage(self): + def test_build_completion_usage_without_usage(self): """Test usage normalization returns None when usage is unavailable.""" output = ModelOutputThunk("done") assert build_completion_usage(output) is None + +class TestStreamingHelpers: + """Tests for reusable streaming helper functions.""" + @pytest.mark.asyncio async def test_stream_chat_completion_chunks_emits_incremental_content(self): """Test helper emits only incremental content fragments.""" @@ -168,8 +169,7 @@ async def mock_astream(): class TestStreamingEndpoint: """Tests for streaming chat completion endpoint.""" - @pytest.mark.asyncio - async def test_streaming_response_format(self, mock_module, streaming_request): + def test_streaming_response_format(self, mock_module, streaming_request): """Test that streaming returns SSE format with proper chunks.""" # Create a mock output that simulates streaming mock_output = ModelOutputThunk(None) @@ -235,8 +235,7 @@ async def mock_astream(): assert last_chunk["usage"] is not None assert last_chunk["usage"]["total_tokens"] == 8 - @pytest.mark.asyncio - async def test_non_streaming_still_works(self, mock_module, non_streaming_request): + def test_non_streaming_still_works(self, mock_module, non_streaming_request): """Test that non-streaming requests still work correctly.""" mock_output = ModelOutputThunk("Complete response") mock_output.usage = { @@ -267,8 +266,7 @@ async def test_non_streaming_still_works(self, mock_module, non_streaming_reques assert data["choices"][0]["finish_reason"] == "stop" assert data["usage"]["total_tokens"] == 7 - @pytest.mark.asyncio - async def test_stream_parameter_passed_to_model_options( + def test_stream_parameter_passed_to_model_options( self, mock_module, streaming_request ): """Test that stream parameter is passed to model_options.""" @@ -302,8 +300,7 @@ async def test_stream_parameter_passed_to_model_options( assert ModelOption.STREAM in model_options assert model_options[ModelOption.STREAM] is True - @pytest.mark.asyncio - async def test_streaming_with_empty_content(self, mock_module, streaming_request): + def test_streaming_with_empty_content(self, mock_module, streaming_request): """Test streaming handles empty content chunks gracefully.""" mock_output = ModelOutputThunk(None) mock_output._computed = False @@ -359,10 +356,7 @@ async def mock_astream(): ] assert len(content_chunks) == 2 # "Hello" and " world" - @pytest.mark.asyncio - async def test_streaming_completion_id_consistent( - self, mock_module, streaming_request - ): + def test_streaming_completion_id_consistent(self, mock_module, streaming_request): """Test that completion ID is consistent across all chunks.""" mock_output = ModelOutputThunk(None) mock_output._computed = False @@ -408,8 +402,7 @@ async def mock_astream(): assert len(set(ids)) == 1 # All IDs are the same assert ids[0].startswith("chatcmpl-") - @pytest.mark.asyncio - async def test_streaming_ends_with_done(self, mock_module, streaming_request): + def test_streaming_ends_with_done(self, mock_module, streaming_request): """Test that streaming response ends with [DONE] marker.""" mock_output = ModelOutputThunk(None) mock_output._computed = False @@ -438,8 +431,7 @@ async def mock_astream(): # Verify response ends with [DONE] assert response.text.strip().endswith("data: [DONE]") - @pytest.mark.asyncio - async def test_streaming_model_field_correct(self, mock_module, streaming_request): + def test_streaming_model_field_correct(self, mock_module, streaming_request): """Test that model field is correctly set in streaming chunks.""" mock_output = ModelOutputThunk(None) mock_output._computed = False @@ -472,8 +464,7 @@ async def mock_astream(): # Model should match request assert first_chunk["model"] == "test-model" - @pytest.mark.asyncio - async def test_stream_options_include_usage_true(self, mock_module): + def test_stream_options_include_usage_true(self, mock_module): """Test that stream_options with include_usage=true includes usage in final chunk.""" request = ChatCompletionRequest( model="test-model", @@ -526,8 +517,7 @@ async def mock_astream(): assert last_chunk["usage"] is not None assert last_chunk["usage"]["total_tokens"] == 8 - @pytest.mark.asyncio - async def test_stream_options_include_usage_false(self, mock_module): + def test_stream_options_include_usage_false(self, mock_module): """Test that stream_options with include_usage=false excludes usage from final chunk.""" request = ChatCompletionRequest( model="test-model", @@ -579,8 +569,7 @@ async def mock_astream(): last_chunk = events[-1] assert last_chunk["usage"] is None - @pytest.mark.asyncio - async def test_stream_options_default_includes_usage(self, mock_module): + def test_stream_options_default_includes_usage(self, mock_module): """Test that without stream_options, usage is included by default (backward compat).""" request = ChatCompletionRequest( model="test-model", @@ -633,8 +622,7 @@ async def mock_astream(): assert last_chunk["usage"] is not None assert last_chunk["usage"]["total_tokens"] == 8 - @pytest.mark.asyncio - async def test_streaming_system_fingerprint_always_none( + def test_streaming_system_fingerprint_always_none( self, mock_module, streaming_request ): """Test that system_fingerprint is None in all streaming chunks. @@ -695,8 +683,7 @@ async def mock_astream(): # Model name should be in the model field assert chunk["model"] == "test-model" - @pytest.mark.asyncio - async def test_stream_options_ignored_for_non_streaming(self, mock_module): + def test_stream_options_ignored_for_non_streaming(self, mock_module): """Test that stream_options is ignored when stream=False (usage always included).""" request = ChatCompletionRequest( model="test-model", From 48ad245f7700b7e54fe9a5cf0d75c16de00aab67 Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Tue, 14 Apr 2026 17:48:08 -0700 Subject: [PATCH 10/13] fix: only include usage in stream results when requested In m serve, usage was included for backward compatibility but this is a new feature so that's not an issue. Instead the OpenAI spec is what we want to follow. Signed-off-by: Mark Sturdevant --- cli/serve/streaming.py | 13 +++++++------ test/cli/test_serve_streaming.py | 21 ++++++++------------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/cli/serve/streaming.py b/cli/serve/streaming.py index c686c77c5..b01dae9bc 100644 --- a/cli/serve/streaming.py +++ b/cli/serve/streaming.py @@ -31,7 +31,8 @@ async def stream_chat_completion_chunks( created: Unix timestamp of when the completion was created. stream_options: OpenAI-compatible streaming options. Currently supports ``include_usage`` (bool) to control whether usage stats are included - in the final chunk. Defaults to including usage when available. + in the final chunk. Usage is only included when explicitly requested + via ``stream_options={"include_usage": True}``. system_fingerprint: Backend configuration fingerprint to include in chunks. Defaults to ``None``. @@ -76,11 +77,11 @@ async def stream_chat_completion_chunks( ) yield f"data: {chunk.model_dump_json()}\n\n" - # Include usage in final chunk if requested via stream_options - # Default to True (include usage) for backward compatibility - include_usage = True - if stream_options is not None: - include_usage = stream_options.get("include_usage", True) + # Include usage in final chunk only if explicitly requested via stream_options + # Per OpenAI spec: usage is only included when stream_options.include_usage=True + include_usage = stream_options is not None and stream_options.get( + "include_usage", False + ) usage = build_completion_usage(output) if include_usage else None diff --git a/test/cli/test_serve_streaming.py b/test/cli/test_serve_streaming.py index 5908e269b..0db54b7d0 100644 --- a/test/cli/test_serve_streaming.py +++ b/test/cli/test_serve_streaming.py @@ -110,6 +110,7 @@ async def mock_astream(): completion_id="chatcmpl-test123", model="test-model", created=123, + stream_options={"include_usage": True}, ): events.append(event) @@ -229,11 +230,10 @@ async def mock_astream(): assert first_chunk["object"] == "chat.completion.chunk" assert first_chunk["choices"][0]["delta"]["role"] == "assistant" - # Last chunk should have finish_reason and usage + # Last chunk should have finish_reason but no usage (not requested) last_chunk = events[-1] assert last_chunk["choices"][0]["finish_reason"] == "stop" - assert last_chunk["usage"] is not None - assert last_chunk["usage"]["total_tokens"] == 8 + assert last_chunk["usage"] is None def test_non_streaming_still_works(self, mock_module, non_streaming_request): """Test that non-streaming requests still work correctly.""" @@ -569,8 +569,8 @@ async def mock_astream(): last_chunk = events[-1] assert last_chunk["usage"] is None - def test_stream_options_default_includes_usage(self, mock_module): - """Test that without stream_options, usage is included by default (backward compat).""" + def test_stream_options_default_excludes_usage(self, mock_module): + """Test that without stream_options, usage is NOT included (per OpenAI spec).""" request = ChatCompletionRequest( model="test-model", messages=[ChatMessage(role="user", content="Hello")], @@ -617,10 +617,9 @@ async def mock_astream(): if data != "[DONE]": events.append(json.loads(data)) - # Last chunk should have usage (default behavior) + # Last chunk should NOT have usage (must explicitly request via stream_options) last_chunk = events[-1] - assert last_chunk["usage"] is not None - assert last_chunk["usage"]["total_tokens"] == 8 + assert last_chunk["usage"] is None def test_streaming_system_fingerprint_always_none( self, mock_module, streaming_request @@ -648,11 +647,7 @@ async def mock_astream(): mock_output.astream = mock_astream mock_output.is_computed = lambda: mock_output._computed - mock_output.usage = { - "prompt_tokens": 5, - "completion_tokens": 2, - "total_tokens": 7, - } + # Usage not needed for this test since we're not checking it mock_module.serve.return_value = mock_output # Create test app From 0570bb4356c65c3cf9a2ab4b4984926ddf7f5748 Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Tue, 14 Apr 2026 17:55:44 -0700 Subject: [PATCH 11/13] fix: add missing code blocks in docs/examples/m_serve/README.md Signed-off-by: Mark Sturdevant --- docs/examples/m_serve/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/examples/m_serve/README.md b/docs/examples/m_serve/README.md index c7daaf895..70fcb5f5e 100644 --- a/docs/examples/m_serve/README.md +++ b/docs/examples/m_serve/README.md @@ -72,9 +72,11 @@ m serve docs/examples/m_serve/m_serve_example_simple.py # In another terminal, test with the non-streaming client python docs/examples/m_serve/client.py +``` ### Streaming +```bash # Start the dedicated streaming example server m serve docs/examples/m_serve/m_serve_example_streaming.py From 358b2079677aa73d4da28820a4b5c0d25042ae9b Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Tue, 14 Apr 2026 18:03:52 -0700 Subject: [PATCH 12/13] fix: add missing dict type details Signed-off-by: Mark Sturdevant --- docs/examples/m_serve/m_serve_example_streaming.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/examples/m_serve/m_serve_example_streaming.py b/docs/examples/m_serve/m_serve_example_streaming.py index 12ec86fe8..e2f29e6e7 100644 --- a/docs/examples/m_serve/m_serve_example_streaming.py +++ b/docs/examples/m_serve/m_serve_example_streaming.py @@ -2,6 +2,8 @@ """Example to run m serve with true streaming support.""" +from typing import Any + import mellea from cli.serve.models import ChatMessage from mellea.backends.model_options import ModelOption @@ -14,7 +16,7 @@ async def serve( input: list[ChatMessage], requirements: list[str] | None = None, - model_options: dict | None = None, + model_options: dict[str, Any] | None = None, ) -> ModelOutputThunk | ComputedModelOutputThunk: """Support both normal and streaming responses from the same example. From 5d2d84bc28f80aeb38a0d65538a14a4e0dbfd0e9 Mon Sep 17 00:00:00 2001 From: Mark Sturdevant Date: Tue, 14 Apr 2026 18:20:15 -0700 Subject: [PATCH 13/13] fix: m serve streaming initial and final chunks use content=None Use content=None instead of content="" to be more correct with OpenAI API. Remove unneeded check for "" in the test. Signed-off-by: Mark Sturdevant --- cli/serve/streaming.py | 4 ++-- test/cli/test_serve_streaming.py | 7 +------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/cli/serve/streaming.py b/cli/serve/streaming.py index b01dae9bc..5e86f6a27 100644 --- a/cli/serve/streaming.py +++ b/cli/serve/streaming.py @@ -48,7 +48,7 @@ async def stream_chat_completion_chunks( choices=[ ChatCompletionChunkChoice( index=0, - delta=ChatCompletionChunkDelta(role="assistant", content=""), + delta=ChatCompletionChunkDelta(role="assistant", content=None), finish_reason=None, ) ], @@ -92,7 +92,7 @@ async def stream_chat_completion_chunks( choices=[ ChatCompletionChunkChoice( index=0, - delta=ChatCompletionChunkDelta(content=""), + delta=ChatCompletionChunkDelta(content=None), finish_reason="stop", ) ], diff --git a/test/cli/test_serve_streaming.py b/test/cli/test_serve_streaming.py index 0db54b7d0..5d67ad41a 100644 --- a/test/cli/test_serve_streaming.py +++ b/test/cli/test_serve_streaming.py @@ -348,12 +348,7 @@ async def mock_astream(): events.append(chunk) # Count chunks with actual content (excluding initial role chunk and final finish chunk) - content_chunks = [ - e - for e in events - if e["choices"][0]["delta"].get("content") - and e["choices"][0]["delta"]["content"] != "" - ] + content_chunks = [e for e in events if e["choices"][0]["delta"].get("content")] assert len(content_chunks) == 2 # "Hello" and " world" def test_streaming_completion_id_consistent(self, mock_module, streaming_request):