Skip to content
47 changes: 44 additions & 3 deletions agentex/src/config/mongodb_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,60 @@ def ensure_mongodb_indexes(mongodb_database: MongoDBDatabase) -> None:
logger.info(f" ✓ Created index '{name or result}'")

except OperationFailure as e:
# Index might already exist with different options
# Index already exists with different options — drop and recreate
if "already exists with different options" in str(e):
index_name = index_spec.get("name", "unnamed")
logger.warning(
f" ⚠ Index '{index_spec.get('name', 'unnamed')}' already exists "
f"with different options. You may need to drop and recreate it."
f" ⚠ Index '{index_name}' already exists "
f"with different options. Dropping and recreating..."
)
try:
collection.drop_index(index_name)
logger.info(f" ✓ Dropped existing index '{index_name}'")
except Exception as drop_err:
logger.error(
f" ✗ Failed to drop index '{index_name}': {drop_err}"
)
continue

try:
collection.create_index(keys, **index_kwargs)
logger.info(f" ✓ Recreated index '{index_name}'")
except Exception as create_err:
logger.error(
f" ✗ Failed to recreate index '{index_name}' after dropping it: {create_err}. "
f"The index is currently absent. Restart the application or recreate it manually."
)
# Re-raise to abort startup. Note: this propagates past the
# sibling `except Exception` below — Python only routes exceptions
# from the `try` block to sibling handlers, not from other handlers.
raise
else:
logger.error(f" ✗ Failed to create index: {e}")
except Exception as e:
logger.error(
f" ✗ Unexpected error creating index '{index_spec.get('name', 'unnamed')}': {e}"
)

# Clean up legacy indexes that were removed from INDEXES lists but may
# still exist on databases where they were previously created.
legacy_indexes: list[tuple[str, str]] = [
("messages", "task_id_idx"),
("task_states", "task_id_idx"),
]
for collection_name, index_name in legacy_indexes:
try:
mongodb_database[collection_name].drop_index(index_name)
logger.info(
f" ✓ Dropped legacy index '{index_name}' from '{collection_name}'"
)
except OperationFailure:
pass # Index doesn't exist — already clean
except Exception as e:
logger.warning(
f" ⚠ Failed to drop legacy index '{index_name}' from '{collection_name}': {e}"
)

logger.info("MongoDB index creation completed.")


Expand Down
12 changes: 5 additions & 7 deletions agentex/src/domain/repositories/task_message_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ class TaskMessageRepository(MongoDBCRUDRepository[TaskMessageEntity]):
# These will be created once at startup, not per request
INDEXES = [
{
"keys": [("task_id", pymongo.ASCENDING), ("created_at", pymongo.ASCENDING)],
"keys": [
("task_id", pymongo.ASCENDING),
("created_at", pymongo.DESCENDING),
],
"name": "task_id_created_at_idx",
"description": "Compound index for querying messages by task_id and sorting by created_at",
},
{
"keys": [("task_id", pymongo.ASCENDING)],
"name": "task_id_idx",
"description": "Single index for task_id queries and delete operations",
},
{
"keys": [
("task_id", pymongo.ASCENDING),
Expand All @@ -40,7 +38,7 @@ class TaskMessageRepository(MongoDBCRUDRepository[TaskMessageEntity]):
"keys": [
("task_id", pymongo.ASCENDING),
("content.type", pymongo.ASCENDING),
("created_at", pymongo.ASCENDING),
("created_at", pymongo.DESCENDING),
],
"name": "task_id_content_type_created_at_idx",
"description": "Compound index for filtering messages by task_id, content type, and sorting by created_at",
Expand Down
17 changes: 10 additions & 7 deletions agentex/src/domain/repositories/task_state_repository.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Annotated

import pymongo
Expand All @@ -21,12 +22,8 @@ class TaskStateRepository(MongoDBCRUDRepository[StateEntity]):
{
"keys": [("task_id", pymongo.ASCENDING), ("agent_id", pymongo.ASCENDING)],
"name": "task_agent_compound_idx",
"description": "Compound index for get_by_task_and_agent queries",
},
{
"keys": [("task_id", pymongo.ASCENDING)],
"name": "task_id_idx",
"description": "Single index for task_id queries",
"unique": True,
"description": "Unique compound index for get_by_task_and_agent queries",
},
Comment on lines 23 to 27
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unique constraint won't apply to existing deployments

Adding unique: True here will not enforce uniqueness on any environment where the task_agent_compound_idx index already exists as a non-unique index. When ensure_mongodb_indexes calls collection.create_index(keys, name="task_agent_compound_idx", unique=True) and an index with that name already exists without unique, MongoDB raises OperationFailure: "already exists with different options". The catch block in mongodb_indexes.py (lines 87–93) silently logs a warning and moves on — the old index remains in place and the uniqueness constraint is never applied.

The same applies to the sort-direction changes in task_message_repository.py: the pre-existing task_id_created_at_idx (with ASCENDING) won't be replaced with the DESCENDING variant on any running database.

To actually apply these changes, you need a one-time migration step (e.g., in ensure_mongodb_indexes or a separate script) that drops the old indexes before recreating them:

# In ensure_mongodb_indexes or a migration script, before create_index:
try:
    collection.drop_index("task_agent_compound_idx")
except Exception:
    pass  # index might not exist yet
collection.create_index(keys, name="task_agent_compound_idx", unique=True)

Or expose the existing drop_all_indexes helper and run it once as part of the deploy process. Until this is addressed, the unique invariant described in the PR summary will only be enforced on fresh databases.

Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/domain/repositories/task_state_repository.py
Line: 22-26

Comment:
**Unique constraint won't apply to existing deployments**

Adding `unique: True` here will **not** enforce uniqueness on any environment where the `task_agent_compound_idx` index already exists as a non-unique index. When `ensure_mongodb_indexes` calls `collection.create_index(keys, name="task_agent_compound_idx", unique=True)` and an index with that name already exists without `unique`, MongoDB raises `OperationFailure: "already exists with different options"`. The catch block in `mongodb_indexes.py` (lines 87–93) silently logs a warning and moves on — the old index remains in place and **the uniqueness constraint is never applied**.

The same applies to the sort-direction changes in `task_message_repository.py`: the pre-existing `task_id_created_at_idx` (with `ASCENDING`) won't be replaced with the `DESCENDING` variant on any running database.

To actually apply these changes, you need a one-time migration step (e.g., in `ensure_mongodb_indexes` or a separate script) that drops the old indexes before recreating them:

```python
# In ensure_mongodb_indexes or a migration script, before create_index:
try:
    collection.drop_index("task_agent_compound_idx")
except Exception:
    pass  # index might not exist yet
collection.create_index(keys, name="task_agent_compound_idx", unique=True)
```

Or expose the existing `drop_all_indexes` helper and run it once as part of the deploy process. Until this is addressed, the unique invariant described in the PR summary will only be enforced on fresh databases.

How can I resolve this? If you propose a fix, please make it concise.

{
"keys": [("agent_id", pymongo.ASCENDING)],
Expand All @@ -43,7 +40,13 @@ def __init__(self, db: DMongoDBDatabase):
async def get_by_task_and_agent(
self, task_id: str, agent_id: str
) -> StateEntity | None:
doc = self.collection.find_one({"task_id": task_id, "agent_id": agent_id})
loop = asyncio.get_running_loop()
doc = await loop.run_in_executor(
None,
lambda: self.collection.find_one(
{"task_id": task_id, "agent_id": agent_id}
),
)
return self._deserialize(doc) if doc else None


Expand Down
Loading