feat: streaming support in m serve OpenAI API server#823
feat: streaming support in m serve OpenAI API server#823markstur wants to merge 4 commits intogenerative-computing:mainfrom
Conversation
Fixes: generative-computing#822 Signed-off-by: Mark Sturdevant <mark.sturdevant@ibm.com>
|
The PR description has been updated. Please fill out the template for your PR to be reviewed. |
Signed-off-by: Mark Sturdevant <mark.sturdevant@ibm.com>
Signed-off-by: Mark Sturdevant <mark.sturdevant@ibm.com>
Added streaming support w/ setting system_fingerprint. Make it consistent. We are currently just setting it to None but now it is consistent for future use. Signed-off-by: Mark Sturdevant <mark.sturdevant@ibm.com>
| Server-sent event payload strings representing OpenAI-compatible chat | ||
| completion chunks, including the terminating ``[DONE]`` event. | ||
| """ | ||
| from cli.serve.models import ( |
There was a problem hiding this comment.
There's a layering concern here worth discussing. mellea/helpers/ sits in the library layer, with cli/ as a consumer — so an import in this direction inverts that relationship. If someone imports mellea.helpers.openai_compatible_helpers outside the cli/ context, this will raise an ImportError at call time.
Two paths forward:
- Move
stream_chat_completion_chunksintocli/serve/(perhapscli/serve/streaming.py) — since it's really CLI-specific glue, it arguably belongs there anyway. - Move
ChatCompletionChunk,ChatCompletionChunkChoice, andChatCompletionChunkDeltaintomellea/helpers/alongsideCompletionUsage, removing the need to reach intocli/at all.
Option 1 is probably the simpler of the two.
| previous_length = 0 | ||
| while not output.is_computed(): | ||
| new_content = await output.astream() | ||
| previous_length += len(new_content) |
There was a problem hiding this comment.
previous_length is incremented here but doesn't appear to be read anywhere afterwards — worth double-checking whether this was intentional.
It would also be helpful to clarify the contract of astream(): does it return only the new fragment since the last call, or the full accumulated text so far? The answer changes what this function should do with the value. The helper-level tests in test_serve_streaming.py mock it to return deltas, while the endpoint-level tests (e.g. test_streaming_response_format) mock it to return accumulated text — so the two sets of tests appear to be working from different assumptions about the API. Might be worth aligning them.
| error_response = OpenAIErrorResponse( | ||
| error=OpenAIError(message=f"Streaming error: {e!s}", type="server_error") | ||
| ) | ||
| yield f"data: {error_response.model_dump_json()}\n\n" |
There was a problem hiding this comment.
It looks like the error path exits without emitting data: [DONE]\n\n. Most SSE clients — including the official openai Python SDK — wait for that sentinel to consider the stream closed, so they'll block until timeout if an exception is raised during streaming.
Adding yield "data: [DONE]\n\n" after the error chunk yield should do it.
| "temperature": ModelOption.TEMPERATURE, | ||
| "max_tokens": ModelOption.MAX_NEW_TOKENS, | ||
| "seed": ModelOption.SEED, | ||
| "stream": ModelOption.STREAM, |
There was a problem hiding this comment.
One thing to be aware of: because ChatCompletionRequest.stream defaults to False, this mapping means every non-streaming request will now forward ModelOption.STREAM: False to the backend — even when streaming was never requested. That's a quiet behavioural change from the previous behaviour where stream was simply ignored.
Backends that don't recognise ModelOption.STREAM may handle the unexpected key in unexpected ways. A couple of options to consider:
- Return
"stream"to the excluded-fields set and handle it separately, asstream_optionsis handled just above. - Only inject
STREAMwhenrequest.streamisTrue, leaving the non-streaming path as it was.
planetf1
left a comment
There was a problem hiding this comment.
noticed a few things to tighten up.
Minor, but looking at the tests - The TestStreamingEndpoint tests (e.g. line 166) are marked @pytest.mark.asyncio and declared async def but contain no await — TestClient is synchronous and doesn't need the marker. TestStreamingHelpers is fine.
Suspect it will still work but it implies the wrong behaviour?
| yield f"data: {chunk.model_dump_json()}\n\n" | ||
|
|
||
| # Include usage in final chunk if requested via stream_options | ||
| # Default to True (include usage) for backward compatibility |
There was a problem hiding this comment.
note that the openai spec default is False
From the OpenAI API reference at https://platform.openai.com/docs/api-reference/chat-streaming, the ChatCompletionChunk.usage field is documented as:
"An optional field that will only be present when you set stream_options: {"include_usage": true} in your request."
| # included in the final streaming chunk. Defaults to True (include usage) | ||
| # when not specified for backward compatibility. For non-streaming requests | ||
| # (stream=False), usage is always included regardless of this parameter. | ||
| stream_options: dict[str, Any] | None = None |
There was a problem hiding this comment.
In openAI this is Pydantic typed object - but I presume you chose this so you don't do any enforcement/checks in case those options change in future - understandable (though it could mean typos are missed). Am ok either way.
| # In another terminal, test with the non-streaming client | ||
| python docs/examples/m_serve/client.py | ||
|
|
||
| ### Streaming |
There was a problem hiding this comment.
I think this is meant to be a heading - needs to have the code fencing terminated in the previous section and started again here (for bash)
| async def serve( | ||
| input: list[ChatMessage], | ||
| requirements: list[str] | None = None, | ||
| model_options: dict | None = None, |
There was a problem hiding this comment.
| model_options: dict | None = None, | |
| model_options: dict[str, Any] | None = None, |
| for chunk in stream: | ||
| if chunk.choices[0].delta.content: | ||
| print(chunk.choices[0].delta.content, end="", flush=True) | ||
| ``` |
| choices=[ | ||
| ChatCompletionChunkChoice( | ||
| index=0, | ||
| delta=ChatCompletionChunkDelta(role="assistant", content=""), |
There was a problem hiding this comment.
| delta=ChatCompletionChunkDelta(role="assistant", content=""), | |
| delta=ChatCompletionChunkDelta(role="assistant", content=None), |
I believe this is more in line with how OpenAI sends it
| choices=[ | ||
| ChatCompletionChunkChoice( | ||
| index=0, | ||
| delta=ChatCompletionChunkDelta(content=""), |
There was a problem hiding this comment.
| delta=ChatCompletionChunkDelta(content=""), | |
| delta=ChatCompletionChunkDelta(content=None), |
same as above
Misc PR
Type of PR
Description
Add OpenAI API compatible support for streaming in
m serveapp.Testing