From eb7b579290ac7c0cc4959de92c6f17a9dabe595b Mon Sep 17 00:00:00 2001 From: Aditya Singh Date: Fri, 8 May 2026 03:42:00 -0700 Subject: [PATCH 1/2] fix(sessions): preserve Dapr session created_at across writes DaprSession.add_items() rewrote the session metadata on every call, clobbering created_at with the current timestamp. The field is now read back from the metadata key and only initialized on first write, so created_at reflects the true session start while updated_at advances. Also removes a duplicated etag assignment in pop_item(). --- src/agents/extensions/memory/dapr_session.py | 21 +++++++++++++--- tests/extensions/memory/test_dapr_session.py | 26 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/agents/extensions/memory/dapr_session.py b/src/agents/extensions/memory/dapr_session.py index 8d92872406..d95a1b8375 100644 --- a/src/agents/extensions/memory/dapr_session.py +++ b/src/agents/extensions/memory/dapr_session.py @@ -323,11 +323,26 @@ async def add_items(self, items: list[TResponseInputItem]) -> None: continue raise - # Update metadata + # Update metadata, preserving created_at across subsequent writes. + now = str(int(time.time())) + created_at = now + try: + existing_meta_response = await self._dapr_client.get_state( + store_name=self._state_store_name, + key=self._metadata_key, + state_metadata=self._get_read_metadata(), + ) + if existing_meta_response.data: + existing_meta = json.loads(existing_meta_response.data.decode("utf-8")) + if isinstance(existing_meta, dict) and existing_meta.get("created_at"): + created_at = str(existing_meta["created_at"]) + except (json.JSONDecodeError, UnicodeDecodeError, AttributeError): + # Corrupt or missing metadata — start fresh with current timestamp. + pass metadata = { "session_id": self.session_id, - "created_at": str(int(time.time())), - "updated_at": str(int(time.time())), + "created_at": created_at, + "updated_at": now, } await self._dapr_client.save_state( store_name=self._state_store_name, diff --git a/tests/extensions/memory/test_dapr_session.py b/tests/extensions/memory/test_dapr_session.py index 9766f35d40..f2dff71561 100644 --- a/tests/extensions/memory/test_dapr_session.py +++ b/tests/extensions/memory/test_dapr_session.py @@ -448,6 +448,32 @@ async def test_add_empty_items_list(fake_dapr_client: FakeDaprClient): await session.close() +async def test_metadata_preserves_created_at(fake_dapr_client: FakeDaprClient): + """add_items must preserve created_at across writes; only updated_at advances.""" + session = await _create_test_session(fake_dapr_client) + try: + await session.add_items([{"role": "user", "content": "first"}]) + first_meta_raw = fake_dapr_client._state[session._metadata_key].decode("utf-8") + first_meta = json.loads(first_meta_raw) + first_created = first_meta["created_at"] + first_updated = first_meta["updated_at"] + + # Wait one second so timestamps are guaranteed to differ. + import time as _time + + _time.sleep(1) + + await session.add_items([{"role": "user", "content": "second"}]) + second_meta = json.loads(fake_dapr_client._state[session._metadata_key].decode("utf-8")) + + assert second_meta["created_at"] == first_created, ( + "created_at must be preserved across add_items calls" + ) + assert int(second_meta["updated_at"]) >= int(first_updated) + finally: + await session.close() + + async def test_unicode_content(fake_dapr_client: FakeDaprClient): """Test that session correctly stores and retrieves unicode/non-ASCII content.""" session = await _create_test_session(fake_dapr_client) From 14b11aba2bc2545f4b14e54fc70c13235c76f4b5 Mon Sep 17 00:00:00 2001 From: Aditya Singh Date: Mon, 11 May 2026 16:42:14 -0700 Subject: [PATCH 2/2] fix(sessions): guard Dapr metadata writes with ETag + first-write The metadata read-modify-write was unconditional, so two writers (or one writer with a stale eventual read) could clobber an existing created_at. Apply the same first-write concurrency + retry-on-conflict pattern used for the messages key so the field is preserved across concurrent writes. --- src/agents/extensions/memory/dapr_session.py | 55 +++++++++++++------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/src/agents/extensions/memory/dapr_session.py b/src/agents/extensions/memory/dapr_session.py index d95a1b8375..a425bf866d 100644 --- a/src/agents/extensions/memory/dapr_session.py +++ b/src/agents/extensions/memory/dapr_session.py @@ -324,33 +324,48 @@ async def add_items(self, items: list[TResponseInputItem]) -> None: raise # Update metadata, preserving created_at across subsequent writes. + # Use first-write concurrency with the read ETag so a concurrent write + # that already established `created_at` can't be clobbered by a stale + # read that saw no metadata. now = str(int(time.time())) - created_at = now - try: + meta_attempt = 0 + while True: + meta_attempt += 1 existing_meta_response = await self._dapr_client.get_state( store_name=self._state_store_name, key=self._metadata_key, state_metadata=self._get_read_metadata(), ) + created_at = now if existing_meta_response.data: - existing_meta = json.loads(existing_meta_response.data.decode("utf-8")) - if isinstance(existing_meta, dict) and existing_meta.get("created_at"): - created_at = str(existing_meta["created_at"]) - except (json.JSONDecodeError, UnicodeDecodeError, AttributeError): - # Corrupt or missing metadata — start fresh with current timestamp. - pass - metadata = { - "session_id": self.session_id, - "created_at": created_at, - "updated_at": now, - } - await self._dapr_client.save_state( - store_name=self._state_store_name, - key=self._metadata_key, - value=json.dumps(metadata), - state_metadata=self._get_metadata(), - options=self._get_state_options(), - ) + try: + existing_meta = json.loads(existing_meta_response.data.decode("utf-8")) + if isinstance(existing_meta, dict) and existing_meta.get("created_at"): + created_at = str(existing_meta["created_at"]) + except (json.JSONDecodeError, UnicodeDecodeError, AttributeError): + # Corrupt metadata — start fresh with current timestamp. + pass + metadata = { + "session_id": self.session_id, + "created_at": created_at, + "updated_at": now, + } + meta_etag = getattr(existing_meta_response, "etag", None) or None + try: + await self._dapr_client.save_state( + store_name=self._state_store_name, + key=self._metadata_key, + value=json.dumps(metadata), + etag=meta_etag, + state_metadata=self._get_metadata(), + options=self._get_state_options(concurrency=Concurrency.first_write), + ) + break + except Exception as error: + should_retry = await self._handle_concurrency_conflict(error, meta_attempt) + if should_retry: + continue + raise async def pop_item(self) -> TResponseInputItem | None: """Remove and return the most recent item from the session.