Skip to content

Issue 5517#3

Open
MatthiasHowellYopp wants to merge 1 commit intomainfrom
issue-5517
Open

Issue 5517#3
MatthiasHowellYopp wants to merge 1 commit intomainfrom
issue-5517

Conversation

@MatthiasHowellYopp
Copy link
Copy Markdown
Owner

Motivation and Context

The Agent Framework currently has no Valkey-native streaming component. The existing resumable streaming samples (python/samples/04-hosting/azure_functions/03_reliable_streaming/) use a RedisStreamResponseHandler built on redis-py — a sample-level implementation that isn't packaged for reuse and creates a dependency mismatch for Valkey users.

This PR delivers ValkeyStreamBuffer, a first-party, reusable streaming buffer backed by Valkey Streams using the valkey-glide client. It enables durable agents to persist response chunks so clients can disconnect and reconnect mid-stream without data loss.

Related issue: [Python] Add ValkeyStreamBuffer — resumable streaming via Valkey Streams for agent-framework-valkey microsoft#5517

Description

Adds the agent-framework-valkey Python package with a single component: ValkeyStreamBuffer.

###Core implementation (_stream_buffer.py):

  • Write side: write_chunk(), write_completion(), write_error() — append entries to a per-conversation Valkey Stream via XADD with auto-refreshing TTL via EXPIRE
  • Read side: read_stream(conversation_id, cursor=None) — async iterator using XREAD with cursor-based resumption, configurable polling interval, and timeout (both before and after data is seen, so a crashed producer doesn't cause infinite polling)
  • Implements AgentResponseCallbackProtocol from agent-framework-durabletask (on_streaming_response_update / on_agent_response) for direct registration as a durable agent callback
  • Uses the same stream-key schema (agent-stream:{conversation_id}) and entry format (text, sequence, timestamp, done) as the existing Redis streaming samples and the .NET RedisStreamResponseHandler for cross-language consistency
  • Accepts both GlideClient (standalone) and GlideClusterClient (cluster) with TLS support through GlideClientConfiguration
  • Input validation on conversation_id (rejects empty strings)
  • Warning-level logging for malformed stream entries

Package scaffolding:

  • pyproject.toml with valkey-glide>=2.1.0 dependency and optional [durabletask] extra
  • init.py exporting ValkeyStreamBuffer and StreamChunk
  • LICENSE, README.md
  • Workspace registration in
  • pyproject.toml
  • (uv sources + pyright execution environments)

Tests (test_stream_buffer.py — 31 tests):

  • Construction validation (None client, defaults, custom params)
  • Write operations (xadd/expire calls, done marker, error field, empty conversation_id rejection)
  • Read operations (text chunks, cursor resumption, error entries, timeout before data, timeout after data with producer crash, empty-count reset, exception handling, polling)
  • Callback protocol (sequence tracking, increment, empty/missing text, missing thread_id, completion, cleanup)
  • Stream key generation and validation
  • Session isolation (separate sequences per thread)
  • Malformed field logging

Files changed:

init.py (new)
_stream_buffer.py (new)
test_stream_buffer.py (new)
pyproject.toml (new)
python/packages/valkey/LICENSE (new)
README.md (new)
pyproject.toml (workspace registration)

Contribution Checklist

  • The code builds clean without any errors or warnings
  • The PR follows the Contribution Guidelines
  • All unit tests pass, and I have added new tests where possible
  • Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.

Copy link
Copy Markdown

@rileydes-improving rileydes-improving left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple nits and clarifications that may be worth adding, otherwise LGTM

Comment thread .gitignore
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splitting into a separate commit would be a bit cleaner.

]
dependencies = [
"agent-framework-core>=1.2.0,<2",
"valkey-glide>=2.1.0,<3; sys_platform != 'win32'",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a comment why the window's exclusion is deliberate (i.e. not supported by valkey-glide) is probably worth explaining.

return val.decode() if isinstance(val, bytes) else str(val)


def _fields_list_to_dict(fields_list: list[list[bytes]]) -> dict[bytes, bytes]:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type signature does assume xread will always return the inner entries as list[list[bytes], the risk of this changing with minor glide version bumps is minimal but adding an @pytest.mark.integration test that does a round trip write_chunk to read_stream against a real server would likely be good practice just to ensure any depend-a-bot etc automated version bumps would have real tests to catch any regression.

It may be excessive to implement the proper integration testing just for that though.

fields: list[tuple[str, str]],
) -> None:
"""Append an entry to a stream and refresh its TTL."""
await self._client.xadd(stream_key, fields) # type: ignore[union-attr]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth a comment explaining why it is ok to have the xadd and expire as separate commands, even if the process crashes etc between the commands.

i.e. "Note: xadd and expire are issued as two separate commands, so a crash between them could leave a key without a TTL. In practice this is self-healing, the TTL is refreshed on every subsequent write, and Valkey's memory eviction policy covers the final-write edge case."

@MatthiasHowellYopp MatthiasHowellYopp force-pushed the issue-5517 branch 2 times, most recently from 13606f8 to 37b7330 Compare April 28, 2026 13:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants