Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions cli/serve/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
import uvicorn
from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, StreamingResponse

from mellea.backends.model_options import ModelOption
from mellea.helpers.openai_compatible_helpers import (
build_completion_usage,
stream_chat_completion_chunks,
)

from .models import (
ChatCompletion,
ChatCompletionMessage,
ChatCompletionRequest,
Choice,
CompletionUsage,
OpenAIError,
OpenAIErrorResponse,
)
Expand Down Expand Up @@ -94,8 +97,8 @@ def _build_model_options(request: ChatCompletionRequest) -> dict:
"n", # Number of completions - not supported in Mellea's model_options
"user", # User tracking ID - metadata, not a generation parameter
"extra", # Pydantic's extra fields dict - unused (see model_config)
"stream_options", # Streaming options - handled separately in streaming response
# Not-yet-implemented OpenAI parameters (silently ignored)
"stream", # Streaming responses - not yet implemented
"stop", # Stop sequences - not yet implemented
"top_p", # Nucleus sampling - not yet implemented
"presence_penalty", # Presence penalty - not yet implemented
Expand All @@ -111,6 +114,7 @@ def _build_model_options(request: ChatCompletionRequest) -> dict:
"temperature": ModelOption.TEMPERATURE,
"max_tokens": ModelOption.MAX_NEW_TOKENS,
"seed": ModelOption.SEED,
"stream": ModelOption.STREAM,
Copy link
Copy Markdown
Contributor

@planetf1 planetf1 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to be aware of: because ChatCompletionRequest.stream defaults to False, this mapping means every non-streaming request will now forward ModelOption.STREAM: False to the backend — even when streaming was never requested. That's a quiet behavioural change from the previous behaviour where stream was simply ignored.

Backends that don't recognise ModelOption.STREAM may handle the unexpected key in unexpected ways. A couple of options to consider:

  1. Return "stream" to the excluded-fields set and handle it separately, as stream_options is handled just above.
  2. Only inject STREAM when request.stream is True, leaving the non-streaming path as it was.

}

filtered_options = {
Expand Down Expand Up @@ -157,26 +161,25 @@ async def endpoint(request: ChatCompletionRequest):
model_options=model_options,
)

# Extract usage information from the ModelOutputThunk if available
usage = None
if hasattr(output, "usage") and output.usage is not None:
prompt_tokens = output.usage.get("prompt_tokens", 0)
completion_tokens = output.usage.get("completion_tokens", 0)
# Calculate total_tokens if not provided
total_tokens = output.usage.get(
"total_tokens", prompt_tokens + completion_tokens
)
usage = CompletionUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
)

# system_fingerprint represents backend config hash, not model name
# The model name is already in response.model (line 73)
# Leave as None since we don't track backend config fingerprints yet
system_fingerprint = None

# Handle streaming response
if request.stream:
return StreamingResponse(
stream_chat_completion_chunks(
output=output,
completion_id=completion_id,
model=request.model,
created=created_timestamp,
stream_options=request.stream_options,
system_fingerprint=system_fingerprint,
),
media_type="text/event-stream",
)

return ChatCompletion(
id=completion_id,
model=request.model,
Expand All @@ -192,7 +195,7 @@ async def endpoint(request: ChatCompletionRequest):
],
object="chat.completion", # type: ignore
system_fingerprint=system_fingerprint,
usage=usage,
usage=build_completion_usage(output),
) # type: ignore
except ValueError as e:
# Handle validation errors or invalid input
Expand Down
74 changes: 63 additions & 11 deletions cli/serve/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from pydantic import BaseModel, Field

from mellea.helpers.openai_compatible_helpers import CompletionUsage


class ChatMessage(BaseModel):
role: Literal["system", "user", "assistant", "tool", "function"]
Expand Down Expand Up @@ -58,6 +60,13 @@ class ChatCompletionRequest(BaseModel):
seed: int | None = None
response_format: ResponseFormat | None = None

# OpenAI-compatible streaming options. Only applies when stream=True.
# Supports `include_usage` (bool) to control whether usage statistics are
# included in the final streaming chunk. Defaults to True (include usage)
# when not specified for backward compatibility. For non-streaming requests
# (stream=False), usage is always included regardless of this parameter.
stream_options: dict[str, Any] | None = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In openAI this is Pydantic typed object - but I presume you chose this so you don't do any enforcement/checks in case those options change in future - understandable (though it could mean typos are missed). Am ok either way.


# For future/undocumented fields
extra: dict[str, Any] = Field(default_factory=dict)

Expand Down Expand Up @@ -88,17 +97,6 @@ class Choice(BaseModel):
"""The reason the model stopped generating tokens."""


class CompletionUsage(BaseModel):
completion_tokens: int
"""Number of tokens in the generated completion."""

prompt_tokens: int
"""Number of tokens in the prompt."""

total_tokens: int
"""Total number of tokens used in the request (prompt + completion)."""


class ChatCompletion(BaseModel):
id: str
"""A unique identifier for the chat completion."""
Expand All @@ -125,6 +123,60 @@ class ChatCompletion(BaseModel):
"""Usage statistics for the completion request."""


class ChatCompletionChunkDelta(BaseModel):
"""Delta content in a streaming chunk."""

content: str | None = None
"""The content fragment in this chunk."""

role: Literal["assistant"] | None = None
"""The role (only present in first chunk)."""

refusal: str | None = None
"""The refusal message fragment, if any."""


class ChatCompletionChunkChoice(BaseModel):
"""A choice in a streaming chunk."""

index: int
"""The index of the choice in the list of choices."""

delta: ChatCompletionChunkDelta
"""The delta content for this chunk."""

finish_reason: (
Literal["stop", "length", "content_filter", "tool_calls", "function_call"]
| None
) = None
"""The reason the model stopped generating tokens (only in final chunk)."""


class ChatCompletionChunk(BaseModel):
"""A chunk in a streaming chat completion response."""

id: str
"""A unique identifier for the chat completion."""

choices: list[ChatCompletionChunkChoice]
"""A list of chat completion choices."""

created: int
"""The Unix timestamp (in seconds) of when the chat completion was created."""

model: str
"""The model used for the chat completion."""

object: Literal["chat.completion.chunk"]
"""The object type, which is always `chat.completion.chunk`."""

system_fingerprint: str | None = None
"""This fingerprint represents the backend configuration that the model runs with."""

usage: CompletionUsage | None = None
"""Usage statistics for the final streaming chunk when available from the backend."""


class OpenAIError(BaseModel):
"""OpenAI API error object."""

Expand Down
64 changes: 61 additions & 3 deletions docs/examples/m_serve/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,21 @@ A simple example showing how to structure a Mellea program for serving as an API
- Custom validation functions for API constraints
- Handling chat message inputs

### m_serve_example_streaming.py
A dedicated streaming example for `m serve` that supports both modes:
- `stream=False` returns a normal computed response
- `stream=True` returns an uncomputed thunk so the server can emit
incremental Server-Sent Events (SSE) chunks

### pii_serve.py
Example of serving a PII (Personally Identifiable Information) detection service.

### client.py
Client code for testing the served API endpoints.
Client code for testing the served API endpoints with non-streaming requests.

### client_streaming.py
Client code demonstrating streaming responses using Server-Sent Events (SSE)
against `m_serve_example_streaming.py`.

## Concepts Demonstrated

Expand All @@ -26,6 +36,7 @@ Client code for testing the served API endpoints.
- **Output Formatting**: Returning appropriate response types
- **Validation in Production**: Using requirements in deployed services
- **Model Options**: Passing model configuration through API
- **Streaming Responses**: Real-time token streaming via Server-Sent Events (SSE)

## Basic Pattern

Expand Down Expand Up @@ -53,12 +64,59 @@ def serve(input: list[ChatMessage],

## Running the Server

### Sampling

```bash
# Start the server
# Start the sampling example server
m serve docs/examples/m_serve/m_serve_example_simple.py

# In another terminal, test with client
# In another terminal, test with the non-streaming client
python docs/examples/m_serve/client.py

### Streaming
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is meant to be a heading - needs to have the code fencing terminated in the previous section and started again here (for bash)


# Start the dedicated streaming example server
m serve docs/examples/m_serve/m_serve_example_streaming.py

# In another terminal, test with the streaming client
python docs/examples/m_serve/client_streaming.py
```

## Streaming Support

The server supports streaming responses via Server-Sent Events (SSE) when the
`stream=True` parameter is set in the request. This allows clients to receive
tokens as they are generated, providing a better user experience for long-running
generations.

For a real streaming demo, serve `m_serve_example_streaming.py`. That example
supports both normal and streaming responses consistently. The sampling example
(`m_serve_example_simple.py`) demonstrates rejection sampling and validation,
not token-by-token streaming.

**Key Features:**
- Real-time token streaming using SSE
- OpenAI-compatible streaming format (`ChatCompletionChunk`)
- Final chunk includes usage statistics when the backend provides usage data
- The dedicated streaming example supports both `stream=False` and `stream=True`
- Works with any backend that supports `ModelOutputThunk.astream()`

**Example:**
```python
import openai

client = openai.OpenAI(api_key="na", base_url="http://0.0.0.0:8080/v1")

# Enable streaming with stream=True
stream = client.chat.completions.create(
messages=[{"role": "user", "content": "Tell me a story"}],
model="granite4:micro-h",
stream=True,
)

for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
```
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
```


## API Endpoints
Expand Down
49 changes: 49 additions & 0 deletions docs/examples/m_serve/client_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# pytest: skip_always
"""Example client demonstrating responses from m serve.

This example shows how to use the OpenAI Python client with a Mellea server
started with:

m serve docs/examples/m_serve/m_serve_example_streaming.py

Set ``streaming`` below to:
- ``True`` for incremental SSE chunks
- ``False`` for a normal non-streaming response
"""

import openai

PORT = 8080

client = openai.OpenAI(api_key="na", base_url=f"http://0.0.0.0:{PORT}/v1")

streaming = True # streaming enabled toggle

print(f"stream={streaming} response:")
print("-" * 50)

# Request either a streaming or non-streaming response from the dedicated example server
if streaming:
stream_result = client.chat.completions.create(
messages=[
{"role": "user", "content": "Count down from 100 using words not digits."}
],
model="granite4:micro-h",
stream=True,
)
for chunk in stream_result:
if chunk.choices[0].delta.content:
# If you want to see the chunks more clearly separated, change end
print(chunk.choices[0].delta.content, end="", flush=True)
else:
completion_result = client.chat.completions.create(
messages=[
{"role": "user", "content": "Count down from 100 using words not digits."}
],
model="granite4:micro-h",
stream=False,
)
print(completion_result.choices[0].message.content)

print("\n" + "-" * 50)
print("Stream complete!")
41 changes: 41 additions & 0 deletions docs/examples/m_serve/m_serve_example_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# pytest: ollama, e2e

"""Example to run m serve with true streaming support."""

import mellea
from cli.serve.models import ChatMessage
from mellea.backends.model_options import ModelOption
from mellea.core import ComputedModelOutputThunk, ModelOutputThunk
from mellea.stdlib.context import SimpleContext

session = mellea.start_session(ctx=SimpleContext())


async def serve(
input: list[ChatMessage],
requirements: list[str] | None = None,
model_options: dict | None = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
model_options: dict | None = None,
model_options: dict[str, Any] | None = None,

) -> ModelOutputThunk | ComputedModelOutputThunk:
"""Support both normal and streaming responses from the same example.

Returns a computed result for non-streaming requests and an uncomputed thunk
for streaming requests.
"""
del requirements
message = input[-1].content or ""
is_streaming = bool((model_options or {}).get(ModelOption.STREAM, False))

if is_streaming:
return await session.ainstruct(
description=message,
strategy=None,
model_options=model_options,
await_result=False,
)

return await session.ainstruct(
description=message,
strategy=None,
model_options=model_options,
await_result=True,
)
Loading
Loading