diff --git a/docs/ref/extensions/memory/dakera_session.md b/docs/ref/extensions/memory/dakera_session.md new file mode 100644 index 0000000000..983070ff1d --- /dev/null +++ b/docs/ref/extensions/memory/dakera_session.md @@ -0,0 +1,3 @@ +# `DakeraSession` + +::: agents.extensions.memory.dakera_session.DakeraSession diff --git a/docs/sessions/index.md b/docs/sessions/index.md index 8916f85fab..ca743d97c4 100644 --- a/docs/sessions/index.md +++ b/docs/sessions/index.md @@ -208,6 +208,7 @@ Use this table to pick a starting point before reading the detailed examples bel | `SQLAlchemySession` | Production apps with existing databases | Works with SQLAlchemy-supported databases | | `MongoDBSession` | Apps already using MongoDB or needing multi-process storage | Async pymongo; atomic sequence counter for ordering | | `DaprSession` | Cloud-native deployments with Dapr sidecars | Supports multiple state stores plus TTL and consistency controls | +| `DakeraSession` | Self-hosted persistent memory across sessions and workers | Backed by a [Dakera](https://github.com/dakera-ai/dakera-deploy) server over REST | | `OpenAIConversationsSession` | Server-managed storage in OpenAI | OpenAI Conversations API-backed history | | `OpenAIResponsesCompactionSession` | Long conversations with automatic compaction | Wrapper around another session backend | | `AdvancedSQLiteSession` | SQLite plus branching/analytics | Heavier feature set; see dedicated page | @@ -449,6 +450,37 @@ Notes: - Two collections are used and both names are configurable via `sessions_collection=` (default `agent_sessions`) and `messages_collection=` (default `agent_messages`). Indexes are created automatically on first use. Each message document carries a monotonically increasing `seq` counter that preserves ordering across concurrent writers and processes. - Use `await session.ping()` to verify connectivity before your first run. +### Dakera sessions + +Use `DakeraSession` to persist conversation history on a self-hosted [Dakera](https://github.com/dakera-ai/dakera-deploy) memory server, so history survives process restarts and can be shared across workers that point at the same server. + +```bash +pip install openai-agents[dakera] +``` + +```python +from agents import Agent, Runner +from agents.extensions.memory import DakeraSession + +agent = Agent(name="Assistant") + +# from_url creates and owns the AsyncDakeraClient; close() releases it. +session = DakeraSession.from_url( + session_id="user-123", + base_url="http://localhost:3000", + api_key="dk-...", +) +result = await Runner.run(agent, "Hello", session=session) +print(result.final_output) +await session.close() +``` + +Notes: + +- `from_url(...)` creates and owns the `AsyncDakeraClient` and closes it on `session.close()`. If your application already manages a client, construct `DakeraSession(session_id, client=...)` directly; in that case `session.close()` is a no-op and lifecycle stays with the caller. +- Each conversation is isolated in its own Dakera namespace derived from `session_id` (`"{key_prefix}:{session_id}"`, `key_prefix` defaults to `agents:session`). The same `session_id` always resolves to the same history, so `get_items`/`pop_item`/`clear_session` are restart-safe. Every item carries a monotonically increasing `seq` in its metadata that preserves ordering across writers. +- Run a local server with the `dakera-ai/dakera-deploy` docker-compose stack (Dakera server + MinIO); it listens on port 3000 by default. + ### Advanced SQLite sessions Enhanced SQLite sessions with conversation branching, usage analytics, and structured queries: diff --git a/examples/memory/dakera_session_example.py b/examples/memory/dakera_session_example.py new file mode 100644 index 0000000000..1104eb1c65 --- /dev/null +++ b/examples/memory/dakera_session_example.py @@ -0,0 +1,106 @@ +"""Example demonstrating Dakera-backed session memory. + +[Dakera](https://github.com/dakera-ai/dakera-deploy) is a self-hosted memory +server for AI agents. This example uses ``DakeraSession`` to persist conversation +history on a Dakera server so an agent keeps context across multiple runs. + +Run a local Dakera server with the ``dakera-ai/dakera-deploy`` docker-compose +stack (Dakera server + MinIO); it listens on http://localhost:3000 by default. +Set ``DAKERA_BASE_URL`` and ``DAKERA_API_KEY`` to point at your own server. + + pip install "openai-agents[dakera]" +""" + +import asyncio +import os + +from agents import Agent, Runner +from agents.extensions.memory import DakeraSession + +DEFAULT_BASE_URL = "http://localhost:3000" + + +async def main() -> None: + agent = Agent( + name="Assistant", + instructions="Reply very concisely.", + ) + + base_url = os.environ.get("DAKERA_BASE_URL", DEFAULT_BASE_URL) + api_key = os.environ.get("DAKERA_API_KEY") + + print("=== Dakera Session Example ===") + print(f"This example uses a Dakera server at {base_url}") + print("Set DAKERA_BASE_URL / DAKERA_API_KEY to use a different server.\n") + + # `from_url` creates and owns the AsyncDakeraClient; `close()` releases it. + session = DakeraSession.from_url( + session_id="dakera_conversation_123", + base_url=base_url, + api_key=api_key, + ) + + try: + # Clear any existing history for a clean demonstration. + await session.clear_session() + print("Session cleared for clean demonstration.") + print("The agent will remember previous messages automatically.\n") + + print("First turn:") + print("User: What city is the Golden Gate Bridge in?") + result = await Runner.run( + agent, + "What city is the Golden Gate Bridge in?", + session=session, + ) + print(f"Assistant: {result.final_output}\n") + + print("Second turn:") + print("User: What state is it in?") + result = await Runner.run(agent, "What state is it in?", session=session) + print(f"Assistant: {result.final_output}\n") + + print("Third turn:") + print("User: What's the population of that state?") + result = await Runner.run( + agent, + "What's the population of that state?", + session=session, + ) + print(f"Assistant: {result.final_output}\n") + + print("=== Conversation Complete ===") + all_items = await session.get_items() + print(f"Total items stored in Dakera: {len(all_items)}") + + # Demonstrate the limit parameter. + latest_items = await session.get_items(limit=2) + print(f"Latest {len(latest_items)} items retrieved via the limit parameter.") + + # Demonstrate session isolation with a second conversation. + other = DakeraSession.from_url( + session_id="different_conversation_456", + base_url=base_url, + api_key=api_key, + ) + try: + await other.clear_session() + await Runner.run(agent, "Hello, this is a new conversation!", session=other) + print( + "\nSession isolation: " + f"original={len(await session.get_items())} items, " + f"new={len(await other.get_items())} items" + ) + finally: + await other.close() + + except Exception as e: # pragma: no cover - example error handling + print(f"Error: {e}") + print(f"Make sure a Dakera server is running and reachable at {base_url}.") + print("See https://github.com/dakera-ai/dakera-deploy for a docker-compose setup.") + finally: + await session.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/mkdocs.yml b/mkdocs.yml index c38e747653..3c006da590 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -191,6 +191,7 @@ plugins: - Async SQLite session: ref/extensions/memory/async_sqlite_session.md - RedisSession: ref/extensions/memory/redis_session.md - MongoDBSession: ref/extensions/memory/mongodb_session.md + - DakeraSession: ref/extensions/memory/dakera_session.md - DaprSession: ref/extensions/memory/dapr_session.md - EncryptedSession: ref/extensions/memory/encrypt_session.md - AdvancedSQLiteSession: ref/extensions/memory/advanced_sqlite_session.md diff --git a/pyproject.toml b/pyproject.toml index 799391c73d..0fe80d53e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ encrypt = ["cryptography>=45.0, <46"] redis = ["redis>=7"] dapr = ["dapr>=1.16.0", "grpcio>=1.60.0"] mongodb = ["pymongo>=4.14"] +dakera = ["dakera>=0.12.6"] docker = ["docker>=6.1"] blaxel = ["blaxel>=0.2.50", "aiohttp>=3.12,<4"] daytona = ["daytona>=0.155.0"] @@ -91,6 +92,7 @@ dev = [ "testcontainers==4.12.0", # pinned to 4.12.0 because 4.13.0 has a warning bug in wait_for_logs, see https://github.com/testcontainers/testcontainers-python/issues/874 "pyright==1.1.408", "pymongo>=4.14", + "dakera>=0.12.6", ] [tool.uv.workspace] diff --git a/src/agents/extensions/memory/__init__.py b/src/agents/extensions/memory/__init__.py index 3fcb71ecaf..ebd66dd3ed 100644 --- a/src/agents/extensions/memory/__init__.py +++ b/src/agents/extensions/memory/__init__.py @@ -16,6 +16,7 @@ if TYPE_CHECKING: from .advanced_sqlite_session import AdvancedSQLiteSession from .async_sqlite_session import AsyncSQLiteSession + from .dakera_session import DakeraSession from .dapr_session import ( DAPR_CONSISTENCY_EVENTUAL, DAPR_CONSISTENCY_STRONG, @@ -29,6 +30,7 @@ __all__: list[str] = [ "AdvancedSQLiteSession", "AsyncSQLiteSession", + "DakeraSession", "DAPR_CONSISTENCY_EVENTUAL", "DAPR_CONSISTENCY_STRONG", "DaprSession", @@ -44,6 +46,7 @@ "SQLAlchemySession": (".sqlalchemy_session", ("sqlalchemy", "sqlalchemy")), "AdvancedSQLiteSession": (".advanced_sqlite_session", None), "AsyncSQLiteSession": (".async_sqlite_session", None), + "DakeraSession": (".dakera_session", ("dakera", "dakera")), "DaprSession": (".dapr_session", ("dapr", "dapr")), "DAPR_CONSISTENCY_EVENTUAL": (".dapr_session", ("dapr", "dapr")), "DAPR_CONSISTENCY_STRONG": (".dapr_session", ("dapr", "dapr")), diff --git a/src/agents/extensions/memory/dakera_session.py b/src/agents/extensions/memory/dakera_session.py new file mode 100644 index 0000000000..05b039dec2 --- /dev/null +++ b/src/agents/extensions/memory/dakera_session.py @@ -0,0 +1,276 @@ +"""Dakera-powered Session backend. + +[Dakera](https://github.com/dakera-ai/dakera-deploy) is a self-hosted memory +server for AI agents that exposes a REST API for persistent, decay-weighted +memory. This backend stores each conversation item as a memory in a per-session +Dakera namespace, so session history survives process restarts and can be shared +across workers that point at the same Dakera server. + +Usage:: + + from agents.extensions.memory import DakeraSession + + # Create and own a client from connection details + session = DakeraSession.from_url( + session_id="user-123", + base_url="http://localhost:3000", + api_key="dk-...", + ) + + # Or pass an ``AsyncDakeraClient`` your application already manages + from dakera import AsyncDakeraClient + + client = AsyncDakeraClient(base_url="http://localhost:3000", api_key="dk-...") + session = DakeraSession(session_id="user-123", client=client) + + await Runner.run(agent, "Hello", session=session) + +Run the server locally with the ``dakera-ai/dakera-deploy`` docker-compose stack +(Dakera server + MinIO); it listens on port 3000 by default. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import json +from typing import Any + +from ._optional_imports import raise_optional_dependency_error + +try: + from dakera import AsyncDakeraClient +except ImportError as e: + raise_optional_dependency_error( + "DakeraSession", + dependency_name="dakera", + extra_name="dakera", + cause=e, + ) + +from ...items import TResponseInputItem +from ...memory.session import SessionABC +from ...memory.session_settings import SessionSettings, resolve_session_limit + +_DEFAULT_KEY_PREFIX = "agents:session" +_SEQ_KEY = "seq" +_SESSION_TAG = "oai-session" + + +class DakeraSession(SessionABC): + """Dakera implementation of [`Session`][agents.memory.session.Session]. + + Each conversation item is persisted as an individual memory in a Dakera + namespace derived from ``session_id`` (``"{key_prefix}:{session_id}"``). + Isolating every session in its own namespace keeps histories separate and + makes ``get_items``/``pop_item``/``clear_session`` restart-safe: the same + ``session_id`` always resolves to the same stored history regardless of the + process that wrote it. + """ + + session_settings: SessionSettings | None = None + + def __init__( + self, + session_id: str, + *, + client: AsyncDakeraClient, + key_prefix: str = _DEFAULT_KEY_PREFIX, + importance: float = 0.6, + session_settings: SessionSettings | None = None, + ): + """Initialize a new DakeraSession. + + Args: + session_id: Unique identifier for the conversation. + client: A pre-configured ``AsyncDakeraClient`` pointed at a Dakera + server. + key_prefix: Prefix used to build the per-session Dakera namespace, + avoiding collisions with other agents on the same server. + Defaults to ``"agents:session"``. + importance: Importance score (0.0-1.0) attached to each stored item. + Defaults to ``0.6`` to keep conversation turns above Dakera's + decay floor without crowding higher-importance memories. + session_settings: Session configuration (e.g. default retrieval + limit). If None, uses default ``SessionSettings()``. + """ + self.session_id = session_id + self.session_settings = session_settings or SessionSettings() + self._client = client + self._key_prefix = key_prefix + self._namespace = f"{key_prefix}:{session_id}" + self._importance = importance + self._lock = asyncio.Lock() + self._owns_client = False + self._next_seq: int | None = None + + @classmethod + def from_url( + cls, + session_id: str, + *, + base_url: str, + api_key: str | None = None, + client_kwargs: dict[str, Any] | None = None, + **kwargs: Any, + ) -> DakeraSession: + """Create a session from Dakera connection details. + + The created ``AsyncDakeraClient`` is owned by the session and closed by + :meth:`close`. + + Args: + session_id: Conversation ID. + base_url: Base URL of the Dakera server, e.g. ``"http://localhost:3000"``. + api_key: Optional API key for authentication. + client_kwargs: Additional keyword arguments forwarded to + ``AsyncDakeraClient`` (e.g. ``timeout``, ``max_retries``). + **kwargs: Additional keyword arguments forwarded to the main + constructor (e.g. ``key_prefix``, ``importance``). + + Returns: + A ``DakeraSession`` that owns its underlying client. + """ + client = AsyncDakeraClient(base_url=base_url, api_key=api_key, **(client_kwargs or {})) + session = cls(session_id, client=client, **kwargs) + session._owns_client = True + return session + + # ------------------------------------------------------------------ + # Serialization helpers (overridable by subclasses) + # ------------------------------------------------------------------ + + def _serialize_item(self, item: TResponseInputItem) -> str: + return json.dumps(item, separators=(",", ":")) + + def _deserialize_item(self, raw: str) -> TResponseInputItem: + return json.loads(raw) # type: ignore[no-any-return] + + @staticmethod + def _seq_of(memory: dict[str, Any]) -> int: + metadata = memory.get("metadata") or {} + seq = metadata.get(_SEQ_KEY) + if isinstance(seq, bool): # bool is an int subclass; treat as unset + return 0 + if isinstance(seq, int | float): + return int(seq) + return 0 + + async def _fetch_sorted(self) -> list[dict[str, Any]]: + """Return this session's memories in chronological (write) order.""" + memories = await self._client.agent_memories(self._namespace) + return sorted(memories, key=lambda m: (self._seq_of(m), m.get("created_at") or "")) + + async def _ensure_next_seq(self) -> int: + """Lazily seed the monotonic sequence counter from stored history.""" + if self._next_seq is None: + memories = await self._client.agent_memories(self._namespace) + max_seq = max((self._seq_of(m) for m in memories), default=-1) + self._next_seq = max_seq + 1 + return self._next_seq + + # ------------------------------------------------------------------ + # Session protocol implementation + # ------------------------------------------------------------------ + + async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: + """Retrieve the conversation history for this session. + + Args: + limit: Maximum number of items to retrieve. If None, uses + ``session_settings.limit``. When specified, returns the latest N + items in chronological order. + + Returns: + List of input items representing the conversation history. + """ + session_limit = resolve_session_limit(limit, self.session_settings) + if session_limit is not None and session_limit <= 0: + return [] + + async with self._lock: + memories = await self._fetch_sorted() + + items: list[TResponseInputItem] = [] + for memory in memories: + raw = memory.get("content") + if not isinstance(raw, str): + continue + try: + items.append(self._deserialize_item(raw)) + except json.JSONDecodeError: + # Skip corrupted entries rather than failing the whole read. + continue + + if session_limit is None: + return items + return items[-session_limit:] + + async def add_items(self, items: list[TResponseInputItem]) -> None: + """Add new items to the conversation history. + + Args: + items: List of input items to add to the history. + """ + if not items: + return + + async with self._lock: + seq = await self._ensure_next_seq() + for item in items: + await self._client.store_memory( + agent_id=self._namespace, + content=self._serialize_item(item), + memory_type="episodic", + importance=self._importance, + metadata={_SEQ_KEY: seq, "oai_session_id": self.session_id}, + tags=[_SESSION_TAG], + ) + seq += 1 + self._next_seq = seq + + async def pop_item(self) -> TResponseInputItem | None: + """Remove and return the most recent item from the session. + + Returns: + The most recent item if it exists, None if the session is empty. + """ + async with self._lock: + memories = await self._fetch_sorted() + while memories: + memory = memories.pop() + memory_id = memory.get("id") + if memory_id: + with contextlib.suppress(Exception): + await self._client.forget(self._namespace, memory_id) + raw = memory.get("content") + if not isinstance(raw, str): + continue + try: + return self._deserialize_item(raw) + except json.JSONDecodeError: + # Corrupted entry already removed; keep looking for a valid one. + continue + return None + + async def clear_session(self) -> None: + """Clear all items for this session.""" + async with self._lock: + memories = await self._client.agent_memories(self._namespace) + for memory in memories: + memory_id = memory.get("id") + if memory_id: + with contextlib.suppress(Exception): + await self._client.forget(self._namespace, memory_id) + self._next_seq = 0 + + async def close(self) -> None: + """Close the underlying Dakera client. + + Only closes the client when this session created it (via + :meth:`from_url`). If the client was injected externally, the caller is + responsible for its lifecycle. + """ + if self._owns_client: + with contextlib.suppress(Exception): + await self._client.close() diff --git a/tests/extensions/memory/test_dakera_session.py b/tests/extensions/memory/test_dakera_session.py new file mode 100644 index 0000000000..642c7cc90a --- /dev/null +++ b/tests/extensions/memory/test_dakera_session.py @@ -0,0 +1,360 @@ +"""Tests for DakeraSession using an in-process fake Dakera client. + +All tests run without a real Dakera server by injecting a lightweight fake +``AsyncDakeraClient`` that emulates the three REST operations the session relies +on (``store_memory``, ``agent_memories``, ``forget``). This keeps the suite fast +and network-free while exercising the full session logic. +""" + +from __future__ import annotations + +import itertools +from typing import Any, cast +from unittest.mock import patch + +import pytest + +from agents import Agent, Runner, TResponseInputItem +from agents.extensions.memory import DakeraSession +from agents.extensions.memory.dakera_session import DakeraSession as DakeraSessionDirect +from agents.memory.session_settings import SessionSettings +from tests.fake_model import FakeModel +from tests.test_responses import get_text_message + +pytestmark = pytest.mark.asyncio + + +class FakeDakeraClient: + """In-memory stand-in for ``dakera.AsyncDakeraClient``. + + Stores memories per ``agent_id`` (namespace) and returns them in a + deliberately non-chronological order so the session's own sequence-based + sorting is what's under test. + """ + + def __init__(self) -> None: + self._store: dict[str, list[dict[str, Any]]] = {} + self._ids = itertools.count(1) + self._created = itertools.count(1) + self.closed = False + + async def store_memory( + self, + agent_id: str, + content: str, + memory_type: str = "episodic", + importance: float | None = None, + metadata: dict[str, Any] | None = None, + session_id: str | None = None, + tags: list[str] | None = None, + **_: Any, + ) -> dict[str, Any]: + memory = { + "id": f"mem-{next(self._ids)}", + "content": content, + "memory_type": memory_type, + "importance": importance, + "metadata": dict(metadata or {}), + "created_at": f"2026-01-01T00:00:{next(self._created):02d}Z", + "tags": list(tags or []), + } + self._store.setdefault(agent_id, []).append(memory) + return memory + + async def agent_memories( + self, + agent_id: str, + memory_type: str | None = None, + limit: int | None = None, + ) -> list[dict[str, Any]]: + rows = [dict(m) for m in self._store.get(agent_id, [])] + # Return reversed to prove DakeraSession does not rely on server order. + return list(reversed(rows)) + + async def forget(self, agent_id: str, memory_id: str) -> dict[str, Any]: + rows = self._store.get(agent_id, []) + self._store[agent_id] = [m for m in rows if m.get("id") != memory_id] + return {"forgotten": memory_id} + + async def close(self) -> None: + self.closed = True + + +def _message(role: str, text: str) -> TResponseInputItem: + return cast("TResponseInputItem", {"role": role, "content": text}) + + +@pytest.fixture +def client() -> FakeDakeraClient: + return FakeDakeraClient() + + +@pytest.fixture +def session(client: FakeDakeraClient) -> DakeraSession: + return DakeraSession("conv-1", client=client) # type: ignore[arg-type] + + +@pytest.fixture +def agent() -> Agent: + return Agent(name="test", model=FakeModel()) + + +# --------------------------------------------------------------------------- +# Core protocol behavior +# --------------------------------------------------------------------------- + + +async def test_add_and_get_items_roundtrip(session: DakeraSession) -> None: + items: list[TResponseInputItem] = [ + _message("user", "Hello"), + _message("assistant", "Hi there"), + ] + await session.add_items(items) + + retrieved = await session.get_items() + assert retrieved == items + + +async def test_items_are_chronological_across_add_calls(session: DakeraSession) -> None: + await session.add_items([_message("user", "first")]) + await session.add_items([_message("assistant", "second")]) + await session.add_items([_message("user", "third")]) + + retrieved = await session.get_items() + assert retrieved == [ + _message("user", "first"), + _message("assistant", "second"), + _message("user", "third"), + ] + + +async def test_add_empty_list_is_noop(session: DakeraSession) -> None: + await session.add_items([]) + assert await session.get_items() == [] + + +async def test_get_items_empty_session(session: DakeraSession) -> None: + assert await session.get_items() == [] + + +async def test_get_items_with_explicit_limit(session: DakeraSession) -> None: + await session.add_items([_message("user", f"m{i}") for i in range(5)]) + + latest = await session.get_items(limit=2) + assert latest == [_message("user", "m3"), _message("user", "m4")] + + +async def test_get_items_limit_zero(session: DakeraSession) -> None: + await session.add_items([_message("user", "m0")]) + assert await session.get_items(limit=0) == [] + + +async def test_get_items_limit_exceeds_count(session: DakeraSession) -> None: + await session.add_items([_message("user", "only")]) + assert len(await session.get_items(limit=10)) == 1 + + +async def test_session_settings_limit_used_as_default(client: FakeDakeraClient) -> None: + session = DakeraSession( + "conv-limit", + client=client, # type: ignore[arg-type] + session_settings=SessionSettings(limit=1), + ) + await session.add_items([_message("user", "a"), _message("assistant", "b")]) + + retrieved = await session.get_items() + assert retrieved == [_message("assistant", "b")] + + +async def test_explicit_limit_overrides_session_settings(client: FakeDakeraClient) -> None: + session = DakeraSession( + "conv-limit-2", + client=client, # type: ignore[arg-type] + session_settings=SessionSettings(limit=1), + ) + await session.add_items([_message("user", "a"), _message("assistant", "b")]) + assert len(await session.get_items(limit=2)) == 2 + + +# --------------------------------------------------------------------------- +# pop_item / clear_session +# --------------------------------------------------------------------------- + + +async def test_pop_item_returns_last(session: DakeraSession) -> None: + await session.add_items([_message("user", "first"), _message("assistant", "second")]) + + popped = await session.pop_item() + assert popped == _message("assistant", "second") + + remaining = await session.get_items() + assert remaining == [_message("user", "first")] + + +async def test_pop_then_add_keeps_order(session: DakeraSession) -> None: + await session.add_items([_message("user", "a"), _message("assistant", "b")]) + await session.pop_item() + await session.add_items([_message("assistant", "c")]) + + assert await session.get_items() == [_message("user", "a"), _message("assistant", "c")] + + +async def test_pop_item_empty_session(session: DakeraSession) -> None: + assert await session.pop_item() is None + + +async def test_clear_session(session: DakeraSession) -> None: + await session.add_items([_message("user", "a"), _message("assistant", "b")]) + await session.clear_session() + assert await session.get_items() == [] + + +# --------------------------------------------------------------------------- +# Isolation +# --------------------------------------------------------------------------- + + +async def test_sessions_are_isolated(client: FakeDakeraClient) -> None: + session_a = DakeraSession("conv-a", client=client) # type: ignore[arg-type] + session_b = DakeraSession("conv-b", client=client) # type: ignore[arg-type] + + await session_a.add_items([_message("user", "for a")]) + await session_b.add_items([_message("user", "for b")]) + + assert await session_a.get_items() == [_message("user", "for a")] + assert await session_b.get_items() == [_message("user", "for b")] + + +async def test_clear_does_not_affect_other_sessions(client: FakeDakeraClient) -> None: + session_a = DakeraSession("conv-a", client=client) # type: ignore[arg-type] + session_b = DakeraSession("conv-b", client=client) # type: ignore[arg-type] + await session_a.add_items([_message("user", "for a")]) + await session_b.add_items([_message("user", "for b")]) + + await session_a.clear_session() + assert await session_a.get_items() == [] + assert len(await session_b.get_items()) == 1 + + +async def test_custom_key_prefix_changes_namespace(client: FakeDakeraClient) -> None: + session = DakeraSession("conv-1", client=client, key_prefix="tenant-x") # type: ignore[arg-type] + await session.add_items([_message("user", "hi")]) + assert "tenant-x:conv-1" in client._store + + +# --------------------------------------------------------------------------- +# Serialization fidelity & corruption handling +# --------------------------------------------------------------------------- + + +async def test_unicode_and_special_characters_roundtrip(session: DakeraSession) -> None: + item = _message("user", 'emoji 🎉 and "quotes" and \\backslash and \n newline') + await session.add_items([item]) + assert (await session.get_items())[0] == item + + +async def test_complex_item_shape_roundtrip(session: DakeraSession) -> None: + item = cast( + "TResponseInputItem", + { + "type": "function_call", + "call_id": "call_123", + "name": "get_weather", + "arguments": '{"city": "SF"}', + }, + ) + await session.add_items([item]) + assert (await session.get_items())[0] == item + + +async def test_corrupted_entry_is_skipped(session: DakeraSession, client: FakeDakeraClient) -> None: + await session.add_items([_message("user", "good")]) + # Inject a corrupted memory directly into the namespace. + client._store[session._namespace].append( + { + "id": "mem-corrupt", + "content": "not-json{", + "metadata": {"seq": 99}, + "created_at": "2026-01-01T00:00:99Z", + } + ) + retrieved = await session.get_items() + assert retrieved == [_message("user", "good")] + + +async def test_pop_skips_corrupt_most_recent( + session: DakeraSession, client: FakeDakeraClient +) -> None: + await session.add_items([_message("user", "good")]) + client._store[session._namespace].append( + { + "id": "mem-corrupt", + "content": "not-json{", + "metadata": {"seq": 99}, + "created_at": "2026-01-01T00:00:99Z", + } + ) + popped = await session.pop_item() + assert popped == _message("user", "good") + # Both the corrupt entry and the popped good entry are gone. + assert await session.get_items() == [] + + +# --------------------------------------------------------------------------- +# Client lifecycle +# --------------------------------------------------------------------------- + + +async def test_external_client_not_closed(client: FakeDakeraClient) -> None: + session = DakeraSession("conv-1", client=client) # type: ignore[arg-type] + await session.close() + assert client.closed is False + + +async def test_from_url_owns_and_closes_client() -> None: + fake = FakeDakeraClient() + with patch( + "agents.extensions.memory.dakera_session.AsyncDakeraClient", + return_value=fake, + ) as mock_ctor: + session = DakeraSessionDirect.from_url( + "conv-1", + base_url="http://localhost:3000", + api_key="dk-test", + ) + mock_ctor.assert_called_once() + assert session._owns_client is True + await session.close() + assert fake.closed is True + + +# --------------------------------------------------------------------------- +# End-to-end Runner integration +# --------------------------------------------------------------------------- + + +async def test_runner_integration(session: DakeraSession, agent: Agent) -> None: + assert isinstance(agent.model, FakeModel) + + agent.model.set_next_output([get_text_message("San Francisco")]) + result1 = await Runner.run(agent, "Where is the Golden Gate Bridge?", session=session) + assert result1.final_output == "San Francisco" + + agent.model.set_next_output([get_text_message("California")]) + result2 = await Runner.run(agent, "What state is it in?", session=session) + assert result2.final_output == "California" + + # user + assistant for each of the two turns + assert len(await session.get_items()) == 4 + + +async def test_runner_session_isolation(client: FakeDakeraClient, agent: Agent) -> None: + assert isinstance(agent.model, FakeModel) + session_a = DakeraSession("conv-a", client=client) # type: ignore[arg-type] + session_b = DakeraSession("conv-b", client=client) # type: ignore[arg-type] + + agent.model.set_next_output([get_text_message("I like cats.")]) + await Runner.run(agent, "Remember: I like cats.", session=session_a) + + assert len(await session_a.get_items()) == 2 + assert await session_b.get_items() == [] diff --git a/tests/extensions/memory/test_memory_imports.py b/tests/extensions/memory/test_memory_imports.py index 955d5a79d4..ba2e0de468 100644 --- a/tests/extensions/memory/test_memory_imports.py +++ b/tests/extensions/memory/test_memory_imports.py @@ -22,6 +22,13 @@ "sqlalchemy", "sqlalchemy", ), + ( + "DakeraSession", + "agents.extensions.memory.dakera_session", + "dakera", + "dakera", + "dakera", + ), ("DaprSession", "agents.extensions.memory.dapr_session", "dapr.aio.clients", "dapr", "dapr"), ( "DAPR_CONSISTENCY_EVENTUAL", @@ -48,6 +55,7 @@ _DIRECT_MODULE_IMPORTS: tuple[tuple[str, str, str, str], ...] = ( ("agents.extensions.memory.redis_session", "redis.asyncio", "redis", "redis"), + ("agents.extensions.memory.dakera_session", "dakera", "dakera", "dakera"), ("agents.extensions.memory.dapr_session", "dapr.aio.clients", "dapr", "dapr"), ( "agents.extensions.memory.mongodb_session", @@ -88,6 +96,9 @@ def _reset_package_imports( def _reset_loaded_module(monkeypatch: pytest.MonkeyPatch, module_name: str) -> None: monkeypatch.delitem(sys.modules, module_name, raising=False) + if "." not in module_name: + # Top-level module (e.g. ``dakera``) has no parent package to reset. + return parent_name, short_name = module_name.rsplit(".", 1) parent_module = sys.modules.get(parent_name) if parent_module is not None: