diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bb7b43..8d694bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,19 +1,25 @@ -# Changelog +## Release History -All notable changes to `azure-cosmos-agent-memory` are documented in this file. +### 0.1.0b2 (2026-06-03) + +#### Bugs Fixed +* Hardened memory extraction: stops emitting phantom/synthesized facts the user never asserted, stops extracting facts from `[assistant]:` turns, stops re-processing already-extracted turns (which previously produced reversed `CONTRADICT` decisions and meta-facts like `"X is contradicted by Y"`), and stops storing near-duplicate episodic memories for the same scope. Episodic memories also now embed the actual content instead of a boilerplate `"intent recorded"` string. See [PR:#20](https://github.com/AzureCosmosDB/AgentMemoryToolkit/pull/20/) +* Fixed `add_cosmos` + `process_now` silently bypassing the cadence subsystem: cadence env vars (`THREAD_SUMMARY_EVERY_N`, `FACT_EXTRACTION_EVERY_N`, `USER_SUMMARY_EVERY_N`, etc.) had no effect, and procedural / user-summary synthesis never ran. `add_cosmos` now triggers cadence on turn writes; `process_now` now runs the full 5-step pipeline on the in-process processor.See [PR:#20](https://github.com/AzureCosmosDB/AgentMemoryToolkit/pull/20/) + +#### Other Changes +* `ProcessThreadResult` gains `procedural` and `user_summary` fields. `extract_memories` returns a `dropped_episodic_count` for monitoring LLM-extraction quality.See [PR:#20](https://github.com/aayush3011/AgentMemoryToolkit/pull/20) -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) -and this project adheres to [PEP 440](https://peps.python.org/pep-0440/) versioning. ## [0.1.0b1] — 2026-06-01 + Initial public preview release. This is a **beta release**. The public surface may evolve in backward-incompatible ways before the `1.0.0` general-availability cut. Pin a specific version when integrating. -### Added +#### Added - Sync (`CosmosMemoryClient`) and async (`AsyncCosmosMemoryClient`) clients for storing, retrieving, and transforming agent memories backed by Azure @@ -40,9 +46,7 @@ Pin a specific version when integrating. - Structured JSON logging via `azure.cosmos.agent_memory.logging` (`configure_logging`, `JsonFormatter`). -### Package layout +#### Package layout - Distribution name: **`azure-cosmos-agent-memory`** (PyPI) - Import path: **`azure.cosmos.agent_memory`** - -[0.1.0b1]: https://github.com/AzureCosmosDB/AgentMemoryToolkit/releases/tag/v0.1.0b1 diff --git a/azure.yaml b/azure.yaml index 8c36f98..2543ee8 100644 --- a/azure.yaml +++ b/azure.yaml @@ -2,7 +2,7 @@ name: azure-cosmos-agent-memory metadata: - template: azure-cosmos-agent-memory@0.1.0b1 + template: azure-cosmos-agent-memory@0.1.0b2 infra: provider: bicep diff --git a/azure/cosmos/agent_memory/_base/base_client.py b/azure/cosmos/agent_memory/_base/base_client.py index e7aef1c..2fef0a5 100644 --- a/azure/cosmos/agent_memory/_base/base_client.py +++ b/azure/cosmos/agent_memory/_base/base_client.py @@ -253,3 +253,59 @@ def _close_sync_closeable(closeable: Any) -> None: close() except Exception: pass + + +# Status codes that indicate transient, retry-able backend conditions. +# Permanent codes (401/403/404/409) and client-side bugs (400) must surface. +_TRANSIENT_HTTP_STATUS_CODES = frozenset({408, 429, 500, 502, 503, 504}) + + +def is_transient_tail_step_error(exc: BaseException) -> bool: + """Classify a tail-step exception as transient (swallow + log) or permanent (re-raise). + + Used by ``process_now`` to decide whether a failure in the + ``synthesize_procedural`` / ``process_user_summary`` tail steps should be + logged as a warning (so the per-thread work already persisted is not + erased) or re-raised to the caller (so configuration / schema bugs do not + become silent ``WARNING`` lines). + + Transient (swallow): + * ``LLMError`` — LLM-side defensive raises (no-choices, no-content). + * ``openai.RateLimitError`` / ``APITimeoutError`` / ``APIConnectionError``. + * Any exception with ``status_code`` in + :data:`_TRANSIENT_HTTP_STATUS_CODES` (covers ``CosmosHttpResponseError`` + and any other ``HttpResponseError`` subclass). + + Permanent (re-raise): + * ``ValidationError`` / ``ConfigurationError`` / ``CosmosNotConnectedError``. + * ``openai.AuthenticationError`` / ``PermissionDeniedError`` / + ``BadRequestError`` (status 400/401/403). + * ``CosmosHttpResponseError`` with status 400/401/403/404/409. + * Python builtins (``KeyError``, ``TypeError``, ``AttributeError``, + ``NameError`` …) — these are programmer bugs, not infra hiccups. + """ + from azure.cosmos.agent_memory.exceptions import LLMError + + if isinstance(exc, LLMError): + return True + + try: + import openai + except ImportError: + openai = None + if openai is not None and isinstance( + exc, + (openai.RateLimitError, openai.APITimeoutError, openai.APIConnectionError), + ): + return True + if openai is not None and isinstance( + exc, + (openai.AuthenticationError, openai.PermissionDeniedError, openai.BadRequestError), + ): + return False + + status = getattr(exc, "status_code", None) + if isinstance(status, int): + return status in _TRANSIENT_HTTP_STATUS_CODES + + return False diff --git a/azure/cosmos/agent_memory/_container_routing.py b/azure/cosmos/agent_memory/_container_routing.py index 4fdeaec..fd439b5 100644 --- a/azure/cosmos/agent_memory/_container_routing.py +++ b/azure/cosmos/agent_memory/_container_routing.py @@ -30,6 +30,8 @@ class ContainerKey(str, Enum): "user_summary": ContainerKey.SUMMARIES, } +USER_SCOPED_MEMORIES_TYPES: frozenset[str] = frozenset({"episodic", "procedural"}) + def container_key_for_type(memory_type: str) -> ContainerKey: """Return the ``ContainerKey`` that owns documents of ``memory_type``.""" diff --git a/azure/cosmos/agent_memory/_query_builder.py b/azure/cosmos/agent_memory/_query_builder.py index fcb6377..22058a0 100644 --- a/azure/cosmos/agent_memory/_query_builder.py +++ b/azure/cosmos/agent_memory/_query_builder.py @@ -32,6 +32,28 @@ def add_filter(self, field: str, param_name: str, value: Any) -> None: self._conditions.append(f"{field} = {param_name}") self._parameters.append({"name": param_name, "value": value}) + def add_thread_id_or_user_scoped( + self, + thread_id: Any, + param_name: str, + user_scoped_types: list[str], + type_param_base: str = "@user_scoped_type_", + ) -> None: + """Match either ``c.thread_id = @thread_id`` OR ``c.type IN (...)``.""" + if thread_id is None: + return + if not user_scoped_types: + self.add_filter("c.thread_id", param_name, thread_id) + return + self._parameters.append({"name": param_name, "value": thread_id}) + type_params: list[str] = [] + for i, t in enumerate(user_scoped_types): + pname = f"{type_param_base}{i}" + type_params.append(pname) + self._parameters.append({"name": pname, "value": t}) + in_clause = f"c.type IN ({', '.join(type_params)})" + self._conditions.append(f"(c.thread_id = {param_name} OR {in_clause})") + def add_array_contains(self, field: str, param_name: str, value: Any) -> None: """Add an ``ARRAY_CONTAINS`` filter.""" self._conditions.append(f"ARRAY_CONTAINS({field}, {param_name})") diff --git a/azure/cosmos/agent_memory/_utils.py b/azure/cosmos/agent_memory/_utils.py index 1a3a7a1..8fc8e8b 100644 --- a/azure/cosmos/agent_memory/_utils.py +++ b/azure/cosmos/agent_memory/_utils.py @@ -13,6 +13,7 @@ from datetime import datetime, timezone from typing import Any, Optional +from ._container_routing import USER_SCOPED_MEMORIES_TYPES from ._query_builder import _QueryBuilder from .exceptions import ConfigurationError, ValidationError from .thresholds import DEFAULT_TTL_BY_TYPE as DEFAULT_TTL_BY_TYPE @@ -341,7 +342,11 @@ def _build_memory_query_builder( qb = _QueryBuilder() qb.add_filter("c.id", "@memory_id", memory_id) qb.add_filter("c.user_id", "@user_id", user_id) - qb.add_filter("c.thread_id", "@thread_id", thread_id) + in_scope_user_types = _resolve_user_scoped_types_in_query(memory_types) + if thread_id is not None and in_scope_user_types: + qb.add_thread_id_or_user_scoped(thread_id, "@thread_id", sorted(in_scope_user_types)) + else: + qb.add_filter("c.thread_id", "@thread_id", thread_id) qb.add_filter("c.role", "@role", role) if memory_types: qb.add_in_filter("c.type", "@memory_type_", list(memory_types)) @@ -350,6 +355,13 @@ def _build_memory_query_builder( return qb +def _resolve_user_scoped_types_in_query(memory_types: Optional[list[str]]) -> set[str]: + """Return the user-scoped types this query may match.""" + if not memory_types: + return set(USER_SCOPED_MEMORIES_TYPES) + return set(memory_types) & USER_SCOPED_MEMORIES_TYPES + + def _container_policies( *, embedding_dimensions: int, diff --git a/azure/cosmos/agent_memory/aio/cosmos_memory_client.py b/azure/cosmos/agent_memory/aio/cosmos_memory_client.py index 6f5098d..a2c1743 100644 --- a/azure/cosmos/agent_memory/aio/cosmos_memory_client.py +++ b/azure/cosmos/agent_memory/aio/cosmos_memory_client.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, Iterable, Optional from azure.cosmos.agent_memory._base import _BaseMemoryClient +from azure.cosmos.agent_memory._base.base_client import is_transient_tail_step_error from azure.cosmos.agent_memory._container_routing import container_key_for_type from azure.cosmos.agent_memory._utils import ( _build_container_kwargs, @@ -25,7 +26,7 @@ from azure.cosmos.agent_memory.aio.processors import AsyncInProcessProcessor, AsyncMemoryProcessor from azure.cosmos.agent_memory.aio.services.pipeline import AsyncPipelineService from azure.cosmos.agent_memory.aio.store import AsyncMemoryStore -from azure.cosmos.agent_memory.exceptions import CosmosNotConnectedError, CosmosOperationError +from azure.cosmos.agent_memory.exceptions import CosmosNotConnectedError, CosmosOperationError, ValidationError from azure.cosmos.agent_memory.logging import get_logger from azure.cosmos.agent_memory.services._pipeline_helpers import _normalize_metadata_keys from azure.cosmos.agent_memory.thresholds import DEFAULT_TTL_BY_TYPE @@ -531,7 +532,21 @@ async def add_cosmos( embedding: Optional[list[float]] = None, embed: Optional[bool] = None, ) -> str: - return await self._get_store().add( + """Add a memory directly to Cosmos DB, bypassing the local buffer. + + For ``memory_type='turn'`` this also bumps the auto-trigger counter and + schedules cadence-aware processing (extract / reconcile / procedural / + thread_summary / user_summary) as a background ``asyncio.Task`` — the + same pattern :meth:`push_to_cosmos` uses for buffered turns. The await + returns after the Cosmos write completes; cadence runs out-of-band so + it does not block the caller. + """ + if memory_type == "turn" and not thread_id: + raise ValidationError( + "thread_id is required for memory_type='turn' so the auto-trigger " + "counter can group turns per conversation. Set thread_id explicitly." + ) + memory_id = await self._get_store().add( user_id, role, content, @@ -544,6 +559,12 @@ async def add_cosmos( embedding, embed, ) + if memory_type == "turn" and thread_id: + task = asyncio.create_task(self._maybe_auto_trigger({(user_id, thread_id): 1})) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + task.add_done_callback(_log_auto_trigger_task_failure) + return memory_id async def push_to_cosmos(self, batch_size: int = 25) -> None: """Insert all local memories into Cosmos DB and schedule processing.""" @@ -815,9 +836,49 @@ async def reconcile(self, user_id: str, n: Optional[int] = None) -> dict[str, in return await self._get_pipeline().reconcile_memories(user_id, n if n is not None else get_dedup_pool_size()) async def process_now(self, *, user_id: str, thread_id: str) -> "ProcessThreadResult": + """Force the processor to run the full pipeline RIGHT NOW for one thread. + + For the in-process processor this fires all five steps: + ``thread_summary → extract → reconcile → procedural → user_summary``. + For the durable processor this remains a no-op (the sibling Function + app drives the pipeline off the Cosmos DB change feed). + + Transient failures in ``procedural`` and ``user_summary`` (LLM + rate-limit / timeout, Cosmos 429 / 5xx, defensive ``LLMError``) are + caught and logged as warnings so the per-thread work already + persisted by the prior steps is not erased. Permanent failures + (config bugs, auth errors, 4xx Cosmos errors, Python builtins like + ``KeyError`` / ``TypeError``) are re-raised — silencing them turns + operational issues into invisible ``WARNING`` lines. + """ _BaseMemoryClient._require_cosmos(self) + processor = self._get_processor() turns = await self.get_thread(thread_id=thread_id, user_id=user_id) or [] - return await self._get_processor().process_thread(user_id=user_id, thread_id=thread_id, turns=turns) + result = await processor.process_thread(user_id=user_id, thread_id=thread_id, turns=turns) + if isinstance(processor, AsyncInProcessProcessor): + try: + result.procedural = await processor.synthesize_procedural(user_id=user_id) + except Exception as exc: + if not is_transient_tail_step_error(exc): + raise + logger.warning( + "process_now: synthesize_procedural failed (transient) for user_id=%s: %s", + user_id, + exc, + ) + try: + user_summary_result = await processor.process_user_summary(user_id=user_id) + if user_summary_result is not None and user_summary_result.summary: + result.user_summary = user_summary_result.summary + except Exception as exc: + if not is_transient_tail_step_error(exc): + raise + logger.warning( + "process_now: process_user_summary failed (transient) for user_id=%s: %s", + user_id, + exc, + ) + return result async def process_now_and_wait(self, *, user_id: str, thread_id: str, timeout: float = 30.0) -> bool: _BaseMemoryClient._require_cosmos(self) diff --git a/azure/cosmos/agent_memory/aio/services/pipeline.py b/azure/cosmos/agent_memory/aio/services/pipeline.py index f4823e3..0db0475 100644 --- a/azure/cosmos/agent_memory/aio/services/pipeline.py +++ b/azure/cosmos/agent_memory/aio/services/pipeline.py @@ -10,6 +10,7 @@ from __future__ import annotations +import asyncio import hashlib import inspect import json @@ -53,9 +54,13 @@ build_transcript, cap_structured_summary, chat_text, + check_extracted_fact_grounding, coerce_valence, parse_llm_json, ) +from azure.cosmos.agent_memory.services._pipeline_helpers import ( + format_existing_episodics as _format_existing_episodics, +) from azure.cosmos.agent_memory.services._pipeline_helpers import ( is_real_number as _is_real_number, ) @@ -137,6 +142,12 @@ async def upsert_item(self, *, body: dict[str, Any]) -> dict[str, Any]: if inspect.isawaitable(response): response = await response return response if isinstance(response, dict) else body + upsert = getattr(self._store, "upsert_item", None) + if upsert is not None: + response = upsert(body=body) + if inspect.isawaitable(response): + response = await response + return response if isinstance(response, dict) else body response = await self._store.add_cosmos(body) return response if isinstance(response, dict) else body @@ -333,6 +344,7 @@ def _empty_extract_counts() -> dict[str, int]: "updated_count": 0, "contradicted_count": 0, "exact_dedup_skipped": 0, + "dropped_episodic_count": 0, } @staticmethod @@ -425,7 +437,11 @@ async def extract_memories_dry( logger.info("extract_memories_dry started user_id=%s thread_id=%s", user_id, thread_id) if turns is None: - query = "SELECT * FROM c WHERE c.user_id = @user_id AND c.thread_id = @thread_id AND c.type = 'turn'" + query = ( + "SELECT * FROM c WHERE c.user_id = @user_id " + "AND c.thread_id = @thread_id AND c.type = 'turn' " + "AND (NOT IS_DEFINED(c.extracted_at) OR IS_NULL(c.extracted_at))" + ) parameters: list[dict[str, Any]] = [ {"name": "@user_id", "value": user_id}, {"name": "@thread_id", "value": thread_id}, @@ -446,25 +462,32 @@ async def extract_memories_dry( if not items: logger.warning("extract_memories_dry no memories found user_id=%s thread_id=%s", user_id, thread_id) - return {"facts": [], "episodic": [], "updates": []} + return {"facts": [], "episodic": [], "updates": [], "processed_turn_docs": []} - existing = await self._load_existing_memories(user_id, ["fact"]) + existing_facts, existing_episodics = await asyncio.gather( + self._load_existing_memories(user_id, ["fact"]), + self._load_existing_memories(user_id, ["episodic"]), + ) existing_fact_hashes: set[str] = { - m["content_hash"] for m in existing if m.get("type") == "fact" and m.get("content_hash") + m["content_hash"] for m in existing_facts if m.get("type") == "fact" and m.get("content_hash") } - if existing: + if existing_facts: existing_text = "\n".join( - f"- [ID: {mem['id']}] {mem.get('content', '')} " - f"(type={mem.get('type', 'fact')}, salience={mem.get('salience', 'N/A')})" - for mem in existing + f"- [ID: {mem['id']}] {mem.get('content', '')} (type=fact, salience={mem.get('salience', 'N/A')})" + for mem in existing_facts ) else: existing_text = "(none)" + existing_episodics_text = _format_existing_episodics(existing_episodics) transcript = self._build_transcript(items) response_text = await self._run_prompty( "extract_memories.prompty", - inputs={"existing_facts": existing_text, "transcript": transcript}, + inputs={ + "existing_facts": existing_text, + "existing_episodics": existing_episodics_text, + "transcript": transcript, + }, ) parsed = self._parse_llm_json(response_text) facts = parsed.get("facts", []) @@ -476,6 +499,7 @@ async def extract_memories_dry( episodic_docs: list[dict[str, Any]] = [] updates: list[dict[str, Any]] = [] exact_dedup_skipped = 0 + dropped_episodic_count = 0 for fact in facts: action = fact.get("action", "ADD").upper() @@ -548,21 +572,37 @@ async def extract_memories_dry( scope_value = scope_value_raw.strip() if isinstance(scope_value_raw, str) else None if not scope_type or not scope_value: logger.warning( - "extract_memories: dropping malformed episodic (missing scope_type/scope_value): %r", + "extract_memories: dropping malformed episodic (missing scope_type/scope_value) " + "user_id=%s thread_id=%s reason=malformed_scope payload=%r", + user_id, + thread_id, ep, ) + dropped_episodic_count += 1 + continue + + text_raw = ep.get("text") + text = text_raw.strip() if isinstance(text_raw, str) else None + if not text: + logger.error( + "extract_memories: dropping episodic with empty/missing text field " + "(LLM extraction did not populate the required `text` field — likely a " + "weaker extraction model that needs upgrading or a prompt-compliance issue). " + "scope_type=%s scope_value=%s user_id=%s thread_id=%s reason=missing_text", + scope_type, + scope_value, + user_id, + thread_id, + ) + dropped_episodic_count += 1 continue situation = ep.get("situation") action_taken = ep.get("action_taken") outcome = ep.get("outcome") - if situation and action_taken and outcome: - text = f"{situation} → {action_taken} → {outcome}" - else: - text = f"For the user's {scope_value} {scope_type}, intent recorded." content_hash = compute_content_hash(text) - seed = _ID_SEED_SEP.join((user_id, thread_id, content_hash)) + seed = _ID_SEED_SEP.join((user_id, scope_type, scope_value)) det_id = f"ep_{hashlib.sha256(seed.encode()).hexdigest()[:32]}" topic_tags = build_topic_tags(ep.get("tags", [])) confidence = ep.get("confidence") @@ -579,7 +619,7 @@ async def extract_memories_dry( doc = { "id": det_id, "user_id": user_id, - "thread_id": thread_id, + "thread_id": "__episodic__", "role": "system", "type": "episodic", "content": text, @@ -590,6 +630,7 @@ async def extract_memories_dry( "metadata": { "scope_type": scope_type, "scope_value": scope_value, + "originating_thread_id": thread_id, "situation": situation, "action_taken": action_taken, "outcome": outcome, @@ -647,8 +688,24 @@ async def extract_memories_dry( if exact_dedup_skipped: updates.append({"op": "stats", "exact_dedup_skipped": exact_dedup_skipped}) + if dropped_episodic_count: + updates.append({"op": "stats", "dropped_episodic_count": dropped_episodic_count}) - result = {"facts": fact_docs, "episodic": episodic_docs, "updates": updates} + check_extracted_fact_grounding( + fact_docs, + items, + existing_facts, + user_id=user_id, + thread_id=thread_id, + logger=logger, + ) + + result = { + "facts": fact_docs, + "episodic": episodic_docs, + "updates": updates, + "processed_turn_docs": items, + } logger.info( "extract_memories_dry completed user_id=%s thread_id=%s fact_docs=%d episodic_docs=%d updates=%d", user_id, @@ -684,23 +741,28 @@ async def persist_extracted_memories( for doc in docs_to_create: validated = self._validate_extracted_doc(doc) + doc_type = validated.get("type") try: - await self._create_memory(validated) + if doc_type == "episodic": + await self._upsert_memory(validated) + else: + await self._create_memory(validated) except CosmosResourceExistsError: logger.info("persist_extracted_memories skipped existing id=%s", validated.get("id")) continue tags = validated.get("tags", []) - if validated.get("type") == "episodic": + if doc_type == "episodic": result["episodic_count"] += 1 elif "sys:unclassified" in tags: result["unclassified_count"] += 1 - elif validated.get("type") == "fact": + elif doc_type == "fact": result["fact_count"] += 1 for op in update_ops: if op.get("op") == "stats": result["exact_dedup_skipped"] += int(op.get("exact_dedup_skipped") or 0) + result["dropped_episodic_count"] += int(op.get("dropped_episodic_count") or 0) continue if op.get("op") != "supersede": continue @@ -724,8 +786,44 @@ async def persist_extracted_memories( result["updated_count"] += 1 logger.info("persist_extracted_memories completed user_id=%s counts=%s", user_id, result) + + processed_turns = extracted.get("processed_turn_docs") or [] + if processed_turns: + marked = await self._mark_turns_extracted(processed_turns) + logger.info( + "persist_extracted_memories marked turns as extracted user_id=%s marked=%d/%d", + user_id, + marked, + len(processed_turns), + ) + return result + async def _mark_turns_extracted(self, turn_docs: list[dict[str, Any]]) -> int: + """Stamp ``extracted_at`` on each turn doc and upsert. Mirror of + the sync helper — per-turn failures are logged but never raise. + """ + if not turn_docs: + return 0 + now_iso = datetime.now(tz=timezone.utc).isoformat() + marked = 0 + for turn in turn_docs: + turn_id = turn.get("id") + if not turn_id: + continue + try: + doc_to_write = dict(turn) + doc_to_write["extracted_at"] = now_iso + await self._upsert_item(self._turns_container, body=doc_to_write) + marked += 1 + except Exception as exc: + logger.warning( + "_mark_turns_extracted failed for turn_id=%s err=%s (turn may be re-extracted on next call)", + turn_id, + exc, + ) + return marked + async def extract_memories( self, user_id: str, diff --git a/azure/cosmos/agent_memory/aio/store/memory_store.py b/azure/cosmos/agent_memory/aio/store/memory_store.py index 4d70a5a..27104a1 100644 --- a/azure/cosmos/agent_memory/aio/store/memory_store.py +++ b/azure/cosmos/agent_memory/aio/store/memory_store.py @@ -9,6 +9,7 @@ from azure.cosmos.agent_memory._container_routing import ( _CONTAINER_FOR_TYPE, + USER_SCOPED_MEMORIES_TYPES, ContainerKey, container_key_for_type, ) @@ -842,6 +843,8 @@ async def search( parameters.append({"name": "@key_terms", "value": terms}) partition_key, _ = query_scope(user_id, thread_id) + if thread_id is not None and (not memory_types or set(memory_types) & USER_SCOPED_MEMORIES_TYPES): + partition_key = None logger.debug("AsyncMemoryStore.search query: %s", sql) return await self.query( sql, diff --git a/azure/cosmos/agent_memory/cosmos_memory_client.py b/azure/cosmos/agent_memory/cosmos_memory_client.py index e3bbbf2..fc54a40 100644 --- a/azure/cosmos/agent_memory/cosmos_memory_client.py +++ b/azure/cosmos/agent_memory/cosmos_memory_client.py @@ -8,6 +8,7 @@ from azure.cosmos.agent_memory.logging import get_logger from ._base import _BaseMemoryClient +from ._base.base_client import is_transient_tail_step_error from ._container_routing import container_key_for_type from ._utils import ( _build_container_kwargs, @@ -23,7 +24,7 @@ from .auto_trigger import maybe_trigger_steps from .chat import ChatClient from .embeddings import EmbeddingsClient -from .exceptions import CosmosOperationError +from .exceptions import CosmosOperationError, ValidationError from .processors import InProcessProcessor, MemoryProcessor from .services._pipeline_helpers import _normalize_metadata_keys from .services.pipeline import PipelineService @@ -489,8 +490,22 @@ def add_cosmos( embedding: Optional[list[float]] = None, embed: Optional[bool] = None, ) -> str: - """Add a memory to Cosmos DB.""" - return self._get_store().add( + """Add a memory directly to Cosmos DB, bypassing the local buffer. + + For ``memory_type='turn'`` this also bumps the auto-trigger counter and + schedules cadence-aware processing (extract / reconcile / procedural / + thread_summary / user_summary) via :func:`maybe_trigger_steps`, exactly + like :meth:`push_to_cosmos` does for buffered turns — so the + ``FACT_EXTRACTION_EVERY_N`` / ``THREAD_SUMMARY_EVERY_N`` / + ``USER_SUMMARY_EVERY_N`` / ``DEDUP_EVERY_N`` knobs apply uniformly + whether the caller uses the buffer or writes through directly. + """ + if memory_type == "turn" and not thread_id: + raise ValidationError( + "thread_id is required for memory_type='turn' so the auto-trigger " + "counter can group turns per conversation. Set thread_id explicitly." + ) + memory_id = self._get_store().add( user_id, role, content, @@ -503,6 +518,12 @@ def add_cosmos( embedding, embed, ) + if memory_type == "turn" and thread_id: + try: + self._maybe_auto_trigger({(user_id, thread_id): 1}) + except Exception as exc: + logger.warning("Auto-trigger after add_cosmos failed: %s", exc) + return memory_id def push_to_cosmos(self, batch_size: int = 25) -> None: """Insert all local memories into Cosmos DB.""" @@ -510,7 +531,7 @@ def push_to_cosmos(self, batch_size: int = 25) -> None: turn_counts, self._unflushed_turn_counts = self._unflushed_turn_counts, {} try: self._maybe_auto_trigger(turn_counts) - except Exception as exc: # pragma: no cover - defensive + except Exception as exc: logger.warning("Auto-trigger after push_to_cosmos failed: %s", exc) def get_memories( @@ -790,10 +811,49 @@ def reconcile(self, user_id: str, n: Optional[int] = None) -> dict[str, int]: return self._get_pipeline().reconcile_memories(user_id, n if n is not None else get_dedup_pool_size()) def process_now(self, *, user_id: str, thread_id: str) -> "ProcessThreadResult": - """Force the processor to run summarize/extract/dedup right now.""" + """Force the processor to run the full pipeline RIGHT NOW for one thread. + + For the in-process processor this fires all five steps: + ``thread_summary → extract → reconcile → procedural → user_summary``. + For the durable processor this remains a no-op (the sibling Function + app drives the pipeline off the Cosmos DB change feed). + + Transient failures in ``procedural`` and ``user_summary`` (LLM + rate-limit / timeout, Cosmos 429 / 5xx, defensive ``LLMError``) are + caught and logged as warnings so the per-thread work already + persisted by the prior steps is not erased. Permanent failures + (config bugs, auth errors, 4xx Cosmos errors, Python builtins like + ``KeyError`` / ``TypeError``) are re-raised — silencing them turns + operational issues into invisible ``WARNING`` lines. + """ self._require_cosmos() + processor = self._get_processor() turns = self.get_thread(thread_id=thread_id, user_id=user_id) or [] - return self._get_processor().process_thread(user_id=user_id, thread_id=thread_id, turns=turns) + result = processor.process_thread(user_id=user_id, thread_id=thread_id, turns=turns) + if isinstance(processor, InProcessProcessor): + try: + result.procedural = processor.synthesize_procedural(user_id=user_id) + except Exception as exc: + if not is_transient_tail_step_error(exc): + raise + logger.warning( + "process_now: synthesize_procedural failed (transient) for user_id=%s: %s", + user_id, + exc, + ) + try: + user_summary_result = processor.process_user_summary(user_id=user_id) + if user_summary_result is not None and user_summary_result.summary: + result.user_summary = user_summary_result.summary + except Exception as exc: + if not is_transient_tail_step_error(exc): + raise + logger.warning( + "process_now: process_user_summary failed (transient) for user_id=%s: %s", + user_id, + exc, + ) + return result def process_now_and_wait(self, *, user_id: str, thread_id: str, timeout: float = 30.0) -> bool: """Force processing and block until a thread summary exists (or timeout).""" diff --git a/azure/cosmos/agent_memory/processors/base.py b/azure/cosmos/agent_memory/processors/base.py index 271ff98..cd257b7 100644 --- a/azure/cosmos/agent_memory/processors/base.py +++ b/azure/cosmos/agent_memory/processors/base.py @@ -24,12 +24,21 @@ class ProcessThreadResult: The actual extracted memory documents are persisted to Cosmos DB by the pipeline; query them back via the SDK's ``get_memories`` if you need the raw docs. + + ``procedural`` and ``user_summary`` are populated by + :meth:`CosmosMemoryClient.process_now` only when the in-process processor + is active and only after the per-thread steps complete. They are ``None`` + for the durable processor (which runs them async out-of-band) and for the + auto-trigger cadence path (which records them via the counter container, + not on this dataclass). """ thread_summary: Optional[dict[str, Any]] = None extracted_counts: dict[str, int] = field(default_factory=dict) reconciled_count: int = 0 elapsed_ms: int = 0 + procedural: Optional[dict[str, Any]] = None + user_summary: Optional[dict[str, Any]] = None @dataclass diff --git a/azure/cosmos/agent_memory/prompts/_schemas.py b/azure/cosmos/agent_memory/prompts/_schemas.py index e0b425e..1b34973 100644 --- a/azure/cosmos/agent_memory/prompts/_schemas.py +++ b/azure/cosmos/agent_memory/prompts/_schemas.py @@ -115,6 +115,7 @@ "properties": { "scope_type": {"type": "string"}, "scope_value": {"type": "string"}, + "text": {"type": "string"}, "situation": {"type": ["string", "null"]}, "action_taken": {"type": ["string", "null"]}, "outcome": {"type": ["string", "null"]}, @@ -132,6 +133,7 @@ "required": [ "scope_type", "scope_value", + "text", "situation", "action_taken", "outcome", diff --git a/azure/cosmos/agent_memory/prompts/extract_memories.prompty b/azure/cosmos/agent_memory/prompts/extract_memories.prompty index dd1be98..d266f1d 100644 --- a/azure/cosmos/agent_memory/prompts/extract_memories.prompty +++ b/azure/cosmos/agent_memory/prompts/extract_memories.prompty @@ -14,6 +14,9 @@ inputs: existing_facts: type: string default: '(none)' + existing_episodics: + type: string + default: '(none)' transcript: type: string --- @@ -29,6 +32,14 @@ You are a precision memory extraction system. Your task is to read a conversatio Every memory must be explicitly grounded in the conversation — never inferred, assumed, or speculated. +## Speaker Discrimination — Where Memories May Come From + +The transcript below is line-tagged: `[user]:` lines are the human's own words, `[assistant]:` lines are the agent's response. These two sources are NOT interchangeable. + +- **Facts about the user may ONLY come from `[user]:` lines.** The assistant may restate, paraphrase, confirm, or make assumptions on the user's behalf ("Got it, you don't eat meat", "I assume you want a luxury hotel") — those are the agent's response, NOT the user's assertion. Never treat assistant text as a new source of user facts. If a fact appears only in `[assistant]:` content and is not asserted by the user, do not extract it. +- **Episodic memories may use both speakers' content** — the user's stated intent or scope is the anchor (and must be present in `[user]:`), but the assistant's content may help fill in the `action_taken` or `outcome` of a `situation → action_taken → outcome` arc when the agent carried out the action on the user's behalf. +- The assistant's general world-knowledge answers (e.g. "Python 3.13 was released in October 2024") are never user facts and never user episodic memories. + ## Confidence Scoring Every extracted memory (in any bucket) must include a `confidence` field in `[0.0, 1.0]` indicating how strongly the conversation supports the claim: @@ -62,8 +73,9 @@ Extract concrete, factual statements that fall into these categories: - Each fact must be self-contained and intelligible without context — no pronouns like "it" or "they" without antecedents - Write in third person ("The user...", "The project...") - Keep each fact concise — under 40 words -- Consolidate closely related items into a single fact (e.g., multiple search results on the same topic) -- Only split into separate facts when the claims are about genuinely different topics +- Consolidate closely related items **within the same category** into a single fact (e.g., multiple search results on the same topic) +- **Never merge across categories.** A single user turn that combines, say, a `preference` ("I don't eat meat") and a `requirement` ("I need wheelchair-accessible restaurants") MUST produce two separate facts. Compound user statements regularly cross category boundaries; extract every category that applies. Silently folding one into the other drops information. +- Only split *within* a category when the claims are about genuinely different topics - Each fact will be stored as its own document with its own vector embedding — self-contained phrasing is essential for retrieval ### Fact Reconciliation @@ -75,11 +87,21 @@ Before adding a fact, check it against the existing memories provided below. Use **UPDATE vs CONTRADICT — the decisive test:** If the old fact and the new fact could both be true at the same moment about the same subject, use UPDATE. If they cannot both be true (one negates, reverses, or excludes the other), use CONTRADICT. -#### Worked example — CONTRADICT +### ADD / CONTRADICT / UPDATE — hard constraints + +These rules are absolute. Violating them silently corrupts the user's memory store. + +1. **Every emitted fact (ADD, UPDATE, or CONTRADICT) must paraphrase a claim directly stated by the user in a `[user]:` line in this extraction's transcript.** Do not invent supporting, clarifying, or "explicit-negation" facts. Do not synthesize facts by combining, restating, or "consolidating" entries from the existing-facts list. If the [user]: lines in this transcript do not assert claim X, do NOT emit claim X — regardless of what the existing-facts list contains. Merging existing facts is the job of a separate reconciliation pass, not yours. +2. **One user statement → one fact, even when it implies a contradiction.** If a single user statement both adds new information AND opposes an existing fact, emit EXACTLY ONE fact: `text` paraphrases what the user actually said, `action="CONTRADICT"`, `supersedes_id` points at the opposed fact. The `supersedes_id` field by itself encodes the semantic opposition — do NOT also emit a second "explicit-negation" fact that restates the opposite of the prior claim. See the worked examples below. +3. **Fact text must describe the world, never describe a memory operation.** Strings like `"X is contradicted by Y"`, `"The user's prior preference is no longer accurate"`, or `"Previous fact superseded"` are meta-commentary about prior reconciliations and are **never** valid fact content. If you find yourself writing one, drop the item entirely. +4. **`supersedes_id` must point at a fact that is semantically about the same subject and property as the new fact.** A new dietary fact may only contradict an existing dietary fact; a new accessibility requirement may only update an existing accessibility requirement. Cross-subject supersedes (e.g. contradicting a "wheelchair access" fact with a new "loves seafood" fact) are always wrong — emit `ADD` instead. +5. **When in doubt, emit `ADD`.** A spurious `ADD` is recoverable (exact-content-hash deduping catches it; reconciliation can later merge or supersede it). A spurious `CONTRADICT` or `UPDATE` corrupts the audit trail and silently marks a still-valid fact as superseded. + +#### Worked example A — explicit CONTRADICT (user states the negation directly) - Existing memory: `[ID: fact_abc123] User is vegetarian.` - New turn: *"I started eating meat again last month."* -- Correct output: +- Correct output (one fact, text paraphrases what the user said): ```json { "text": "User eats meat.", @@ -92,6 +114,54 @@ Before adding a fact, check it against the existing memories provided below. Use ``` - **Wrong**: emitting `"action": "UPDATE"` here would be a silent bug — the pipeline treats UPDATE as a compatible refinement and CONTRADICT as an opposing claim, and downstream telemetry / belief-revision logic depends on the distinction. +#### Worked example B — implicit CONTRADICT (user states a claim that semantically opposes an existing fact) + +- Existing memory: `[ID: fact_xyz789] The user does not eat meat.` +- New turn: *"Actually, I love steak and seafood."* +- Correct output (ONE fact — `text` is what the user said, `supersedes_id` carries the opposition): + ```json + { + "text": "The user loves steak and seafood.", + "category": "preference", + "action": "CONTRADICT", + "supersedes_id": "fact_xyz789", + "confidence": 0.95, + "salience": 0.8 + } + ``` +- **Wrong** — emitting two facts: + ```json + // BAD: phantom explicit-negation fact alongside the literal user claim + [ + {"text": "The user loves steak and seafood.", "action": "ADD", ...}, + {"text": "The user eats meat.", "action": "CONTRADICT", "supersedes_id": "fact_xyz789", ...} + ] + ``` + The phantom `"The user eats meat"` fact was never said by the user — it is an invented restatement to make the contradiction "explicit". This pollutes the fact store with claims the user did not make. The CONTRADICT relation on the literal fact is sufficient. + +#### Worked example C — do NOT synthesize ADDs from existing facts + +- Existing memories: + - `[ID: fact_111] The user eats meat.` + - `[ID: fact_222] The user loves steak and seafood.` +- New turn: *"Normally, I prefer moderate hotels."* +- Correct output (ONE fact — only the hotel preference; the existing-facts list is reference-only): + ```json + { + "text": "The user normally prefers moderate hotels.", + "category": "preference", + "action": "ADD", + "confidence": 0.9, + "salience": 0.7 + } + ``` +- **Wrong** — emitting a synthesized "consolidation" fact: + ```json + // BAD: this fact is a paraphrase-merge of fact_111 + fact_222; the user never said this in this turn + {"text": "The user loves steak and seafood, indicating they eat meat.", "action": "ADD", ...} + ``` + Merging existing facts is the job of a separate reconciliation pass. Your job here is to extract claims from the new [user] turn only. + --- ## 2. Episodic — Situated Memories @@ -105,8 +175,12 @@ Extract memories that are tied to a **specific situation, scope, or context** th ### Required Fields - **scope_type** — short, free-form noun describing the kind of context (e.g. `trip`, `project`, `event`, `session`, `release`, `campaign`). Pick whatever vocabulary fits the user's domain. Do not invent a value if one is not implied — if no scope is present, the memory probably belongs in facts. - **scope_value** — the specific instance of that scope (e.g. `Paris 2025`, `Acme revamp`, `Q3 launch`). +- **text** — short, self-contained one-liner (under 25 words) describing what this memory is *about*. Required, non-empty. This is the field that gets embedded and full-text-indexed — vague text like "intent recorded" silently kills retrieval. Write it like a subject line: + - *Planned / in-flight* intents — capture the goal **plus the key constraints** the user has stated. Example: `"Planning a Tokyo trip with vegetarian and wheelchair-accessible-restaurant constraints."` + - *Past events* — summarize situation→action→outcome in one sentence. Example: `"Resolved Q3 K8s OOM outage by raising pod memory limits to 1GB."` + - *Ongoing context* — describe the current state. Example: `"User is heads-down on the Acme launch this week and wants short answers."` -Both must be non-empty. +All three of `scope_type`, `scope_value`, and `text` must be non-empty. (The `text` field plays the same role for episodic that it does for facts — self-contained phrasing intended for embedding.) ### Optional Fields (include only when applicable) - **situation** — the context or problem faced (present for past/in-flight events) @@ -117,7 +191,7 @@ Both must be non-empty. - **lesson** — a transferable takeaway - **domain** — topic area -For planned/in-flight or ongoing-context memories, leave `situation`, `action_taken`, `outcome`, `outcome_valence`, `reasoning`, and `lesson` as `null`. The scope fields alone carry the meaning. +For planned/in-flight or ongoing-context memories, leave `situation`, `action_taken`, `outcome`, `outcome_valence`, `reasoning`, and `lesson` as `null`. The scope fields plus `text` carry the meaning. --- @@ -140,9 +214,34 @@ When in doubt: --- ## Existing Memories -The following facts already exist for this user. Use action=UPDATE with supersedes_id to refine a compatible existing fact. Use action=CONTRADICT with supersedes_id when the new fact negates or is mutually exclusive with an existing fact. Use action=NONE if a fact is already known. + +### Existing facts (REFERENCE ONLY — never source ADDs from this list) + +The list below shows facts already stored for this user. Use it ONLY for these three purposes: + +1. **Deduplication** — if a fact you would otherwise ADD is already captured (semantically equivalent to an existing entry), omit it (action=NONE; don't include NONE entries in your output). +2. **UPDATE** — if a NEW `[user]:` line in this transcript refines or adds detail to an existing fact in a compatible way, emit one fact with `action=UPDATE` and `supersedes_id` pointing at the existing fact. +3. **CONTRADICT** — if a NEW `[user]:` line in this transcript opposes or negates an existing fact, emit one fact with `action=CONTRADICT` and `supersedes_id` pointing at the existing fact. The new fact's `text` paraphrases what the user actually said (not an invented explicit-negation). + +**Do NOT** treat this list as source material for ADD. Never combine, merge, restate, or "consolidate" entries from this list into a new ADD fact. If the new `[user]:` lines do not directly assert claim X, do NOT emit claim X — regardless of what the existing list contains. Cross-fact merging is the job of a separate reconciliation pass, not this one. + {{existing_facts}} +### Existing episodics + +The following episodic memories already exist for this user, grouped by scope. **Each scope (the pair of `scope_type` + `scope_value`) is the unique identity of an episodic memory** — there is one episodic per scope. Storage is upsert-by-scope: whatever you emit for an existing scope **replaces** the prior record. There is no ADD/UPDATE/CONTRADICT vocabulary for episodics — emit the full current state in `text` and the pipeline does the right thing. + +Decision rule when the transcript mentions a scope: + +- **Already captured, nothing new** — if the same scope is already covered and the transcript adds no new information (e.g. the user just re-states "for the Tokyo trip, I want luxury hotels" and that intent is already in the existing record), **omit it from the output**. Re-emitting it accomplishes nothing. +- **Refinement / extension** — the transcript adds new details to a scope already present (e.g. existing: "Planning Tokyo trip with luxury hotels"; new turn: "let's also book a 5-night Shinjuku stay"). Emit the episodic with **the merged, richer `text`** that includes both the prior intent and the new detail. The new text replaces the old record by scope. +- **Reversal** — the transcript negates or replaces the prior intent for the same scope (e.g. existing: "Planning Tokyo trip with luxury hotels"; new turn: "switching to budget hostels"). Emit the episodic with the **new** `text` describing the updated intent. The reversal replaces the old record by scope. +- **New scope** — the transcript introduces a scope that is not in the list below. Emit a fresh episodic with that scope. + +If two genuinely-distinct events would share the same `(scope_type, scope_value)` (e.g. "lost wallet in Tokyo" and "booked Tokyo hotel" both as `(trip, Tokyo)`), differentiate them via `scope_value` (`Tokyo lost-wallet incident` vs `Tokyo trip`) — not by emitting two records under the same scope. + +{{existing_episodics}} + --- ## Salience Scoring Rubric @@ -184,7 +283,7 @@ A fact is a standing claim that holds outside any specific context. The test: if 1. Could someone act on or reference this fact without reading the original thread? 2. Is this fact stated explicitly, not inferred? 3. Is each fact truly atomic — one claim per entry? -4. Can any facts be merged because they describe variants of the same thing? +4. Is every emitted fact grounded in a `[user]:` line in this transcript? (Existing-facts entries are reference-only; never source new ADDs from them.) **For Episodic:** 1. Did this event actually happen, or is it hypothetical? @@ -275,6 +374,7 @@ A fact is a standing claim that holds outside any specific context. The test: if { "scope_type": "incident", "scope_value": "Q3 K8s OOM outage", + "text": "Resolved Q3 K8s OOM outage by raising pod memory limits to 1GB and adding per-namespace resource quotas.", "situation": "Kubernetes pods in production were repeatedly OOM-killed, causing an outage.", "action_taken": "Bumped pod memory limits from 512MB to 1GB and added resource quotas per namespace.", "outcome": "The OOM-killing stopped and production stabilized.", @@ -360,6 +460,7 @@ The first statement is a standing preference and belongs in `facts`. The second { "scope_type": "trip", "scope_value": "Paris", + "text": "Planning a Paris trip with a luxury-accommodations preference.", "situation": null, "action_taken": null, "outcome": null, @@ -375,6 +476,121 @@ The first statement is a standing preference and belongs in `facts`. The second } ``` +### Example 6: Episodic — same scope, same intent already captured (OMIT) + +**Existing episodics:** +``` +- trip = Tokyo (1 episodic) + - [ID: ep_a1b2c3] (salience 0.8) Planning a Tokyo trip with a luxury hotel preference. +``` + +**Conversation:** +> User: "For this Tokyo trip, I want luxury hotels." +> User: "Normally, I prefer moderate hotels." + +The first user turn re-states an intent that is already captured for `(trip, Tokyo)`. Omit it — re-emitting the same intent for the same scope accomplishes nothing (the pipeline upserts by scope). The second user turn is a standing preference and belongs in `facts`. + +**Output:** +```json +{ + "facts": [ + { + "text": "The user normally prefers moderate hotels.", + "category": "preference", + "subject": "user", + "predicate": "hotel_preference", + "object": "moderate hotels", + "confidence": 0.95, + "salience": 0.7, + "temporal_context": null, + "tags": ["topic:travel", "topic:hotels"], + "action": "ADD", + "supersedes_id": null + } + ], + "episodic": [] +} +``` + +### Example 7: Episodic — refinement (emit merged text for the same scope) + +**Existing episodics:** +``` +- trip = Tokyo (1 episodic) + - [ID: ep_a1b2c3] (salience 0.8) Planning a Tokyo trip with a luxury hotel preference. +``` + +**Conversation:** +> User: "For the Tokyo trip, let's also book a 5-night stay in Shinjuku." + +The transcript adds a new detail (5-night Shinjuku stay) to the existing Tokyo scope. Emit one episodic for `(trip, Tokyo)` with the **merged richer text** that carries both the prior luxury-hotel intent and the new accommodation detail. The pipeline upserts by scope, so this replaces the prior record. + +**Output:** +```json +{ + "facts": [], + "episodic": [ + { + "scope_type": "trip", + "scope_value": "Tokyo", + "text": "Planning a Tokyo trip with a luxury hotel preference and a 5-night stay in Shinjuku.", + "situation": null, + "action_taken": null, + "outcome": null, + "outcome_valence": null, + "reasoning": null, + "lesson": null, + "domain": "travel", + "confidence": 0.95, + "salience": 0.85, + "tags": ["topic:travel", "topic:hotels", "topic:itinerary"] + } + ] +} +``` + +### Example 5: Compound user statement crossing fact categories + +**Conversation:** +> User: "I don't eat meat and I need wheelchair-accessible restaurants." + +A single user turn that combines a `preference` (diet) and a `requirement` (accessibility). These are different categories — they MUST be emitted as separate facts. Collapsing both into one "restaurant preferences" fact silently loses the accessibility constraint, which is the more operationally critical of the two. + +**Output:** +```json +{ + "facts": [ + { + "text": "The user does not eat meat.", + "category": "preference", + "subject": "user", + "predicate": "dietary_restriction", + "object": "no meat", + "confidence": 1.0, + "salience": 0.9, + "temporal_context": null, + "tags": ["topic:diet", "topic:food"], + "action": "ADD", + "supersedes_id": null + }, + { + "text": "The user requires wheelchair-accessible restaurants.", + "category": "requirement", + "subject": "user", + "predicate": "accessibility_requirement", + "object": "wheelchair-accessible restaurants", + "confidence": 1.0, + "salience": 0.95, + "temporal_context": null, + "tags": ["topic:accessibility", "topic:restaurants"], + "action": "ADD", + "supersedes_id": null + } + ], + "episodic": [] +} +``` + --- ## Output Format @@ -402,6 +618,7 @@ You must output ONLY valid JSON matching the schema below. No preamble, no expla { "scope_type": "trip|project|event|session|release|campaign|... (free-form, required, non-empty)", "scope_value": "specific instance, e.g. Paris 2025 (required, non-empty)", + "text": "self-contained one-liner describing the memory — required, non-empty", "situation": "context/problem, or null", "action_taken": "what was tried, or null", "outcome": "what happened, or null", diff --git a/azure/cosmos/agent_memory/services/_pipeline_helpers.py b/azure/cosmos/agent_memory/services/_pipeline_helpers.py index 715b453..9748168 100644 --- a/azure/cosmos/agent_memory/services/_pipeline_helpers.py +++ b/azure/cosmos/agent_memory/services/_pipeline_helpers.py @@ -280,6 +280,215 @@ def build_transcript( return "\n".join(parts) +def format_existing_episodics(memories: list[dict[str, Any]]) -> str: + """Render existing episodic memories for the extract_memories prompt. + + Groups by ``(scope_type, scope_value)`` so the LLM can see, per-scope, + which intent is already captured. Episodics use **scope-as-identity**: + the deterministic id is seeded from ``(user_id, scope_type, scope_value)``, + so any re-emission for the same scope (paraphrased intent, added detail, + or a reversal) collides and overwrites the prior record via upsert. The + LLM does NOT make ``ADD``/``UPDATE``/``CONTRADICT`` decisions on + episodics — that vocabulary is not in the episodic schema. + + What this rendering gives the model is per-scope context so it can: + + 1. Emit a single coherent ``text`` that reflects the *current* intent + for the scope (the upsert will overwrite the prior one). + 2. Avoid re-emitting an episodic when the new turn carries no + additional signal beyond what the existing one already records. + + Distinct events under the same umbrella (e.g. hotel booking vs lost + wallet, both under a Tokyo trip) belong under distinct ``scope_value`` + strings so they don't collide on the deterministic id. + """ + if not memories: + return "(none)" + grouped: dict[tuple[str, str], list[dict[str, Any]]] = defaultdict(list) + for mem in memories: + meta = mem.get("metadata") or {} + scope_type = (meta.get("scope_type") or "(none)").strip() or "(none)" + scope_value = (meta.get("scope_value") or "(none)").strip() or "(none)" + grouped[(scope_type, scope_value)].append(mem) + lines: list[str] = [] + for (scope_type, scope_value), bucket in grouped.items(): + lines.append(f"- {scope_type} = {scope_value} ({len(bucket)} episodic{'s' if len(bucket) != 1 else ''})") + for mem in bucket: + mem_id = mem.get("id", "(no-id)") + salience = mem.get("salience", "N/A") + content = (mem.get("content") or "").strip() or "(empty content)" + lines.append(f" - [ID: {mem_id}] (salience {salience}) {content}") + return "\n".join(lines) + + +# Stopwords stripped from grounding checks. Keep this list short and focused +# on tokens that carry no factual content; any word a memory might legitimately +# differ on (e.g. "not", "no") must NOT be added here. +_GROUNDING_STOPWORDS = frozenset( + { + "the", + "a", + "an", + "is", + "are", + "was", + "were", + "be", + "been", + "being", + "and", + "or", + "but", + "to", + "of", + "for", + "on", + "in", + "at", + "by", + "with", + "from", + "as", + "that", + "this", + "these", + "those", + "it", + "its", + "user", + "they", + "them", + "their", + "he", + "she", + "his", + "her", + "him", + "has", + "have", + "had", + "do", + "does", + "did", + "will", + "would", + "should", + "can", + "could", + "may", + "might", + "must", + } +) + +_GROUNDING_TOKEN_RE = re.compile(r"[a-zA-Z]{3,}") + + +def _grounding_tokens(text: str) -> set[str]: + """Tokenize text into lowercased content words (>=3 chars, stopwords removed).""" + if not text: + return set() + return {t for t in _GROUNDING_TOKEN_RE.findall(text.lower()) if t not in _GROUNDING_STOPWORDS} + + +def check_extracted_fact_grounding( + fact_docs: list[dict[str, Any]], + turn_items: list[dict[str, Any]], + existing_facts: list[dict[str, Any]], + *, + user_id: str, + thread_id: str, + logger: Any, +) -> None: + """Warn when an extracted fact's content is not grounded in the new user turns. + + Catches two known LLM failure modes that previously corrupted the fact store: + + 1. **Synthesis from existing facts** — the LLM emits an ADD whose content + paraphrase-merges two or more existing facts (e.g. existing + "user eats meat" + "user loves steak" → emitted "user loves steak, + indicating they eat meat") even though the new user turn says nothing + on the topic. Reconciliation later catches the resulting duplicates + but the visible artefact is a chain of "duplicate" supersedes that the + user never triggered. + + 2. **Phantom explicit-negation** — the LLM emits a second CONTRADICT fact + alongside the literal user statement (e.g. user says "I love steak and + seafood"; LLM emits both "user loves steak and seafood" and an invented + "user eats meat" CONTRADICT) when the supersedes_id on the literal fact + would have sufficed. Pollutes the store with claims the user didn't make. + + Heuristic: tokenize each emitted fact's content into lowercased content + words; subtract tokens present in the new user-turn transcript; the + remainder is "ungrounded". If ungrounded tokens come from 2+ existing + facts → strong synthesis signal. If they come from a single existing + fact with >=50%% overlap → weaker phantom-negation signal. + + Logs a WARNING for each suspected fact. Does NOT drop facts — downstream + reconciliation remains the dedup authority — but the WARNING is the + deterministic test signal that catches regressions. + """ + if not fact_docs or not turn_items: + return + + user_turn_text = " ".join( + str(m.get("content") or "") for m in turn_items if (m.get("role") or "").lower() == "user" + ) + user_tokens = _grounding_tokens(user_turn_text) + + existing_with_tokens: list[tuple[str, set[str]]] = [] + for mem in existing_facts: + toks = _grounding_tokens(str(mem.get("content") or "")) + if toks: + existing_with_tokens.append((str(mem.get("id") or ""), toks)) + + for doc in fact_docs: + content = str(doc.get("content") or "") + fact_tokens = _grounding_tokens(content) + if not fact_tokens: + continue + + ungrounded = fact_tokens - user_tokens + if not ungrounded: + continue + + contributors: list[tuple[str, set[str]]] = [ + (eid, ungrounded & toks) for eid, toks in existing_with_tokens if ungrounded & toks + ] + + if len(contributors) >= 2: + logger.warning( + "extract_memories: emitted fact appears synthesized from %d existing facts " + "(ungrounded in user turns) — extract should ground only in this turn's [user] lines. " + "doc_id=%s content=%r ungrounded_tokens=%s contributor_ids=%s " + "user_id=%s thread_id=%s", + len(contributors), + doc.get("id"), + content, + sorted(ungrounded), + [eid for eid, _ in contributors], + user_id, + thread_id, + ) + elif len(contributors) == 1 and len(ungrounded) >= 2: + eid, overlap = contributors[0] + overlap_ratio = len(overlap) / len(ungrounded) + if overlap_ratio >= 0.5: + logger.warning( + "extract_memories: emitted fact has ungrounded tokens overlapping a single existing fact " + "(possible phantom-negation/restatement) — extract should ground only in this turn's " + "[user] lines. doc_id=%s content=%r ungrounded_tokens=%s overlap_existing_id=%s " + "overlap_ratio=%.2f user_id=%s thread_id=%s", + doc.get("id"), + content, + sorted(ungrounded), + eid, + overlap_ratio, + user_id, + thread_id, + ) + + def parse_llm_json(text: str | None) -> dict[str, Any]: """Parse JSON from an LLM response, stripping markdown fences.""" if text is None: diff --git a/azure/cosmos/agent_memory/services/pipeline.py b/azure/cosmos/agent_memory/services/pipeline.py index 315fd05..2211669 100644 --- a/azure/cosmos/agent_memory/services/pipeline.py +++ b/azure/cosmos/agent_memory/services/pipeline.py @@ -51,9 +51,13 @@ build_transcript, cap_structured_summary, chat_text, + check_extracted_fact_grounding, coerce_valence, parse_llm_json, ) +from azure.cosmos.agent_memory.services._pipeline_helpers import ( + format_existing_episodics as _format_existing_episodics, +) from azure.cosmos.agent_memory.services._pipeline_helpers import ( is_real_number as _is_real_number, ) @@ -130,6 +134,10 @@ def upsert_item(self, *, body: dict[str, Any]) -> dict[str, Any]: if container is not None and hasattr(container, "upsert_item"): response = container.upsert_item(body=body) return response if isinstance(response, dict) else body + upsert = getattr(self._store, "upsert_item", None) + if upsert is not None: + response = upsert(body=body) + return response if isinstance(response, dict) else body return self._store.add_cosmos(body) def create_item(self, *, body: dict[str, Any]) -> dict[str, Any]: @@ -296,6 +304,7 @@ def _empty_extract_counts() -> dict[str, int]: "updated_count": 0, "contradicted_count": 0, "exact_dedup_skipped": 0, + "dropped_episodic_count": 0, } @staticmethod @@ -425,7 +434,11 @@ def extract_memories_dry( logger.info("extract_memories_dry started user_id=%s thread_id=%s", user_id, thread_id) if turns is None: - query = "SELECT * FROM c WHERE c.user_id = @user_id AND c.thread_id = @thread_id AND c.type = 'turn'" + query = ( + "SELECT * FROM c WHERE c.user_id = @user_id " + "AND c.thread_id = @thread_id AND c.type = 'turn' " + "AND (NOT IS_DEFINED(c.extracted_at) OR IS_NULL(c.extracted_at))" + ) parameters: list[dict[str, Any]] = [ {"name": "@user_id", "value": user_id}, {"name": "@thread_id", "value": thread_id}, @@ -447,25 +460,30 @@ def extract_memories_dry( if not items: logger.warning("extract_memories_dry no memories found user_id=%s thread_id=%s", user_id, thread_id) - return {"facts": [], "episodic": [], "updates": []} + return {"facts": [], "episodic": [], "updates": [], "processed_turn_docs": []} - existing = self._load_existing_memories(user_id, ["fact"]) + existing_facts = self._load_existing_memories(user_id, ["fact"]) + existing_episodics = self._load_existing_memories(user_id, ["episodic"]) existing_fact_hashes: set[str] = { - m["content_hash"] for m in existing if m.get("type") == "fact" and m.get("content_hash") + m["content_hash"] for m in existing_facts if m.get("type") == "fact" and m.get("content_hash") } - if existing: + if existing_facts: existing_text = "\n".join( - f"- [ID: {mem['id']}] {mem.get('content', '')} " - f"(type={mem.get('type', 'fact')}, salience={mem.get('salience', 'N/A')})" - for mem in existing + f"- [ID: {mem['id']}] {mem.get('content', '')} (type=fact, salience={mem.get('salience', 'N/A')})" + for mem in existing_facts ) else: existing_text = "(none)" + existing_episodics_text = _format_existing_episodics(existing_episodics) transcript = self._build_transcript(items) response_text = self._run_prompty( "extract_memories.prompty", - inputs={"existing_facts": existing_text, "transcript": transcript}, + inputs={ + "existing_facts": existing_text, + "existing_episodics": existing_episodics_text, + "transcript": transcript, + }, ) parsed = self._parse_llm_json(response_text) facts = parsed.get("facts", []) @@ -477,6 +495,7 @@ def extract_memories_dry( episodic_docs: list[dict[str, Any]] = [] updates: list[dict[str, Any]] = [] exact_dedup_skipped = 0 + dropped_episodic_count = 0 for fact in facts: action = fact.get("action", "ADD").upper() @@ -549,21 +568,37 @@ def extract_memories_dry( scope_value = scope_value_raw.strip() if isinstance(scope_value_raw, str) else None if not scope_type or not scope_value: logger.warning( - "extract_memories: dropping malformed episodic (missing scope_type/scope_value): %r", + "extract_memories: dropping malformed episodic (missing scope_type/scope_value) " + "user_id=%s thread_id=%s reason=malformed_scope payload=%r", + user_id, + thread_id, ep, ) + dropped_episodic_count += 1 + continue + + text_raw = ep.get("text") + text = text_raw.strip() if isinstance(text_raw, str) else None + if not text: + logger.error( + "extract_memories: dropping episodic with empty/missing text field " + "(LLM extraction did not populate the required `text` field — likely a " + "weaker extraction model that needs upgrading or a prompt-compliance issue). " + "scope_type=%s scope_value=%s user_id=%s thread_id=%s reason=missing_text", + scope_type, + scope_value, + user_id, + thread_id, + ) + dropped_episodic_count += 1 continue situation = ep.get("situation") action_taken = ep.get("action_taken") outcome = ep.get("outcome") - if situation and action_taken and outcome: - text = f"{situation} → {action_taken} → {outcome}" - else: - text = f"For the user's {scope_value} {scope_type}, intent recorded." content_hash = compute_content_hash(text) - seed = _ID_SEED_SEP.join((user_id, thread_id, content_hash)) + seed = _ID_SEED_SEP.join((user_id, scope_type, scope_value)) det_id = f"ep_{hashlib.sha256(seed.encode()).hexdigest()[:32]}" topic_tags = build_topic_tags(ep.get("tags", [])) confidence = ep.get("confidence") @@ -580,7 +615,7 @@ def extract_memories_dry( doc = { "id": det_id, "user_id": user_id, - "thread_id": thread_id, + "thread_id": "__episodic__", "role": "system", "type": "episodic", "content": text, @@ -591,6 +626,7 @@ def extract_memories_dry( "metadata": { "scope_type": scope_type, "scope_value": scope_value, + "originating_thread_id": thread_id, "situation": situation, "action_taken": action_taken, "outcome": outcome, @@ -648,8 +684,24 @@ def extract_memories_dry( if exact_dedup_skipped: updates.append({"op": "stats", "exact_dedup_skipped": exact_dedup_skipped}) + if dropped_episodic_count: + updates.append({"op": "stats", "dropped_episodic_count": dropped_episodic_count}) + + check_extracted_fact_grounding( + fact_docs, + items, + existing_facts, + user_id=user_id, + thread_id=thread_id, + logger=logger, + ) - result = {"facts": fact_docs, "episodic": episodic_docs, "updates": updates} + result = { + "facts": fact_docs, + "episodic": episodic_docs, + "updates": updates, + "processed_turn_docs": items, + } logger.info( "extract_memories_dry completed user_id=%s thread_id=%s fact_docs=%d episodic_docs=%d updates=%d", user_id, @@ -690,23 +742,28 @@ def persist_extracted_memories( # shape. Cost is microseconds per doc; the safety boundary is the # whole point. validated = self._validate_extracted_doc(doc) + doc_type = validated.get("type") try: - self._create_memory(validated) + if doc_type == "episodic": + self._upsert_memory(validated) + else: + self._create_memory(validated) except CosmosResourceExistsError: logger.info("persist_extracted_memories skipped existing id=%s", validated.get("id")) continue tags = validated.get("tags", []) - if validated.get("type") == "episodic": + if doc_type == "episodic": result["episodic_count"] += 1 elif "sys:unclassified" in tags: result["unclassified_count"] += 1 - elif validated.get("type") == "fact": + elif doc_type == "fact": result["fact_count"] += 1 for op in update_ops: if op.get("op") == "stats": result["exact_dedup_skipped"] += int(op.get("exact_dedup_skipped") or 0) + result["dropped_episodic_count"] += int(op.get("dropped_episodic_count") or 0) continue if op.get("op") != "supersede": continue @@ -730,8 +787,48 @@ def persist_extracted_memories( result["updated_count"] += 1 logger.info("persist_extracted_memories completed user_id=%s counts=%s", user_id, result) + + processed_turns = extracted.get("processed_turn_docs") or [] + if processed_turns: + marked = self._mark_turns_extracted(processed_turns) + logger.info( + "persist_extracted_memories marked turns as extracted user_id=%s marked=%d/%d", + user_id, + marked, + len(processed_turns), + ) + return result + def _mark_turns_extracted(self, turn_docs: list[dict[str, Any]]) -> int: + """Stamp ``extracted_at`` on each turn doc and upsert. + + We upsert the full doc (rather than patch) because the container + adapter only exposes upsert. Per-turn failures are logged but do + not raise — the worst case is one turn gets re-extracted on the + next call, which is bounded and recoverable. + """ + if not turn_docs: + return 0 + now_iso = datetime.now(tz=timezone.utc).isoformat() + marked = 0 + for turn in turn_docs: + turn_id = turn.get("id") + if not turn_id: + continue + try: + doc_to_write = dict(turn) + doc_to_write["extracted_at"] = now_iso + self._turns_container.upsert_item(body=doc_to_write) + marked += 1 + except Exception as exc: + logger.warning( + "_mark_turns_extracted failed for turn_id=%s err=%s (turn may be re-extracted on next call)", + turn_id, + exc, + ) + return marked + def extract_memories( self, user_id: str, diff --git a/azure/cosmos/agent_memory/store/memory_store.py b/azure/cosmos/agent_memory/store/memory_store.py index 5cbe7cd..b49e4c1 100644 --- a/azure/cosmos/agent_memory/store/memory_store.py +++ b/azure/cosmos/agent_memory/store/memory_store.py @@ -7,6 +7,7 @@ from azure.cosmos.agent_memory._container_routing import ( _CONTAINER_FOR_TYPE, + USER_SCOPED_MEMORIES_TYPES, ContainerKey, container_key_for_type, ) @@ -861,6 +862,8 @@ def search( parameters.append({"name": "@key_terms", "value": terms}) partition_key, cross_partition = query_scope(user_id, thread_id) + if thread_id is not None and (not memory_types or set(memory_types) & USER_SCOPED_MEMORIES_TYPES): + partition_key, cross_partition = None, True logger.debug("MemoryStore.search query: %s", sql) return self.query( sql, diff --git a/function_app/requirements.txt b/function_app/requirements.txt index 69450cc..a166c17 100644 --- a/function_app/requirements.txt +++ b/function_app/requirements.txt @@ -2,7 +2,7 @@ azure-functions azure-functions-durable -azure-cosmos-agent-memory==0.1.0b1 +azure-cosmos-agent-memory==0.1.0b2 azure-cosmos>=4.16.0 azure-identity>=1.20 diff --git a/pyproject.toml b/pyproject.toml index ecf9659..9681f23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ namespaces = true [project] name = "azure-cosmos-agent-memory" -version = "0.1.0b1" +version = "0.1.0b2" description = "Store, retrieve, and transform AI agent memories backed by Azure Cosmos DB" readme = "README.md" license = {file = "LICENSE"} diff --git a/tests/unit/aio/store/test_memory_store.py b/tests/unit/aio/store/test_memory_store.py index 7d3ba91..c8b6ef9 100644 --- a/tests/unit/aio/store/test_memory_store.py +++ b/tests/unit/aio/store/test_memory_store.py @@ -373,3 +373,96 @@ async def test_get_thread_summary_queries_summaries_with_partition_key(): assert params["@type"] == "thread_summary" assert params["@user_id"] == "u1" assert params["@thread_id"] == "t1" + + +# --------------------------------------------------------------------------- +# F-final: read-path translation for user-scoped types (episodic / procedural) +# --------------------------------------------------------------------------- + + +async def test_get_memories_with_episodic_and_thread_id_emits_or_clause(): + memories = MagicMock() + memories.query_items.return_value = AsyncIterator([]) + store = AsyncMemoryStore(containers=_containers(memories=memories)) + + await store.get_memories(user_id="u1", thread_id="t1", memory_types=["episodic"]) + + call_kwargs = memories.query_items.call_args.kwargs + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0))" in call_kwargs["query"] + params = _params_by_name(call_kwargs) + assert params["@thread_id"] == "t1" + assert params["@user_scoped_type_0"] == "episodic" + + +async def test_get_memories_with_procedural_and_thread_id_emits_or_clause(): + memories = MagicMock() + memories.query_items.return_value = AsyncIterator([]) + store = AsyncMemoryStore(containers=_containers(memories=memories)) + + await store.get_memories(user_id="u1", thread_id="t1", memory_types=["procedural"]) + + call_kwargs = memories.query_items.call_args.kwargs + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0))" in call_kwargs["query"] + + +async def test_get_memories_fact_only_with_thread_id_keeps_plain_filter(): + memories = MagicMock() + memories.query_items.return_value = AsyncIterator([]) + store = AsyncMemoryStore(containers=_containers(memories=memories)) + + await store.get_memories(user_id="u1", thread_id="t1", memory_types=["fact"]) + + call_kwargs = memories.query_items.call_args.kwargs + assert "c.thread_id = @thread_id" in call_kwargs["query"] + assert "@user_scoped_type_" not in call_kwargs["query"] + + +async def test_get_memories_no_memory_types_with_thread_id_emits_or_clause(): + memories = MagicMock() + memories.query_items.return_value = AsyncIterator([]) + store = AsyncMemoryStore(containers=_containers(memories=memories)) + + await store.get_memories(user_id="u1", thread_id="t1") + + call_kwargs = memories.query_items.call_args.kwargs + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0, @user_scoped_type_1))" in call_kwargs["query"] + + +async def test_search_with_episodic_and_thread_id_omits_partition_key(): + memories = MagicMock() + memories.query_items.return_value = AsyncIterator([]) + embeddings = MagicMock() + embeddings.generate = AsyncMock(return_value=[0.1, 0.2]) + store = AsyncMemoryStore(containers=_containers(memories=memories), embeddings_client=embeddings) + + await store.search( + search_terms="hotels", + user_id="u1", + thread_id="t1", + memory_types=["episodic"], + ) + + call_kwargs = memories.query_items.call_args.kwargs + # When user-scoped types are in scope, async search must NOT confine + # the query to [u1, t1] (where no episodic ever lives). Async SDK + # treats absent partition_key as cross-partition by default. + assert "partition_key" not in call_kwargs + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0))" in call_kwargs["query"] + + +async def test_search_fact_only_with_thread_id_uses_partition_path(): + memories = MagicMock() + memories.query_items.return_value = AsyncIterator([]) + embeddings = MagicMock() + embeddings.generate = AsyncMock(return_value=[0.1, 0.2]) + store = AsyncMemoryStore(containers=_containers(memories=memories), embeddings_client=embeddings) + + await store.search( + search_terms="hotels", + user_id="u1", + thread_id="t1", + memory_types=["fact"], + ) + + call_kwargs = memories.query_items.call_args.kwargs + assert call_kwargs.get("partition_key") == ["u1", "t1"] diff --git a/tests/unit/aio/test_cosmos_memory_client.py b/tests/unit/aio/test_cosmos_memory_client.py index 3eec695..eab2f84 100644 --- a/tests/unit/aio/test_cosmos_memory_client.py +++ b/tests/unit/aio/test_cosmos_memory_client.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import uuid from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, patch @@ -476,7 +477,11 @@ async def test_validate_topology_raises_when_not_connected(self): class TestAddCosmos: async def test_add_cosmos(self): mem, container = _connected_client() - await mem.add_cosmos(user_id="u1", role="user", content="hello") + # Suppress the background cadence task to keep the test focused on the CRUD write. + mem._maybe_auto_trigger = AsyncMock() + await mem.add_cosmos(user_id="u1", role="user", content="hello", thread_id="t1") + # Drain any pending background tasks (none expected since we stubbed the trigger). + await asyncio.gather(*list(mem._background_tasks), return_exceptions=True) turns = mem._turns_container_client turns.upsert_item.assert_awaited_once() @@ -487,7 +492,38 @@ async def test_add_cosmos(self): async def test_add_cosmos_not_connected(self): mem = _make_client() with pytest.raises(CosmosNotConnectedError): - await mem.add_cosmos(user_id="u1", role="user", content="hi") + await mem.add_cosmos(user_id="u1", role="user", content="hi", thread_id="t1") + + async def test_add_cosmos_turn_requires_thread_id(self): + """Turn writes must declare a thread_id so the auto-trigger counter can group them.""" + mem, _ = _connected_client() + with pytest.raises(ValidationError, match="thread_id is required"): + await mem.add_cosmos(user_id="u1", role="user", content="hi") # memory_type='turn' default + + async def test_add_cosmos_non_turn_does_not_require_thread_id(self): + """Non-turn writes (facts, episodics, etc.) work without thread_id and skip cadence.""" + mem, container = _connected_client() + trigger = AsyncMock() + mem._maybe_auto_trigger = trigger + + await mem.add_cosmos(user_id="u1", role="user", content="prefers dark mode", memory_type="fact") + await asyncio.gather(*list(mem._background_tasks), return_exceptions=True) + + container.upsert_item.assert_awaited_once() + trigger.assert_not_awaited() + + async def test_add_cosmos_turn_schedules_cadence(self): + """A turn write must schedule the auto-trigger as a background task so cadence + env vars apply whether the caller uses the local buffer or writes through directly.""" + mem, _ = _connected_client() + trigger = AsyncMock() + mem._maybe_auto_trigger = trigger + + await mem.add_cosmos(user_id="u1", role="user", content="hello", thread_id="t1") + # Drain the background task so the AsyncMock records the call. + await asyncio.gather(*list(mem._background_tasks), return_exceptions=True) + + trigger.assert_awaited_once_with({("u1", "t1"): 1}) class TestPushToCosmos: diff --git a/tests/unit/aio/test_process_now.py b/tests/unit/aio/test_process_now.py index 6407b56..25ae108 100644 --- a/tests/unit/aio/test_process_now.py +++ b/tests/unit/aio/test_process_now.py @@ -12,7 +12,7 @@ AsyncInProcessProcessor, ProcessThreadResult, ) -from azure.cosmos.agent_memory.exceptions import CosmosNotConnectedError +from azure.cosmos.agent_memory.exceptions import CosmosNotConnectedError, LLMError, ValidationError def _connected(processor=None) -> AsyncCosmosMemoryClient: @@ -28,12 +28,17 @@ def _patch_get_thread(client, turns): @pytest.mark.asyncio -async def test_process_now_with_inprocess_invokes_pipeline(): +async def test_process_now_with_inprocess_invokes_full_pipeline(): + """process_now must fire ALL FIVE steps for AsyncInProcess: thread_summary, extract, + reconcile, procedural, user_summary. Pre-fix this was only the first 3, so + procedural + user_summary never ran when callers used add_cosmos + process_now.""" client = _connected() pipeline = AsyncMock() pipeline.generate_thread_summary.return_value = {"id": "s"} pipeline.extract_memories.return_value = {"facts": 1} pipeline.reconcile_memories.return_value = {"kept": 0, "merged": 0, "contradicted": 0} + pipeline.synthesize_procedural.return_value = {"id": "proc1", "type": "procedural"} + pipeline.generate_user_summary.return_value = {"id": "us1", "type": "user_summary"} pipeline._store = client._get_store() pipeline._containers = dict(client._containers) client._pipeline = pipeline @@ -46,10 +51,127 @@ async def test_process_now_with_inprocess_invokes_pipeline(): pipeline.generate_thread_summary.assert_awaited_once_with("u", "t") pipeline.extract_memories.assert_awaited_once_with("u", "t") pipeline.reconcile_memories.assert_awaited_once_with("u", 50) + pipeline.synthesize_procedural.assert_awaited_once_with("u", force=False) + pipeline.generate_user_summary.assert_awaited_once_with("u", None) + assert result.procedural == {"id": "proc1", "type": "procedural"} + assert result.user_summary == {"id": "us1", "type": "user_summary"} @pytest.mark.asyncio -async def test_process_now_with_durable_is_noop(): +async def test_process_now_swallows_procedural_failure(): + """A transient LLM failure in synthesize_procedural must NOT erase the work + already persisted by the per-thread steps. WARNING-log + continue.""" + client = _connected() + pipeline = AsyncMock() + pipeline.generate_thread_summary.return_value = {"id": "s"} + pipeline.extract_memories.return_value = {"facts": 1} + pipeline.reconcile_memories.return_value = {"kept": 0} + pipeline.synthesize_procedural.side_effect = LLMError("LLM rate-limited") + pipeline.generate_user_summary.return_value = {"id": "us1"} + pipeline._store = client._get_store() + pipeline._containers = dict(client._containers) + client._pipeline = pipeline + _patch_get_thread(client, [{"role": "user"}]) + + result = await client.process_now(user_id="u", thread_id="t") + + assert result.procedural is None + assert result.user_summary == {"id": "us1"} + pipeline.synthesize_procedural.assert_awaited_once() + pipeline.generate_user_summary.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_process_now_swallows_user_summary_failure(): + """A transient LLM failure in generate_user_summary must NOT erase the work + already persisted by the per-thread + procedural steps.""" + client = _connected() + pipeline = AsyncMock() + pipeline.generate_thread_summary.return_value = {"id": "s"} + pipeline.extract_memories.return_value = {"facts": 1} + pipeline.reconcile_memories.return_value = {"kept": 0} + pipeline.synthesize_procedural.return_value = {"id": "proc1"} + pipeline.generate_user_summary.side_effect = LLMError("LLM timeout") + pipeline._store = client._get_store() + pipeline._containers = dict(client._containers) + client._pipeline = pipeline + _patch_get_thread(client, [{"role": "user"}]) + + result = await client.process_now(user_id="u", thread_id="t") + + assert result.procedural == {"id": "proc1"} + assert result.user_summary is None + + +@pytest.mark.asyncio +async def test_process_now_swallows_transient_http_error_by_status_code(): + """HTTP exceptions with transient status codes must be swallowed.""" + + class _FakeHttpExc(Exception): + def __init__(self, status_code): + super().__init__(f"HTTP {status_code}") + self.status_code = status_code + + client = _connected() + pipeline = AsyncMock() + pipeline.generate_thread_summary.return_value = {"id": "s"} + pipeline.extract_memories.return_value = {"facts": 1} + pipeline.reconcile_memories.return_value = {"kept": 0} + pipeline.synthesize_procedural.side_effect = _FakeHttpExc(429) + pipeline.generate_user_summary.side_effect = _FakeHttpExc(503) + pipeline._store = client._get_store() + pipeline._containers = dict(client._containers) + client._pipeline = pipeline + _patch_get_thread(client, [{"role": "user"}]) + + result = await client.process_now(user_id="u", thread_id="t") + + assert result.procedural is None + assert result.user_summary is None + + +@pytest.mark.asyncio +async def test_process_now_propagates_permanent_procedural_failure(): + """Permanent failures (e.g. KeyError from a schema bug) must propagate.""" + client = _connected() + pipeline = AsyncMock() + pipeline.generate_thread_summary.return_value = {"id": "s"} + pipeline.extract_memories.return_value = {"facts": 1} + pipeline.reconcile_memories.return_value = {"kept": 0} + pipeline.synthesize_procedural.side_effect = KeyError("missing_required_field") + pipeline._store = client._get_store() + pipeline._containers = dict(client._containers) + client._pipeline = pipeline + _patch_get_thread(client, [{"role": "user"}]) + + with pytest.raises(KeyError): + await client.process_now(user_id="u", thread_id="t") + pipeline.generate_user_summary.assert_not_called() + + +@pytest.mark.asyncio +async def test_process_now_propagates_permanent_user_summary_failure(): + """ValidationError from generate_user_summary must propagate.""" + client = _connected() + pipeline = AsyncMock() + pipeline.generate_thread_summary.return_value = {"id": "s"} + pipeline.extract_memories.return_value = {"facts": 1} + pipeline.reconcile_memories.return_value = {"kept": 0} + pipeline.synthesize_procedural.return_value = {"id": "proc1"} + pipeline.generate_user_summary.side_effect = ValidationError("bad payload") + pipeline._store = client._get_store() + pipeline._containers = dict(client._containers) + client._pipeline = pipeline + _patch_get_thread(client, [{"role": "user"}]) + + with pytest.raises(ValidationError): + await client.process_now(user_id="u", thread_id="t") + + +@pytest.mark.asyncio +async def test_process_now_with_durable_skips_tail_steps(): + """Durable mode must NOT call synthesize_procedural or generate_user_summary — + those are driven by the change-feed-fed sibling Function app.""" client = _connected(processor=AsyncDurableFunctionProcessor()) pipeline = AsyncMock() client._pipeline = pipeline @@ -59,7 +181,11 @@ async def test_process_now_with_durable_is_noop(): assert isinstance(result, ProcessThreadResult) assert result.thread_summary is None + assert result.procedural is None + assert result.user_summary is None pipeline.generate_thread_summary.assert_not_called() + pipeline.synthesize_procedural.assert_not_called() + pipeline.generate_user_summary.assert_not_called() @pytest.mark.asyncio diff --git a/tests/unit/services/test_extract_dry.py b/tests/unit/services/test_extract_dry.py index c7686e6..32b4390 100644 --- a/tests/unit/services/test_extract_dry.py +++ b/tests/unit/services/test_extract_dry.py @@ -72,8 +72,20 @@ def query(self, sql: str, parameters=None, partition_key=None, cross_partition: docs = [doc for doc in docs if doc.get("type") in types] if "superseded_by" in sql: docs = [doc for doc in docs if not doc.get("superseded_by")] + if "extracted_at" in sql: + docs = [doc for doc in docs if not doc.get("extracted_at")] return docs + def upsert_item(self, *, body: dict[str, Any]) -> dict[str, Any]: + body = dict(body) + doc_id = body.get("id") + for i, doc in enumerate(self.docs): + if doc.get("id") == doc_id: + self.docs[i] = body + return body + self.docs.append(body) + return body + def read_item(self, item_id: str, partition_key: Any): del partition_key for doc in self.docs: @@ -162,7 +174,7 @@ def _response() -> dict[str, Any]: { "scope_type": "project", "scope_value": "CI", - "summary": "CI retries resolved flaky tests.", + "text": "CI retries resolved flaky tests.", "lesson": "Use retries for flaky CI tests.", "confidence": 0.8, } @@ -184,7 +196,7 @@ def test_extract_memories_dry_shape_is_small_and_has_no_embeddings() -> None: output = service.extract_memories_dry("u1", "t1") - assert set(output) == {"facts", "episodic", "updates"} + assert set(output) == {"facts", "episodic", "updates", "processed_turn_docs"} assert len(json.dumps(output)) < 32 * 1024 assert output["facts"] and output["episodic"] assert all("embedding" not in doc for docs in (output["facts"], output["episodic"]) for doc in docs) @@ -224,7 +236,7 @@ async def test_async_extract_memories_dry_shape_is_small_and_has_no_embeddings() output = await service.extract_memories_dry("u1", "t1") - assert set(output) == {"facts", "episodic", "updates"} + assert set(output) == {"facts", "episodic", "updates", "processed_turn_docs"} assert len(json.dumps(output)) < 32 * 1024 assert all("embedding" not in doc for docs in (output["facts"], output["episodic"]) for doc in docs) assert embeddings.calls == [] @@ -247,3 +259,426 @@ async def test_async_extract_memories_dry_is_byte_deterministic_for_same_llm_res assert json.dumps(first, sort_keys=True, separators=(",", ":")) == json.dumps( second, sort_keys=True, separators=(",", ":") ) + + +def test_dry_returns_processed_turn_docs_for_watermarking() -> None: + """``extract_memories_dry`` must surface the turn docs it processed so + ``persist_extracted_memories`` can stamp ``extracted_at`` on them and + the next extraction call doesn't reprocess them.""" + chat = _SyncChat([_response()]) + memories_store = _Store([]) + turns_store = _Store([_turn(i) for i in range(3)]) + service = PipelineService( + memories_store, + chat, + _SyncEmbeddings(), + containers=_containers_for_store(memories_store, turns_store=turns_store), + ) + + output = service.extract_memories_dry("u1", "t1") + + assert "processed_turn_docs" in output + assert {d["id"] for d in output["processed_turn_docs"]} == {"turn-0", "turn-1", "turn-2"} + + +def test_dry_alone_does_not_mark_turns_as_extracted() -> None: + """A dry run is read-only: it must NOT stamp ``extracted_at`` on any + turn (the wet ``extract_memories`` orchestrator handles marking only + after a successful persist).""" + chat = _SyncChat([_response()]) + memories_store = _Store([]) + turns_store = _Store([_turn(i) for i in range(3)]) + service = PipelineService( + memories_store, + chat, + _SyncEmbeddings(), + containers=_containers_for_store(memories_store, turns_store=turns_store), + ) + + service.extract_memories_dry("u1", "t1") + + for doc in turns_store.docs: + assert "extracted_at" not in doc or doc["extracted_at"] is None + + +def test_extract_memories_marks_turns_after_successful_persist() -> None: + """The wet ``extract_memories`` must stamp ``extracted_at`` on each + turn it processed. Without this, the next extraction call re-loads + the same turns and the LLM re-decides UPDATE/CONTRADICT — which is + the runaway-extraction bug this fix is designed to prevent.""" + chat = _SyncChat([_response()]) + memories_store = _Store([]) + turns_store = _Store([_turn(i) for i in range(3)]) + service = PipelineService( + memories_store, + chat, + _SyncEmbeddings(), + containers=_containers_for_store(memories_store, turns_store=turns_store), + ) + + service.extract_memories("u1", "t1") + + marked_turns = [doc for doc in turns_store.docs if doc.get("extracted_at")] + assert len(marked_turns) == 3 + marked_ids = {doc["id"] for doc in marked_turns} + assert marked_ids == {"turn-0", "turn-1", "turn-2"} + + +def test_second_extract_call_does_not_reprocess_already_extracted_turns() -> None: + """End-to-end watermarking proof: after a first ``extract_memories`` + marks the turns, a second call with no NEW turns must produce zero + work — no second LLM call, no second persist. This is the property + that prevents reversed-supersede / hallucinated-meta-fact bugs.""" + chat = _SyncChat([_response(), _response()]) # second response should never be consumed + memories_store = _Store([]) + turns_store = _Store([_turn(i) for i in range(3)]) + service = PipelineService( + memories_store, + chat, + _SyncEmbeddings(), + containers=_containers_for_store(memories_store, turns_store=turns_store), + ) + + service.extract_memories("u1", "t1") + calls_after_first = chat.calls + + # Second invocation with no new turns: watermarked turns are filtered + # out by the query and the dry early-returns with empty items. + service.extract_memories("u1", "t1") + assert chat.calls == calls_after_first + + +@pytest.mark.asyncio +async def test_async_extract_memories_marks_turns_after_successful_persist() -> None: + chat = _AsyncChat([_response()]) + memories_store = _AsyncStore([]) + turns_store = _AsyncStore([_turn(i) for i in range(3)]) + service = AsyncPipelineService( + memories_store, + chat, + _AsyncEmbeddings(), + containers=_async_containers_for_store(memories_store, turns_store=turns_store), + ) + + await service.extract_memories("u1", "t1") + + marked_turns = [doc for doc in turns_store.docs if doc.get("extracted_at")] + assert len(marked_turns) == 3 + + +# --------------------------------------------------------------------------- +# Grounding-check regression tests +# +# These tests pin down the two known LLM extraction-time failure modes that +# previously corrupted the fact store and required a wheel hotfix: +# +# 1. The LLM synthesizes an ADD by paraphrase-merging 2+ existing facts +# (e.g. "user eats meat" + "user loves steak" → "user loves steak, +# indicating they eat meat") even though the new user turn said nothing +# on the topic. +# 2. The LLM emits a second invented CONTRADICT fact alongside the literal +# user statement (e.g. user says "I love steak"; LLM emits both +# "loves steak" AND "user eats meat" — the second is a phantom +# explicit-negation that polluted the store with claims the user +# didn't make). +# +# The fix is a prompt change that forbids both patterns. Because we can't +# directly test prompt-following at the unit-test level (no real LLM in +# the test loop), we test the structural safety net: +# ``check_extracted_fact_grounding`` logs a WARNING when these patterns +# slip through. The four scenarios below pair a "buggy" LLM response with +# a "clean" one for each pattern, asserting the WARNING fires (or not) +# accordingly. If a future change ever regresses the prompt and the LLM +# starts emitting these patterns, the WARNING in production telemetry +# becomes the visible signal. +# --------------------------------------------------------------------------- + + +def _existing_fact(fid: str, content: str) -> dict[str, Any]: + """Build a minimal existing-fact doc shaped like what + ``_load_existing_memories`` returns from Cosmos.""" + return { + "id": fid, + "user_id": "u1", + "type": "fact", + "content": content, + "content_hash": fid, + "salience": 0.8, + "confidence": 0.9, + "metadata": {"category": "preference"}, + "tags": ["sys:fact"], + } + + +def _moderate_hotels_turn() -> dict[str, Any]: + return { + "id": "turn-new", + "user_id": "u1", + "thread_id": "t1", + "role": "user", + "type": "turn", + "content": "Normally, I prefer moderate hotels.", + "created_at": "2026-06-02T19:00:00+00:00", + } + + +def _steak_seafood_turn() -> dict[str, Any]: + return { + "id": "turn-new", + "user_id": "u1", + "thread_id": "t1", + "role": "user", + "type": "turn", + "content": "Actually, I love steak and seafood.", + "created_at": "2026-06-02T18:00:00+00:00", + } + + +def test_grounding_check_warns_when_add_synthesizes_from_multiple_existing_facts(caplog) -> None: + """Scenario 1 (buggy): the LLM emits a synthesized ADD whose tokens come + from 2+ existing facts but not from the new user turn. The grounding + check must emit a WARNING naming the offending fact.""" + existing = [ + _existing_fact("fact_meat", "The user eats meat."), + _existing_fact("fact_steak", "The user loves steak and seafood."), + ] + buggy_response = { + "facts": [ + { + "text": "The user normally prefers moderate hotels.", + "action": "ADD", + "category": "preference", + "confidence": 0.9, + "salience": 0.7, + }, + { + # synthesized — tokens come from existing fact_steak + fact_meat, + # not from the new "moderate hotels" turn + "text": "The user loves steak and seafood, indicating they eat meat.", + "action": "ADD", + "category": "preference", + "confidence": 0.9, + "salience": 0.7, + }, + ], + "episodic": [], + } + chat = _SyncChat([buggy_response]) + memories_store = _Store(existing) + turns_store = _Store([_moderate_hotels_turn()]) + service = PipelineService( + memories_store, + chat, + _SyncEmbeddings(), + containers=_containers_for_store(memories_store, turns_store=turns_store), + ) + + with caplog.at_level("WARNING", logger="azure.cosmos.agent_memory.pipeline"): + service.extract_memories_dry("u1", "t1") + + synthesis_warnings = [ + rec + for rec in caplog.records + if "synthesized from" in rec.getMessage() and "steak and seafood, indicating they eat meat" in rec.getMessage() + ] + assert synthesis_warnings, ( + f"expected a WARNING flagging the synthesized fact; got: {[rec.getMessage() for rec in caplog.records]}" + ) + # The grounded "moderate hotels" fact must NOT trigger a warning. + assert not any( + "moderate hotels" in rec.getMessage() and "synthesized from" in rec.getMessage() for rec in caplog.records + ) + + +def test_grounding_check_silent_when_add_is_grounded_in_user_turn(caplog) -> None: + """Scenario 2 (clean): with the same existing-facts context, a single + grounded ADD (only the new "moderate hotels" claim) must NOT trigger + any synthesis WARNING. This is the post-fix expected behaviour.""" + existing = [ + _existing_fact("fact_meat", "The user eats meat."), + _existing_fact("fact_steak", "The user loves steak and seafood."), + ] + clean_response = { + "facts": [ + { + "text": "The user normally prefers moderate hotels.", + "action": "ADD", + "category": "preference", + "confidence": 0.9, + "salience": 0.7, + } + ], + "episodic": [], + } + chat = _SyncChat([clean_response]) + memories_store = _Store(existing) + turns_store = _Store([_moderate_hotels_turn()]) + service = PipelineService( + memories_store, + chat, + _SyncEmbeddings(), + containers=_containers_for_store(memories_store, turns_store=turns_store), + ) + + with caplog.at_level("WARNING", logger="azure.cosmos.agent_memory.pipeline"): + service.extract_memories_dry("u1", "t1") + + grounding_warnings = [ + rec + for rec in caplog.records + if "synthesized from" in rec.getMessage() or "phantom-negation/restatement" in rec.getMessage() + ] + assert grounding_warnings == [], ( + f"clean output must not emit grounding warnings; got: {[rec.getMessage() for rec in grounding_warnings]}" + ) + + +def test_grounding_check_warns_on_phantom_explicit_negation_fact(caplog) -> None: + """Scenario 3 (buggy): user said "Actually, I love steak and seafood"; + LLM emits both a literal-paraphrase fact AND a phantom "user eats meat" + fact (an invented explicit-negation of the prior vegetarian fact). + The phantom fact's tokens are NOT in the user turn and they overlap + a single existing fact ("does not eat meat") — the single-contributor + branch of the grounding heuristic must fire a WARNING.""" + existing = [_existing_fact("fact_veg", "The user does not eat meat.")] + buggy_response = { + "facts": [ + { + # legitimate literal paraphrase of the user turn + "text": "The user loves steak and seafood.", + "action": "CONTRADICT", + "supersedes_id": "fact_veg", + "category": "preference", + "confidence": 0.95, + "salience": 0.8, + }, + { + # phantom — user never said this; tokens come from existing fact_veg + "text": "The user eats meat.", + "action": "CONTRADICT", + "supersedes_id": "fact_veg", + "category": "preference", + "confidence": 0.95, + "salience": 0.8, + }, + ], + "episodic": [], + } + chat = _SyncChat([buggy_response]) + memories_store = _Store(existing) + turns_store = _Store([_steak_seafood_turn()]) + service = PipelineService( + memories_store, + chat, + _SyncEmbeddings(), + containers=_containers_for_store(memories_store, turns_store=turns_store), + ) + + with caplog.at_level("WARNING", logger="azure.cosmos.agent_memory.pipeline"): + service.extract_memories_dry("u1", "t1") + + phantom_warnings = [ + rec for rec in caplog.records if "phantom-negation" in rec.getMessage() and "eats meat" in rec.getMessage() + ] + assert phantom_warnings, ( + f"expected a WARNING flagging the phantom-negation fact; got: {[rec.getMessage() for rec in caplog.records]}" + ) + # The legitimate "loves steak and seafood" fact must NOT trigger a warning; + # its tokens are grounded in the user turn. + assert not any( + "loves steak and seafood" in rec.getMessage() + and ("phantom-negation" in rec.getMessage() or "synthesized from" in rec.getMessage()) + for rec in caplog.records + ) + + +def test_grounding_check_silent_on_clean_implicit_contradict(caplog) -> None: + """Scenario 4 (clean): the post-fix expected behaviour for an implicit + contradiction — ONE fact with literal user text and a CONTRADICT + supersedes_id. No phantom-negation fact, no WARNING.""" + existing = [_existing_fact("fact_veg", "The user does not eat meat.")] + clean_response = { + "facts": [ + { + "text": "The user loves steak and seafood.", + "action": "CONTRADICT", + "supersedes_id": "fact_veg", + "category": "preference", + "confidence": 0.95, + "salience": 0.8, + } + ], + "episodic": [], + } + chat = _SyncChat([clean_response]) + memories_store = _Store(existing) + turns_store = _Store([_steak_seafood_turn()]) + service = PipelineService( + memories_store, + chat, + _SyncEmbeddings(), + containers=_containers_for_store(memories_store, turns_store=turns_store), + ) + + with caplog.at_level("WARNING", logger="azure.cosmos.agent_memory.pipeline"): + service.extract_memories_dry("u1", "t1") + + grounding_warnings = [ + rec + for rec in caplog.records + if "synthesized from" in rec.getMessage() or "phantom-negation/restatement" in rec.getMessage() + ] + assert grounding_warnings == [], ( + "clean implicit-contradict must not emit grounding warnings; got: " + f"{[rec.getMessage() for rec in grounding_warnings]}" + ) + + +@pytest.mark.asyncio +async def test_async_grounding_check_warns_on_synthesis(caplog) -> None: + """Async-path mirror of scenario 1: confirms the grounding heuristic + is wired into both sync and async extract pipelines.""" + existing = [ + _existing_fact("fact_meat", "The user eats meat."), + _existing_fact("fact_steak", "The user loves steak and seafood."), + ] + buggy_response = { + "facts": [ + { + "text": "The user normally prefers moderate hotels.", + "action": "ADD", + "category": "preference", + "confidence": 0.9, + "salience": 0.7, + }, + { + "text": "The user loves steak and seafood, indicating they eat meat.", + "action": "ADD", + "category": "preference", + "confidence": 0.9, + "salience": 0.7, + }, + ], + "episodic": [], + } + chat = _AsyncChat([buggy_response]) + memories_store = _AsyncStore(existing) + turns_store = _AsyncStore([_moderate_hotels_turn()]) + service = AsyncPipelineService( + memories_store, + chat, + _AsyncEmbeddings(), + containers=_async_containers_for_store(memories_store, turns_store=turns_store), + ) + + with caplog.at_level("WARNING", logger="azure.cosmos.agent_memory.pipeline.aio"): + await service.extract_memories_dry("u1", "t1") + + synthesis_warnings = [ + rec + for rec in caplog.records + if "synthesized from" in rec.getMessage() and "steak and seafood, indicating they eat meat" in rec.getMessage() + ] + assert synthesis_warnings, ( + f"expected an async WARNING flagging the synthesized fact; got: {[rec.getMessage() for rec in caplog.records]}" + ) diff --git a/tests/unit/services/test_pipeline_service.py b/tests/unit/services/test_pipeline_service.py index b9fa81c..587f680 100644 --- a/tests/unit/services/test_pipeline_service.py +++ b/tests/unit/services/test_pipeline_service.py @@ -203,6 +203,7 @@ def test_extract_memories_happy_path_writes_fact_and_episodic() -> None: { "scope_type": "project", "scope_value": "CI", + "text": "Stabilized flaky CI tests by adding retries.", "situation": "CI tests flaked intermittently", "action_taken": "Added retries", "outcome": "Tests stabilized", @@ -225,7 +226,7 @@ def test_extract_memories_happy_path_writes_fact_and_episodic() -> None: assert llm.embed_calls == [ [ "The user prefers dark mode.", - "CI tests flaked intermittently → Added retries → Tests stabilized", + "Stabilized flaky CI tests by adding retries.", ] ] diff --git a/tests/unit/store/test_memory_store.py b/tests/unit/store/test_memory_store.py index b9b06a6..bef3d76 100644 --- a/tests/unit/store/test_memory_store.py +++ b/tests/unit/store/test_memory_store.py @@ -349,3 +349,109 @@ def test_get_thread_summary_queries_summaries_with_partition_key(): assert params["@type"] == "thread_summary" assert params["@user_id"] == "u1" assert params["@thread_id"] == "t1" + + +# --------------------------------------------------------------------------- +# F-final: read-path translation for user-scoped types (episodic / procedural) +# +# Episodic + procedural docs live in sentinel partitions +# ("__episodic__" / "__procedural__") under each user, not the originating +# thread's partition. Public read APIs that filter on c.thread_id must OR in +# an IN (...) clause for those types so they aren't silently excluded. +# --------------------------------------------------------------------------- + + +def test_get_memories_with_episodic_and_thread_id_emits_or_clause(): + memories = MagicMock() + memories.query_items.return_value = [] + store = MemoryStore(containers=_containers(memories=memories)) + + store.get_memories(user_id="u1", thread_id="t1", memory_types=["episodic"]) + + call_kwargs = memories.query_items.call_args.kwargs + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0))" in call_kwargs["query"] + params = _params_by_name(call_kwargs) + assert params["@thread_id"] == "t1" + assert params["@user_scoped_type_0"] == "episodic" + + +def test_get_memories_with_procedural_and_thread_id_emits_or_clause(): + memories = MagicMock() + memories.query_items.return_value = [] + store = MemoryStore(containers=_containers(memories=memories)) + + store.get_memories(user_id="u1", thread_id="t1", memory_types=["procedural"]) + + call_kwargs = memories.query_items.call_args.kwargs + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0))" in call_kwargs["query"] + params = _params_by_name(call_kwargs) + assert params["@user_scoped_type_0"] == "procedural" + + +def test_get_memories_fact_only_with_thread_id_keeps_plain_filter(): + memories = MagicMock() + memories.query_items.return_value = [] + store = MemoryStore(containers=_containers(memories=memories)) + + store.get_memories(user_id="u1", thread_id="t1", memory_types=["fact"]) + + call_kwargs = memories.query_items.call_args.kwargs + assert "c.thread_id = @thread_id" in call_kwargs["query"] + assert "@user_scoped_type_" not in call_kwargs["query"] + + +def test_get_memories_no_memory_types_with_thread_id_emits_or_clause(): + # Defaulting to "every type" means episodic + procedural are in scope and + # would otherwise be silently dropped by a plain c.thread_id equality. + memories = MagicMock() + memories.query_items.return_value = [] + store = MemoryStore(containers=_containers(memories=memories)) + + store.get_memories(user_id="u1", thread_id="t1") + + call_kwargs = memories.query_items.call_args.kwargs + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0, @user_scoped_type_1))" in call_kwargs["query"] + params = _params_by_name(call_kwargs) + assert sorted(v for k, v in params.items() if k.startswith("@user_scoped_type_")) == ["episodic", "procedural"] + + +def test_search_with_episodic_and_thread_id_forces_cross_partition(): + memories = MagicMock() + memories.query_items.return_value = [] + embeddings = MagicMock() + embeddings.generate.return_value = [0.1, 0.2] + store = MemoryStore(containers=_containers(memories=memories), embeddings_client=embeddings) + + store.search( + search_terms="hotels", + user_id="u1", + thread_id="t1", + memory_types=["episodic"], + ) + + call_kwargs = memories.query_items.call_args.kwargs + # When user-scoped types are in scope, search must fan out across + # partitions instead of confining to [u1, t1] (where no episodic + # ever lives — they all use the "__episodic__" sentinel partition). + assert call_kwargs.get("enable_cross_partition_query") is True + assert "partition_key" not in call_kwargs + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0))" in call_kwargs["query"] + + +def test_search_fact_only_with_thread_id_uses_partition_path(): + memories = MagicMock() + memories.query_items.return_value = [] + embeddings = MagicMock() + embeddings.generate.return_value = [0.1, 0.2] + store = MemoryStore(containers=_containers(memories=memories), embeddings_client=embeddings) + + store.search( + search_terms="hotels", + user_id="u1", + thread_id="t1", + memory_types=["fact"], + ) + + call_kwargs = memories.query_items.call_args.kwargs + assert call_kwargs.get("partition_key") == ["u1", "t1"] + assert "enable_cross_partition_query" not in call_kwargs diff --git a/tests/unit/test_cosmos_memory_client.py b/tests/unit/test_cosmos_memory_client.py index 4c79eab..a991517 100644 --- a/tests/unit/test_cosmos_memory_client.py +++ b/tests/unit/test_cosmos_memory_client.py @@ -490,7 +490,10 @@ def test_constructor_rejects_invalid_throughput_mode(self): class TestAddCosmos: def test_add_cosmos(self): mem, container = _connected_client() - mem.add_cosmos(user_id="u1", role="user", content="hello") + # Suppress cadence work — the trigger path is exercised in + # tests/unit/test_auto_trigger.py; this test just asserts the CRUD write. + mem._maybe_auto_trigger = MagicMock() + mem.add_cosmos(user_id="u1", role="user", content="hello", thread_id="t1") turns = mem._turns_container_client turns.upsert_item.assert_called_once() @@ -502,7 +505,47 @@ def test_add_cosmos(self): def test_add_cosmos_not_connected(self): mem = _make_client() with pytest.raises(CosmosNotConnectedError): - mem.add_cosmos(user_id="u1", role="user", content="hi") + mem.add_cosmos(user_id="u1", role="user", content="hi", thread_id="t1") + + def test_add_cosmos_turn_requires_thread_id(self): + """Turn writes must declare a thread_id so the auto-trigger counter can group them.""" + mem, _ = _connected_client() + with pytest.raises(ValidationError, match="thread_id is required"): + mem.add_cosmos(user_id="u1", role="user", content="hi") # memory_type='turn' default + + def test_add_cosmos_non_turn_does_not_require_thread_id(self): + """Non-turn writes (facts, episodics, etc.) work without thread_id and skip cadence.""" + mem, container = _connected_client() + trigger = MagicMock() + mem._maybe_auto_trigger = trigger + + mem.add_cosmos(user_id="u1", role="user", content="prefers dark mode", memory_type="fact") + + container.upsert_item.assert_called_once() + trigger.assert_not_called() + + def test_add_cosmos_turn_triggers_cadence(self): + """A turn write must bump the auto-trigger counter so cadence env vars apply + whether the caller uses the local buffer or writes through directly.""" + mem, _ = _connected_client() + trigger = MagicMock() + mem._maybe_auto_trigger = trigger + + mem.add_cosmos(user_id="u1", role="user", content="hello", thread_id="t1") + + trigger.assert_called_once_with({("u1", "t1"): 1}) + + def test_add_cosmos_swallows_cadence_failure(self): + """If the cadence trigger raises, the add_cosmos call must still succeed — + the user's turn was written; cadence is best-effort telemetry.""" + mem, _ = _connected_client() + mem._maybe_auto_trigger = MagicMock(side_effect=RuntimeError("boom")) + + # Should NOT raise — the write succeeded. + result_id = mem.add_cosmos(user_id="u1", role="user", content="hi", thread_id="t1") + + assert isinstance(result_id, str) + mem._turns_container_client.upsert_item.assert_called_once() class TestPushToCosmos: diff --git a/tests/unit/test_memory_type_multi.py b/tests/unit/test_memory_type_multi.py index 28642ec..d00d7e2 100644 --- a/tests/unit/test_memory_type_multi.py +++ b/tests/unit/test_memory_type_multi.py @@ -81,11 +81,86 @@ def test_memory_types_list_combines_with_other_filters(): ) where = qb.build_where() assert "c.user_id = @user_id" in where - assert "c.thread_id = @thread_id" in where + # Episodic is user-scoped, so the thread_id filter becomes an OR clause + # instead of the plain equality form. + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0))" in where assert "c.type IN (@memory_type_0, @memory_type_1)" in where assert "c.confidence >= @min_confidence" in where +# --------------------------------------------------------------------------- +# thread_id translation for user-scoped types (episodic / procedural) +# --------------------------------------------------------------------------- + + +def test_thread_id_unchanged_when_only_non_user_scoped_types_requested(): + qb = _build_memory_query_builder(user_id="u1", thread_id="t1", memory_types=["fact"]) + where = qb.build_where() + assert "c.thread_id = @thread_id" in where + assert "@user_scoped_type_" not in where + + +def test_thread_id_unchanged_when_only_turn_requested(): + qb = _build_memory_query_builder(user_id="u1", thread_id="t1", memory_types=["turn"]) + where = qb.build_where() + assert "c.thread_id = @thread_id" in where + assert "@user_scoped_type_" not in where + + +def test_thread_id_or_clause_when_episodic_requested(): + qb = _build_memory_query_builder(user_id="u1", thread_id="t1", memory_types=["episodic"]) + where = qb.build_where() + params = qb.get_parameters() + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0))" in where + user_scoped_values = sorted(p["value"] for p in params if p["name"].startswith("@user_scoped_type_")) + assert user_scoped_values == ["episodic"] + + +def test_thread_id_or_clause_when_procedural_requested(): + qb = _build_memory_query_builder(user_id="u1", thread_id="t1", memory_types=["procedural"]) + where = qb.build_where() + params = qb.get_parameters() + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0))" in where + user_scoped_values = sorted(p["value"] for p in params if p["name"].startswith("@user_scoped_type_")) + assert user_scoped_values == ["procedural"] + + +def test_thread_id_or_clause_includes_both_when_both_in_scope(): + qb = _build_memory_query_builder( + user_id="u1", + thread_id="t1", + memory_types=["episodic", "procedural"], + ) + where = qb.build_where() + params = qb.get_parameters() + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0, @user_scoped_type_1))" in where + user_scoped_values = sorted(p["value"] for p in params if p["name"].startswith("@user_scoped_type_")) + assert user_scoped_values == ["episodic", "procedural"] + + +def test_thread_id_or_clause_when_no_memory_types_filter(): + # No memory_types means "all types", so user-scoped types are implicitly in scope. + qb = _build_memory_query_builder(user_id="u1", thread_id="t1") + where = qb.build_where() + params = qb.get_parameters() + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0, @user_scoped_type_1))" in where + user_scoped_values = sorted(p["value"] for p in params if p["name"].startswith("@user_scoped_type_")) + assert user_scoped_values == ["episodic", "procedural"] + + +def test_thread_id_or_clause_when_empty_memory_types_filter(): + qb = _build_memory_query_builder(user_id="u1", thread_id="t1", memory_types=[]) + where = qb.build_where() + assert "(c.thread_id = @thread_id OR c.type IN (@user_scoped_type_0, @user_scoped_type_1))" in where + + +def test_no_thread_id_no_or_clause(): + qb = _build_memory_query_builder(user_id="u1", memory_types=["episodic"]) + where = qb.build_where() + assert "c.thread_id" not in where + assert "@user_scoped_type_" not in where + + # --------------------------------------------------------------------------- # Sync client surface — verifies the list reaches the generated SQL. # --------------------------------------------------------------------------- diff --git a/tests/unit/test_pipeline_confidence.py b/tests/unit/test_pipeline_confidence.py index 84eb6aa..fce8308 100644 --- a/tests/unit/test_pipeline_confidence.py +++ b/tests/unit/test_pipeline_confidence.py @@ -139,6 +139,7 @@ def test_extract_episodic_carries_confidence(): { "scope_type": "project", "scope_value": "CI revamp", + "text": "Set up CI by adding Ruff — faster lint times.", "situation": "Setup CI", "action_taken": "Added Ruff", "outcome": "Faster lint", @@ -285,11 +286,17 @@ def test_thread_ids_does_not_appear_in_query_or_parameters(self): # --------------------------------------------------------------------------- -def test_extract_scoped_intent_without_outcome_stores_correctly(caplog): - """An episodic with only scope fields (no situation/action/outcome) is kept. +def test_extract_drops_episodic_missing_text(caplog): + """An episodic with no ``text`` is dropped and surfaced via the return value. - The doc must use the deterministic fallback content string, expose the - scope fields at the top level, and not emit a "dropping malformed" warning. + Previously the pipeline synthesized boilerplate content like + ``"For the user's Paris trip, intent recorded."`` which was + semantically empty for embedding/recall. The fix is to require + the LLM to emit ``text`` (same field facts use) — if it doesn't, + drop the record so we don't poison the recall index. The drop is + logged at ERROR (it's data loss) and surfaced via the + ``dropped_episodic_count`` field on the return dict so callers + can monitor LLM-extraction compliance over time. """ pipeline, upserted = _make_pipeline( { @@ -299,36 +306,34 @@ def test_extract_scoped_intent_without_outcome_stores_correctly(caplog): "scope_value": "Paris", "confidence": 0.95, "salience": 0.8, + "tags": ["topic:travel", "topic:hotels"], } ] } ) - with caplog.at_level("WARNING", logger="azure.cosmos.agent_memory.pipeline"): - pipeline.extract_memories("u1", "t1") + with caplog.at_level("ERROR", logger="azure.cosmos.agent_memory.pipeline"): + result = pipeline.extract_memories("u1", "t1") eps = [d for d in upserted if d["type"] == "episodic"] - assert len(eps) == 1 - ep = eps[0] - assert ep["scope_type"] == "trip" - assert ep["scope_value"] == "Paris" - assert ep["metadata"]["scope_type"] == "trip" - assert ep["metadata"]["scope_value"] == "Paris" - assert ep["metadata"]["situation"] is None - assert ep["metadata"]["action_taken"] is None - assert ep["metadata"]["outcome"] is None - assert ep["content"] == "For the user's Paris trip, intent recorded." - assert ep["confidence"] == pytest.approx(0.95) - assert not any("dropping malformed episodic" in rec.getMessage() for rec in caplog.records) + assert eps == [] + assert result["episodic_count"] == 0 + assert result["dropped_episodic_count"] == 1 + msgs = [rec.getMessage() for rec in caplog.records] + assert any("empty/missing text field" in m for m in msgs) + assert any("reason=missing_text" in m for m in msgs) + # Bumped from WARNING → ERROR because dropping == data loss. + assert any(rec.levelname == "ERROR" and "empty/missing text field" in rec.getMessage() for rec in caplog.records) -def test_extract_past_event_episodic_uses_arrow_form_and_keeps_scope(): +def test_extract_past_event_episodic_uses_text_and_keeps_chain_in_metadata(): pipeline, upserted = _make_pipeline( { "episodic": [ { "scope_type": "project", "scope_value": "Acme revamp", + "text": "Migrated Acme DB by running the script — all rows migrated cleanly.", "situation": "Migrated DB", "action_taken": "Ran the script", "outcome": "All rows migrated", @@ -347,7 +352,7 @@ def test_extract_past_event_episodic_uses_arrow_form_and_keeps_scope(): pipeline.extract_memories("u1", "t1") [ep] = [d for d in upserted if d["type"] == "episodic"] - assert ep["content"] == "Migrated DB → Ran the script → All rows migrated" + assert ep["content"] == "Migrated Acme DB by running the script — all rows migrated cleanly." assert ep["scope_type"] == "project" assert ep["scope_value"] == "Acme revamp" md = ep["metadata"] @@ -361,12 +366,13 @@ def test_extract_past_event_episodic_uses_arrow_form_and_keeps_scope(): assert "topic:db" in ep["tags"] -def test_extract_episodic_falls_back_to_arrow_form_when_summary_field_present(): - """The schema dropped ``summary``; pipeline now always uses arrow form. +def test_extract_episodic_uses_text_directly_no_synthesis(): + """The LLM-written ``text`` is the embedded ``content``, verbatim. - Even if a non-strict LLM smuggles a ``summary`` field through, the - pipeline ignores it and builds content from - ``situation → action_taken → outcome``. + The pipeline must NOT synthesize content from the s/a/o chain or + from scope fields — that's how we ended up with useless boilerplate + before the fix. Whatever the LLM emits in ``text`` is what gets + embedded. """ pipeline, upserted = _make_pipeline( { @@ -374,7 +380,7 @@ def test_extract_episodic_falls_back_to_arrow_form_when_summary_field_present(): { "scope_type": "trip", "scope_value": "Paris", - "summary": "User wants luxury hotels for the Paris trip.", + "text": "User wants luxury hotels for the Paris trip.", "situation": "Planning Paris trip", "action_taken": "Said luxury", "outcome": "Pending", @@ -386,7 +392,106 @@ def test_extract_episodic_falls_back_to_arrow_form_when_summary_field_present(): pipeline.extract_memories("u1", "t1") [ep] = [d for d in upserted if d["type"] == "episodic"] - assert ep["content"] == "Planning Paris trip → Said luxury → Pending" + assert ep["content"] == "User wants luxury hotels for the Paris trip." + assert ep["metadata"]["situation"] == "Planning Paris trip" + assert ep["metadata"]["action_taken"] == "Said luxury" + assert ep["metadata"]["outcome"] == "Pending" + + +def test_extract_episodic_uses_text_alone_for_planned_intent(): + """Planned/in-flight episodics carry their meaning entirely in ``text``. + + This is the headline bug-1 scenario from the workshop: the LLM (correctly + following the prompt) emits only scope_type/scope_value/text for a + planned trip, and the pipeline must embed the text, not boilerplate. + """ + pipeline, upserted = _make_pipeline( + { + "episodic": [ + { + "scope_type": "trip", + "scope_value": "Tokyo", + "text": ("Planning a Tokyo trip with vegetarian and wheelchair-accessible-restaurant constraints."), + "confidence": 0.95, + "salience": 0.85, + "tags": ["topic:travel", "topic:accessibility"], + } + ] + } + ) + + pipeline.extract_memories("u1", "t1") + + [ep] = [d for d in upserted if d["type"] == "episodic"] + assert ep["content"] == ("Planning a Tokyo trip with vegetarian and wheelchair-accessible-restaurant constraints.") + assert ep["metadata"]["situation"] is None + assert ep["metadata"]["action_taken"] is None + assert ep["metadata"]["outcome"] is None + + +def test_extract_episodic_strips_whitespace_from_text(): + pipeline, upserted = _make_pipeline( + { + "episodic": [ + { + "scope_type": "trip", + "scope_value": "Paris", + "text": " Planning a Paris trip. ", + } + ] + } + ) + pipeline.extract_memories("u1", "t1") + [ep] = [d for d in upserted if d["type"] == "episodic"] + assert ep["content"] == "Planning a Paris trip." + + +def test_extract_compound_statement_yields_facts_across_categories(): + """Bug-2 scenario: a single user turn that combines preference + requirement + must produce two facts, not one merged "restaurant preferences" fact. + + Drives the prompt's tightened consolidation rule. We're mocking the LLM + response here so this is really a regression guard on the pipeline plumbing + (the prompt change is what makes a real LLM produce this shape). + """ + pipeline, upserted = _make_pipeline( + { + "facts": [ + { + "text": "The user does not eat meat.", + "category": "preference", + "subject": "user", + "predicate": "dietary_restriction", + "object": "no meat", + "confidence": 1.0, + "salience": 0.9, + "tags": ["topic:diet"], + "action": "ADD", + "supersedes_id": None, + }, + { + "text": "The user requires wheelchair-accessible restaurants.", + "category": "requirement", + "subject": "user", + "predicate": "accessibility_requirement", + "object": "wheelchair-accessible restaurants", + "confidence": 1.0, + "salience": 0.95, + "tags": ["topic:accessibility"], + "action": "ADD", + "supersedes_id": None, + }, + ] + } + ) + pipeline.extract_memories("u1", "t1") + + facts = [d for d in upserted if d["type"] == "fact"] + assert len(facts) == 2 + by_category = {f["metadata"]["category"]: f for f in facts} + assert set(by_category) == {"preference", "requirement"} + assert by_category["preference"]["content"] == "The user does not eat meat." + assert by_category["requirement"]["content"] == "The user requires wheelchair-accessible restaurants." def test_extract_drops_episodic_missing_scope_type(caplog): @@ -404,10 +509,13 @@ def test_extract_drops_episodic_missing_scope_type(caplog): ) with caplog.at_level("WARNING", logger="azure.cosmos.agent_memory.pipeline"): - pipeline.extract_memories("u1", "t1") + result = pipeline.extract_memories("u1", "t1") assert not any(d["type"] == "episodic" for d in upserted) assert any("dropping malformed episodic" in rec.getMessage() for rec in caplog.records) + assert any("reason=malformed_scope" in rec.getMessage() for rec in caplog.records) + # Malformed-scope drops also count toward the dropped_episodic_count signal. + assert result["dropped_episodic_count"] == 1 def test_extract_drops_episodic_missing_scope_value(caplog): @@ -470,6 +578,7 @@ def test_extract_strips_whitespace_from_scope_fields(): { "scope_type": " trip ", "scope_value": " Paris ", + "text": "Planning a Paris trip.", "confidence": 0.9, } ] @@ -481,4 +590,4 @@ def test_extract_strips_whitespace_from_scope_fields(): [ep] = [d for d in upserted if d["type"] == "episodic"] assert ep["scope_type"] == "trip" assert ep["scope_value"] == "Paris" - assert ep["content"] == "For the user's Paris trip, intent recorded." + assert ep["content"] == "Planning a Paris trip." diff --git a/tests/unit/test_procedural_synthesis.py b/tests/unit/test_procedural_synthesis.py index c92d5b5..889e3ca 100644 --- a/tests/unit/test_procedural_synthesis.py +++ b/tests/unit/test_procedural_synthesis.py @@ -219,6 +219,7 @@ def test_extract_memories_without_procedural_bucket_returns_new_count_shape(): { "scope_type": "task", "scope_value": "refactoring tests", + "text": "Refactored test suite using focused helpers to keep it readable.", "situation": "Refactoring tests", "action_taken": "Used focused helpers", "outcome": "The suite stayed readable", diff --git a/tests/unit/test_process_now.py b/tests/unit/test_process_now.py index 4b60138..d81a0f7 100644 --- a/tests/unit/test_process_now.py +++ b/tests/unit/test_process_now.py @@ -7,7 +7,7 @@ import pytest from azure.cosmos.agent_memory.cosmos_memory_client import CosmosMemoryClient -from azure.cosmos.agent_memory.exceptions import CosmosNotConnectedError +from azure.cosmos.agent_memory.exceptions import CosmosNotConnectedError, LLMError, ValidationError from azure.cosmos.agent_memory.processors import ( DurableFunctionProcessor, InProcessProcessor, @@ -28,15 +28,20 @@ def _patch_get_thread(client, turns): client.get_thread = MagicMock(return_value=turns) -def test_process_now_with_inprocess_invokes_pipeline(): +def test_process_now_with_inprocess_invokes_full_pipeline(): + """process_now must fire ALL FIVE steps for InProcess: thread_summary, extract, + reconcile, procedural, user_summary. Pre-fix this was only the first 3, so + procedural + user_summary never ran when callers used add_cosmos + process_now.""" client = _connected() # default → InProcessProcessor lazily built pipeline = MagicMock() pipeline.generate_thread_summary.return_value = {"id": "s", "type": "thread_summary"} pipeline.extract_memories.return_value = {"facts": 1} pipeline.reconcile_memories.return_value = {"kept": 0, "merged": 0, "contradicted": 0} + pipeline.synthesize_procedural.return_value = {"id": "proc1", "type": "procedural"} + pipeline.generate_user_summary.return_value = {"id": "us1", "type": "user_summary"} pipeline._store = client._get_store() pipeline._containers = dict(client._containers) - client._pipeline = pipeline # short-circuit lazy build + client._pipeline = pipeline _patch_get_thread(client, [{"role": "user", "content": "hi"}]) result = client.process_now(user_id="u1", thread_id="t1") @@ -46,9 +51,126 @@ def test_process_now_with_inprocess_invokes_pipeline(): pipeline.generate_thread_summary.assert_called_once_with("u1", "t1") pipeline.extract_memories.assert_called_once_with("u1", "t1") pipeline.reconcile_memories.assert_called_once_with("u1", 50) + pipeline.synthesize_procedural.assert_called_once_with(user_id="u1", force=False) + pipeline.generate_user_summary.assert_called_once_with("u1", None) + assert result.procedural == {"id": "proc1", "type": "procedural"} + assert result.user_summary == {"id": "us1", "type": "user_summary"} + + +def test_process_now_swallows_procedural_failure(): + """A transient LLM failure in synthesize_procedural must NOT erase the work + already persisted by the per-thread steps. WARNING-log + continue.""" + client = _connected() + pipeline = MagicMock() + pipeline.generate_thread_summary.return_value = {"id": "s"} + pipeline.extract_memories.return_value = {"facts": 1} + pipeline.reconcile_memories.return_value = {"kept": 0} + pipeline.synthesize_procedural.side_effect = LLMError("LLM rate-limited") + pipeline.generate_user_summary.return_value = {"id": "us1"} + pipeline._store = client._get_store() + pipeline._containers = dict(client._containers) + client._pipeline = pipeline + _patch_get_thread(client, [{"role": "user", "content": "hi"}]) + + result = client.process_now(user_id="u1", thread_id="t1") + + assert result.procedural is None + assert result.user_summary == {"id": "us1"} + pipeline.synthesize_procedural.assert_called_once() + pipeline.generate_user_summary.assert_called_once() + + +def test_process_now_swallows_user_summary_failure(): + """A transient LLM failure in generate_user_summary must NOT erase the work + already persisted by the per-thread + procedural steps.""" + client = _connected() + pipeline = MagicMock() + pipeline.generate_thread_summary.return_value = {"id": "s"} + pipeline.extract_memories.return_value = {"facts": 1} + pipeline.reconcile_memories.return_value = {"kept": 0} + pipeline.synthesize_procedural.return_value = {"id": "proc1"} + pipeline.generate_user_summary.side_effect = LLMError("LLM timeout") + pipeline._store = client._get_store() + pipeline._containers = dict(client._containers) + client._pipeline = pipeline + _patch_get_thread(client, [{"role": "user", "content": "hi"}]) + + result = client.process_now(user_id="u1", thread_id="t1") + + assert result.procedural == {"id": "proc1"} + assert result.user_summary is None + + +def test_process_now_swallows_transient_http_error_by_status_code(): + """Cosmos / HTTP exceptions with transient status codes (429, 503) must be + swallowed — they're infrastructure hiccups, not bugs.""" + + class _FakeHttpExc(Exception): + def __init__(self, status_code): + super().__init__(f"HTTP {status_code}") + self.status_code = status_code + + client = _connected() + pipeline = MagicMock() + pipeline.generate_thread_summary.return_value = {"id": "s"} + pipeline.extract_memories.return_value = {"facts": 1} + pipeline.reconcile_memories.return_value = {"kept": 0} + pipeline.synthesize_procedural.side_effect = _FakeHttpExc(429) + pipeline.generate_user_summary.side_effect = _FakeHttpExc(503) + pipeline._store = client._get_store() + pipeline._containers = dict(client._containers) + client._pipeline = pipeline + _patch_get_thread(client, [{"role": "user", "content": "hi"}]) + + result = client.process_now(user_id="u1", thread_id="t1") + + assert result.procedural is None + assert result.user_summary is None + + +def test_process_now_propagates_permanent_procedural_failure(): + """A non-transient failure (e.g. ``KeyError`` from a schema bug) must NOT + be silently swallowed — it should surface to the caller so config / + programmer bugs do not turn into invisible ``WARNING`` lines.""" + client = _connected() + pipeline = MagicMock() + pipeline.generate_thread_summary.return_value = {"id": "s"} + pipeline.extract_memories.return_value = {"facts": 1} + pipeline.reconcile_memories.return_value = {"kept": 0} + pipeline.synthesize_procedural.side_effect = KeyError("missing_required_field") + pipeline._store = client._get_store() + pipeline._containers = dict(client._containers) + client._pipeline = pipeline + _patch_get_thread(client, [{"role": "user", "content": "hi"}]) + + with pytest.raises(KeyError): + client.process_now(user_id="u1", thread_id="t1") + # user_summary must NOT be attempted after a permanent procedural failure + pipeline.generate_user_summary.assert_not_called() + + +def test_process_now_propagates_permanent_user_summary_failure(): + """ValidationError from generate_user_summary (e.g. schema bug) must + surface to the caller — silencing config bugs is a bug.""" + client = _connected() + pipeline = MagicMock() + pipeline.generate_thread_summary.return_value = {"id": "s"} + pipeline.extract_memories.return_value = {"facts": 1} + pipeline.reconcile_memories.return_value = {"kept": 0} + pipeline.synthesize_procedural.return_value = {"id": "proc1"} + pipeline.generate_user_summary.side_effect = ValidationError("bad payload") + pipeline._store = client._get_store() + pipeline._containers = dict(client._containers) + client._pipeline = pipeline + _patch_get_thread(client, [{"role": "user", "content": "hi"}]) + + with pytest.raises(ValidationError): + client.process_now(user_id="u1", thread_id="t1") -def test_process_now_with_durable_is_noop(): +def test_process_now_with_durable_skips_tail_steps(): + """Durable mode must NOT call synthesize_procedural or generate_user_summary — + those are driven by the change-feed-fed sibling Function app.""" client = _connected(processor=DurableFunctionProcessor()) pipeline = MagicMock() client._pipeline = pipeline @@ -58,9 +180,13 @@ def test_process_now_with_durable_is_noop(): assert isinstance(result, ProcessThreadResult) assert result.thread_summary is None + assert result.procedural is None + assert result.user_summary is None pipeline.generate_thread_summary.assert_not_called() pipeline.extract_memories.assert_not_called() pipeline.reconcile_memories.assert_not_called() + pipeline.synthesize_procedural.assert_not_called() + pipeline.generate_user_summary.assert_not_called() def test_process_now_requires_cosmos(): diff --git a/tests/unit/test_reconcile.py b/tests/unit/test_reconcile.py index 88b5a6b..be3f63b 100644 --- a/tests/unit/test_reconcile.py +++ b/tests/unit/test_reconcile.py @@ -649,6 +649,269 @@ def test_fact_not_dropped_when_only_procedural_has_same_hash(self): assert fact_docs[0]["content"] == text +class TestEpisodicReconciliation: + """Episodic memories use scope as identity: the deterministic ID is + seeded only on ``(user_id, scope_type, scope_value)``. Any re-emission + for the same scope (paraphrased intent, added detail, reversed intent) + collides on upsert and replaces the prior record. The LLM does NOT + make ADD/UPDATE/CONTRADICT decisions for episodics — the scope IS the + identity. Distinct events under the same umbrella belong under + distinct ``scope_value`` strings (e.g. "Tokyo trip" vs + "Tokyo lost-wallet incident"). + """ + + def _build(self) -> PipelineService: + p = PipelineService.__new__(PipelineService) + p._embeddings = MagicMock() + p._embeddings.generate.return_value = [0.1] * 8 + p._embeddings.generate_batch.return_value = [[0.1] * 8] + p._container = MagicMock() + p._memories_container = p._container + p._turns_container = p._container + p._summaries_container = p._container + p._chat = MagicMock() + p._upsert_memory = MagicMock(side_effect=lambda doc: doc) + p._create_memory = MagicMock(side_effect=lambda doc: doc) + p._mark_superseded = MagicMock(return_value=True) + return p + + def _turns(self) -> list[dict]: + return [ + { + "id": "turn-1", + "role": "user", + "content": "x", + "type": "turn", + "created_at": "2024-01-01T00:00:00+00:00", + } + ] + + def _episodic_payload(self, **overrides) -> dict: + payload = { + "scope_type": "trip", + "scope_value": "Tokyo", + "text": "Planning a Tokyo trip with a luxury hotel preference.", + "situation": None, + "action_taken": None, + "outcome": None, + "outcome_valence": None, + "reasoning": None, + "lesson": None, + "domain": "travel", + "confidence": 0.95, + "salience": 0.8, + "tags": ["topic:travel"], + } + payload.update(overrides) + return payload + + def test_existing_episodics_are_rendered_into_prompt_inputs(self): + """The extractor must pass ``existing_episodics`` to the LLM, grouped + by ``(scope_type, scope_value)``. Without this, the model has no + context for refining or reversing the existing intent for that + scope when it emits the merged text.""" + p = self._build() + existing_text = "Planning a Tokyo trip with a luxury hotel preference." + existing_ep = { + "id": "ep_existing", + "type": "episodic", + "content": existing_text, + "content_hash": compute_content_hash(existing_text), + "thread_id": "__episodic__", + "salience": 0.8, + "metadata": {"scope_type": "trip", "scope_value": "Tokyo"}, + } + p._container.query_items.return_value = iter(self._turns()) + # Two queries are issued (one for facts, one for episodics) so each + # type gets its own 100-row budget — return [] for facts, the + # existing episodic for the episodic call. + p._load_existing_memories = MagicMock( + side_effect=lambda user_id, memory_types, **kw: [existing_ep] if memory_types == ["episodic"] else [] + ) + p._run_prompty = MagicMock(return_value=json.dumps({"facts": [], "episodic": [], "unclassified": []})) + + p.extract_memories("u1", "t1") + + # Two separate calls — one per type, each with its own budget. + load_calls = [c.args for c in p._load_existing_memories.call_args_list] + assert ("u1", ["fact"]) in load_calls + assert ("u1", ["episodic"]) in load_calls + call_kwargs = p._run_prompty.call_args.kwargs + inputs = call_kwargs["inputs"] + assert "existing_episodics" in inputs + rendered = inputs["existing_episodics"] + assert "trip = Tokyo" in rendered + assert "ep_existing" in rendered + assert existing_text in rendered + + def test_same_scope_episodics_collide_on_deterministic_id(self): + """Two episodics with the same (scope_type, scope_value) but + different ``text`` MUST produce the same deterministic ID so that + the second write overwrites the first via upsert. This is the + core mechanism that prevents near-duplicate episodic storage when + a recent-turn re-extraction window paraphrases the same intent. + """ + p = self._build() + p._container.query_items.return_value = iter(self._turns()) + p._load_existing_memories = MagicMock(return_value=[]) + # LLM emits two episodics under the SAME scope but with paraphrased + # text — this is the exact failure mode the user reported. + p._run_prompty = MagicMock( + return_value=json.dumps( + { + "facts": [], + "episodic": [ + self._episodic_payload(text="Planning a Tokyo trip with a luxury hotel preference."), + self._episodic_payload(text="Planning a Tokyo trip with a preference for luxury hotels."), + ], + "unclassified": [], + } + ) + ) + + p.extract_memories("u1", "t1") + + upsert_calls = [c.args[0] for c in p._upsert_memory.call_args_list if c.args[0].get("type") == "episodic"] + # Both episodics flow through upsert (the persist path branches on + # type=episodic) and they MUST share the same det_id — that's what + # makes the second a Cosmos upsert that replaces the first. + assert len(upsert_calls) == 2 + assert upsert_calls[0]["id"] == upsert_calls[1]["id"] + # And neither went through create_item (which would 409 on the + # second write and silently lose the new richer text). + episodic_creates = [c for c in p._create_memory.call_args_list if c.args[0].get("type") == "episodic"] + assert episodic_creates == [] + + def test_different_scope_values_produce_different_ids(self): + """Two episodics with the same scope_type but different + scope_value (e.g. distinct trips, or distinct incidents within a + trip) MUST produce different deterministic IDs so they coexist. + """ + p = self._build() + p._container.query_items.return_value = iter(self._turns()) + p._load_existing_memories = MagicMock(return_value=[]) + p._run_prompty = MagicMock( + return_value=json.dumps( + { + "facts": [], + "episodic": [ + self._episodic_payload(scope_value="Tokyo", text="Trip A."), + self._episodic_payload(scope_value="Paris", text="Trip B."), + ], + "unclassified": [], + } + ) + ) + + p.extract_memories("u1", "t1") + + upsert_calls = [c.args[0] for c in p._upsert_memory.call_args_list if c.args[0].get("type") == "episodic"] + assert len(upsert_calls) == 2 + assert upsert_calls[0]["id"] != upsert_calls[1]["id"] + + def test_episodic_and_fact_with_same_content_do_not_collide(self): + """An episodic's deterministic ID is seeded on scope; a fact's is + seeded on content_hash. Even if their content text matches + verbatim, the IDs live in disjoint namespaces (ep_ vs fact_ prefix + plus different seeds) so both records persist.""" + p = self._build() + text = "Planning a Tokyo trip with a luxury hotel preference." + existing = [ + { + "id": "fact_existing", + "type": "fact", + "content": text, + "content_hash": compute_content_hash(text), + "thread_id": "t1", + "tags": ["sys:fact"], + } + ] + p._container.query_items.return_value = iter(self._turns()) + p._load_existing_memories = MagicMock(return_value=existing) + p._run_prompty = MagicMock( + return_value=json.dumps({"facts": [], "episodic": [self._episodic_payload(text=text)], "unclassified": []}) + ) + + out = p.extract_memories("u1", "t1") + + assert out["episodic_count"] == 1 + upsert_calls = [c.args[0] for c in p._upsert_memory.call_args_list if c.args[0].get("type") == "episodic"] + assert len(upsert_calls) == 1 + + def test_episodic_uses_sentinel_thread_id_for_partition_routing(self): + """Auto-extracted episodics MUST be persisted under the sentinel + ``thread_id="__episodic__"`` regardless of which thread emitted them. + + The memories container is partitioned hierarchically on + ``(user_id, thread_id)`` and Cosmos ``id`` uniqueness is per-partition + — so a deterministic ID seeded only on scope is only meaningful if + every episodic for that scope lands in the SAME partition. Writing + the live thread_id splits identical-scope episodics across two + partitions and breaks upsert dedup across threads. The originating + thread is preserved on ``metadata.originating_thread_id`` for audit. + """ + p = self._build() + p._container.query_items.return_value = iter(self._turns()) + p._load_existing_memories = MagicMock(return_value=[]) + p._run_prompty = MagicMock( + return_value=json.dumps( + { + "facts": [], + "episodic": [self._episodic_payload(text="Trip planning.")], + "unclassified": [], + } + ) + ) + + p.extract_memories("u1", "thread-alpha") + + upsert_calls = [c.args[0] for c in p._upsert_memory.call_args_list if c.args[0].get("type") == "episodic"] + assert len(upsert_calls) == 1 + doc = upsert_calls[0] + assert doc["thread_id"] == "__episodic__" + assert doc["metadata"]["originating_thread_id"] == "thread-alpha" + + def test_same_scope_episodics_collide_across_different_threads(self): + """The cross-thread regression test for the per-partition id bug. + + Same user, same scope ``(trip, Tokyo)``, but emitted from two + different threads (``thread-alpha`` and ``thread-beta``). Both + writes MUST produce the same det_id AND the same persisted + thread_id (the sentinel) so they land in one partition and the + second upsert replaces the first. Without the sentinel, the docs + live in two different partitions and you'd see duplicate + episodics for the same intent — exactly the bug the deterministic + ID was meant to prevent. + """ + p = self._build() + # Fresh iterator per call — both extract_memories calls need turns. + p._container.query_items.side_effect = lambda *a, **kw: iter(self._turns()) + p._load_existing_memories = MagicMock(return_value=[]) + p._run_prompty = MagicMock( + return_value=json.dumps( + { + "facts": [], + "episodic": [self._episodic_payload(text="Tokyo luxury hotel intent.")], + "unclassified": [], + } + ) + ) + + p.extract_memories("u1", "thread-alpha") + p.extract_memories("u1", "thread-beta") + + upsert_calls = [c.args[0] for c in p._upsert_memory.call_args_list if c.args[0].get("type") == "episodic"] + assert len(upsert_calls) == 2 + # Same det_id — scope is identity. + assert upsert_calls[0]["id"] == upsert_calls[1]["id"] + # Same partition (sentinel thread_id) — so the second upsert replaces + # the first instead of creating a duplicate in a sibling partition. + assert upsert_calls[0]["thread_id"] == upsert_calls[1]["thread_id"] == "__episodic__" + # Originating thread preserved on each metadata for audit. + assert upsert_calls[0]["metadata"]["originating_thread_id"] == "thread-alpha" + assert upsert_calls[1]["metadata"]["originating_thread_id"] == "thread-beta" + + class TestExtractEarlyReturnShape: """The no-memories early-return must include every key the success path returns; otherwise callers using ``result["exact_dedup_skipped"]`` @@ -668,6 +931,7 @@ def test_empty_thread_returns_full_dict_shape(self): "unclassified_count", "updated_count", "exact_dedup_skipped", + "dropped_episodic_count", ): assert key in out, f"missing key: {key}" assert out[key] == 0