diff --git a/agentex/src/config/mongodb_indexes.py b/agentex/src/config/mongodb_indexes.py index d87231c3..d4458907 100644 --- a/agentex/src/config/mongodb_indexes.py +++ b/agentex/src/config/mongodb_indexes.py @@ -84,12 +84,34 @@ def ensure_mongodb_indexes(mongodb_database: MongoDBDatabase) -> None: logger.info(f" ✓ Created index '{name or result}'") except OperationFailure as e: - # Index might already exist with different options + # Index already exists with different options — drop and recreate if "already exists with different options" in str(e): + index_name = index_spec.get("name", "unnamed") logger.warning( - f" ⚠ Index '{index_spec.get('name', 'unnamed')}' already exists " - f"with different options. You may need to drop and recreate it." + f" ⚠ Index '{index_name}' already exists " + f"with different options. Dropping and recreating..." ) + try: + collection.drop_index(index_name) + logger.info(f" ✓ Dropped existing index '{index_name}'") + except Exception as drop_err: + logger.error( + f" ✗ Failed to drop index '{index_name}': {drop_err}" + ) + continue + + try: + collection.create_index(keys, **index_kwargs) + logger.info(f" ✓ Recreated index '{index_name}'") + except Exception as create_err: + logger.error( + f" ✗ Failed to recreate index '{index_name}' after dropping it: {create_err}. " + f"The index is currently absent. Restart the application or recreate it manually." + ) + # Re-raise to abort startup. Note: this propagates past the + # sibling `except Exception` below — Python only routes exceptions + # from the `try` block to sibling handlers, not from other handlers. + raise else: logger.error(f" ✗ Failed to create index: {e}") except Exception as e: @@ -97,6 +119,25 @@ def ensure_mongodb_indexes(mongodb_database: MongoDBDatabase) -> None: f" ✗ Unexpected error creating index '{index_spec.get('name', 'unnamed')}': {e}" ) + # Clean up legacy indexes that were removed from INDEXES lists but may + # still exist on databases where they were previously created. + legacy_indexes: list[tuple[str, str]] = [ + ("messages", "task_id_idx"), + ("task_states", "task_id_idx"), + ] + for collection_name, index_name in legacy_indexes: + try: + mongodb_database[collection_name].drop_index(index_name) + logger.info( + f" ✓ Dropped legacy index '{index_name}' from '{collection_name}'" + ) + except OperationFailure: + pass # Index doesn't exist — already clean + except Exception as e: + logger.warning( + f" ⚠ Failed to drop legacy index '{index_name}' from '{collection_name}': {e}" + ) + logger.info("MongoDB index creation completed.") diff --git a/agentex/src/domain/repositories/task_message_repository.py b/agentex/src/domain/repositories/task_message_repository.py index 8d5e1e30..92ab9ac2 100644 --- a/agentex/src/domain/repositories/task_message_repository.py +++ b/agentex/src/domain/repositories/task_message_repository.py @@ -19,15 +19,13 @@ class TaskMessageRepository(MongoDBCRUDRepository[TaskMessageEntity]): # These will be created once at startup, not per request INDEXES = [ { - "keys": [("task_id", pymongo.ASCENDING), ("created_at", pymongo.ASCENDING)], + "keys": [ + ("task_id", pymongo.ASCENDING), + ("created_at", pymongo.DESCENDING), + ], "name": "task_id_created_at_idx", "description": "Compound index for querying messages by task_id and sorting by created_at", }, - { - "keys": [("task_id", pymongo.ASCENDING)], - "name": "task_id_idx", - "description": "Single index for task_id queries and delete operations", - }, { "keys": [ ("task_id", pymongo.ASCENDING), @@ -40,7 +38,7 @@ class TaskMessageRepository(MongoDBCRUDRepository[TaskMessageEntity]): "keys": [ ("task_id", pymongo.ASCENDING), ("content.type", pymongo.ASCENDING), - ("created_at", pymongo.ASCENDING), + ("created_at", pymongo.DESCENDING), ], "name": "task_id_content_type_created_at_idx", "description": "Compound index for filtering messages by task_id, content type, and sorting by created_at", diff --git a/agentex/src/domain/repositories/task_state_repository.py b/agentex/src/domain/repositories/task_state_repository.py index 16e43147..dddd10a1 100644 --- a/agentex/src/domain/repositories/task_state_repository.py +++ b/agentex/src/domain/repositories/task_state_repository.py @@ -1,3 +1,4 @@ +import asyncio from typing import Annotated import pymongo @@ -21,12 +22,8 @@ class TaskStateRepository(MongoDBCRUDRepository[StateEntity]): { "keys": [("task_id", pymongo.ASCENDING), ("agent_id", pymongo.ASCENDING)], "name": "task_agent_compound_idx", - "description": "Compound index for get_by_task_and_agent queries", - }, - { - "keys": [("task_id", pymongo.ASCENDING)], - "name": "task_id_idx", - "description": "Single index for task_id queries", + "unique": True, + "description": "Unique compound index for get_by_task_and_agent queries", }, { "keys": [("agent_id", pymongo.ASCENDING)], @@ -43,7 +40,13 @@ def __init__(self, db: DMongoDBDatabase): async def get_by_task_and_agent( self, task_id: str, agent_id: str ) -> StateEntity | None: - doc = self.collection.find_one({"task_id": task_id, "agent_id": agent_id}) + loop = asyncio.get_running_loop() + doc = await loop.run_in_executor( + None, + lambda: self.collection.find_one( + {"task_id": task_id, "agent_id": agent_id} + ), + ) return self._deserialize(doc) if doc else None