Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
# Changelog
## Release History

All notable changes to `azure-cosmos-agent-memory` are documented in this file.
### 0.1.0b2 (2026-06-03)

#### Bugs Fixed
* Hardened memory extraction: stops emitting phantom/synthesized facts the user never asserted, stops extracting facts from `[assistant]:` turns, stops re-processing already-extracted turns (which previously produced reversed `CONTRADICT` decisions and meta-facts like `"X is contradicted by Y"`), and stops storing near-duplicate episodic memories for the same scope. Episodic memories also now embed the actual content instead of a boilerplate `"intent recorded"` string. See [PR:#20](https://github.com/AzureCosmosDB/AgentMemoryToolkit/pull/20/)
* Fixed `add_cosmos` + `process_now` silently bypassing the cadence subsystem: cadence env vars (`THREAD_SUMMARY_EVERY_N`, `FACT_EXTRACTION_EVERY_N`, `USER_SUMMARY_EVERY_N`, etc.) had no effect, and procedural / user-summary synthesis never ran. `add_cosmos` now triggers cadence on turn writes; `process_now` now runs the full 5-step pipeline on the in-process processor.See [PR:#20](https://github.com/AzureCosmosDB/AgentMemoryToolkit/pull/20/)

#### Other Changes
* `ProcessThreadResult` gains `procedural` and `user_summary` fields. `extract_memories` returns a `dropped_episodic_count` for monitoring LLM-extraction quality.See [PR:#20](https://github.com/aayush3011/AgentMemoryToolkit/pull/20)

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
and this project adheres to [PEP 440](https://peps.python.org/pep-0440/) versioning.

## [0.1.0b1] — 2026-06-01
Comment thread
aayush3011 marked this conversation as resolved.


Initial public preview release.

This is a **beta release**. The public surface may evolve in
backward-incompatible ways before the `1.0.0` general-availability cut.
Pin a specific version when integrating.

### Added
#### Added

- Sync (`CosmosMemoryClient`) and async (`AsyncCosmosMemoryClient`) clients
for storing, retrieving, and transforming agent memories backed by Azure
Expand All @@ -40,9 +46,7 @@ Pin a specific version when integrating.
- Structured JSON logging via `azure.cosmos.agent_memory.logging`
(`configure_logging`, `JsonFormatter`).

### Package layout
#### Package layout

- Distribution name: **`azure-cosmos-agent-memory`** (PyPI)
- Import path: **`azure.cosmos.agent_memory`**

[0.1.0b1]: https://github.com/AzureCosmosDB/AgentMemoryToolkit/releases/tag/v0.1.0b1
2 changes: 1 addition & 1 deletion azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

name: azure-cosmos-agent-memory
metadata:
template: azure-cosmos-agent-memory@0.1.0b1
template: azure-cosmos-agent-memory@0.1.0b2

infra:
provider: bicep
Expand Down
56 changes: 56 additions & 0 deletions azure/cosmos/agent_memory/_base/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,59 @@ def _close_sync_closeable(closeable: Any) -> None:
close()
except Exception:
pass


# Status codes that indicate transient, retry-able backend conditions.
# Permanent codes (401/403/404/409) and client-side bugs (400) must surface.
_TRANSIENT_HTTP_STATUS_CODES = frozenset({408, 429, 500, 502, 503, 504})


def is_transient_tail_step_error(exc: BaseException) -> bool:
"""Classify a tail-step exception as transient (swallow + log) or permanent (re-raise).

Used by ``process_now`` to decide whether a failure in the
``synthesize_procedural`` / ``process_user_summary`` tail steps should be
logged as a warning (so the per-thread work already persisted is not
erased) or re-raised to the caller (so configuration / schema bugs do not
become silent ``WARNING`` lines).

Transient (swallow):
* ``LLMError`` — LLM-side defensive raises (no-choices, no-content).
* ``openai.RateLimitError`` / ``APITimeoutError`` / ``APIConnectionError``.
* Any exception with ``status_code`` in
:data:`_TRANSIENT_HTTP_STATUS_CODES` (covers ``CosmosHttpResponseError``
and any other ``HttpResponseError`` subclass).

Permanent (re-raise):
* ``ValidationError`` / ``ConfigurationError`` / ``CosmosNotConnectedError``.
* ``openai.AuthenticationError`` / ``PermissionDeniedError`` /
``BadRequestError`` (status 400/401/403).
* ``CosmosHttpResponseError`` with status 400/401/403/404/409.
* Python builtins (``KeyError``, ``TypeError``, ``AttributeError``,
``NameError`` …) — these are programmer bugs, not infra hiccups.
"""
from azure.cosmos.agent_memory.exceptions import LLMError

if isinstance(exc, LLMError):
return True

try:
import openai
except ImportError:
openai = None
if openai is not None and isinstance(
exc,
(openai.RateLimitError, openai.APITimeoutError, openai.APIConnectionError),
):
return True
if openai is not None and isinstance(
exc,
(openai.AuthenticationError, openai.PermissionDeniedError, openai.BadRequestError),
):
return False

status = getattr(exc, "status_code", None)
if isinstance(status, int):
return status in _TRANSIENT_HTTP_STATUS_CODES

return False
2 changes: 2 additions & 0 deletions azure/cosmos/agent_memory/_container_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class ContainerKey(str, Enum):
"user_summary": ContainerKey.SUMMARIES,
}

USER_SCOPED_MEMORIES_TYPES: frozenset[str] = frozenset({"episodic", "procedural"})


def container_key_for_type(memory_type: str) -> ContainerKey:
"""Return the ``ContainerKey`` that owns documents of ``memory_type``."""
Expand Down
22 changes: 22 additions & 0 deletions azure/cosmos/agent_memory/_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ def add_filter(self, field: str, param_name: str, value: Any) -> None:
self._conditions.append(f"{field} = {param_name}")
self._parameters.append({"name": param_name, "value": value})

def add_thread_id_or_user_scoped(
self,
thread_id: Any,
param_name: str,
user_scoped_types: list[str],
type_param_base: str = "@user_scoped_type_",
) -> None:
"""Match either ``c.thread_id = @thread_id`` OR ``c.type IN (...)``."""
if thread_id is None:
return
if not user_scoped_types:
self.add_filter("c.thread_id", param_name, thread_id)
return
self._parameters.append({"name": param_name, "value": thread_id})
type_params: list[str] = []
for i, t in enumerate(user_scoped_types):
pname = f"{type_param_base}{i}"
type_params.append(pname)
self._parameters.append({"name": pname, "value": t})
in_clause = f"c.type IN ({', '.join(type_params)})"
self._conditions.append(f"(c.thread_id = {param_name} OR {in_clause})")

def add_array_contains(self, field: str, param_name: str, value: Any) -> None:
"""Add an ``ARRAY_CONTAINS`` filter."""
self._conditions.append(f"ARRAY_CONTAINS({field}, {param_name})")
Expand Down
14 changes: 13 additions & 1 deletion azure/cosmos/agent_memory/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datetime import datetime, timezone
from typing import Any, Optional

from ._container_routing import USER_SCOPED_MEMORIES_TYPES
from ._query_builder import _QueryBuilder
from .exceptions import ConfigurationError, ValidationError
from .thresholds import DEFAULT_TTL_BY_TYPE as DEFAULT_TTL_BY_TYPE
Expand Down Expand Up @@ -341,7 +342,11 @@ def _build_memory_query_builder(
qb = _QueryBuilder()
qb.add_filter("c.id", "@memory_id", memory_id)
qb.add_filter("c.user_id", "@user_id", user_id)
qb.add_filter("c.thread_id", "@thread_id", thread_id)
in_scope_user_types = _resolve_user_scoped_types_in_query(memory_types)
if thread_id is not None and in_scope_user_types:
qb.add_thread_id_or_user_scoped(thread_id, "@thread_id", sorted(in_scope_user_types))
else:
qb.add_filter("c.thread_id", "@thread_id", thread_id)
qb.add_filter("c.role", "@role", role)
if memory_types:
qb.add_in_filter("c.type", "@memory_type_", list(memory_types))
Expand All @@ -350,6 +355,13 @@ def _build_memory_query_builder(
return qb


def _resolve_user_scoped_types_in_query(memory_types: Optional[list[str]]) -> set[str]:
"""Return the user-scoped types this query may match."""
if not memory_types:
return set(USER_SCOPED_MEMORIES_TYPES)
return set(memory_types) & USER_SCOPED_MEMORIES_TYPES


def _container_policies(
*,
embedding_dimensions: int,
Expand Down
67 changes: 64 additions & 3 deletions azure/cosmos/agent_memory/aio/cosmos_memory_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import TYPE_CHECKING, Any, Iterable, Optional

from azure.cosmos.agent_memory._base import _BaseMemoryClient
from azure.cosmos.agent_memory._base.base_client import is_transient_tail_step_error
from azure.cosmos.agent_memory._container_routing import container_key_for_type
from azure.cosmos.agent_memory._utils import (
_build_container_kwargs,
Expand All @@ -25,7 +26,7 @@
from azure.cosmos.agent_memory.aio.processors import AsyncInProcessProcessor, AsyncMemoryProcessor
from azure.cosmos.agent_memory.aio.services.pipeline import AsyncPipelineService
from azure.cosmos.agent_memory.aio.store import AsyncMemoryStore
from azure.cosmos.agent_memory.exceptions import CosmosNotConnectedError, CosmosOperationError
from azure.cosmos.agent_memory.exceptions import CosmosNotConnectedError, CosmosOperationError, ValidationError
from azure.cosmos.agent_memory.logging import get_logger
from azure.cosmos.agent_memory.services._pipeline_helpers import _normalize_metadata_keys
from azure.cosmos.agent_memory.thresholds import DEFAULT_TTL_BY_TYPE
Expand Down Expand Up @@ -531,7 +532,21 @@ async def add_cosmos(
embedding: Optional[list[float]] = None,
embed: Optional[bool] = None,
) -> str:
return await self._get_store().add(
"""Add a memory directly to Cosmos DB, bypassing the local buffer.

For ``memory_type='turn'`` this also bumps the auto-trigger counter and
schedules cadence-aware processing (extract / reconcile / procedural /
thread_summary / user_summary) as a background ``asyncio.Task`` — the
same pattern :meth:`push_to_cosmos` uses for buffered turns. The await
returns after the Cosmos write completes; cadence runs out-of-band so
it does not block the caller.
"""
if memory_type == "turn" and not thread_id:
raise ValidationError(
"thread_id is required for memory_type='turn' so the auto-trigger "
"counter can group turns per conversation. Set thread_id explicitly."
)
memory_id = await self._get_store().add(
user_id,
role,
content,
Expand All @@ -544,6 +559,12 @@ async def add_cosmos(
embedding,
embed,
)
if memory_type == "turn" and thread_id:
task = asyncio.create_task(self._maybe_auto_trigger({(user_id, thread_id): 1}))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
task.add_done_callback(_log_auto_trigger_task_failure)
return memory_id

async def push_to_cosmos(self, batch_size: int = 25) -> None:
"""Insert all local memories into Cosmos DB and schedule processing."""
Expand Down Expand Up @@ -815,9 +836,49 @@ async def reconcile(self, user_id: str, n: Optional[int] = None) -> dict[str, in
return await self._get_pipeline().reconcile_memories(user_id, n if n is not None else get_dedup_pool_size())

async def process_now(self, *, user_id: str, thread_id: str) -> "ProcessThreadResult":
"""Force the processor to run the full pipeline RIGHT NOW for one thread.

For the in-process processor this fires all five steps:
``thread_summary → extract → reconcile → procedural → user_summary``.
For the durable processor this remains a no-op (the sibling Function
app drives the pipeline off the Cosmos DB change feed).

Transient failures in ``procedural`` and ``user_summary`` (LLM
rate-limit / timeout, Cosmos 429 / 5xx, defensive ``LLMError``) are
caught and logged as warnings so the per-thread work already
persisted by the prior steps is not erased. Permanent failures
(config bugs, auth errors, 4xx Cosmos errors, Python builtins like
``KeyError`` / ``TypeError``) are re-raised — silencing them turns
operational issues into invisible ``WARNING`` lines.
"""
_BaseMemoryClient._require_cosmos(self)
processor = self._get_processor()
turns = await self.get_thread(thread_id=thread_id, user_id=user_id) or []
return await self._get_processor().process_thread(user_id=user_id, thread_id=thread_id, turns=turns)
result = await processor.process_thread(user_id=user_id, thread_id=thread_id, turns=turns)
if isinstance(processor, AsyncInProcessProcessor):
try:
result.procedural = await processor.synthesize_procedural(user_id=user_id)
except Exception as exc:
if not is_transient_tail_step_error(exc):
raise
logger.warning(
"process_now: synthesize_procedural failed (transient) for user_id=%s: %s",
user_id,
exc,
)
try:
user_summary_result = await processor.process_user_summary(user_id=user_id)
if user_summary_result is not None and user_summary_result.summary:
result.user_summary = user_summary_result.summary
except Exception as exc:
if not is_transient_tail_step_error(exc):
raise
logger.warning(
"process_now: process_user_summary failed (transient) for user_id=%s: %s",
user_id,
exc,
)
return result

async def process_now_and_wait(self, *, user_id: str, thread_id: str, timeout: float = 30.0) -> bool:
_BaseMemoryClient._require_cosmos(self)
Expand Down
Loading
Loading