diff --git a/python/packages/core/AGENTS.md b/python/packages/core/AGENTS.md index 48ef906ab3a..b44233a04ab 100644 --- a/python/packages/core/AGENTS.md +++ b/python/packages/core/AGENTS.md @@ -130,7 +130,7 @@ agent_framework/ - **`AgentLoopMiddleware`** - `AgentMiddleware` that re-runs an agent in a loop by calling `call_next()` repeatedly (the pipeline re-reads `context.messages` each time). One configurable class covers two patterns: a required user `should_continue` predicate (sync or async, the first positional/keyword arg), and a chat-client judge built via the `.with_judge(...)` factory (a second chat client decides whether the original request was answered; loops while it is *not*, using a `JudgeVerdict` structured-output response — internally just an async `should_continue` predicate). The constructor covers the predicate pattern directly; only the judge has a convenience classmethod factory (`.with_judge(judge_client, ...)`) that forwards to `__init__`. Supports both streaming and non-streaming runs. By default a non-streaming run returns an aggregated `AgentResponse` containing every iteration's messages plus the injected `next_message` "nudge" messages (as `user` messages); set `return_final_only=True` to return only the last iteration's response. Streaming runs always yield each iteration's updates and emit the injected nudge messages as `user` updates between iterations (the `return_final_only` flag has no effect on streaming, and the final response reflects the last iteration; `MiddlewareTermination` is handled cleanly). `should_continue` is required; other constructor args are optional: `max_iterations` (safety cap; defaults to `DEFAULT_MAX_ITERATIONS`=10, explicit `None`→unbounded, positive int caps; `.with_judge` uses `DEFAULT_JUDGE_MAX_ITERATIONS`=5 as its default), `next_message` (defaults to a short "continue" nudge), `return_final_only`, and `additional_instructions` (an extra `system` message injected ahead of the input before the agent runs — becomes part of the original messages so it survives `fresh_context` resets and persists via a session). The judge is configured only through `.with_judge` (`judge_client`/`instructions`/`criteria`), not the constructor, and its `reasoning` is fed back to the agent as the next iteration's input; the judge forwards the original request messages and the agent's latest response messages verbatim so multi-modal content is preserved. `criteria` (a `list[str]`) is both injected as the agent's `additional_instructions` and rendered into the judge instructions wherever the `{{criteria}}` placeholder (`CRITERIA_PLACEHOLDER`) appears (`DEFAULT_JUDGE_INSTRUCTIONS` ends with it; custom `instructions` may include it, and it is stripped when no criteria are given). The `should_continue`/`next_message` callables are invoked with keyword args (`iteration`, `last_result`, `messages`, `original_messages`, `session`, `agent`, `progress`, `feedback`) and may be sync or async; declare only what you need plus `**kwargs`. `should_continue` may return a plain `bool` or a `(bool, str | None)` tuple whose second item is feedback surfaced to `next_message`/`record_feedback` via the `feedback` kwarg (the judge uses this to relay its `reasoning`). Stop precedence per iteration is `max_iterations` → `should_continue`, evaluated before `record_feedback` so the feedback is available to it. - **Feedback tracking** - `record_feedback` captures a per-iteration progress entry (called with the loop kwargs; if it returns a truthy string the entry is appended, otherwise the agent's response text is used as the fallback entry). The accumulated log is exposed to every callback via the `progress` keyword (a per-iteration copy of prior entries) and, when `inject_progress=True` (default), injected into the next iteration's input as a `user` message (the full log without a session, only the latest entry with a session to avoid duplicating history). `fresh_context=True` restarts each iteration from the original task plus the progress log; when a session is attached it is snapshotted (`to_dict()`) before the loop and restored (`from_dict` + field copy) between iterations so the local transcript and any service-side conversation id reset too (in-loop working-state is discarded, pre-loop state preserved, continuity carried only by the progress log). - **`todos_remaining(*, looping_modes=None)`** / **`todos_remaining_message`** - Helper factories for todo-driven loops (the Python counterpart of .NET's `TodoCompletionLoopEvaluator`), designed for `create_harness_agent` but usable with any agent that registers a `TodoProvider` via `context_providers`. They resolve the `TodoProvider`/`AgentModeProvider` from the *running agent* (`agent.context_providers`, via `_resolve_context_provider`) rather than taking the provider as an argument, so they can be wired directly into `loop_should_continue`/`loop_next_message`. `todos_remaining` returns a `should_continue` predicate that loops while any todo is open; pass `looping_modes=[...]` to gate looping to specific operating modes (case-insensitive; honors the `AgentModeProvider`'s `source_id`/`available_modes`), `looping_modes=None` (default) applies in every mode, and an empty sequence raises `ValueError`. `todos_remaining_message` is a `next_message` callable that lists the still-open todo titles and tells the agent to finish them, returning `None` when the session/agent/provider is unavailable or nothing is open (in which case the middleware's default `None` handling applies: reuse the previous iteration's messages verbatim under the default `fresh_context=False`, or `DEFAULT_NEXT_MESSAGE` only when `fresh_context=True`). -- **`background_tasks_running(provider)`** - Helper factory returning a `should_continue` predicate that loops while a `BackgroundAgentsProvider`'s persisted state shows running tasks (takes the provider explicitly, unlike `todos_remaining`). +- **`background_tasks_running()`** / **`background_tasks_running_message`** - Helper factories for background-agent-driven loops, mirroring the `todos_remaining` pair. They resolve the `BackgroundAgentsProvider` from the *running agent* (`agent.context_providers`, via `_resolve_context_provider`) rather than taking the provider as an argument, so they can be wired directly into `create_harness_agent`'s `loop_should_continue`/`loop_next_message`. `background_tasks_running` returns a `should_continue` predicate that loops while the provider's persisted state shows any task with `status == RUNNING` (pair it with `max_iterations` so the loop is bounded even if a task's persisted status is never refreshed). `background_tasks_running_message` is a `next_message` callable that lists the still-running tasks (`# (): `) and tells the agent to wait for them to finish and retrieve their results, returning `None` when the session/agent/provider is unavailable or no task is running. - **Approval escape hatch** - `_has_pending_approval_request(result)` checks whether an iteration's response carries a pending tool-approval request (any content with `type == "function_approval_request"`). Both the streaming and non-streaming loops stop and return that response to the caller *before* evaluating `should_continue`/`max_iterations` or injecting `next_message`, so the loop is HITL-safe even when wrapped outermost around a `ToolApprovalMiddleware` (mirrors the C# `LoopAgent`'s `HasPendingApprovalRequests`). - **Harness integration** - `create_harness_agent` enables the loop when a `loop_should_continue` callable is passed; it prepends `AgentLoopMiddleware(loop_should_continue, max_iterations=loop_max_iterations, next_message=loop_next_message)` ahead of `ToolApprovalMiddleware` so the loop is the outermost middleware (each iteration is a full agent run including tool approval, and the escape hatch hands pending approvals back to the caller). `loop_next_message` and `loop_max_iterations` only take effect together with `loop_should_continue` (with no `loop_should_continue` there is no loop, so they are ignored); `loop_max_iterations` defaults to the loop's default cap (`None` → unbounded). diff --git a/python/packages/core/agent_framework/__init__.py b/python/packages/core/agent_framework/__init__.py index 1d608365986..eccde63addd 100644 --- a/python/packages/core/agent_framework/__init__.py +++ b/python/packages/core/agent_framework/__init__.py @@ -111,6 +111,7 @@ AgentLoopMiddleware, JudgeVerdict, background_tasks_running, + background_tasks_running_message, todos_remaining, todos_remaining_message, ) @@ -576,6 +577,7 @@ "annotate_message_groups", "apply_compaction", "background_tasks_running", + "background_tasks_running_message", "chat_middleware", "create_always_approve_tool_response", "create_always_approve_tool_with_arguments_response", diff --git a/python/packages/core/agent_framework/_harness/_loop.py b/python/packages/core/agent_framework/_harness/_loop.py index e61cdb47402..8f9a5e7be9e 100644 --- a/python/packages/core/agent_framework/_harness/_loop.py +++ b/python/packages/core/agent_framework/_harness/_loop.py @@ -9,7 +9,8 @@ 1. A user-supplied ``should_continue`` predicate - for example, keep looping while a response does not yet contain a completion marker, while a :class:`~agent_framework.TodoProvider` still has open items, or while a :class:`~agent_framework.BackgroundAgentsProvider` still has running - tasks (see the :func:`todos_remaining` and :func:`background_tasks_running` helpers). The loop + tasks (see the :func:`todos_remaining` and :func:`background_tasks_running` helpers, which resolve + their provider from the running agent). The loop can track a **feedback log** across iterations (``record_feedback``): each pass contributes an entry that is exposed to every callback via the ``progress`` keyword and (by default) injected into the next iteration's input. Set ``fresh_context=True`` to restart each pass from the @@ -53,6 +54,7 @@ "AgentLoopMiddleware", "JudgeVerdict", "background_tasks_running", + "background_tasks_running_message", "todos_remaining", "todos_remaining_message", ] @@ -777,35 +779,73 @@ async def _resolve_next_message( return list(next_msgs) -def background_tasks_running(provider: Any) -> ShouldContinueCallable: - """Build a ``should_continue`` predicate that loops while a ``BackgroundAgentsProvider`` is busy. +def _running_background_tasks(session: Any, agent: Any) -> list[Any]: + """Return the still-running ``BackgroundTaskInfo`` entries for the agent's provider. + + Resolves the :class:`~agent_framework.BackgroundAgentsProvider` from the running agent + (``agent.context_providers``) and reads its persisted task state. Returns an empty list when the + session/agent/provider is unavailable or no task is currently running. + """ + from ._background_agents import BackgroundAgentsProvider, BackgroundTaskInfo, BackgroundTaskStatus + + if session is None or agent is None: + return [] + provider = _resolve_context_provider(agent, BackgroundAgentsProvider) + if provider is None: + return [] + state = session.state.get(provider.source_id) + if not state: + return [] + tasks = [BackgroundTaskInfo.from_dict(task) for task in state.get("tasks", [])] + return [task for task in tasks if task.status == BackgroundTaskStatus.RUNNING] + + +def background_tasks_running() -> ShouldContinueCallable: + """Build a ``should_continue`` predicate that loops while the agent's background tasks are busy. + + This resolves the :class:`~agent_framework.BackgroundAgentsProvider` from the running agent + (``agent.context_providers``). The predicate inspects the provider's persisted task state and continues while any task is still marked as running. Pair it with ``max_iterations`` so the loop is guaranteed to stop even if a task's persisted status is never refreshed. - Args: - provider: A :class:`~agent_framework.BackgroundAgentsProvider` attached to the same session - as the loop. - Returns: - A predicate suitable for :class:`AgentLoopMiddleware`'s ``should_continue`` argument. + A predicate suitable for :class:`AgentLoopMiddleware`'s ``should_continue`` argument (and for + ``create_harness_agent``'s ``loop_should_continue``). """ - from ._background_agents import BackgroundTaskInfo, BackgroundTaskStatus - def _should_continue(*, session: Any = None, **kwargs: Any) -> bool: - if session is None: - return False - state = session.state.get(provider.source_id) - if not state: - return False - return any( - BackgroundTaskInfo.from_dict(task).status == BackgroundTaskStatus.RUNNING for task in state.get("tasks", []) - ) + def _should_continue(*, session: Any = None, agent: Any = None, **kwargs: Any) -> bool: + return bool(_running_background_tasks(session, agent)) return _should_continue +def background_tasks_running_message(*, session: Any = None, agent: Any = None, **kwargs: Any) -> str | None: + """``next_message`` callable that reminds the agent which background tasks are still running. + + Designed to pair with :func:`background_tasks_running` as a loop's ``next_message`` (e.g. + ``create_harness_agent``'s ``loop_next_message``): between iterations it resolves the + :class:`~agent_framework.BackgroundAgentsProvider` from the agent, lists the still-running tasks, + and instructs the agent to wait for them to finish (and retrieve their results) before finishing. + + Returns ``None`` when the session/agent/provider is unavailable or no task is running. In that + case the loop's default ``next_message`` handling applies. In normal looping a ``None`` here is + rare, since "no running tasks" also makes :func:`background_tasks_running` stop the loop before + the next message is consulted. + """ + running = _running_background_tasks(session, agent) + if not running: + return None + task_lines = "\n".join(f"- #{task.id} ({task.agent_name}): {task.description}" for task in running) + return ( + f"You still have {len(running)} background task(s) running that must finish before you can " + f"complete the work:\n{task_lines}\n\n" + "Wait for these tasks to complete, retrieve their results, and incorporate them. Only stop " + "once every background task has finished." + ) + + def _resolve_context_provider(agent: Any, provider_type: type) -> Any: """Return the first ``provider_type`` instance on ``agent.context_providers`` (or ``None``). diff --git a/python/packages/core/tests/core/test_harness_loop.py b/python/packages/core/tests/core/test_harness_loop.py index ba2c8769288..b3e98f1724b 100644 --- a/python/packages/core/tests/core/test_harness_loop.py +++ b/python/packages/core/tests/core/test_harness_loop.py @@ -29,6 +29,7 @@ TodoItem, TodoProvider, background_tasks_running, + background_tasks_running_message, set_agent_mode, todos_remaining, todos_remaining_message, @@ -974,10 +975,11 @@ def run(self, *args: Any, **kwargs: Any) -> Any: ... provider = BackgroundAgentsProvider([_DummyAgent()]) # type: ignore[list-item] # ty: ignore[invalid-argument-type] session = AgentSession() - predicate = background_tasks_running(provider) + agent = _FakeHarnessAgent(provider) + predicate = background_tasks_running() # No tasks -> not running. - assert predicate(session=session) is False + assert predicate(session=session, agent=agent) is False running = BackgroundTaskInfo( id=1, @@ -986,7 +988,7 @@ def run(self, *args: Any, **kwargs: Any) -> Any: ... status=BackgroundTaskStatus.RUNNING, ) session.state[provider_source] = {"next_task_id": 2, "tasks": [running.to_dict()]} - assert predicate(session=session) is True + assert predicate(session=session, agent=agent) is True completed = BackgroundTaskInfo( id=1, @@ -995,10 +997,10 @@ def run(self, *args: Any, **kwargs: Any) -> Any: ... status=BackgroundTaskStatus.COMPLETED, ) session.state[provider_source] = {"next_task_id": 2, "tasks": [completed.to_dict()]} - assert predicate(session=session) is False + assert predicate(session=session, agent=agent) is False -def test_background_tasks_running_helper_without_session() -> None: +def test_background_tasks_running_helper_requires_session_agent_and_provider() -> None: from agent_framework import BackgroundAgentsProvider class _DummyAgent: @@ -1008,8 +1010,78 @@ class _DummyAgent: def run(self, *args: Any, **kwargs: Any) -> Any: ... provider = BackgroundAgentsProvider([_DummyAgent()]) # type: ignore[list-item] # ty: ignore[invalid-argument-type] - predicate = background_tasks_running(provider) - assert predicate(session=None) is False + session = AgentSession() + session.state["background_agents"] = { + "next_task_id": 2, + "tasks": [ + BackgroundTaskInfo( + id=1, agent_name="worker", description="job", status=BackgroundTaskStatus.RUNNING + ).to_dict() + ], + } + predicate = background_tasks_running() + + # Missing session or agent -> False. + assert predicate(session=None, agent=_FakeHarnessAgent(provider)) is False + assert predicate(session=session, agent=None) is False + # Agent without a BackgroundAgentsProvider -> False. + assert predicate(session=session, agent=_FakeHarnessAgent()) is False + + +def test_background_tasks_running_message_lists_running_tasks() -> None: + from agent_framework import BackgroundAgentsProvider + + class _DummyAgent: + name = "worker" + description = "does work" + + def run(self, *args: Any, **kwargs: Any) -> Any: ... + + provider = BackgroundAgentsProvider([_DummyAgent()]) # type: ignore[list-item] # ty: ignore[invalid-argument-type] + session = AgentSession() + agent = _FakeHarnessAgent(provider) + session.state["background_agents"] = { + "next_task_id": 4, + "tasks": [ + BackgroundTaskInfo( + id=1, agent_name="worker", description="first job", status=BackgroundTaskStatus.RUNNING + ).to_dict(), + BackgroundTaskInfo( + id=2, agent_name="worker", description="done job", status=BackgroundTaskStatus.COMPLETED + ).to_dict(), + BackgroundTaskInfo( + id=3, agent_name="worker", description="third job", status=BackgroundTaskStatus.RUNNING + ).to_dict(), + ], + } + + message = background_tasks_running_message(session=session, agent=agent) + assert message is not None + assert "2 background task(s) running" in message + assert "#1 (worker): first job" in message + assert "#3 (worker): third job" in message + assert "done job" not in message + + +def test_background_tasks_running_message_returns_none_when_idle() -> None: + from agent_framework import BackgroundAgentsProvider + + class _DummyAgent: + name = "worker" + description = "does work" + + def run(self, *args: Any, **kwargs: Any) -> Any: ... + + provider = BackgroundAgentsProvider([_DummyAgent()]) # type: ignore[list-item] # ty: ignore[invalid-argument-type] + session = AgentSession() + agent = _FakeHarnessAgent(provider) + + # No running tasks at all. + assert background_tasks_running_message(session=session, agent=agent) is None + # Missing session/agent/provider -> None. + assert background_tasks_running_message(session=None, agent=agent) is None + assert background_tasks_running_message(session=session, agent=None) is None + assert background_tasks_running_message(session=session, agent=_FakeHarnessAgent()) is None # region todos_remaining / todos_remaining_message helpers