diff --git a/src/mcp/shared/experimental/tasks/message_queue.py b/src/mcp/shared/experimental/tasks/message_queue.py index 018c2b7b2..e17c4a865 100644 --- a/src/mcp/shared/experimental/tasks/message_queue.py +++ b/src/mcp/shared/experimental/tasks/message_queue.py @@ -12,6 +12,7 @@ """ from abc import ABC, abstractmethod +from collections import deque from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Literal @@ -151,13 +152,13 @@ class InMemoryTaskMessageQueue(TaskMessageQueue): """ def __init__(self) -> None: - self._queues: dict[str, list[QueuedMessage]] = {} + self._queues: dict[str, deque[QueuedMessage]] = {} self._events: dict[str, anyio.Event] = {} - def _get_queue(self, task_id: str) -> list[QueuedMessage]: + def _get_queue(self, task_id: str) -> deque[QueuedMessage]: """Get or create the queue for a task.""" if task_id not in self._queues: - self._queues[task_id] = [] + self._queues[task_id] = deque() return self._queues[task_id] async def enqueue(self, task_id: str, message: QueuedMessage) -> None: @@ -172,7 +173,7 @@ async def dequeue(self, task_id: str) -> QueuedMessage | None: queue = self._get_queue(task_id) if not queue: return None - return queue.pop(0) + return queue.popleft() async def peek(self, task_id: str) -> QueuedMessage | None: """Return the next message without removing it.""" diff --git a/tests/experimental/tasks/test_message_queue.py b/tests/experimental/tasks/test_message_queue.py index a8517e535..eca113d5b 100644 --- a/tests/experimental/tasks/test_message_queue.py +++ b/tests/experimental/tasks/test_message_queue.py @@ -1,5 +1,6 @@ """Tests for TaskMessageQueue and InMemoryTaskMessageQueue.""" +from collections import deque from datetime import datetime, timezone import anyio @@ -270,7 +271,7 @@ async def is_empty_with_injection(tid: str) -> bool: if call_count == 2 and tid == task_id: # Before second check, inject a message - this simulates a message # arriving between event creation and the double-check - queue._queues[task_id] = [QueuedMessage(type="request", message=make_request())] + queue._queues[task_id] = deque([QueuedMessage(type="request", message=make_request())]) return await original_is_empty(tid) queue.is_empty = is_empty_with_injection # type: ignore[method-assign]