Skip to content

LCORE-2311: Added agent streaming query utils#1919

Open
asimurka wants to merge 1 commit into
lightspeed-core:mainfrom
asimurka:agent_query_utils
Open

LCORE-2311: Added agent streaming query utils#1919
asimurka wants to merge 1 commit into
lightspeed-core:mainfrom
asimurka:agent_query_utils

Conversation

@asimurka

@asimurka asimurka commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Description

Adds agent streaming query core utility functions that shadow original functions.

Type of change

  • Refactor
  • New feature
  • Bug fix
  • CVE fix
  • Optimization
  • Documentation Update
  • Configuration Update
  • Bump-up service version
  • Bump-up dependent library
  • Bump-up library or tool used for development (does not change the final image)
  • CI configuration change
  • Konflux configuration change
  • Unit tests improvement
  • Integration tests improvement
  • End to end tests improvement
  • Benchmarks improvement

Tools used to create PR

Identify any AI code assistants used in this PR (for transparency and review context)

  • Assisted-by: Cursor

Related Tickets & Documents

Checklist before requesting a review

  • I have performed a self-review of my code.
  • PR has passed all pre-merge test jobs.
  • If it is a core feature, I have added thorough tests.

Testing

  • Please provide detailed steps to perform tests related to this code change.
  • How were the fix/results from this change verified? Please provide relevant screenshots or results.

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced agent response streaming with improved event handling and interruption recovery.
    • Incremental turn summary updates with token usage and referenced document tracking.
  • Bug Fixes

    • Fixed tool result routing to correctly handle error-only responses.
  • Tests

    • Added comprehensive test coverage for agent streaming functionality.

@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Warning

Review limit reached

@asimurka, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 54 minutes and 5 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more credits in the billing tab to continue.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: f2c2d6c6-b9bb-4066-9a94-e715bf2a847a

📥 Commits

Reviewing files that changed from the base of the PR and between eee0e81 and 0990ea5.

📒 Files selected for processing (3)
  • src/utils/agents/streaming.py
  • src/utils/agents/tool_processor.py
  • tests/unit/utils/agents/test_streaming.py

Walkthrough

This PR introduces a complete agent streaming SSE (Server-Sent Events) module for the streaming_query flow, plus comprehensive test coverage and a minor MCP routing fix. The streaming module handles end-to-end SSE payload generation from pydantic-ai runtime events, with event dispatch, turn accumulation, lifecycle orchestration (start/end events, interruption handling, token consumption), and structured result storage. The MCP fix narrows tool-result routing to the "list tools" path only when tools are present.

Changes

Agent Streaming SSE Implementation and Tests

Layer / File(s) Summary
Event Serialization and Dispatch Foundation
src/utils/agents/streaming.py
Module setup with imports, SSE refusal constant, serialize_event function (JSON/text), _process_token helper for token text accumulation, and default dispatch_stream_event singledispatch implementation.
Event Handler Registrations
src/utils/agents/streaming.py
Singledispatch handler registrations for pydantic-ai stream event types: AgentRunResultEvent (final response text), PartStartEvent/PartDeltaEvent/PartEndEvent (token streaming and turn summary updates), and FunctionToolCallEvent/FunctionToolResultEvent (tool call/result payloads).
Agent Streaming Execution
src/utils/agents/streaming.py
agent_response_generator executes the agent with run_stream_events, dispatches events to accumulate turn summary state, computes token usage and finish reason, and emits SSE error payload on non-success completion.
Response Generation Orchestration
src/utils/agents/streaming.py
retrieve_agent_response_generator (moderation check and agent delegation with error mapping) and generate_agent_response (SSE lifecycle: start/end events, interruption/cancellation handling with once-only persistence, error emission, topic summaries, token consumption, and result storage).
Test Fixtures and Setup
tests/unit/utils/agents/test_streaming.py
Imports and comprehensive pytest fixtures for mocking ResponseGeneratorContext, ResponsesApiParams, moderation results, agent run results, and patches for metric recording and interrupt handling.
Serialization and Dispatch Tests
tests/unit/utils/agents/test_streaming.py
Unit tests for serialize_event (JSON/text format validation) and dispatch_stream_event (event routing, turn-summary accumulation, token streaming, tool call/result emission, unknown event handling).
Generator Function Tests
tests/unit/utils/agents/test_streaming.py
Async tests for retrieve_agent_response_generator (blocked moderation, conversation appending, error mapping), generate_agent_response (SSE sequencing, interruption persistence, error emission), and agent_response_generator (event-to-payload mapping, token/turn updates, finish-reason handling, result event detection). Includes helper utilities for SSE extraction and mocking.

MCP Tool Result Routing Fix

Layer / File(s) Summary
MCP Return-Shape Dispatch
src/utils/agents/tool_processor.py
Adjusted summarize_mcp_tool_result to dispatch to "list tools" summarizer only when content contains tools field, removing error-only routing to that path.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • tisnik
  • jrobertboos
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately captures the main addition: new agent streaming query utility functions. The title is concise, clear, and directly reflects the primary changes in the changeset.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

"""
content = cast(dict[str, Any], part.content)
if "tools" in content or "error" in content:
if "tools" in content:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small bug fix in tool processing, error is present in both mcp_list_tools and mcp_call.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/utils/agents/streaming.py`:
- Around line 356-371: The base singledispatch function dispatch_stream_event
currently types its first arg as AgentStreamEvent which is too narrow because
agent.run_stream_events() yields AgentRunResultEvent as well; broaden the type
annotation on dispatch_stream_event to accept the full union of possible stream
event types (e.g., include AgentRunResultEvent or a common
supertype/Union[AgentStreamEvent, AgentRunResultEvent]) so the registered
handler for AgentRunResultEvent can be recognized by mypy and the singledispatch
registry; update the function signature and any import/type references
accordingly.

In `@src/utils/agents/tool_processor.py`:
- Around line 482-484: The dispatcher currently chooses
summarize_mcp_list_tools_result vs summarize_mcp_call_result solely by
inspecting content (checking for "tools"), which causes error-only list-tools
payloads to be mis-typed as mcp_call; change the dispatcher to decide by action
context instead of payload shape: add an explicit parameter (e.g., action or
call_action) to the dispatcher function and update its call sites to pass the
original tool action (or, if unavailable, look up the recorded tool-call summary
by tool_call_id) and then call summarize_mcp_list_tools_result(part, tool_round)
when the action is list-tools (even if content is {"error":...}), otherwise call
summarize_mcp_call_result; update function signatures and any callers
accordingly to preserve the mcp_list_tools result.type for failed list-tools
responses.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 8c30708e-17c9-4dbf-bb49-ba2487a2dda3

📥 Commits

Reviewing files that changed from the base of the PR and between 42bf698 and eee0e81.

📒 Files selected for processing (3)
  • src/utils/agents/streaming.py
  • src/utils/agents/tool_processor.py
  • tests/unit/utils/agents/test_streaming.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-on-pull-request
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-0-6-on-pull-request
  • GitHub Check: spectral
  • GitHub Check: build-pr
  • GitHub Check: unit_tests (3.12)
  • GitHub Check: Pylinter
  • GitHub Check: unit_tests (3.13)
  • GitHub Check: E2E: server mode / ci / group 2
  • GitHub Check: E2E: library mode / ci / group 3
  • GitHub Check: E2E: library mode / ci / group 2
  • GitHub Check: E2E: library mode / ci / group 1
  • GitHub Check: E2E: server mode / ci / group 3
  • GitHub Check: E2E: server mode / ci / group 1
  • GitHub Check: E2E Tests for Lightspeed Evaluation job
🧰 Additional context used
📓 Path-based instructions (2)
src/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

src/**/*.py: Use absolute imports for internal modules: from authentication import get_auth_dependency
Llama Stack imports: Use from llama_stack_client import AsyncLlamaStackClient
Check constants.py for shared constants before defining new ones
All modules must start with descriptive docstrings explaining purpose
Use logger = get_logger(__name__) from log.py for module logging
All functions must have complete type annotations for parameters and return types, use modern syntax (str | int), and include descriptive docstrings
Use snake_case with descriptive, action-oriented names for functions (get_, validate_, check_)
Avoid in-place parameter modification anti-patterns; return new data structures instead of modifying function parameters
Use async def for I/O operations and external API calls
Use standard log levels with clear purposes: debug() for diagnostic info, info() for program execution, warning() for unexpected events, error() for serious problems
All classes must have descriptive docstrings explaining purpose and use PascalCase with standard suffixes: Configuration, Error/Exception, Resolver, Interface
Abstract classes must use ABC with @abstractmethod decorators
Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes

Files:

  • src/utils/agents/tool_processor.py
  • src/utils/agents/streaming.py
tests/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

tests/**/*.py: Use pytest for all unit and integration tests; do not use unittest
Use pytest.mark.asyncio marker for async tests

Files:

  • tests/unit/utils/agents/test_streaming.py
🪛 GitHub Actions: Type checks / 0_mypy.txt
src/utils/agents/streaming.py

[error] 293-293: mypy: Argument 1 to "dispatch_stream_event" has incompatible type "PartStartEvent | PartDeltaEvent | PartEndEvent | FinalResultEvent | FunctionToolCallEvent | <6 more items>"; expected "PartStartEvent | PartDeltaEvent | PartEndEvent | FinalResultEvent | FunctionToolCallEvent | FunctionToolResultEvent | OutputToolCallEvent | OutputToolResultEvent | BuiltinToolCallEvent | BuiltinToolResultEvent". [arg-type]


[error] 375-375: mypy: Dispatch type "AgentRunResultEvent[str]" must be subtype of fallback function first argument "PartStartEvent | PartDeltaEvent | PartEndEvent | FinalResultEvent | FunctionToolCallEvent | FunctionToolResultEvent | OutputToolCallEvent | OutputToolResultEvent | BuiltinToolCallEvent | BuiltinToolResultEvent". [misc]

🪛 GitHub Actions: Type checks / mypy
src/utils/agents/streaming.py

[error] 293-293: mypy error: Argument 1 to "dispatch_stream_event" has incompatible type "PartStartEvent | PartDeltaEvent | PartEndEvent | FinalResultEvent | FunctionToolCallEvent | <6 more items>"; expected "PartStartEvent | PartDeltaEvent | PartEndEvent | FinalResultEvent | FunctionToolCallEvent | FunctionToolResultEvent | OutputToolCallEvent | OutputToolResultEvent | BuiltinToolCallEvent | BuiltinToolResultEvent". [arg-type]


[error] 375-375: mypy error: Dispatch type "AgentRunResultEvent[str]" must be subtype of fallback function first argument "PartStartEvent | PartDeltaEvent | PartEndEvent | FinalResultEvent | FunctionToolCallEvent | FunctionToolResultEvent | OutputToolCallEvent | OutputToolResultEvent | BuiltinToolCallEvent | BuiltinToolResultEvent". [misc]

🔇 Additional comments (8)
tests/unit/utils/agents/test_streaming.py (1)

1-991: LGTM!

src/utils/agents/streaming.py (7)

1-78: LGTM!


81-132: LGTM!


135-260: LGTM!


263-321: LGTM!


324-353: LGTM!


470-476: ⚡ Quick win

Inconsistent newline suffix due to operator precedence.

The or operator has lower precedence than +, so "\n\n" is only appended when part.content is falsy and the accumulated text_parts fallback is used. If part.content has a value, no newlines are added. Verify this asymmetry is intentional.

# Current behavior:
# part.content = "hello" → "hello" (no newlines)
# part.content = None    → "accumulated text\n\n" (newlines)

506-527: LGTM!

Comment thread src/utils/agents/streaming.py
Comment thread src/utils/agents/tool_processor.py
@asimurka asimurka force-pushed the agent_query_utils branch from eee0e81 to 0990ea5 Compare June 12, 2026 10:11
@asimurka asimurka requested review from jrobertboos and tisnik June 12, 2026 10:13
@asimurka

Copy link
Copy Markdown
Contributor Author

/retest

@jrobertboos jrobertboos left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants