Conversation
…get session handling Change-Id: I4e3ce626e51d4b9b3a77fddcdbfa32b05a30959f Signed-off-by: 久氢 <mapenghui.mph@alibaba-inc.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR updates MemoryConversation to be safer under concurrent async usage by preventing duplicate memory-store initialization, and to reduce streaming latency by persisting session/message data in background tasks rather than blocking the response stream.
Changes:
- Add a per-instance async lock and split memory-store initialization into a locked init path to avoid concurrent double-initialization.
- Switch session + conversation persistence to fire-and-forget background tasks (
asyncio.create_task) to avoid blocking stream completion. - Update unit tests to account for background persistence timing.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| agentrun/memory_collection/memory_conversation.py | Adds async init locking and moves persistence to background tasks to avoid blocking streaming. |
| tests/unittests/memory_collection/test_memory_conversation.py | Adds a helper to wait before assertions so background persistence tasks can complete. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+397
to
+402
| async def _put_session_bg(): | ||
| try: | ||
| await memory_store.put_session(session) | ||
| except Exception as e: | ||
| logger.error(f"Failed to save session: {e}", exc_info=True) | ||
|
|
Comment on lines
+516
to
534
| try: | ||
| await ms.put_message(msg) | ||
| sess.update_time = microseconds_timestamp() | ||
| await ms.update_session(sess) | ||
| logger.debug( | ||
| "Saved conversation: %d messages," | ||
| " text length: %d chars," | ||
| " tool_calls: %d, tool_results: %d", | ||
| n_msgs, | ||
| text_len, | ||
| n_tc, | ||
| n_tr, | ||
| ) | ||
| except Exception as e: | ||
| logger.error( | ||
| "Failed to save conversation: %s", | ||
| e, | ||
| exc_info=True, | ||
| ) |
| except Exception as e: | ||
| logger.error(f"Failed to save session: {e}", exc_info=True) | ||
|
|
||
| asyncio.create_task(_put_session_bg()) |
Comment on lines
+12
to
+15
| async def _flush_bg_tasks(): | ||
| """Let fire-and-forget background tasks complete before assertions.""" | ||
| await asyncio.sleep(0.05) | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
…get session handling
Change-Id: I4e3ce626e51d4b9b3a77fddcdbfa32b05a30959f
Fix bugs
Bug detail
Pull request tasks
Update docs
Reason for update
Pull request tasks
Add contributor
Contributed content
Content detail
Others
Reason for update