diff --git a/.gitignore b/.gitignore index 283f626207..cd8e81500e 100644 --- a/.gitignore +++ b/.gitignore @@ -213,6 +213,7 @@ WARP.md **/memory-bank/ **/projectBrief.md **/tmpclaude* +.kiro/ # Dependency-bound validation reports python/scripts/dependency-*-results.json python/scripts/dependencies/dependency-*-results.json diff --git a/python/packages/valkey/LICENSE b/python/packages/valkey/LICENSE new file mode 100644 index 0000000000..22aed37e65 --- /dev/null +++ b/python/packages/valkey/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) Microsoft Corporation. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/python/packages/valkey/README.md b/python/packages/valkey/README.md new file mode 100644 index 0000000000..31ce2de00b --- /dev/null +++ b/python/packages/valkey/README.md @@ -0,0 +1,57 @@ +# agent-framework-valkey + +Valkey integration for the [Microsoft Agent Framework](https://aka.ms/agent-framework). + +## Components + +- **ValkeyStreamBuffer** — Resumable streaming buffer backed by Valkey Streams. + Persists agent response chunks via `XADD` and supports cursor-based client + reconnection via `XREAD`. Implements `AgentResponseCallbackProtocol` for + direct use with durable agent workers. + +## Installation + +```bash +pip install agent-framework-valkey +``` + +For durable agent callback support: + +```bash +pip install agent-framework-valkey[durabletask] +``` + +## Quick Start + +```python +import asyncio + +from glide import GlideClient, GlideClientConfiguration, NodeAddress +from agent_framework_valkey import ValkeyStreamBuffer + + +async def main(): + config = GlideClientConfiguration([NodeAddress("localhost", 6379)]) + client = await GlideClient.create(config) + buffer = ValkeyStreamBuffer(client=client) + + # Write side + await buffer.write_chunk("conv-1", "Hello, ", 0) + await buffer.write_chunk("conv-1", "world!", 1) + await buffer.write_completion("conv-1", 2) + + # Read side (supports cursor-based resumption) + async for chunk in buffer.read_stream("conv-1"): + if chunk.is_done: + break + print(chunk.text, end="") + + +asyncio.run(main()) +``` + +## Requirements + +- Python 3.10+ +- Valkey server (any version supporting Streams) +- `valkey-glide` client library diff --git a/python/packages/valkey/agent_framework_valkey/__init__.py b/python/packages/valkey/agent_framework_valkey/__init__.py new file mode 100644 index 0000000000..d1f8717b5a --- /dev/null +++ b/python/packages/valkey/agent_framework_valkey/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) Microsoft. All rights reserved. + +import importlib.metadata + +from ._stream_buffer import StreamChunk, ValkeyStreamBuffer + +try: + __version__ = importlib.metadata.version(__name__) +except importlib.metadata.PackageNotFoundError: + __version__ = "0.0.0" # Fallback for development mode + +__all__ = [ + "StreamChunk", + "ValkeyStreamBuffer", + "__version__", +] diff --git a/python/packages/valkey/agent_framework_valkey/_stream_buffer.py b/python/packages/valkey/agent_framework_valkey/_stream_buffer.py new file mode 100644 index 0000000000..51b9c49a2f --- /dev/null +++ b/python/packages/valkey/agent_framework_valkey/_stream_buffer.py @@ -0,0 +1,396 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Valkey-backed resumable stream buffer for durable agent responses. + +This module provides :class:`ValkeyStreamBuffer`, a reliable streaming buffer +that persists agent response chunks to Valkey Streams (``XADD``/``XREAD``) +using the ``valkey-glide`` client. Clients can disconnect and reconnect +mid-stream without data loss by supplying the last-seen entry ID as a cursor. + +The class implements :class:`AgentResponseCallbackProtocol` so it can be +registered directly as a callback with durable agent workers, and also +exposes lower-level ``write_chunk`` / ``read_stream`` methods for custom +integration. +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from collections.abc import AsyncIterator +from dataclasses import dataclass +from datetime import timedelta +from typing import TYPE_CHECKING, Union + +if TYPE_CHECKING: + from agent_framework import AgentResponse, AgentResponseUpdate + from agent_framework_durabletask import AgentCallbackContext + from glide import GlideClient, GlideClusterClient + +# Union accepted by all public methods so callers can pass either client type. +TGlideClient = Union["GlideClient", "GlideClusterClient"] + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Default constants +# --------------------------------------------------------------------------- +_DEFAULT_STREAM_TTL = timedelta(minutes=10) +_DEFAULT_KEY_PREFIX = "agent-stream" +_MAX_EMPTY_READS = 300 +_POLL_INTERVAL_SECONDS = 1.0 +# Number of entries to fetch per XREAD call. Kept as a module constant +# rather than a constructor parameter because tuning it has negligible +# impact on correctness — it only affects per-poll batch size. +_READ_COUNT = 100 + + +@dataclass +class StreamChunk: + """A single chunk read from a Valkey Stream. + + Attributes: + entry_id: The Valkey stream entry ID (use as cursor for resumption). + text: The text content of the chunk, if any. + is_done: Whether this chunk marks the end of the stream. + error: Error message if something went wrong, otherwise ``None``. + """ + + entry_id: str + text: str | None = None + is_done: bool = False + error: str | None = None + + +class ValkeyStreamBuffer: + """Resumable stream buffer backed by Valkey Streams. + + Writes agent response chunks via ``XADD`` and reads them back via + ``XREAD``, supporting cursor-based resumption. Each conversation gets + its own Valkey Stream keyed by ``{key_prefix}:{conversation_id}``. + + The class also satisfies the ``AgentResponseCallbackProtocol`` from + ``agent-framework-durabletask`` so it can be passed directly as a + ``callback`` when registering agents with a durable worker. + + Args: + client: A connected ``GlideClient`` or ``GlideClusterClient``. + stream_ttl: Time-to-live for stream keys. Refreshed on every write. + key_prefix: Prefix for Valkey stream keys. + max_empty_reads: Maximum consecutive empty reads before timing out + on the read side. + poll_interval_seconds: Seconds to sleep between read polls. + + Example: + .. code-block:: python + + from glide import GlideClient, GlideClientConfiguration, NodeAddress + from agent_framework_valkey import ValkeyStreamBuffer + + config = GlideClientConfiguration([NodeAddress("localhost", 6379)]) + client = await GlideClient.create(config) + buffer = ValkeyStreamBuffer(client=client) + + # Write side + await buffer.write_chunk("conv-1", "Hello, ", 0) + await buffer.write_chunk("conv-1", "world!", 1) + await buffer.write_completion("conv-1", 2) + + # Read side + async for chunk in buffer.read_stream("conv-1"): + if chunk.is_done: + break + print(chunk.text, end="") + """ + + def __init__( + self, + client: TGlideClient, + *, + stream_ttl: timedelta = _DEFAULT_STREAM_TTL, + key_prefix: str = _DEFAULT_KEY_PREFIX, + max_empty_reads: int = _MAX_EMPTY_READS, + poll_interval_seconds: float = _POLL_INTERVAL_SECONDS, + ) -> None: + if client is None: + raise ValueError("client must not be None") + self._client: TGlideClient = client + self._stream_ttl_seconds = int(stream_ttl.total_seconds()) + self._key_prefix = key_prefix + self._max_empty_reads = max_empty_reads + self._poll_interval_seconds = poll_interval_seconds + # Track per-conversation sequence numbers for callback usage. + self._sequence_numbers: dict[str, int] = {} + + # ------------------------------------------------------------------ + # Write API + # ------------------------------------------------------------------ + + async def write_chunk( + self, + conversation_id: str, + text: str, + sequence: int, + ) -> None: + """Write a single text chunk to the Valkey Stream. + + Args: + conversation_id: Conversation / session identifier. + text: The text content to persist. + sequence: Monotonically increasing sequence number. + + Raises: + ValueError: If *conversation_id* is empty. + """ + stream_key = self._stream_key(conversation_id) + await self._xadd_and_expire( + stream_key, + [ + ("text", text), + ("sequence", str(sequence)), + ("timestamp", str(int(time.time() * 1000))), + ], + ) + + async def write_completion( + self, + conversation_id: str, + sequence: int, + ) -> None: + """Write an end-of-stream sentinel to the Valkey Stream. + + Args: + conversation_id: Conversation / session identifier. + sequence: Final sequence number. + + Raises: + ValueError: If *conversation_id* is empty. + """ + stream_key = self._stream_key(conversation_id) + await self._xadd_and_expire( + stream_key, + [ + ("text", ""), + ("sequence", str(sequence)), + ("timestamp", str(int(time.time() * 1000))), + ("done", "true"), + ], + ) + + async def write_error( + self, + conversation_id: str, + error: str, + sequence: int, + ) -> None: + """Write an error entry to the Valkey Stream. + + Args: + conversation_id: Conversation / session identifier. + error: The error message. + sequence: Sequence number. + + Raises: + ValueError: If *conversation_id* is empty. + """ + stream_key = self._stream_key(conversation_id) + await self._xadd_and_expire( + stream_key, + [ + ("error", error), + ("sequence", str(sequence)), + ("timestamp", str(int(time.time() * 1000))), + ], + ) + + # ------------------------------------------------------------------ + # Read API + # ------------------------------------------------------------------ + + async def read_stream( + self, + conversation_id: str, + cursor: str | None = None, + ) -> AsyncIterator[StreamChunk]: + """Read chunks from a Valkey Stream with cursor-based resumption. + + Polls the stream for new entries, yielding :class:`StreamChunk` + instances as they arrive. Pass the ``entry_id`` of the last + received chunk as ``cursor`` to resume after a disconnect. + + The reader times out after ``max_empty_reads`` consecutive empty + polls — both before *and* after data has been seen. This prevents + the reader from polling forever if the producer crashes mid-stream. + + Args: + conversation_id: Conversation / session identifier. + cursor: Entry ID to resume from (exclusive). ``None`` reads + from the beginning. + + Yields: + :class:`StreamChunk` instances. + + Raises: + ValueError: If *conversation_id* is empty. + """ + stream_key = self._stream_key(conversation_id) + start_id = cursor if cursor else "0-0" + + # Build read options if valkey-glide is installed. On Windows the + # package is unavailable, but unit tests still exercise this path + # with a mocked client that accepts any arguments. + read_options = None + try: + from glide import StreamReadOptions + + read_options = StreamReadOptions(count=_READ_COUNT) + except ImportError: + pass + + empty_read_count = 0 + + while True: + try: + if read_options is not None: + result = await self._client.xread( # type: ignore[union-attr] + {stream_key: start_id}, + read_options, + ) + else: + result = await self._client.xread( # type: ignore[union-attr] + {stream_key: start_id}, + ) + + if not result: + empty_read_count += 1 + if empty_read_count >= self._max_empty_reads: + timeout_secs = self._max_empty_reads * self._poll_interval_seconds + yield StreamChunk( + entry_id=start_id, + error=f"Stream not found or timed out after {timeout_secs} seconds", + ) + return + + await asyncio.sleep(self._poll_interval_seconds) + continue + + # Reset counter whenever we receive data so a stalled + # producer is detected even after earlier successful reads. + empty_read_count = 0 + + for _stream_name, entries in result.items(): + for entry_id_bytes, fields_list in entries.items(): + entry_id = _decode_value(entry_id_bytes) + start_id = entry_id + + # Convert [[field, value], ...] list to a dict. + fields = _fields_list_to_dict(fields_list) + + error_val = fields.get(b"error") + if error_val: + yield StreamChunk(entry_id=entry_id, error=_decode_value(error_val)) + return + + done_val = fields.get(b"done") + if done_val and _decode_value(done_val) == "true": + yield StreamChunk(entry_id=entry_id, is_done=True) + return + + if b"text" in fields: + text_str = _decode_value(fields[b"text"]) + yield StreamChunk(entry_id=entry_id, text=text_str) + + except Exception as exc: + yield StreamChunk(entry_id=start_id, error=str(exc)) + return + + # ------------------------------------------------------------------ + # AgentResponseCallbackProtocol implementation + # ------------------------------------------------------------------ + + async def on_streaming_response_update( + self, + update: AgentResponseUpdate, + context: AgentCallbackContext, + ) -> None: + """Handle a streaming response update from a durable agent. + + Satisfies ``AgentResponseCallbackProtocol.on_streaming_response_update``. + """ + thread_id: str | None = context.thread_id + if not thread_id: + return + + text: str | None = update.text if update.text else None + if not text: + return + + seq = self._sequence_numbers.get(thread_id, 0) + self._sequence_numbers[thread_id] = seq + 1 + await self.write_chunk(thread_id, text, seq) + + async def on_agent_response( + self, + response: AgentResponse, + context: AgentCallbackContext, + ) -> None: + """Handle the final agent response from a durable agent. + + Satisfies ``AgentResponseCallbackProtocol.on_agent_response``. + """ + thread_id: str | None = context.thread_id + if not thread_id: + return + + seq = self._sequence_numbers.pop(thread_id, 0) + await self.write_completion(thread_id, seq) + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + def _stream_key(self, conversation_id: str) -> str: + """Build the Valkey key for a conversation stream. + + Raises: + ValueError: If *conversation_id* is empty. + """ + if not conversation_id: + raise ValueError("conversation_id must not be empty") + return f"{self._key_prefix}:{conversation_id}" + + async def _xadd_and_expire( + self, + stream_key: str, + fields: list[tuple[str, str]], + ) -> None: + """Append an entry to a stream and refresh its TTL. + + Note: ``XADD`` and ``EXPIRE`` are issued as two separate commands, so + a crash between them could leave a key without a TTL. In practice + this is self-healing — the TTL is refreshed on every subsequent write, + and Valkey's memory-eviction policy covers the final-write edge case. + """ + await self._client.xadd(stream_key, fields) # type: ignore[union-attr] + await self._client.expire(stream_key, self._stream_ttl_seconds) # type: ignore[union-attr] + + +def _decode_value(val: bytes | str) -> str: + """Decode a bytes or str value to str.""" + return val.decode() if isinstance(val, bytes) else str(val) + + +def _fields_list_to_dict(fields_list: list[list[bytes]]) -> dict[bytes, bytes]: + """Convert ``[[field, value], ...]`` to ``{field: value}``. + + Pairs with fewer than 2 elements are skipped with a warning, as they + indicate corrupt or unexpected stream data. + """ + result: dict[bytes, bytes] = {} + for pair in fields_list: + if len(pair) >= 2: + result[pair[0]] = pair[1] + else: + logger.warning("Skipping malformed stream entry field (expected [key, value], got %d elements)", len(pair)) + return result diff --git a/python/packages/valkey/pyproject.toml b/python/packages/valkey/pyproject.toml new file mode 100644 index 0000000000..f3f90c7f1d --- /dev/null +++ b/python/packages/valkey/pyproject.toml @@ -0,0 +1,99 @@ +[project] +name = "agent-framework-valkey" +description = "Valkey integration for Microsoft Agent Framework." +authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}] +readme = "README.md" +requires-python = ">=3.10" +version = "1.0.0b260427" +license-files = ["LICENSE"] +urls.homepage = "https://aka.ms/agent-framework" +urls.source = "https://github.com/microsoft/agent-framework/tree/main/python" +urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true" +urls.issues = "https://github.com/microsoft/agent-framework/issues" +classifiers = [ + "License :: OSI Approved :: MIT License", + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Typing :: Typed", +] +dependencies = [ + "agent-framework-core>=1.2.0,<2", + # valkey-glide does not publish Windows wheels; exclude win32. + "valkey-glide>=2.1.0,<3; sys_platform != 'win32'", +] + +[project.optional-dependencies] +durabletask = [ + "agent-framework-durabletask>=1.0.0,<2", +] + +[tool.uv] +prerelease = "if-necessary-or-explicit" +environments = [ + "sys_platform == 'darwin'", + "sys_platform == 'linux'" +] + +[tool.uv-dynamic-versioning] +fallback-version = "0.0.0" + +[tool.pytest.ini_options] +testpaths = 'tests' +addopts = "-ra -q -r fEX" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +timeout = 120 +markers = [ + "integration: marks tests as integration tests that require external services", +] + +[tool.ruff] +extend = "../../pyproject.toml" + +[tool.coverage.run] +omit = [ + "**/__init__.py" +] + +[tool.pyright] +extends = "../../pyproject.toml" +include = ["agent_framework_valkey"] + +[tool.mypy] +strict = true +python_version = "3.10" +ignore_missing_imports = true +disallow_untyped_defs = true +no_implicit_optional = true +check_untyped_defs = true +warn_return_any = true +show_error_codes = true +warn_unused_ignores = false +disallow_incomplete_defs = true +disallow_untyped_decorators = true + +[tool.bandit] +targets = ["agent_framework_valkey"] +exclude_dirs = ["tests"] + +[tool.poe] +executor.type = "uv" +include = "../../shared_tasks.toml" + +[tool.poe.tasks.mypy] +help = "Run MyPy for this package." +cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_valkey" + +[tool.poe.tasks.test] +help = "Run the default unit test suite for this package." +cmd = 'pytest -m "not integration" --cov=agent_framework_valkey --cov-report=term-missing:skip-covered tests' + +[build-system] +requires = ["flit-core >= 3.11,<4.0"] +build-backend = "flit_core.buildapi" diff --git a/python/packages/valkey/tests/test_stream_buffer.py b/python/packages/valkey/tests/test_stream_buffer.py new file mode 100644 index 0000000000..375bdebbf3 --- /dev/null +++ b/python/packages/valkey/tests/test_stream_buffer.py @@ -0,0 +1,583 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Unit tests for ValkeyStreamBuffer with a mocked valkey-glide client.""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from datetime import timedelta +from typing import Protocol +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agent_framework_valkey import StreamChunk, ValkeyStreamBuffer + +# --------------------------------------------------------------------------- +# Protocol-aligned fakes — mirror the real protocol shapes so tests catch +# interface drift without importing the actual durabletask package. +# --------------------------------------------------------------------------- + + +class HasText(Protocol): + """Minimal shape of AgentResponseUpdate used by ValkeyStreamBuffer.""" + + @property + def text(self) -> str: ... + + +class HasThreadId(Protocol): + """Minimal shape of AgentCallbackContext used by ValkeyStreamBuffer.""" + + @property + def thread_id(self) -> str | None: ... + + @property + def agent_name(self) -> str: ... + + @property + def correlation_id(self) -> str: ... + + +@dataclass(frozen=True) +class FakeCallbackContext: + """Fake satisfying the AgentCallbackContext shape.""" + + agent_name: str = "test-agent" + correlation_id: str = "corr-1" + thread_id: str | None = "thread-1" + request_message: str | None = None + + +@dataclass +class FakeUpdate: + """Fake satisfying the AgentResponseUpdate.text shape.""" + + text: str | None = None + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_client() -> AsyncMock: + """Create a mock GlideClient with async stream methods.""" + client = AsyncMock() + client.xadd = AsyncMock(return_value=b"1234567890-0") + client.expire = AsyncMock(return_value=True) + client.xread = AsyncMock(return_value=None) + return client + + +def _xread_entries( + stream_key: str, + entries: dict[str, list[list[bytes]]], +) -> dict[bytes, dict[bytes, list[list[bytes]]]]: + """Build a valkey-glide-style xread return value.""" + return { + stream_key.encode(): {eid.encode(): fields for eid, fields in entries.items()} + } + + +# --------------------------------------------------------------------------- +# Construction +# --------------------------------------------------------------------------- + + +class TestConstruction: + def test_rejects_none_client(self) -> None: + with pytest.raises(ValueError, match="client must not be None"): + ValkeyStreamBuffer(client=None) # type: ignore[arg-type] + + def test_default_parameters(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + assert buf._stream_ttl_seconds == 600 + assert buf._key_prefix == "agent-stream" + assert buf._max_empty_reads == 300 + + def test_custom_parameters(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer( + client=client, + stream_ttl=timedelta(minutes=5), + key_prefix="custom-prefix", + max_empty_reads=10, + poll_interval_seconds=0.5, + ) + assert buf._stream_ttl_seconds == 300 + assert buf._key_prefix == "custom-prefix" + assert buf._max_empty_reads == 10 + assert buf._poll_interval_seconds == 0.5 + + +# --------------------------------------------------------------------------- +# Write operations +# --------------------------------------------------------------------------- + + +class TestWriteChunk: + @pytest.mark.asyncio + async def test_write_chunk_calls_xadd_and_expire(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + + await buf.write_chunk("conv-1", "hello", 0) + + client.xadd.assert_awaited_once() + args = client.xadd.call_args + assert args[0][0] == "agent-stream:conv-1" + fields = args[0][1] + field_dict = {k: v for k, v in fields} + assert field_dict["text"] == "hello" + assert field_dict["sequence"] == "0" + assert "timestamp" in field_dict + + client.expire.assert_awaited_once_with("agent-stream:conv-1", 600) + + @pytest.mark.asyncio + async def test_write_completion_includes_done_marker(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + + await buf.write_completion("conv-1", 5) + + args = client.xadd.call_args + fields = args[0][1] + field_dict = {k: v for k, v in fields} + assert field_dict["done"] == "true" + assert field_dict["text"] == "" + assert field_dict["sequence"] == "5" + + @pytest.mark.asyncio + async def test_write_error_includes_error_field(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + + await buf.write_error("conv-1", "something broke", 3) + + args = client.xadd.call_args + fields = args[0][1] + field_dict = {k: v for k, v in fields} + assert field_dict["error"] == "something broke" + assert field_dict["sequence"] == "3" + + @pytest.mark.asyncio + async def test_write_chunk_rejects_empty_conversation_id(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + + with pytest.raises(ValueError, match="conversation_id must not be empty"): + await buf.write_chunk("", "text", 0) + + @pytest.mark.asyncio + async def test_write_completion_rejects_empty_conversation_id(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + + with pytest.raises(ValueError, match="conversation_id must not be empty"): + await buf.write_completion("", 0) + + @pytest.mark.asyncio + async def test_write_error_rejects_empty_conversation_id(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + + with pytest.raises(ValueError, match="conversation_id must not be empty"): + await buf.write_error("", "err", 0) + + +# --------------------------------------------------------------------------- +# Read operations +# --------------------------------------------------------------------------- + + +class TestReadStream: + @pytest.mark.asyncio + async def test_read_text_chunks(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client, poll_interval_seconds=0) + + client.xread = AsyncMock( + side_effect=[ + _xread_entries( + "agent-stream:conv-1", + { + "100-0": [[b"text", b"Hello "], [b"sequence", b"0"]], + "100-1": [[b"text", b"world!"], [b"sequence", b"1"]], + "100-2": [[b"text", b""], [b"done", b"true"], [b"sequence", b"2"]], + }, + ), + ] + ) + + chunks: list[StreamChunk] = [] + async for chunk in buf.read_stream("conv-1"): + chunks.append(chunk) + + assert len(chunks) == 3 + assert chunks[0].text == "Hello " + assert chunks[0].entry_id == "100-0" + assert chunks[1].text == "world!" + assert chunks[2].is_done is True + assert chunks[2].entry_id == "100-2" + + @pytest.mark.asyncio + async def test_read_with_cursor_resumes(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client, poll_interval_seconds=0) + + client.xread = AsyncMock( + side_effect=[ + _xread_entries( + "agent-stream:conv-1", + { + "200-0": [[b"text", b"resumed chunk"], [b"sequence", b"1"]], + "200-1": [[b"text", b""], [b"done", b"true"], [b"sequence", b"2"]], + }, + ), + ] + ) + + chunks: list[StreamChunk] = [] + async for chunk in buf.read_stream("conv-1", cursor="100-1"): + chunks.append(chunk) + + # Verify xread was called with the cursor as start ID. + call_args = client.xread.call_args + keys_and_ids = call_args[0][0] + assert keys_and_ids["agent-stream:conv-1"] == "100-1" + + assert len(chunks) == 2 + assert chunks[0].text == "resumed chunk" + + @pytest.mark.asyncio + async def test_read_error_entry_stops_stream(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client, poll_interval_seconds=0) + + client.xread = AsyncMock( + side_effect=[ + _xread_entries( + "agent-stream:conv-1", + { + "300-0": [[b"error", b"upstream failure"], [b"sequence", b"0"]], + }, + ), + ] + ) + + chunks: list[StreamChunk] = [] + async for chunk in buf.read_stream("conv-1"): + chunks.append(chunk) + + assert len(chunks) == 1 + assert chunks[0].error == "upstream failure" + + @pytest.mark.asyncio + async def test_read_timeout_on_empty_stream(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer( + client=client, + max_empty_reads=3, + poll_interval_seconds=0, + ) + + # xread always returns None (empty). + client.xread = AsyncMock(return_value=None) + + chunks: list[StreamChunk] = [] + async for chunk in buf.read_stream("conv-1"): + chunks.append(chunk) + + assert len(chunks) == 1 + assert chunks[0].error is not None + assert "timed out" in chunks[0].error + + @pytest.mark.asyncio + async def test_read_timeout_after_data_seen(self) -> None: + """Reader times out even after receiving earlier data (producer crash).""" + client = _make_client() + buf = ValkeyStreamBuffer( + client=client, + max_empty_reads=2, + poll_interval_seconds=0, + ) + + # First call returns data, then producer goes silent. + client.xread = AsyncMock( + side_effect=[ + _xread_entries( + "agent-stream:conv-1", + {"500-0": [[b"text", b"partial"], [b"sequence", b"0"]]}, + ), + None, + None, # hits max_empty_reads + ] + ) + + chunks: list[StreamChunk] = [] + async for chunk in buf.read_stream("conv-1"): + chunks.append(chunk) + + assert len(chunks) == 2 + assert chunks[0].text == "partial" + assert chunks[1].error is not None + assert "timed out" in chunks[1].error + + @pytest.mark.asyncio + async def test_read_resets_empty_count_on_data(self) -> None: + """Empty-read counter resets when data arrives, preventing false timeouts.""" + client = _make_client() + buf = ValkeyStreamBuffer( + client=client, + max_empty_reads=3, + poll_interval_seconds=0, + ) + + # Two empties, then data, then two more empties, then done. + client.xread = AsyncMock( + side_effect=[ + None, + None, + _xread_entries( + "agent-stream:conv-1", + {"600-0": [[b"text", b"ok"], [b"sequence", b"0"]]}, + ), + None, + None, + _xread_entries( + "agent-stream:conv-1", + {"600-1": [[b"text", b""], [b"done", b"true"], [b"sequence", b"1"]]}, + ), + ] + ) + + chunks: list[StreamChunk] = [] + async for chunk in buf.read_stream("conv-1"): + chunks.append(chunk) + + assert len(chunks) == 2 + assert chunks[0].text == "ok" + assert chunks[1].is_done is True + + @pytest.mark.asyncio + async def test_read_exception_yields_error_chunk(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client, poll_interval_seconds=0) + + client.xread = AsyncMock(side_effect=ConnectionError("connection lost")) + + chunks: list[StreamChunk] = [] + async for chunk in buf.read_stream("conv-1"): + chunks.append(chunk) + + assert len(chunks) == 1 + assert chunks[0].error == "connection lost" + + @pytest.mark.asyncio + async def test_read_polls_until_data_arrives(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client, poll_interval_seconds=0) + + # First two reads return nothing, third returns data. + client.xread = AsyncMock( + side_effect=[ + None, + None, + _xread_entries( + "agent-stream:conv-1", + { + "400-0": [[b"text", b"delayed"], [b"sequence", b"0"]], + "400-1": [[b"text", b""], [b"done", b"true"], [b"sequence", b"1"]], + }, + ), + ] + ) + + chunks: list[StreamChunk] = [] + async for chunk in buf.read_stream("conv-1"): + chunks.append(chunk) + + assert client.xread.await_count == 3 + assert len(chunks) == 2 + assert chunks[0].text == "delayed" + assert chunks[1].is_done is True + + @pytest.mark.asyncio + async def test_read_rejects_empty_conversation_id(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + + with pytest.raises(ValueError, match="conversation_id must not be empty"): + async for _ in buf.read_stream(""): + pass # pragma: no cover + + +# --------------------------------------------------------------------------- +# Callback protocol +# --------------------------------------------------------------------------- + + +class TestCallbackProtocol: + @pytest.mark.asyncio + async def test_on_streaming_response_update_writes_chunk(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + ctx = FakeCallbackContext(thread_id="t-1") + update = FakeUpdate(text="chunk text") + + await buf.on_streaming_response_update(update, ctx) # type: ignore[arg-type] + + client.xadd.assert_awaited_once() + args = client.xadd.call_args + assert args[0][0] == "agent-stream:t-1" + field_dict = {k: v for k, v in args[0][1]} + assert field_dict["text"] == "chunk text" + assert field_dict["sequence"] == "0" + + @pytest.mark.asyncio + async def test_on_streaming_response_update_increments_sequence(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + ctx = FakeCallbackContext(thread_id="t-1") + + await buf.on_streaming_response_update(FakeUpdate(text="a"), ctx) # type: ignore[arg-type] + await buf.on_streaming_response_update(FakeUpdate(text="b"), ctx) # type: ignore[arg-type] + + assert client.xadd.await_count == 2 + second_call_fields = {k: v for k, v in client.xadd.call_args_list[1][0][1]} + assert second_call_fields["sequence"] == "1" + + @pytest.mark.asyncio + async def test_on_streaming_response_update_skips_empty_text(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + ctx = FakeCallbackContext(thread_id="t-1") + + await buf.on_streaming_response_update(FakeUpdate(text=""), ctx) # type: ignore[arg-type] + await buf.on_streaming_response_update(FakeUpdate(text=None), ctx) # type: ignore[arg-type] + + client.xadd.assert_not_awaited() + + @pytest.mark.asyncio + async def test_on_streaming_response_update_skips_no_thread_id(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + ctx = FakeCallbackContext(thread_id=None) + + await buf.on_streaming_response_update(FakeUpdate(text="data"), ctx) # type: ignore[arg-type] + + client.xadd.assert_not_awaited() + + @pytest.mark.asyncio + async def test_on_agent_response_writes_completion(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + ctx = FakeCallbackContext(thread_id="t-1") + + # Simulate one chunk first to set sequence. + await buf.on_streaming_response_update(FakeUpdate(text="x"), ctx) # type: ignore[arg-type] + client.xadd.reset_mock() + + await buf.on_agent_response(MagicMock(), ctx) # type: ignore[arg-type] + + client.xadd.assert_awaited_once() + field_dict = {k: v for k, v in client.xadd.call_args[0][1]} + assert field_dict["done"] == "true" + assert field_dict["sequence"] == "1" + + @pytest.mark.asyncio + async def test_on_agent_response_cleans_up_sequence(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + ctx = FakeCallbackContext(thread_id="t-1") + + await buf.on_streaming_response_update(FakeUpdate(text="x"), ctx) # type: ignore[arg-type] + assert "t-1" in buf._sequence_numbers + + await buf.on_agent_response(MagicMock(), ctx) # type: ignore[arg-type] + assert "t-1" not in buf._sequence_numbers + + @pytest.mark.asyncio + async def test_on_agent_response_skips_no_thread_id(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + ctx = FakeCallbackContext(thread_id=None) + + await buf.on_agent_response(MagicMock(), ctx) # type: ignore[arg-type] + + client.xadd.assert_not_awaited() + + +# --------------------------------------------------------------------------- +# Stream key generation & validation +# --------------------------------------------------------------------------- + + +class TestStreamKey: + def test_default_prefix(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + assert buf._stream_key("abc") == "agent-stream:abc" + + def test_custom_prefix(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client, key_prefix="my-streams") + assert buf._stream_key("abc") == "my-streams:abc" + + def test_rejects_empty_conversation_id(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + with pytest.raises(ValueError, match="conversation_id must not be empty"): + buf._stream_key("") + + +# --------------------------------------------------------------------------- +# Session isolation +# --------------------------------------------------------------------------- + + +class TestSessionIsolation: + @pytest.mark.asyncio + async def test_separate_sequences_per_thread(self) -> None: + client = _make_client() + buf = ValkeyStreamBuffer(client=client) + + ctx_a = FakeCallbackContext(thread_id="thread-a") + ctx_b = FakeCallbackContext(thread_id="thread-b") + + await buf.on_streaming_response_update(FakeUpdate(text="a1"), ctx_a) # type: ignore[arg-type] + await buf.on_streaming_response_update(FakeUpdate(text="b1"), ctx_b) # type: ignore[arg-type] + await buf.on_streaming_response_update(FakeUpdate(text="a2"), ctx_a) # type: ignore[arg-type] + + # thread-a should be at sequence 2, thread-b at sequence 1. + assert buf._sequence_numbers["thread-a"] == 2 + assert buf._sequence_numbers["thread-b"] == 1 + + +# --------------------------------------------------------------------------- +# Malformed stream data logging +# --------------------------------------------------------------------------- + + +class TestMalformedFields: + def test_malformed_pair_logs_warning(self, caplog: pytest.LogCaptureFixture) -> None: + from agent_framework_valkey._stream_buffer import _fields_list_to_dict + + with caplog.at_level(logging.WARNING): + result = _fields_list_to_dict([[b"only_key"], [b"good_key", b"good_val"]]) + + assert result == {b"good_key": b"good_val"} + assert "malformed stream entry field" in caplog.text.lower() + + def test_empty_pair_logs_warning(self, caplog: pytest.LogCaptureFixture) -> None: + from agent_framework_valkey._stream_buffer import _fields_list_to_dict + + with caplog.at_level(logging.WARNING): + result = _fields_list_to_dict([[], [b"k", b"v"]]) + + assert result == {b"k": b"v"} + assert "malformed" in caplog.text.lower() diff --git a/python/packages/valkey/tests/test_stream_buffer_integration.py b/python/packages/valkey/tests/test_stream_buffer_integration.py new file mode 100644 index 0000000000..61201f4f5a --- /dev/null +++ b/python/packages/valkey/tests/test_stream_buffer_integration.py @@ -0,0 +1,100 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Integration tests for ValkeyStreamBuffer against a real Valkey server. + +These tests require a running Valkey instance (default: localhost:6379). +Run with: uv run pytest packages/valkey/tests/ -v -m integration + +They are skipped by default in the unit test suite. +""" + +from __future__ import annotations + +import uuid + +import pytest + +from agent_framework_valkey import StreamChunk, ValkeyStreamBuffer + +# Guard the glide import so the module can be collected on Windows (where +# valkey-glide is not installed) without failing at import time. +glide = pytest.importorskip("glide", reason="valkey-glide is not installed (not available on Windows)") + + +@pytest.fixture +async def valkey_client(): + """Create a GlideClient connected to localhost:6379.""" + config = glide.GlideClientConfiguration([glide.NodeAddress("localhost", 6379)]) + client = await glide.GlideClient.create(config) + yield client + await client.close() + + +@pytest.fixture +def conversation_id() -> str: + """Generate a unique conversation ID to avoid test interference.""" + return f"test-{uuid.uuid4().hex[:12]}" + + +@pytest.mark.integration +class TestStreamBufferIntegration: + """Round-trip integration tests validating the valkey-glide wire format.""" + + @pytest.mark.asyncio + async def test_write_and_read_round_trip( + self, + valkey_client: glide.GlideClient, + conversation_id: str, + ) -> None: + """Write chunks and read them back, verifying the full round trip.""" + buf = ValkeyStreamBuffer(client=valkey_client, key_prefix="test-stream") + + await buf.write_chunk(conversation_id, "Hello, ", 0) + await buf.write_chunk(conversation_id, "world!", 1) + await buf.write_completion(conversation_id, 2) + + chunks: list[StreamChunk] = [] + async for chunk in buf.read_stream(conversation_id): + chunks.append(chunk) + + assert len(chunks) == 3 + assert chunks[0].text == "Hello, " + assert chunks[1].text == "world!" + assert chunks[2].is_done is True + + # Clean up + await valkey_client.delete([f"test-stream:{conversation_id}"]) + + @pytest.mark.asyncio + async def test_cursor_resumption( + self, + valkey_client: glide.GlideClient, + conversation_id: str, + ) -> None: + """Resume reading from a cursor mid-stream.""" + buf = ValkeyStreamBuffer(client=valkey_client, key_prefix="test-stream") + + await buf.write_chunk(conversation_id, "first", 0) + await buf.write_chunk(conversation_id, "second", 1) + await buf.write_completion(conversation_id, 2) + + # Read first chunk and capture cursor. + first_chunk: StreamChunk | None = None + async for chunk in buf.read_stream(conversation_id): + first_chunk = chunk + break + + assert first_chunk is not None + assert first_chunk.text == "first" + + # Resume from cursor — should get "second" and done. + remaining: list[StreamChunk] = [] + async for chunk in buf.read_stream(conversation_id, cursor=first_chunk.entry_id): + remaining.append(chunk) + + assert len(remaining) == 2 + assert remaining[0].text == "second" + assert remaining[1].is_done is True + + # Clean up + await valkey_client.delete([f"test-stream:{conversation_id}"]) diff --git a/python/pyproject.toml b/python/pyproject.toml index b788f48e71..d3f713d093 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -94,6 +94,7 @@ agent-framework-orchestrations = { workspace = true } agent-framework-purview = { workspace = true } agent-framework-redis = { workspace = true } agent-framework-azure-contentunderstanding = { workspace = true } +agent-framework-valkey = { workspace = true } [tool.ruff] line-length = 120 @@ -213,6 +214,7 @@ executionEnvironments = [ { root = "packages/orchestrations/tests", reportPrivateUsage = "none" }, { root = "packages/purview/tests", reportPrivateUsage = "none" }, { root = "packages/redis/tests", reportPrivateUsage = "none" }, + { root = "packages/valkey/tests", reportPrivateUsage = "none" }, { root = "tests", reportPrivateUsage = "none" }, ] diff --git a/python/samples/04-hosting/valkey_stream_buffer/README.md b/python/samples/04-hosting/valkey_stream_buffer/README.md new file mode 100644 index 0000000000..815b3cf2ee --- /dev/null +++ b/python/samples/04-hosting/valkey_stream_buffer/README.md @@ -0,0 +1,30 @@ +# Valkey Stream Buffer — Resumable Streaming + +This sample demonstrates the `ValkeyStreamBuffer` component from `agent-framework-valkey`, which provides reliable, resumable streaming using Valkey Streams. + +See also the Redis-based equivalent: [`../azure_functions/03_reliable_streaming/`](../azure_functions/03_reliable_streaming/) + +## Prerequisites + +- A running Valkey server (default: `localhost:6379`) + +Start Valkey locally with Docker: + +```bash +docker run -d --name valkey -p 6379:6379 valkey/valkey:latest +``` + +## Scenarios Demonstrated + +| # | Scenario | Description | +|---|----------|-------------| +| 1 | Basic write and read | Write chunks to a Valkey Stream and read them back | +| 2 | Cursor-based resumption | Simulate a client disconnect and reconnect mid-stream | +| 3 | Error propagation | Write an error entry and observe reader behavior | + +## Running + +```bash +# From the python/ directory +uv run python samples/04-hosting/valkey_stream_buffer/valkey_stream_buffer.py +``` diff --git a/python/samples/04-hosting/valkey_stream_buffer/valkey_stream_buffer.py b/python/samples/04-hosting/valkey_stream_buffer/valkey_stream_buffer.py new file mode 100644 index 0000000000..1c4541f6ec --- /dev/null +++ b/python/samples/04-hosting/valkey_stream_buffer/valkey_stream_buffer.py @@ -0,0 +1,212 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""ValkeyStreamBuffer: Resumable streaming with Valkey Streams + +This sample demonstrates the ValkeyStreamBuffer component from the +agent-framework-valkey package. It shows three scenarios: + +1) Basic write and read — write chunks to a Valkey Stream and read them back. +2) Cursor-based resumption — simulate a client disconnect and reconnect + mid-stream, resuming from the last-seen entry ID. +3) Error propagation — write an error entry and observe how the reader + surfaces it. + +The ValkeyStreamBuffer uses Valkey Streams (XADD/XREAD) under the hood. +Each conversation gets its own stream key with a configurable TTL that +auto-refreshes on every write. + +Requirements: + - A running Valkey server (default: localhost:6379) + - agent-framework-valkey installed (included in the workspace dev environment) + + Start Valkey locally with Docker: + docker run -d --name valkey -p 6379:6379 valkey/valkey:latest + +Run (from the python/ directory): + uv run python samples/04-hosting/valkey_stream_buffer/valkey_stream_buffer.py +""" + +import asyncio + +from agent_framework_valkey import ValkeyStreamBuffer +from glide import GlideClient, GlideClientConfiguration, NodeAddress + +# Connection settings — adjust for your environment. +VALKEY_HOST = "localhost" +VALKEY_PORT = 6379 + + +async def create_client() -> GlideClient: + """Create and return a connected GlideClient.""" + config = GlideClientConfiguration([NodeAddress(VALKEY_HOST, VALKEY_PORT)]) + return await GlideClient.create(config) + + +# --------------------------------------------------------------------------- +# Scenario 1: Basic write and read +# --------------------------------------------------------------------------- + + +async def basic_write_and_read(client: GlideClient) -> None: + """Write chunks to a Valkey Stream and read them back in order.""" + print("=== Scenario 1: Basic Write and Read ===\n") + + buf = ValkeyStreamBuffer(client=client, key_prefix="sample-stream") + conv_id = "demo-basic" + + # 1. Write several chunks and a completion marker. + await buf.write_chunk(conv_id, "Hello, ", 0) + await buf.write_chunk(conv_id, "world! ", 1) + await buf.write_chunk(conv_id, "This is streamed from Valkey.", 2) + await buf.write_completion(conv_id, 3) + print("Wrote 3 text chunks + completion sentinel.\n") + + # 2. Read them back. + print("Reading stream:") + async for chunk in buf.read_stream(conv_id): + if chunk.is_done: + print("\n[Stream complete]") + break + print(f" chunk({chunk.entry_id}): {chunk.text!r}") + + # Clean up the stream key. + await client.delete([f"sample-stream:{conv_id}"]) + + +# --------------------------------------------------------------------------- +# Scenario 2: Cursor-based resumption +# --------------------------------------------------------------------------- + + +async def cursor_resumption(client: GlideClient) -> None: + """Simulate a client disconnect and resume from the last-seen cursor.""" + print("\n=== Scenario 2: Cursor-Based Resumption ===\n") + + buf = ValkeyStreamBuffer(client=client, key_prefix="sample-stream") + conv_id = "demo-resume" + + # 1. Write a multi-chunk response. + chunks_text = [ + "Planning your trip: ", + "Day 1 — Arrive in Tokyo. ", + "Day 2 — Visit Shibuya and Harajuku. ", + "Day 3 — Explore Asakusa and Akihabara. ", + "Have a great trip!", + ] + for i, text in enumerate(chunks_text): + await buf.write_chunk(conv_id, text, i) + await buf.write_completion(conv_id, len(chunks_text)) + print(f"Wrote {len(chunks_text)} chunks + completion.\n") + + # 2. Read only the first 2 chunks, simulating a client disconnect. + print("First read (simulating disconnect after 2 chunks):") + last_cursor: str | None = None + count = 0 + async for chunk in buf.read_stream(conv_id): + if chunk.is_done: + break + print(f" chunk({chunk.entry_id}): {chunk.text!r}") + last_cursor = chunk.entry_id + count += 1 + if count >= 2: + print(" [Client disconnected]\n") + break + + # 3. Resume from the last-seen cursor. + print(f"Resuming from cursor: {last_cursor}") + async for chunk in buf.read_stream(conv_id, cursor=last_cursor): + if chunk.is_done: + print("\n[Stream complete]") + break + print(f" chunk({chunk.entry_id}): {chunk.text!r}") + + # Clean up. + await client.delete([f"sample-stream:{conv_id}"]) + + +# --------------------------------------------------------------------------- +# Scenario 3: Error propagation +# --------------------------------------------------------------------------- + + +async def error_propagation(client: GlideClient) -> None: + """Write an error entry and observe how the reader surfaces it.""" + print("\n=== Scenario 3: Error Propagation ===\n") + + buf = ValkeyStreamBuffer(client=client, key_prefix="sample-stream") + conv_id = "demo-error" + + # 1. Write a partial response followed by an error. + await buf.write_chunk(conv_id, "Starting response... ", 0) + await buf.write_error(conv_id, "upstream model timeout", 1) + print("Wrote 1 chunk + error entry.\n") + + # 2. Read — the reader yields the text chunk, then the error, and stops. + print("Reading stream:") + async for chunk in buf.read_stream(conv_id): + if chunk.error: + print(f" [Error] {chunk.error}") + break + print(f" chunk({chunk.entry_id}): {chunk.text!r}") + + # Clean up. + await client.delete([f"sample-stream:{conv_id}"]) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + +async def main() -> None: + """Run all three scenarios.""" + client = await create_client() + try: + await basic_write_and_read(client) + await cursor_resumption(client) + await error_propagation(client) + finally: + await client.close() + + +if __name__ == "__main__": + asyncio.run(main()) + +""" +Sample output: + +=== Scenario 1: Basic Write and Read === + +Wrote 3 text chunks + completion sentinel. + +Reading stream: + chunk(1745862000000-0): 'Hello, ' + chunk(1745862000001-0): 'world! ' + chunk(1745862000002-0): 'This is streamed from Valkey.' + +[Stream complete] + +=== Scenario 2: Cursor-Based Resumption === + +Wrote 5 chunks + completion. + +First read (simulating disconnect after 2 chunks): + chunk(1745862000010-0): 'Planning your trip: ' + chunk(1745862000011-0): 'Day 1 — Arrive in Tokyo. ' + [Client disconnected] + +Resuming from cursor: 1745862000011-0 + chunk(1745862000012-0): 'Day 2 — Visit Shibuya and Harajuku. ' + chunk(1745862000013-0): 'Day 3 — Explore Asakusa and Akihabara. ' + chunk(1745862000014-0): 'Have a great trip!' + +[Stream complete] + +=== Scenario 3: Error Propagation === + +Wrote 1 chunk + error entry. + +Reading stream: + chunk(1745862000020-0): 'Starting response... ' + [Error] upstream model timeout +""" diff --git a/python/uv.lock b/python/uv.lock index 12c5f39b27..41b872fb0b 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -55,6 +55,7 @@ members = [ "agent-framework-orchestrations", "agent-framework-purview", "agent-framework-redis", + "agent-framework-valkey", ] constraints = [ { name = "fastapi-sso", specifier = ">=0.19.0" }, @@ -790,6 +791,28 @@ requires-dist = [ { name = "redisvl", specifier = ">=0.11.0,<0.16" }, ] +[[package]] +name = "agent-framework-valkey" +version = "1.0.0b260427" +source = { editable = "packages/valkey" } +dependencies = [ + { name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "valkey-glide", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] + +[package.optional-dependencies] +durabletask = [ + { name = "agent-framework-durabletask", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.metadata] +requires-dist = [ + { name = "agent-framework-core", editable = "packages/core" }, + { name = "agent-framework-durabletask", marker = "extra == 'durabletask'", editable = "packages/durabletask" }, + { name = "valkey-glide", marker = "sys_platform != 'win32'", specifier = ">=2.1.0,<3" }, +] +provides-extras = ["durabletask"] + [[package]] name = "agentlightning" version = "0.2.2" @@ -7264,6 +7287,48 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/63/9a/0962b05b308494e3202d3f794a6e85abe471fe3cafdbcf95c2e8c713aabd/uvloop-0.21.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:a5c39f217ab3c663dc699c04cbd50c13813e31d917642d459fdcec07555cc553", size = 4660018, upload-time = "2024-10-14T23:38:10.888Z" }, ] +[[package]] +name = "valkey-glide" +version = "2.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "sniffio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "typing-extensions", marker = "(python_full_version < '3.11' and sys_platform == 'darwin') or (python_full_version < '3.11' and sys_platform == 'linux')" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/28/04/92be56c4dd9b5c89f10999e66f4d0e156d07d7b45aed9b0f89273f26aac5/valkey_glide-2.3.1.tar.gz", hash = "sha256:f4bae030c0aa6e55edb2c27dbd55f82cfb5f581904fff1318eec1c062f30d4b3", size = 832671, upload-time = "2026-04-01T17:56:32.983Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/40/b2ea2da3baa3085cfa93740a431e1054ff2f9c95a23c476618f7859b2083/valkey_glide-2.3.1-cp310-cp310-macosx_10_7_x86_64.whl", hash = "sha256:736a3e58393fa4f0f2fbb10031d46da5f18ebb8e72d2f9428ff24f0f6addeb3f", size = 7379323, upload-time = "2026-04-01T17:55:26.269Z" }, + { url = "https://files.pythonhosted.org/packages/53/ef/ad098d9c8c4385cedb66344316eaba7d8ca613c87dd757ca4f56390f11b9/valkey_glide-2.3.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:b2cd6f5c4e9b67b78873f34f19b9182bab5b07a9151855cf059303e05dac3b2f", size = 6860556, upload-time = "2026-04-01T17:55:28.255Z" }, + { url = "https://files.pythonhosted.org/packages/7c/14/680b98b22e0af970758a9fe7e16f1f438a0424c6761820e8d5732f6220ea/valkey_glide-2.3.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1ddf70bc7888d565273e4bf858ff6047d5284140ff380a732f807c775be8e108", size = 7133576, upload-time = "2026-04-01T17:55:29.778Z" }, + { url = "https://files.pythonhosted.org/packages/21/a8/4683c403fe26aa9cecc25e557e924f64ea9185c45b31c17aeecd89e00a5f/valkey_glide-2.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9f947dd44ba9741eadcab154443f447c19f23dab56de33f56d5f133ee0d597c2", size = 7599098, upload-time = "2026-04-01T17:55:31.594Z" }, + { url = "https://files.pythonhosted.org/packages/0c/19/4cce4fde822f2fa0df7c98e82232af367471996efe89d2c680022350a618/valkey_glide-2.3.1-cp311-cp311-macosx_10_7_x86_64.whl", hash = "sha256:6ddc4c6bee1a9c102f003cddc5d1bad8173a9d90e1c9a0f73a285228ed8625af", size = 7378844, upload-time = "2026-04-01T17:55:33.321Z" }, + { url = "https://files.pythonhosted.org/packages/9d/04/fca4862a885e0f0ef9560f2d4e42f29e0ec6df27e487aa64dc9c0b9a2f6e/valkey_glide-2.3.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:30590532136e4ea38b6a6389cbcfe4edc554418563c6e4f6357b0749907b2c20", size = 6860870, upload-time = "2026-04-01T17:55:34.885Z" }, + { url = "https://files.pythonhosted.org/packages/b8/94/7eb28e04008e247c2fe5c427b3dbbd81b238dd8ed9772e2acfc999008e42/valkey_glide-2.3.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4bbdb7baa7aac12c109aefd97f69f9780a4812429db18786254ef288ecf75f19", size = 7135059, upload-time = "2026-04-01T17:55:36.63Z" }, + { url = "https://files.pythonhosted.org/packages/ec/81/cbb2bfb989efef22b43b66a7e8249aa4afbb1201c2e9a29bb32677460ee9/valkey_glide-2.3.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7fd64d77ae26efd524be58456e22636ce4cb0a6110ad722e89f249a45d098692", size = 7596374, upload-time = "2026-04-01T17:55:38.534Z" }, + { url = "https://files.pythonhosted.org/packages/2e/04/9c492cdf0238aabd2902f4f252dd63ffee64cd0228d06989c8cd2a272291/valkey_glide-2.3.1-cp312-cp312-macosx_10_7_x86_64.whl", hash = "sha256:406b73f5ee080406fbfeda542d37de7e330fb4d83b0aa7212b92707d7b7b82a6", size = 7381382, upload-time = "2026-04-01T17:55:40.431Z" }, + { url = "https://files.pythonhosted.org/packages/63/ab/15302dba094927acced9bfccdbe5cf333129ddedf5e8378b94b415a54ccd/valkey_glide-2.3.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0940d4069cbc4896dec3a1ab39db7bf86667fb32892df4dbf3b043129d26d6e5", size = 6854103, upload-time = "2026-04-01T17:55:42.166Z" }, + { url = "https://files.pythonhosted.org/packages/ec/54/6b40a104352e44b36558528cd97d1ec7c12585b1fa1019b1794d52d19ab5/valkey_glide-2.3.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b47de0ec3d5a253c2b37d33266aaeb22503014f9e8f0611ba999e06f9804966a", size = 7134311, upload-time = "2026-04-01T17:55:43.966Z" }, + { url = "https://files.pythonhosted.org/packages/97/79/84de88074bc6780813415afd704e9c827be13b3aa02cc5508122070ae100/valkey_glide-2.3.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a364210002dd0e7c3362299f61a2a1cacf867594a8a0bbf157a345f3f40d4d94", size = 7597689, upload-time = "2026-04-01T17:55:45.898Z" }, + { url = "https://files.pythonhosted.org/packages/fe/bd/1cdb584687a2d2cd762a53cf111932aee1216186a6b28d00724805679643/valkey_glide-2.3.1-cp313-cp313-macosx_10_7_x86_64.whl", hash = "sha256:86d56756842acd6286601128822c5f1f9dcd61305f0c6a80c3e7fb3a7e0404ef", size = 7384605, upload-time = "2026-04-01T17:55:47.94Z" }, + { url = "https://files.pythonhosted.org/packages/85/76/f8c609597a24a07957c1d0e13d6f083376ad12ee205b21414d6a445c51fa/valkey_glide-2.3.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:b307795a23473b8e7cff781eb54936cc672a430820f5fa71c6b6fb3748cc1189", size = 6854336, upload-time = "2026-04-01T17:55:50.079Z" }, + { url = "https://files.pythonhosted.org/packages/96/5e/4fc465f880219712c9daff2b38a55008515946dfa5b3b63d3232b75c6bf4/valkey_glide-2.3.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3cb570f5d637ee55300ccdecd39a51cbf25c67ab6e25f2022d42f32a7bec6163", size = 7134155, upload-time = "2026-04-01T17:55:51.566Z" }, + { url = "https://files.pythonhosted.org/packages/00/46/894470eaf297a5d302b63c0900722fa56715a53ccd577528978171481553/valkey_glide-2.3.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:506c7800eec05caf17136645cc642941a9536578f4d6733845e7d0ed36ed4e3e", size = 7597496, upload-time = "2026-04-01T17:55:53.174Z" }, + { url = "https://files.pythonhosted.org/packages/56/88/eb7f25667c81d16ff55774c685f62d2b622917730b0c822db1d30ff32c11/valkey_glide-2.3.1-cp314-cp314-macosx_10_7_x86_64.whl", hash = "sha256:3d6626e6f9ddfa7f8706023e167b4a2eca8a0f7b7fee1d30f91a83b4811349e4", size = 7383535, upload-time = "2026-04-01T17:55:54.747Z" }, + { url = "https://files.pythonhosted.org/packages/d8/64/5db032850ae1f8ec345ec5e5c4b0f15c50c8cf88e5a67990491964938cab/valkey_glide-2.3.1-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:3466a0c113a951d722036704795ff0377eef11a44ab224472f98d99ac2c5ef28", size = 6853286, upload-time = "2026-04-01T17:55:56.96Z" }, + { url = "https://files.pythonhosted.org/packages/7e/af/4c835ece50d6e1536e96a74a11fe51a1aef8006c6e38544c324a5d4d5637/valkey_glide-2.3.1-cp314-cp314-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fe53e4808bdac5b4e6482c66583e1980ecf75666b4e4d0984d89e8b693026543", size = 7135821, upload-time = "2026-04-01T17:55:58.692Z" }, + { url = "https://files.pythonhosted.org/packages/97/12/c1341d977d0cd3ae812ae620bf0935e51d95e563af5a00562592c10fcc38/valkey_glide-2.3.1-cp314-cp314-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c1a9662885ea8f3df97a6d873131dea983d42e4735750af368fe2d47e7e44f0c", size = 7595445, upload-time = "2026-04-01T17:56:00.544Z" }, + { url = "https://files.pythonhosted.org/packages/20/5e/0b9cc70a0852c1423bd4e1609500481ffcdd7d11de88ac799c4b4758d39b/valkey_glide-2.3.1-pp310-pypy310_pp73-macosx_10_7_x86_64.whl", hash = "sha256:5533a090953fd6af4c07b80bd042231540fbd1ede95fff42614750b435f01184", size = 7379308, upload-time = "2026-04-01T17:56:10.508Z" }, + { url = "https://files.pythonhosted.org/packages/4d/3d/68dcc6010a5cd100c360ff57c15cb1e2ff343e81a1ee2630c7cdb57e91b4/valkey_glide-2.3.1-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:f814ad759e9fdc6c5ced18ddba38cc2a3badb2839ce3555ec9b44beb794096e4", size = 6860135, upload-time = "2026-04-01T17:56:12.698Z" }, + { url = "https://files.pythonhosted.org/packages/63/a4/6e4b8603ab0217f43721641d08740d3c7ce124d1fc7c9bfb30e967ac1830/valkey_glide-2.3.1-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3dc6dea7ce627a8b166d33232aa7bc7f8dd9d224870235a560bc5d1c4ccec8cb", size = 7136201, upload-time = "2026-04-01T17:56:14.287Z" }, + { url = "https://files.pythonhosted.org/packages/04/18/8a5a22e8245e48b0bf83a99ac64f289ff62de84ee44315c53a5db8dff69c/valkey_glide-2.3.1-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e1e135bb43e50b1cd6558d93b3108c40a79ce8dc119de883cebb7458d470f629", size = 7594993, upload-time = "2026-04-01T17:56:16.235Z" }, + { url = "https://files.pythonhosted.org/packages/88/58/a0acca1c36a1481c9f5cf094fc584b1a9f9ad9af927a355400e968cc1f92/valkey_glide-2.3.1-pp311-pypy311_pp73-macosx_10_7_x86_64.whl", hash = "sha256:993c9bffde847fa3d36c6f11e5e50872dd491f245850d7c6ae1bbb8db5bff346", size = 7379554, upload-time = "2026-04-01T17:56:18.12Z" }, + { url = "https://files.pythonhosted.org/packages/1f/84/02e922bfd7201c9bbf3a4464aaf46e1b5b508852ba05974981a215f34d1b/valkey_glide-2.3.1-pp311-pypy311_pp73-macosx_11_0_arm64.whl", hash = "sha256:918ce3b8a2a3602e82d03f254bad5cc5bd1398eb84dec8eef77aefccc039bd5d", size = 6860268, upload-time = "2026-04-01T17:56:19.808Z" }, + { url = "https://files.pythonhosted.org/packages/ae/30/d8e215ab273d9a599ab926a7299e9a1f219120e6248850efb51186107723/valkey_glide-2.3.1-pp311-pypy311_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:28d4cbf00b07db273214488f17d59232baaddd0cc30c26064cf3bf384b03e9cd", size = 7136569, upload-time = "2026-04-01T17:56:21.357Z" }, + { url = "https://files.pythonhosted.org/packages/fb/ac/80d29b75115133c3f97dd0fa725eb9598ebcd4217f0ece22ce63dc7dc8f7/valkey_glide-2.3.1-pp311-pypy311_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7d93ef822a524c8f18c1b750f061373d95e842005116ebcf832d166533bf2bc2", size = 7594844, upload-time = "2026-04-01T17:56:23.528Z" }, +] + [[package]] name = "watchdog" version = "6.0.0"