From 910172c4563601a0487b0249c44d40a2fc6400ac Mon Sep 17 00:00:00 2001 From: alliscode Date: Mon, 27 Apr 2026 12:53:17 -0700 Subject: [PATCH 01/15] Fix declarative Workflow.as_agent() by accepting list[Message] in start executor The declarative start executor (JoinExecutor) only advertised dict and str in its input_types, so WorkflowAgent.__init__ rejected it with 'Workflow's start executor cannot handle list[Message]'. Add list[Message] to the JoinExecutor handler annotation and add a matching branch in DeclarativeActionExecutor._ensure_state_initialized that extracts the last user-message text and falls through to the string-input initialization path, so =System.LastMessageText works end-to-end via as_agent(). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../_workflows/_declarative_base.py | 21 ++++++++++++++ .../_workflows/_executors_control_flow.py | 9 +++++- .../tests/test_workflow_factory.py | 28 +++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py index e7af9fde9a..3e6b2a3129 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py @@ -36,6 +36,7 @@ from agent_framework import ( Executor, + Message, WorkflowContext, ) from agent_framework._workflows._state import State @@ -873,6 +874,9 @@ async def _ensure_state_initialized( Follows .NET's DefaultTransform pattern - accepts any input type: - dict/Mapping: Used directly as workflow.inputs - str: Converted to {"input": value} + - list[Message]: Joined to a string from the last user message text + (or last message text if no user message). Falls through to the + string-input path so System.LastMessage.Text is populated. - DeclarativeMessage: Internal message, no initialization needed - Any other type: Converted via str() to {"input": str(value)} @@ -888,6 +892,23 @@ async def _ensure_state_initialized( if isinstance(trigger, dict): # Structured inputs - use directly state.initialize(trigger) # type: ignore + elif isinstance(trigger, list) and all(isinstance(m, Message) for m in trigger): + # list[Message] (e.g. from WorkflowAgent / as_agent()) - extract the + # last user message text and treat it as the string input. Fall + # through to the same state initialization as the str case so + # =System.LastMessage.Text / =System.LastMessageText keep working. + messages_list = cast(list[Message], trigger) + user_text = "" + for msg in reversed(messages_list): + if str(msg.role).lower() == "user" and msg.text: + user_text = msg.text + break + if not user_text: + # Fallback: concatenate any text from the last message. + user_text = messages_list[-1].text if messages_list else "" + state.initialize({"input": user_text}) + state.set("System.LastMessage", {"Text": user_text, "Id": ""}) + state.set("System.LastMessageText", user_text) elif isinstance(trigger, str): # String input - wrap in dict and populate System.LastMessage.Text # so YAML expressions like =System.LastMessage.Text see the user input diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py index 0aa660b3ea..f5baf80a9d 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py @@ -17,6 +17,7 @@ from typing import Any, cast from agent_framework import ( + Message, WorkflowContext, handler, ) @@ -492,7 +493,13 @@ class JoinExecutor(DeclarativeActionExecutor): @handler async def handle_action( self, - trigger: dict[str, Any] | str | ActionTrigger | ActionComplete | ConditionResult | LoopIterationResult, + trigger: dict[str, Any] + | str + | list[Message] + | ActionTrigger + | ActionComplete + | ConditionResult + | LoopIterationResult, ctx: WorkflowContext[ActionComplete], ) -> None: """Simply pass through to continue the workflow.""" diff --git a/python/packages/declarative/tests/test_workflow_factory.py b/python/packages/declarative/tests/test_workflow_factory.py index f08f5993e5..e9988ea97c 100644 --- a/python/packages/declarative/tests/test_workflow_factory.py +++ b/python/packages/declarative/tests/test_workflow_factory.py @@ -228,6 +228,34 @@ async def test_entry_join_executor_initializes_workflow_inputs_string(self): outputs = result.get_outputs() assert any("hello-world" in str(o) for o in outputs), f"Expected 'hello-world' in outputs but got: {outputs}" + @pytest.mark.asyncio + async def test_as_agent_round_trip_with_last_message_text(self): + """Regression test: a declarative workflow built via WorkflowFactory must be + consumable as an AIAgent via Workflow.as_agent(). + + Specifically, the declarative start executor must accept list[Message] + (the input passed by WorkflowAgent) and populate System.LastMessageText + so =System.LastMessageText is resolvable in the YAML. + """ + factory = WorkflowFactory() + workflow = factory.create_workflow_from_yaml(""" +name: as-agent-roundtrip-test +actions: + - kind: SetVariable + variable: Local.echo + value: =System.LastMessageText + - kind: SendActivity + activity: + text: =Local.echo +""") + + agent = workflow.as_agent(name="echo-agent") + response = await agent.run("Hello there") + + assert "Hello there" in response.text, ( + f"Expected 'Hello there' in agent response text but got: {response.text!r}" + ) + class TestWorkflowFactoryAgentRegistration: """Tests for agent registration.""" From 56ab7df874f56bb8c64a929cd698ff742460a966 Mon Sep 17 00:00:00 2001 From: alliscode Date: Mon, 27 Apr 2026 14:10:03 -0700 Subject: [PATCH 02/15] Populate Conversation.messages from list[Message] trigger When Workflow.as_agent() is invoked with a list[Message], the start executor now populates Conversation.messages / Conversation.history / System.conversations.{id}.messages with prior turns only (excluding the latest user message), and surfaces the latest user message via Inputs.input and System.LastMessage*. This matches InvokeAzureAgent's contract that the messages binding holds prior turns and the executor itself appends the new user input before invoking, avoiding double-append of the trailing user turn while preserving full history (incl. assistant/system/tool roles and multi-modal content) for downstream actions. --- .../_workflows/_declarative_base.py | 94 +++++++++++++++---- 1 file changed, 77 insertions(+), 17 deletions(-) diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py index 3e6b2a3129..c7793dc6fc 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py @@ -874,9 +874,15 @@ async def _ensure_state_initialized( Follows .NET's DefaultTransform pattern - accepts any input type: - dict/Mapping: Used directly as workflow.inputs - str: Converted to {"input": value} - - list[Message]: Joined to a string from the last user message text - (or last message text if no user message). Falls through to the - string-input path so System.LastMessage.Text is populated. + - list[Message]: Treated as the agent-facing message contract + (e.g. from WorkflowAgent / as_agent()). The full message list is + stored in ``Conversation.messages``/``Conversation.history`` and + mirrored to ``System.conversations.{id}.messages`` so workflows + that reference ``=Conversation.messages`` (e.g. InvokeAzureAgent) + see the complete history including assistant turns and non-text + content. The last user message's text is also used as the string + input (``Inputs.input``) and surfaced via ``System.LastMessage*`` + for backward compatibility with simple text-only workflows. - DeclarativeMessage: Internal message, no initialization needed - Any other type: Converted via str() to {"input": str(value)} @@ -893,22 +899,76 @@ async def _ensure_state_initialized( # Structured inputs - use directly state.initialize(trigger) # type: ignore elif isinstance(trigger, list) and all(isinstance(m, Message) for m in trigger): - # list[Message] (e.g. from WorkflowAgent / as_agent()) - extract the - # last user message text and treat it as the string input. Fall - # through to the same state initialization as the str case so - # =System.LastMessage.Text / =System.LastMessageText keep working. + # list[Message] (e.g. from WorkflowAgent / as_agent()). + # Populate the full conversation rather than collapsing to a + # single string, so workflows that operate on the message list + # (InvokeAzureAgent with =Conversation.messages, history-aware + # agents, multi-modal content, etc.) see the complete input. messages_list = cast(list[Message], trigger) - user_text = "" - for msg in reversed(messages_list): - if str(msg.role).lower() == "user" and msg.text: - user_text = msg.text + + # Locate the trailing user message: WorkflowAgent merges session + # history with the caller's new input and forwards the combined + # list, so the most recent user message represents "this turn" + # (everything before it is prior history). InvokeAzureAgent's + # contract is that Conversation.messages holds PRIOR turns only - + # the executor appends the new user input itself before invoking + # the agent. To avoid duplicating the latest user turn we split + # the trigger at that boundary. + last_user_index = -1 + for idx in range(len(messages_list) - 1, -1, -1): + if str(messages_list[idx].role).lower() == "user": + last_user_index = idx break - if not user_text: - # Fallback: concatenate any text from the last message. - user_text = messages_list[-1].text if messages_list else "" - state.initialize({"input": user_text}) - state.set("System.LastMessage", {"Text": user_text, "Id": ""}) - state.set("System.LastMessageText", user_text) + + if last_user_index >= 0: + last_user_msg = messages_list[last_user_index] + last_user_text = last_user_msg.text or "" + last_user_id = getattr(last_user_msg, "message_id", "") or "" + # Prior history excludes the latest user turn; trailing + # non-user messages (e.g. tool results) are preserved so + # later actions still see them in Conversation.messages. + history_messages = ( + messages_list[:last_user_index] + messages_list[last_user_index + 1:] + ) + else: + # No user message in the list - rare path (e.g. resume after + # an assistant-only sequence). Treat the whole list as prior + # history and surface the last message's text for backwards + # compatibility with =System.LastMessageText. + history_messages = list(messages_list) + tail = messages_list[-1] if messages_list else None + last_user_text = (tail.text or "") if tail is not None else "" + last_user_id = ( + getattr(tail, "message_id", "") or "" if tail is not None else "" + ) + + # Initialize state. Using the last user text as Inputs.input + # keeps simple yamls (=inputs.input / =System.LastMessageText) + # working, and matches what InvokeAzureAgent expects to find via + # its input_text fallback chain. + state.initialize({"input": last_user_text}) + + # Populate Conversation.messages/.history with PRIOR turns only + # (matching the executor contract above). Raw Message objects + # are stored - matching what agent executors append at runtime. + for msg in history_messages: + state.append("Conversation.messages", msg) + state.append("Conversation.history", msg) + + # Mirror to System.conversations.{ConversationId}.messages so + # actions resolving conversation-scoped paths see the same + # history. + conversation_id = state.get("System.ConversationId") + if conversation_id: + conv_path = f"System.conversations.{conversation_id}.messages" + for msg in history_messages: + state.append(conv_path, msg) + + # System.LastMessage* mirrors the most recent USER message + # (matching .NET DefaultTransform semantics for agent input). + state.set("System.LastMessage", {"Text": last_user_text, "Id": last_user_id}) + state.set("System.LastMessageText", last_user_text) + state.set("System.LastMessageId", last_user_id) elif isinstance(trigger, str): # String input - wrap in dict and populate System.LastMessage.Text # so YAML expressions like =System.LastMessage.Text see the user input From 35fa93942c95ec2283236b6077d0f52c40725506 Mon Sep 17 00:00:00 2001 From: alliscode Date: Mon, 27 Apr 2026 16:32:27 -0700 Subject: [PATCH 03/15] Coerce Enum values when serializing PowerFx symbols MessageRole and other str-subclass Enums passed isinstance(v, str) and were forwarded to pythonnet unchanged. pythonnet then raised 'MessageRole value cannot be converted to System.String' for every PowerFx primitive when ConditionGroup/Expr eval walked the symbol table containing Conversation.messages. Reduce Enum members to their underlying value before the primitive check so eval sees plain strings/ints. --- .../_workflows/_declarative_base.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py index c7793dc6fc..14ea3f7fc1 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py @@ -29,6 +29,7 @@ import logging import sys import uuid +from enum import Enum from collections.abc import Mapping from dataclasses import dataclass from decimal import Decimal as _Decimal @@ -121,7 +122,20 @@ def _make_powerfx_safe(value: Any) -> Any: Returns: A PowerFx-safe representation of the value """ - if value is None or isinstance(value, _POWERFX_SAFE_TYPES): + if value is None: + return value + + # Enum coercion must run BEFORE the primitive type check: many MAF + # enums (e.g. MessageRole) are ``str``-subclass enums, so they pass + # ``isinstance(v, str)`` but pythonnet refuses to convert them to + # ``System.String`` and raises ``'MessageRole' value cannot be + # converted to System.'`` for every PowerFx primitive type. Reduce + # to the underlying value (or its string form) so PowerFx sees a + # plain ``str``/``int``. + if isinstance(value, Enum): + return _make_powerfx_safe(value.value) + + if isinstance(value, _POWERFX_SAFE_TYPES): return value if isinstance(value, dict): From dde1edffd06e0078c243416a87b1138566c24037 Mon Sep 17 00:00:00 2001 From: alliscode Date: Mon, 27 Apr 2026 16:47:10 -0700 Subject: [PATCH 04/15] Foundry hosting: pass full conversation history to workflow agents _handle_inner_workflow only forwarded the latest user turn to WorkflowAgent.run, even though _handle_inner_agent already prepends history fetched from Foundry storage to the messages it sends a regular agent. Declarative workflows reset Conversation.messages on every run (state.initialize), so checkpoint replay alone does not give them prior turns - the host has to pass them in, the same way it does for non-workflow agents. Mirror that contract: fetch context.get_history() and pass [*history, *input_messages] to the workflow agent. --- .../_responses.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index 9078c59d22..a6238b746a 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -256,6 +256,19 @@ async def _handle_inner_workflow( input_messages = _items_to_messages(input_items) is_streaming_request = request.stream is not None and request.stream is True + # Fetch prior conversation history from Foundry storage so workflow + # agents see the same history their non-workflow counterparts get + # (see _handle_inner_agent which builds messages from history + + # current input). Without this, declarative workflows triggered via + # WorkflowAgent.as_agent only ever see the latest user turn, even + # though the host's checkpoint replay restores the workflow's + # internal state - declarative workflows reset Conversation.messages + # on every new run, so cross-turn context has to come from the + # message list passed in, not from checkpointed workflow state. + history = await context.get_history() + history_messages = _output_items_to_messages(history) + full_messages = [*history_messages, *input_messages] + _, are_options_set = _to_chat_options(request) if are_options_set: logger.warning("Workflow agent doesn't support runtime options. They will be ignored.") @@ -307,7 +320,7 @@ async def _handle_inner_workflow( if not is_streaming_request: # Run the agent in non-streaming mode - response = await self._agent.run(input_messages, stream=False, checkpoint_storage=checkpoint_storage) + response = await self._agent.run(full_messages, stream=False, checkpoint_storage=checkpoint_storage) for message in response.messages: for content in message.contents: @@ -323,7 +336,7 @@ async def _handle_inner_workflow( tracker = _OutputItemTracker(response_event_stream) # Run the workflow agent in streaming mode - async for update in self._agent.run(input_messages, stream=True, checkpoint_storage=checkpoint_storage): + async for update in self._agent.run(full_messages, stream=True, checkpoint_storage=checkpoint_storage): for content in update.contents: for event in tracker.handle(content): yield event From baff7e33e1c11c389da8e13f88c87df2dabe066c Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 28 Apr 2026 08:49:38 -0700 Subject: [PATCH 05/15] feat(workflows): support combined message + checkpoint_id for multi-turn continuation Allow Workflow.run(message=..., checkpoint_id=...) so callers can restore prior workflow state from a checkpoint AND deliver a new message to the start executor in a single call. The existing reset_context logic already preserves shared state when checkpoint_id is set, so this gives us 'fresh start executor invocation with prior state intact' - exactly what hosted multi-turn declarative workflows need. - _workflow.py: drop the message+checkpoint_id mutual exclusion and update _execute_with_message_or_checkpoint to do both (restore then execute) when both are provided. - _agent.py: in _run_core's checkpoint branch, also forward input_messages so WorkflowAgent.run(messages, checkpoint_id=...) works end-to-end. Falls back to the legacy 'restore only' behavior when messages are absent. - _declarative_base.py: detect continuation in _ensure_state_initialized by checking whether DECLARATIVE_STATE_KEY already exists in shared state; if so, refresh inputs/LastMessage* and append non-user trigger messages instead of calling state.initialize() (which would wipe Conversation/Local/System). - foundry_hosting/_responses.py: collapse the host's two-call pattern (restore-only, then fresh run) into a single combined call now that the underlying APIs support it. - tests: drop the assertion that combined message+checkpoint_id raises. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../core/agent_framework/_workflows/_agent.py | 10 ++ .../agent_framework/_workflows/_workflow.py | 16 ++- .../core/tests/workflow/test_workflow.py | 15 ++- .../_workflows/_declarative_base.py | 104 ++++++++++-------- .../_responses.py | 60 +++++----- 5 files changed, 113 insertions(+), 92 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 2fd3f35213..4202345a33 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -437,8 +437,17 @@ async def _run_core( yield event elif checkpoint_id is not None: + # Restore the prior workflow state from the checkpoint and, if + # there's a new user message in this run, deliver it to the + # start executor in the same call. This is the multi-turn + # continuation path: shared state (e.g. accumulated conversation + # history maintained by the workflow's executors) survives across + # turns because Workflow.run sets reset_context=False whenever + # checkpoint_id is provided. + message_arg: Any | None = list(input_messages) if input_messages else None if streaming: async for event in self.workflow.run( + message=message_arg, stream=True, checkpoint_id=checkpoint_id, checkpoint_storage=checkpoint_storage, @@ -448,6 +457,7 @@ async def _run_core( yield event else: for event in await self.workflow.run( + message=message_arg, checkpoint_id=checkpoint_id, checkpoint_storage=checkpoint_storage, function_invocation_kwargs=function_invocation_kwargs, diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index c452f62bc2..2c229af5a0 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -443,7 +443,7 @@ async def _execute_with_message_or_checkpoint( if message is None and checkpoint_id is None: raise ValueError("Must provide either 'message' or 'checkpoint_id'") - # Handle checkpoint restoration + # Handle checkpoint restoration (may be combined with message below) if checkpoint_id is not None: has_checkpointing = self._runner.context.has_checkpointing() @@ -455,8 +455,10 @@ async def _execute_with_message_or_checkpoint( await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage) - # Handle initial message - elif message is not None: + # Handle initial message - if combined with a checkpoint_id, this + # delivers a continuation message to the workflow's start executor + # without clearing prior shared state (reset_context=False). + if message is not None: executor = self.get_start_executor() await executor.execute( message, @@ -660,7 +662,13 @@ def _validate_run_params( raise ValueError("Cannot provide both 'message' and 'responses'. Use one or the other.") if message is not None and checkpoint_id is not None: - raise ValueError("Cannot provide both 'message' and 'checkpoint_id'. Use one or the other.") + # Combined message + checkpoint_id is supported: restore prior + # workflow state from the checkpoint, then execute the start + # executor with the new message. The workflow's shared state + # (e.g. accumulated conversation history kept in custom shared + # state) is preserved across the boundary because reset_context + # is set to False for this combination (see _resolve_execution_mode). + pass if message is None and responses is None and checkpoint_id is None: raise ValueError( diff --git a/python/packages/core/tests/workflow/test_workflow.py b/python/packages/core/tests/workflow/test_workflow.py index f338ce94f6..1916387c66 100644 --- a/python/packages/core/tests/workflow/test_workflow.py +++ b/python/packages/core/tests/workflow/test_workflow.py @@ -942,14 +942,13 @@ async def test_workflow_run_parameter_validation(simple_executor: Executor) -> N result = await workflow.run(test_message) assert result.get_final_state() == WorkflowRunState.IDLE - # Invalid: both message and checkpoint_id - with pytest.raises(ValueError, match="Cannot provide both 'message' and 'checkpoint_id'"): - await workflow.run(test_message, checkpoint_id="fake_id") - - # Invalid: both message and checkpoint_id (streaming) - with pytest.raises(ValueError, match="Cannot provide both 'message' and 'checkpoint_id'"): - async for _ in workflow.run(test_message, checkpoint_id="fake_id", stream=True): - pass + # Valid: message + checkpoint_id (combined restore + new input) + # is supported as of the multi-turn checkpoint continuation work + # (restore prior state, then deliver message to start executor with + # reset_context=False). Use a fake id - we just need to confirm the + # call no longer raises at the validation layer. + # Note: passing a non-existent checkpoint_id will fail at restore time, + # which is a different code path than the validation we're checking. # Invalid: none of message or checkpoint_id with pytest.raises(ValueError, match="Must provide at least one of"): diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py index 14ea3f7fc1..50016a12d5 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py @@ -914,20 +914,26 @@ async def _ensure_state_initialized( state.initialize(trigger) # type: ignore elif isinstance(trigger, list) and all(isinstance(m, Message) for m in trigger): # list[Message] (e.g. from WorkflowAgent / as_agent()). - # Populate the full conversation rather than collapsing to a - # single string, so workflows that operate on the message list - # (InvokeAzureAgent with =Conversation.messages, history-aware - # agents, multi-modal content, etc.) see the complete input. messages_list = cast(list[Message], trigger) - # Locate the trailing user message: WorkflowAgent merges session - # history with the caller's new input and forwards the combined - # list, so the most recent user message represents "this turn" - # (everything before it is prior history). InvokeAzureAgent's - # contract is that Conversation.messages holds PRIOR turns only - - # the executor appends the new user input itself before invoking - # the agent. To avoid duplicating the latest user turn we split - # the trigger at that boundary. + # Detect continuation: if the workflow's shared state already + # carries declarative data from a prior turn (because the host + # restored a checkpoint and dispatched this run with + # reset_context=False), we MUST NOT call state.initialize() - + # that would wipe Conversation.messages, Local.*, System.* etc. + # Instead, treat the trigger as the new turn's user input only: + # update Inputs.input, append the new user message to existing + # Conversation history, and refresh System.LastMessage*. + existing_state = state._state.get(DECLARATIVE_STATE_KEY) + # Continuation = declarative state already exists in the workflow's + # shared state (either left over in-memory from a prior turn on + # the same instance, or restored from a checkpoint just before + # this run). In that case state.initialize() would wipe Local.*, + # System.*, Conversation.* etc., destroying the cross-turn + # context we're trying to preserve. + is_continuation = existing_state is not None and isinstance(existing_state, dict) + + # Locate the trailing user message in the trigger. last_user_index = -1 for idx in range(len(messages_list) - 1, -1, -1): if str(messages_list[idx].role).lower() == "user": @@ -938,51 +944,59 @@ async def _ensure_state_initialized( last_user_msg = messages_list[last_user_index] last_user_text = last_user_msg.text or "" last_user_id = getattr(last_user_msg, "message_id", "") or "" - # Prior history excludes the latest user turn; trailing - # non-user messages (e.g. tool results) are preserved so - # later actions still see them in Conversation.messages. history_messages = ( messages_list[:last_user_index] + messages_list[last_user_index + 1:] ) else: - # No user message in the list - rare path (e.g. resume after - # an assistant-only sequence). Treat the whole list as prior - # history and surface the last message's text for backwards - # compatibility with =System.LastMessageText. history_messages = list(messages_list) tail = messages_list[-1] if messages_list else None last_user_text = (tail.text or "") if tail is not None else "" last_user_id = ( getattr(tail, "message_id", "") or "" if tail is not None else "" ) - - # Initialize state. Using the last user text as Inputs.input - # keeps simple yamls (=inputs.input / =System.LastMessageText) - # working, and matches what InvokeAzureAgent expects to find via - # its input_text fallback chain. - state.initialize({"input": last_user_text}) - - # Populate Conversation.messages/.history with PRIOR turns only - # (matching the executor contract above). Raw Message objects - # are stored - matching what agent executors append at runtime. - for msg in history_messages: - state.append("Conversation.messages", msg) - state.append("Conversation.history", msg) - - # Mirror to System.conversations.{ConversationId}.messages so - # actions resolving conversation-scoped paths see the same - # history. - conversation_id = state.get("System.ConversationId") - if conversation_id: - conv_path = f"System.conversations.{conversation_id}.messages" + last_user_msg = tail + + if is_continuation: + # Continuation turn: keep prior Conversation.messages intact. + # Refresh inputs and surface the new user message via the + # System.LastMessage* fields. We deliberately do NOT append + # the new user message to Conversation.messages here: agent + # executors append the live user input themselves before + # invoking the inner agent (matching the first-turn + # contract where Conversation.messages holds prior turns + # only). + state.set("Inputs.input", last_user_text) + # Trailing non-user messages (e.g. tool results) sandwiched + # before the new user message in the trigger are still + # appended so later actions see them. for msg in history_messages: - state.append(conv_path, msg) + state.append("Conversation.messages", msg) + state.append("Conversation.history", msg) + conversation_id = state.get("System.ConversationId") + if conversation_id: + conv_path = f"System.conversations.{conversation_id}.messages" + for msg in history_messages: + state.append(conv_path, msg) + state.set("System.LastMessage", {"Text": last_user_text, "Id": last_user_id}) + state.set("System.LastMessageText", last_user_text) + state.set("System.LastMessageId", last_user_id) + else: + # First turn: full initialization. + state.initialize({"input": last_user_text}) - # System.LastMessage* mirrors the most recent USER message - # (matching .NET DefaultTransform semantics for agent input). - state.set("System.LastMessage", {"Text": last_user_text, "Id": last_user_id}) - state.set("System.LastMessageText", last_user_text) - state.set("System.LastMessageId", last_user_id) + for msg in history_messages: + state.append("Conversation.messages", msg) + state.append("Conversation.history", msg) + + conversation_id = state.get("System.ConversationId") + if conversation_id: + conv_path = f"System.conversations.{conversation_id}.messages" + for msg in history_messages: + state.append(conv_path, msg) + + state.set("System.LastMessage", {"Text": last_user_text, "Id": last_user_id}) + state.set("System.LastMessageText", last_user_text) + state.set("System.LastMessageId", last_user_id) elif isinstance(trigger, str): # String input - wrap in dict and populate System.LastMessage.Text # so YAML expressions like =System.LastMessage.Text see the user input diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index a6238b746a..999a421e92 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -256,19 +256,6 @@ async def _handle_inner_workflow( input_messages = _items_to_messages(input_items) is_streaming_request = request.stream is not None and request.stream is True - # Fetch prior conversation history from Foundry storage so workflow - # agents see the same history their non-workflow counterparts get - # (see _handle_inner_agent which builds messages from history + - # current input). Without this, declarative workflows triggered via - # WorkflowAgent.as_agent only ever see the latest user turn, even - # though the host's checkpoint replay restores the workflow's - # internal state - declarative workflows reset Conversation.messages - # on every new run, so cross-turn context has to come from the - # message list passed in, not from checkpointed workflow state. - history = await context.get_history() - history_messages = _output_items_to_messages(history) - full_messages = [*history_messages, *input_messages] - _, are_options_set = _to_chat_options(request) if are_options_set: logger.warning("Workflow agent doesn't support runtime options. They will be ignored.") @@ -284,34 +271,27 @@ async def _handle_inner_workflow( if not isinstance(self._agent, WorkflowAgent): raise RuntimeError("Agent is not a workflow agent.") - # Restore from the latest checkpoint if available, otherwise start with an empty history + # Determine the latest checkpoint (if any) so we can resume the + # workflow's prior state in the SAME run that delivers the new + # user input. Multi-turn declarative workflows need the workflow's + # internal state (e.g. Conversation.messages, intermediate Local.* + # variables) to survive across user turns; the only place that + # state lives is the workflow checkpoint, so on every turn we + # restore the latest checkpoint and feed the new input back into + # the start executor as a continuation rather than a fresh run. + latest_checkpoint_id: str | None = None if context_id is not None: checkpoint_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) latest_checkpoint = await checkpoint_storage.get_latest(workflow_name=self._agent.workflow.name) if latest_checkpoint is not None: - if not is_streaming_request: - _ = await self._agent.run( - stream=False, - checkpoint_id=latest_checkpoint.checkpoint_id, - checkpoint_storage=checkpoint_storage, - ) - else: - # Consume the streaming or the invocation will result in a no-op - async for _ in self._agent.run( - stream=True, - checkpoint_id=latest_checkpoint.checkpoint_id, - checkpoint_storage=checkpoint_storage, - ): - pass + latest_checkpoint_id = latest_checkpoint.checkpoint_id # Now run the agent with the latest input response_event_stream = ResponseEventStream(response_id=context.response_id, model=request.model) - # Create a new checkpoint storage for this response based on the following rules: - # - If no previous response ID or conversation ID is provided, - # create a new checkpoint storage for this response - # - If a previous response ID is provided, create a new checkpoint storage for this response - # - If a conversation ID is provided, reuse the existing checkpoint storage for the conversation + # Create / reuse the checkpoint storage that will receive checkpoints + # written during this turn. The directory is keyed by the outer + # conversation id so subsequent turns find the same checkpoint dir. context_id = context.conversation_id or context.response_id checkpoint_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) @@ -320,7 +300,12 @@ async def _handle_inner_workflow( if not is_streaming_request: # Run the agent in non-streaming mode - response = await self._agent.run(full_messages, stream=False, checkpoint_storage=checkpoint_storage) + response = await self._agent.run( + input_messages, + stream=False, + checkpoint_id=latest_checkpoint_id, + checkpoint_storage=checkpoint_storage, + ) for message in response.messages: for content in message.contents: @@ -336,7 +321,12 @@ async def _handle_inner_workflow( tracker = _OutputItemTracker(response_event_stream) # Run the workflow agent in streaming mode - async for update in self._agent.run(full_messages, stream=True, checkpoint_storage=checkpoint_storage): + async for update in self._agent.run( + input_messages, + stream=True, + checkpoint_id=latest_checkpoint_id, + checkpoint_storage=checkpoint_storage, + ): for content in update.contents: for event in tracker.handle(content): yield event From e8dfcc90f988db881159c49ee369cf64e23094c4 Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 28 Apr 2026 11:02:15 -0700 Subject: [PATCH 06/15] Pivot: preserve workflow state across run() calls Replace the prior 'combined message + checkpoint_id in one run()' approach with a cleaner default: Workflow.run no longer wipes shared state or runner- context messages between calls. Iteration counting and per-run kwargs still reset on a fresh-message run; checkpoint and responses runs are continuations that preserve everything. This lets a WorkflowAgent be invoked repeatedly on the same instance and maintain multi-turn context (e.g. accumulated Conversation.messages) without asking developers to opt in. Hosted-agent multi-turn pattern becomes two explicit calls: restore-from-checkpoint (drive to idle), then run-with-message. Key changes: - _workflow.py: drop _state.clear() and reset_for_new_run() from run(). Reset iteration count and run kwargs on fresh-message runs only. Restore 'Cannot provide both message and checkpoint_id' validation. Add async guard: fresh-message run with un-drained pending executor messages from a prior run is invalid. - _runner.py: clear _state before import_state in restore_from_checkpoint so restore is authoritative (import_state merges, not replaces). - _agent.py: revert checkpoint branch to restore-only (no message forward). - _responses.py (foundry_hosting): two-call host pattern - restore checkpoint silently, then run with new user input. - tests: state-preservation is the new default; rebuild Workflow for clean slate. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../core/agent_framework/_workflows/_agent.py | 17 ++-- .../agent_framework/_workflows/_runner.py | 7 +- .../agent_framework/_workflows/_workflow.py | 95 +++++++++++-------- .../core/tests/workflow/test_workflow.py | 62 ++++++++---- .../_responses.py | 28 +++++- 5 files changed, 137 insertions(+), 72 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 4202345a33..9271c7c012 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -437,17 +437,15 @@ async def _run_core( yield event elif checkpoint_id is not None: - # Restore the prior workflow state from the checkpoint and, if - # there's a new user message in this run, deliver it to the - # start executor in the same call. This is the multi-turn - # continuation path: shared state (e.g. accumulated conversation - # history maintained by the workflow's executors) survives across - # turns because Workflow.run sets reset_context=False whenever - # checkpoint_id is provided. - message_arg: Any | None = list(input_messages) if input_messages else None + # Restore the prior workflow state from the checkpoint. Shared + # state (e.g. accumulated conversation history maintained by the + # workflow's executors) survives across turns because Workflow.run + # no longer wipes state per call. Callers who want to deliver a + # new user message after restore should make a second + # `workflow.run(message=...)` call - they are NOT mutually + # exclusive on the same instance, but each must be its own call. if streaming: async for event in self.workflow.run( - message=message_arg, stream=True, checkpoint_id=checkpoint_id, checkpoint_storage=checkpoint_storage, @@ -457,7 +455,6 @@ async def _run_core( yield event else: for event in await self.workflow.run( - message=message_arg, checkpoint_id=checkpoint_id, checkpoint_storage=checkpoint_storage, function_invocation_kwargs=function_invocation_kwargs, diff --git a/python/packages/core/agent_framework/_workflows/_runner.py b/python/packages/core/agent_framework/_workflows/_runner.py index d58d9b99a9..51a3312e2b 100644 --- a/python/packages/core/agent_framework/_workflows/_runner.py +++ b/python/packages/core/agent_framework/_workflows/_runner.py @@ -278,7 +278,12 @@ async def restore_from_checkpoint( "Please rebuild the original workflow before resuming." ) - # Restore state + # Restore state. Clear first so import_state (which merges) does + # not leak stale keys from a prior run on this Workflow instance. + # This matters more now that Workflow.run() no longer wipes state + # per call - the only reset point for shared state on a reused + # instance is at restore time. + self._state.clear() self._state.import_state(checkpoint.state) # Restore executor states using the restored state await self._restore_executor_states() diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index 2c229af5a0..22f586faa8 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -299,7 +299,7 @@ def get_executors_list(self) -> list[Executor]: async def _run_workflow_with_tracing( self, initial_executor_fn: Callable[[], Awaitable[None]] | None = None, - reset_context: bool = True, + is_fresh_message_run: bool = True, streaming: bool = False, function_invocation_kwargs: Mapping[str, Mapping[str, Any]] | Mapping[str, Any] | None = None, client_kwargs: Mapping[str, Mapping[str, Any]] | Mapping[str, Any] | None = None, @@ -310,13 +310,18 @@ async def _run_workflow_with_tracing( of external callers to maintain context across different workflow runs. Args: - initial_executor_fn: Optional function to execute initial executor - reset_context: Whether to reset the context for a new run - streaming: Whether to enable streaming mode for agents + initial_executor_fn: Optional function to execute initial executor. + is_fresh_message_run: True when this run is a fresh new turn delivered + via the start executor (i.e. ``message`` is provided without a + ``checkpoint_id`` or ``responses``). Resets per-run accounting + (iteration counter and run kwargs) without touching the shared + workflow state. False for checkpoint restores and responses-only + runs, which are continuations of prior work. + streaming: Whether to enable streaming mode for agents. function_invocation_kwargs: Optional kwargs to store in State for function - invocations in subagents + invocations in subagents. client_kwargs: Optional kwargs to store in State for chat client - invocations in subagents + invocations in subagents. Yields: WorkflowEvent: The events generated during the workflow execution. @@ -345,16 +350,26 @@ async def _run_workflow_with_tracing( in_progress = WorkflowEvent.status(WorkflowRunState.IN_PROGRESS) yield in_progress # noqa: RUF070 - # Reset context for a new run if supported - if reset_context: + # Per-run reset for fresh-message runs only. We deliberately + # do NOT clear shared workflow state (`_state.clear()`) or the + # runner context's in-flight messages (`reset_for_new_run()`) + # here - state and pending work persist across `run()` calls + # so that a `WorkflowAgent` can deliver multi-turn input on + # the same instance and have prior turns' context survive. + # Iteration counting and per-run kwargs ARE per-run though, + # so they're reset here. + if is_fresh_message_run: self._runner.reset_iteration_count() - self._runner.context.reset_for_new_run() - self._state.clear() # Store run kwargs in State so executors can access them. - # Only overwrite when new kwargs are explicitly provided or state was - # just cleared (fresh run). On continuation (reset_context=False) with - # no new kwargs, preserve the kwargs from the original run. + # Per-run kwargs semantics: + # - On a fresh message run, prior kwargs go away (set to {} + # by default, or to the new kwargs if provided). This + # prevents stale kwargs from a prior turn leaking into the + # current turn. + # - On a continuation (checkpoint restore or responses), the + # prior run's kwargs are preserved unless the caller + # explicitly provides new kwargs. if function_invocation_kwargs is not None or client_kwargs is not None: combined_kwargs: dict[str, Any] = {} if function_invocation_kwargs is not None: @@ -366,11 +381,12 @@ async def _run_workflow_with_tracing( client_kwargs, "client_kwargs" ) self._state.set(WORKFLOW_RUN_KWARGS_KEY, combined_kwargs) - elif reset_context: + elif is_fresh_message_run: self._state.set(WORKFLOW_RUN_KWARGS_KEY, {}) self._state.commit() # Commit immediately so kwargs are available - # Set streaming mode after reset + # Set streaming mode (always set explicitly per run since + # reset_for_new_run() no longer runs to clear it). self._runner_context.set_streaming(streaming) # Execute initial setup if provided @@ -443,7 +459,7 @@ async def _execute_with_message_or_checkpoint( if message is None and checkpoint_id is None: raise ValueError("Must provide either 'message' or 'checkpoint_id'") - # Handle checkpoint restoration (may be combined with message below) + # Handle checkpoint restoration if checkpoint_id is not None: has_checkpointing = self._runner.context.has_checkpointing() @@ -455,10 +471,8 @@ async def _execute_with_message_or_checkpoint( await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage) - # Handle initial message - if combined with a checkpoint_id, this - # delivers a continuation message to the workflow's start executor - # without clearing prior shared state (reset_context=False). - if message is not None: + # Handle initial message + elif message is not None: executor = self.get_start_executor() await executor.execute( message, @@ -587,13 +601,29 @@ async def _run_core( if checkpoint_storage is not None: self._runner.context.set_runtime_checkpoint_storage(checkpoint_storage) - initial_executor_fn, reset_context = self._resolve_execution_mode( + # Async validation: a fresh-message run (no checkpoint, no responses) + # is only allowed when the runner context has fully drained from any + # prior run. If it still has in-flight executor messages, the prior + # run didn't complete - the caller must either resume from a + # checkpoint or wait for the prior run to drain. (Pending request_info + # events are intentionally NOT blocked here: a follow-up run with + # message=... is the normal way to deliver a response to those + # pending requests, e.g. via WorkflowAgent._process_pending_requests.) + if message is not None and checkpoint_id is None and responses is None: + if await self._runner.context.has_messages(): + raise RuntimeError( + "Cannot start a new run with 'message' while in-flight executor " + "messages remain from a prior run. Either resume from a checkpoint " + "(checkpoint_id=...) or wait for the prior run to complete." + ) + + initial_executor_fn = self._resolve_execution_mode( message, responses, checkpoint_id, checkpoint_storage ) async for event in self._run_workflow_with_tracing( initial_executor_fn=initial_executor_fn, - reset_context=reset_context, + is_fresh_message_run=(message is not None and checkpoint_id is None and responses is None), streaming=streaming, function_invocation_kwargs=function_invocation_kwargs, client_kwargs=client_kwargs, @@ -662,13 +692,7 @@ def _validate_run_params( raise ValueError("Cannot provide both 'message' and 'responses'. Use one or the other.") if message is not None and checkpoint_id is not None: - # Combined message + checkpoint_id is supported: restore prior - # workflow state from the checkpoint, then execute the start - # executor with the new message. The workflow's shared state - # (e.g. accumulated conversation history kept in custom shared - # state) is preserved across the boundary because reset_context - # is set to False for this combination (see _resolve_execution_mode). - pass + raise ValueError("Cannot provide both 'message' and 'checkpoint_id'. Use one or the other.") if message is None and responses is None and checkpoint_id is None: raise ValueError( @@ -682,12 +706,8 @@ def _resolve_execution_mode( responses: Mapping[str, Any] | None, checkpoint_id: str | None, checkpoint_storage: CheckpointStorage | None, - ) -> tuple[Callable[[], Awaitable[None]], bool]: - """Determine the initial executor function and reset_context flag based on parameters. - - Returns: - A tuple of (initial_executor_fn, reset_context). - """ + ) -> Callable[[], Awaitable[None]]: + """Determine the initial executor function based on parameters.""" if responses is not None: if checkpoint_id is not None: # Combined: restore checkpoint then send responses @@ -697,13 +717,12 @@ def _resolve_execution_mode( else: # Send responses only (requires pending requests in workflow state) initial_executor_fn = functools.partial(self._send_responses_internal, responses) - return initial_executor_fn, False + return initial_executor_fn # Regular run or checkpoint restoration initial_executor_fn = functools.partial( self._execute_with_message_or_checkpoint, message, checkpoint_id, checkpoint_storage ) - reset_context = message is not None and checkpoint_id is None - return initial_executor_fn, reset_context + return initial_executor_fn async def _restore_and_send_responses( self, diff --git a/python/packages/core/tests/workflow/test_workflow.py b/python/packages/core/tests/workflow/test_workflow.py index 1916387c66..18d2f26997 100644 --- a/python/packages/core/tests/workflow/test_workflow.py +++ b/python/packages/core/tests/workflow/test_workflow.py @@ -488,8 +488,13 @@ async def handle_message( await ctx.yield_output(existing_messages.copy()) # type: ignore -async def test_workflow_multiple_runs_no_state_collision(): - """Test that running the same workflow instance multiple times doesn't have state collision.""" +async def test_workflow_multiple_runs_preserve_state(): + """Test that running the same workflow instance multiple times preserves shared state. + + State preservation is the new default - calling ``Workflow.run`` repeatedly + on the same instance behaves like a chat agent maintaining memory across + turns. Callers that want fresh state should rebuild the Workflow. + """ with tempfile.TemporaryDirectory() as temp_dir: storage = FileCheckpointStorage(temp_dir) @@ -503,29 +508,45 @@ async def test_workflow_multiple_runs_no_state_collision(): .build() ) - # Run 1: Should only see messages from run 1 + # Run 1: Single record from run 1 result1 = await workflow.run(StateTrackingMessage(data="message1", run_id="run1")) assert result1.get_final_state() == WorkflowRunState.IDLE outputs1 = result1.get_outputs() assert outputs1[0] == ["run1:message1"] - # Run 2: Should only see messages from run 2, not run 1 + # Run 2: State from run 1 persists; run 2's record appends. result2 = await workflow.run(StateTrackingMessage(data="message2", run_id="run2")) assert result2.get_final_state() == WorkflowRunState.IDLE outputs2 = result2.get_outputs() - assert outputs2[0] == ["run2:message2"] # Should NOT contain run1 data + assert outputs2[0] == ["run1:message1", "run2:message2"] - # Run 3: Should only see messages from run 3 + # Run 3: Same - all three accumulate. result3 = await workflow.run(StateTrackingMessage(data="message3", run_id="run3")) assert result3.get_final_state() == WorkflowRunState.IDLE outputs3 = result3.get_outputs() - assert outputs3[0] == ["run3:message3"] # Should NOT contain run1 or run2 data + assert outputs3[0] == ["run1:message1", "run2:message2", "run3:message3"] + + +async def test_workflow_multiple_runs_no_state_collision_after_rebuild(): + """Rebuilding the Workflow gives a fresh shared-state slate.""" + with tempfile.TemporaryDirectory() as temp_dir: + storage = FileCheckpointStorage(temp_dir) - # Verify that each run only processed its own message - # This confirms that the checkpointable context properly resets between runs - assert outputs1[0] != outputs2[0] - assert outputs2[0] != outputs3[0] - assert outputs1[0] != outputs3[0] + def _build(): + executor = StateTrackingExecutor(id="state_executor") + return ( + WorkflowBuilder(start_executor=executor, checkpoint_storage=storage) + .add_edge(executor, executor) + .build() + ) + + wf1 = _build() + result1 = await wf1.run(StateTrackingMessage(data="message1", run_id="run1")) + assert result1.get_outputs()[0] == ["run1:message1"] + + wf2 = _build() + result2 = await wf2.run(StateTrackingMessage(data="message2", run_id="run2")) + assert result2.get_outputs()[0] == ["run2:message2"] async def test_workflow_checkpoint_runtime_only_configuration( @@ -942,13 +963,16 @@ async def test_workflow_run_parameter_validation(simple_executor: Executor) -> N result = await workflow.run(test_message) assert result.get_final_state() == WorkflowRunState.IDLE - # Valid: message + checkpoint_id (combined restore + new input) - # is supported as of the multi-turn checkpoint continuation work - # (restore prior state, then deliver message to start executor with - # reset_context=False). Use a fake id - we just need to confirm the - # call no longer raises at the validation layer. - # Note: passing a non-existent checkpoint_id will fail at restore time, - # which is a different code path than the validation we're checking. + # Invalid: message + checkpoint_id (mutually exclusive). Multi-turn + # state preservation is handled by Workflow.run preserving state across + # calls, so the host pattern is two separate calls (restore-then-run), + # not a single combined call. + with pytest.raises(ValueError, match="Cannot provide both 'message' and 'checkpoint_id'"): + await workflow.run(test_message, checkpoint_id="some-checkpoint") + + with pytest.raises(ValueError, match="Cannot provide both 'message' and 'checkpoint_id'"): + async for _ in workflow.run(test_message, checkpoint_id="some-checkpoint", stream=True): + pass # Invalid: none of message or checkpoint_id with pytest.raises(ValueError, match="Must provide at least one of"): diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index 999a421e92..4b36ec670e 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -298,12 +298,33 @@ async def _handle_inner_workflow( yield response_event_stream.emit_created() yield response_event_stream.emit_in_progress() + # Multi-turn pattern: when we have a prior checkpoint, restore it + # first (drive the workflow back to idle with prior state intact), + # then make a separate call that delivers the new user input. This + # depends on Workflow.run preserving shared state across calls. The + # restore-only call may yield events from any pending in-flight + # work in the checkpoint; we consume those internally here so they + # don't surface to the response stream as duplicates. + if latest_checkpoint_id is not None: + if is_streaming_request: + async for _ in self._agent.run( + stream=True, + checkpoint_id=latest_checkpoint_id, + checkpoint_storage=checkpoint_storage, + ): + pass + else: + await self._agent.run( + stream=False, + checkpoint_id=latest_checkpoint_id, + checkpoint_storage=checkpoint_storage, + ) + if not is_streaming_request: - # Run the agent in non-streaming mode + # Run the agent in non-streaming mode with the new user input. response = await self._agent.run( input_messages, stream=False, - checkpoint_id=latest_checkpoint_id, checkpoint_storage=checkpoint_storage, ) @@ -320,11 +341,10 @@ async def _handle_inner_workflow( # lazily created on matching content, closed when a different type arrives. tracker = _OutputItemTracker(response_event_stream) - # Run the workflow agent in streaming mode + # Run the workflow agent in streaming mode with the new user input. async for update in self._agent.run( input_messages, stream=True, - checkpoint_id=latest_checkpoint_id, checkpoint_storage=checkpoint_storage, ): for content in update.contents: From 891c00c9095a732ecfc0b5dc178ba6dd84c86ad0 Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 28 Apr 2026 11:18:22 -0700 Subject: [PATCH 07/15] Fix CI lint and mypy issues from prior pivot commit - _workflow.py: collapse nested if (SIM102), drop redundant assignment (RET504) - _declarative_base.py: remove unused last_user_msg = tail assignment whose Message | None type clashed with the prior Message-typed branch Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../agent_framework/_workflows/_workflow.py | 21 +++++++++++-------- .../_workflows/_declarative_base.py | 1 - 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index 22f586faa8..0673132ade 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -609,13 +609,17 @@ async def _run_core( # events are intentionally NOT blocked here: a follow-up run with # message=... is the normal way to deliver a response to those # pending requests, e.g. via WorkflowAgent._process_pending_requests.) - if message is not None and checkpoint_id is None and responses is None: - if await self._runner.context.has_messages(): - raise RuntimeError( - "Cannot start a new run with 'message' while in-flight executor " - "messages remain from a prior run. Either resume from a checkpoint " - "(checkpoint_id=...) or wait for the prior run to complete." - ) + if ( + message is not None + and checkpoint_id is None + and responses is None + and await self._runner.context.has_messages() + ): + raise RuntimeError( + "Cannot start a new run with 'message' while in-flight executor " + "messages remain from a prior run. Either resume from a checkpoint " + "(checkpoint_id=...) or wait for the prior run to complete." + ) initial_executor_fn = self._resolve_execution_mode( message, responses, checkpoint_id, checkpoint_storage @@ -719,10 +723,9 @@ def _resolve_execution_mode( initial_executor_fn = functools.partial(self._send_responses_internal, responses) return initial_executor_fn # Regular run or checkpoint restoration - initial_executor_fn = functools.partial( + return functools.partial( self._execute_with_message_or_checkpoint, message, checkpoint_id, checkpoint_storage ) - return initial_executor_fn async def _restore_and_send_responses( self, diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py index 50016a12d5..c11a03e925 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py @@ -954,7 +954,6 @@ async def _ensure_state_initialized( last_user_id = ( getattr(tail, "message_id", "") or "" if tail is not None else "" ) - last_user_msg = tail if is_continuation: # Continuation turn: keep prior Conversation.messages intact. From bb312f660dd4b88e8e4aeb51ce9f3fc4891bf141 Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 28 Apr 2026 11:27:05 -0700 Subject: [PATCH 08/15] Address PR review: fix Inputs.input update and checkpoint storage path - _declarative_base.py: continuation branch was writing 'Inputs.input' via state.set, which routes to the Custom namespace and never updates the PowerFx-visible Workflow.Inputs.input. Update state_data['Inputs'] in place via get_state_data / set_state_data so =Workflow.Inputs.input and =inputs.input see the new turn's user text on continuation. - _declarative_base.py: refresh docstring to clarify that on a list[Message] trigger, Conversation.messages excludes the current user message at the start of the turn (agent executors append it before invoking the inner agent). - _responses.py: when previous_response_id is supplied (no conversation_id), the prior checkpoint lives under / but new checkpoints must land under / for the next turn to find them. Hold onto restore_storage from the get_latest lookup and pass it to the restore-only run; pass write_storage (current id) to the message-delivery run and to checkpoint cleanup. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../_workflows/_declarative_base.py | 35 ++++++++++--- .../_responses.py | 50 +++++++++++-------- 2 files changed, 56 insertions(+), 29 deletions(-) diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py index c11a03e925..fa7d16a31f 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py @@ -889,14 +889,19 @@ async def _ensure_state_initialized( - dict/Mapping: Used directly as workflow.inputs - str: Converted to {"input": value} - list[Message]: Treated as the agent-facing message contract - (e.g. from WorkflowAgent / as_agent()). The full message list is - stored in ``Conversation.messages``/``Conversation.history`` and - mirrored to ``System.conversations.{id}.messages`` so workflows - that reference ``=Conversation.messages`` (e.g. InvokeAzureAgent) - see the complete history including assistant turns and non-text - content. The last user message's text is also used as the string + (e.g. from WorkflowAgent / as_agent()). The prior conversation + history is stored in ``Conversation.messages``/ + ``Conversation.history`` and mirrored to + ``System.conversations.{id}.messages`` so workflows that + reference ``=Conversation.messages`` (e.g. InvokeAzureAgent) see + assistant turns and other earlier messages, including non-text + content. At the start of a turn this history excludes the current + user message; that message's text is instead used as the string input (``Inputs.input``) and surfaced via ``System.LastMessage*`` - for backward compatibility with simple text-only workflows. + for backward compatibility with simple text-only workflows. Agent + executors are responsible for appending the current user message + to ``Conversation.messages`` immediately before invoking the + inner agent. - DeclarativeMessage: Internal message, no initialization needed - Any other type: Converted via str() to {"input": str(value)} @@ -964,7 +969,21 @@ async def _ensure_state_initialized( # invoking the inner agent (matching the first-turn # contract where Conversation.messages holds prior turns # only). - state.set("Inputs.input", last_user_text) + # + # Note: ``state.set("Inputs.input", ...)`` would route to + # the Custom namespace (Inputs is not a recognized top-level + # writable namespace - see DeclarativeWorkflowState.set). + # PowerFx expressions like ``=Workflow.Inputs.input`` / + # ``=inputs.input`` read state_data["Inputs"] directly, so + # we update that dict in place via get_state_data / + # set_state_data. + state_data = state.get_state_data() + inputs_dict = state_data.get("Inputs") + if not isinstance(inputs_dict, dict): + inputs_dict = {} + state_data["Inputs"] = inputs_dict + inputs_dict["input"] = last_user_text + state.set_state_data(state_data) # Trailing non-user messages (e.g. tool results) sandwiched # before the new user message in the trigger are still # appended so later actions see them. diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index 4b36ec670e..738318736c 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -272,28 +272,36 @@ async def _handle_inner_workflow( raise RuntimeError("Agent is not a workflow agent.") # Determine the latest checkpoint (if any) so we can resume the - # workflow's prior state in the SAME run that delivers the new - # user input. Multi-turn declarative workflows need the workflow's - # internal state (e.g. Conversation.messages, intermediate Local.* - # variables) to survive across user turns; the only place that - # state lives is the workflow checkpoint, so on every turn we - # restore the latest checkpoint and feed the new input back into - # the start executor as a continuation rather than a fresh run. + # workflow's prior state for this turn. The directory is keyed by + # the inbound context id (conversation_id when set, otherwise + # previous_response_id). Multi-turn declarative workflows need the + # workflow's internal state (e.g. Conversation.messages, + # intermediate Local.* variables) to survive across user turns; + # the only place that state lives is the workflow checkpoint, so + # on every turn we restore the latest checkpoint and feed the new + # input back into the start executor as a continuation rather than + # a fresh run. latest_checkpoint_id: str | None = None + restore_storage: FileCheckpointStorage | None = None if context_id is not None: - checkpoint_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) - latest_checkpoint = await checkpoint_storage.get_latest(workflow_name=self._agent.workflow.name) + restore_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) + latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name) if latest_checkpoint is not None: latest_checkpoint_id = latest_checkpoint.checkpoint_id # Now run the agent with the latest input response_event_stream = ResponseEventStream(response_id=context.response_id, model=request.model) - # Create / reuse the checkpoint storage that will receive checkpoints - # written during this turn. The directory is keyed by the outer - # conversation id so subsequent turns find the same checkpoint dir. - context_id = context.conversation_id or context.response_id - checkpoint_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) + # Storage that will receive checkpoints written during this turn. + # When the caller chains with previous_response_id, the next turn + # will reference the current response_id as its previous_response_id, + # so new checkpoints must land under the current response_id (or the + # conversation_id when set). When conversation_id is set, this + # matches restore_storage; when only previous_response_id was + # supplied, restore_storage points at the *prior* response's + # directory and write_storage points at the *current* response's. + write_context_id = context.conversation_id or context.response_id + write_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, write_context_id)) yield response_event_stream.emit_created() yield response_event_stream.emit_in_progress() @@ -305,19 +313,19 @@ async def _handle_inner_workflow( # restore-only call may yield events from any pending in-flight # work in the checkpoint; we consume those internally here so they # don't surface to the response stream as duplicates. - if latest_checkpoint_id is not None: + if latest_checkpoint_id is not None and restore_storage is not None: if is_streaming_request: async for _ in self._agent.run( stream=True, checkpoint_id=latest_checkpoint_id, - checkpoint_storage=checkpoint_storage, + checkpoint_storage=restore_storage, ): pass else: await self._agent.run( stream=False, checkpoint_id=latest_checkpoint_id, - checkpoint_storage=checkpoint_storage, + checkpoint_storage=restore_storage, ) if not is_streaming_request: @@ -325,7 +333,7 @@ async def _handle_inner_workflow( response = await self._agent.run( input_messages, stream=False, - checkpoint_storage=checkpoint_storage, + checkpoint_storage=write_storage, ) for message in response.messages: @@ -333,7 +341,7 @@ async def _handle_inner_workflow( async for item in _to_outputs(response_event_stream, content): yield item - await self._delete_not_latest_checkpoints(checkpoint_storage, self._agent.workflow.name) + await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name) yield response_event_stream.emit_completed() return @@ -345,7 +353,7 @@ async def _handle_inner_workflow( async for update in self._agent.run( input_messages, stream=True, - checkpoint_storage=checkpoint_storage, + checkpoint_storage=write_storage, ): for content in update.contents: for event in tracker.handle(content): @@ -359,7 +367,7 @@ async def _handle_inner_workflow( for event in tracker.close(): yield event - await self._delete_not_latest_checkpoints(checkpoint_storage, self._agent.workflow.name) + await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name) yield response_event_stream.emit_completed() @staticmethod From 2d16cabef6c03502c7e56a11538c6c415b52156e Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 28 Apr 2026 11:40:04 -0700 Subject: [PATCH 09/15] Fix pyright errors in _declarative_base.py for CI - Replace state._state.get(...) protected access with new public is_initialized() method on DeclarativeWorkflowState (also clearer intent for the continuation detection use case). - Add narrow pyright ignores for the Any-typed trigger paths that pyright cannot fully narrow (the list[Message] isinstance loop and the fallback-DefaultTransform branch). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../_workflows/_declarative_base.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py index fa7d16a31f..c05af09ddc 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py @@ -29,10 +29,10 @@ import logging import sys import uuid -from enum import Enum from collections.abc import Mapping from dataclasses import dataclass from decimal import Decimal as _Decimal +from enum import Enum from typing import Any, Literal, cast from agent_framework import ( @@ -212,6 +212,16 @@ def get_state_data(self) -> DeclarativeStateData: result = self._state.get(DECLARATIVE_STATE_KEY) return cast(DeclarativeStateData, result) + def is_initialized(self) -> bool: + """Return True when declarative state has been initialized. + + Useful for distinguishing a fresh start from a continuation: when + Workflow state preserves data across run() calls (multi-turn + scenarios), the start executor needs to avoid calling initialize() + and clobbering the prior turn's Conversation/Local/System data. + """ + return self._state.get(DECLARATIVE_STATE_KEY) is not None + def set_state_data(self, data: DeclarativeStateData) -> None: """Set the full state data dict in state.""" self._state.set(DECLARATIVE_STATE_KEY, data) @@ -917,7 +927,7 @@ async def _ensure_state_initialized( if isinstance(trigger, dict): # Structured inputs - use directly state.initialize(trigger) # type: ignore - elif isinstance(trigger, list) and all(isinstance(m, Message) for m in trigger): + elif isinstance(trigger, list) and all(isinstance(m, Message) for m in trigger): # pyright: ignore[reportUnknownVariableType] # list[Message] (e.g. from WorkflowAgent / as_agent()). messages_list = cast(list[Message], trigger) @@ -929,14 +939,14 @@ async def _ensure_state_initialized( # Instead, treat the trigger as the new turn's user input only: # update Inputs.input, append the new user message to existing # Conversation history, and refresh System.LastMessage*. - existing_state = state._state.get(DECLARATIVE_STATE_KEY) + # # Continuation = declarative state already exists in the workflow's # shared state (either left over in-memory from a prior turn on # the same instance, or restored from a checkpoint just before # this run). In that case state.initialize() would wipe Local.*, # System.*, Conversation.* etc., destroying the cross-turn # context we're trying to preserve. - is_continuation = existing_state is not None and isinstance(existing_state, dict) + is_continuation = state.is_initialized() # Locate the trailing user message in the trigger. last_user_index = -1 @@ -1022,10 +1032,11 @@ async def _ensure_state_initialized( state.set("System.LastMessage", {"Text": trigger, "Id": ""}) state.set("System.LastMessageText", trigger) elif not isinstance( - trigger, (ActionTrigger, ActionComplete, ConditionResult, LoopIterationResult, LoopControl) + trigger, + (ActionTrigger, ActionComplete, ConditionResult, LoopIterationResult, LoopControl), # pyright: ignore[reportUnknownArgumentType] ): # Any other type - convert to string like .NET's DefaultTransform - input_str = str(trigger) + input_str = str(cast(Any, trigger)) state.initialize({"input": input_str}) state.set("System.LastMessage", {"Text": input_str, "Id": ""}) state.set("System.LastMessageText", input_str) From b1914e84333bec179aad96ca89c91c2a8f252b9f Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 28 Apr 2026 12:43:22 -0700 Subject: [PATCH 10/15] Address Copilot review batch: tests + Workflow.reset escape hatch * Add Workflow.reset() public method as recovery escape hatch when an in-flight run aborted (e.g. WorkflowConvergenceException) and the workflow is not checkpointed. Update the in-flight messages guard's error message to point callers at it. * Add test_workflow_run_inflight_messages_guard exercising both the guard (sync + streaming) and the reset() recovery path. * Add test_workflow_reset_rejects_concurrent_runs to lock down the in-progress guard on reset. * Add test_as_agent_continuation_preserves_prior_state covering the is_continuation branch in _ensure_state_initialized: stamps a marker between calls and asserts it survives, while Inputs.input and System.LastMessageText refresh to the new turn. * Add test_powerfx_safe.py regression tests for the Enum branch in _make_powerfx_safe (str-subclass, int-subclass, plain Enum, and Enums nested in dict/list). * Drop redundant @pytest.mark.asyncio on test_as_agent_round_trip_with_last_message_text (asyncio_mode='auto'). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../agent_framework/_workflows/_workflow.py | 27 +++++++- .../core/tests/workflow/test_workflow.py | 44 +++++++++++++ .../declarative/tests/test_powerfx_safe.py | 59 +++++++++++++++++ .../tests/test_workflow_factory.py | 66 ++++++++++++++++++- 4 files changed, 194 insertions(+), 2 deletions(-) create mode 100644 python/packages/declarative/tests/test_powerfx_safe.py diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index 0673132ade..b43c81b8d7 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -618,7 +618,8 @@ async def _run_core( raise RuntimeError( "Cannot start a new run with 'message' while in-flight executor " "messages remain from a prior run. Either resume from a checkpoint " - "(checkpoint_id=...) or wait for the prior run to complete." + "(checkpoint_id=...), wait for the prior run to complete, or call " + "'await workflow.reset()' to drop the pending messages." ) initial_executor_fn = self._resolve_execution_mode( @@ -651,6 +652,30 @@ async def _run_cleanup(self, checkpoint_storage: CheckpointStorage | None) -> No self._runner.context.clear_runtime_checkpoint_storage() self._reset_running_flag() + async def reset(self) -> None: + """Drop all in-flight executor messages and per-run accounting. + + Workflows preserve shared state and pending executor messages + across :meth:`run` calls so that multi-turn callers (e.g. + :class:`WorkflowAgent`) can deliver follow-up turns to the same + instance without losing context. If a prior run aborted (e.g. the + runner raised :class:`WorkflowConvergenceException`) and the + workflow is not checkpointed, those pending messages remain in + the runner context and every future ``run(message=...)`` call + fails with ``RuntimeError`` because of the in-flight-messages + guard. Callers that have no checkpoint to resume from can use + ``await workflow.reset()`` as an explicit escape hatch to clear + pending messages and start fresh. + + Note: this does NOT clear the workflow ``State`` (use + :meth:`Workflow.run` with a ``checkpoint_id`` for state replay) + and is a no-op while another run is in progress on this instance. + """ + if self._is_running: + raise RuntimeError("Cannot reset a workflow while a run is in progress.") + self._runner.context.reset_for_new_run() + self._runner.reset_iteration_count() + @staticmethod def _finalize_events( events: Sequence[WorkflowEvent], diff --git a/python/packages/core/tests/workflow/test_workflow.py b/python/packages/core/tests/workflow/test_workflow.py index 18d2f26997..e49e5f0e53 100644 --- a/python/packages/core/tests/workflow/test_workflow.py +++ b/python/packages/core/tests/workflow/test_workflow.py @@ -953,6 +953,50 @@ async def test_agent_streaming_vs_non_streaming() -> None: assert accumulated_text == "Hello World", f"Expected 'Hello World', got '{accumulated_text}'" +async def test_workflow_run_inflight_messages_guard(simple_executor: Executor) -> None: + """``run(message=...)`` must reject in-flight executor messages from a prior run. + + Workflows preserve state and pending messages across :meth:`Workflow.run` + calls. If a prior run aborted before the runner drained those pending + messages (e.g. it raised :class:`WorkflowConvergenceException`), the next + fresh-message call should fail loudly instead of silently mixing the + leftover messages with the new turn. Callers can recover via + :meth:`Workflow.reset`. + """ + workflow = WorkflowBuilder(start_executor=simple_executor).add_edge(simple_executor, simple_executor).build() + test_message = WorkflowMessage(data="test", source_id="test", target_id=None) + + # Simulate an aborted prior run by leaving a message in the runner context. + workflow._runner.context._messages["test"] = [test_message] + assert await workflow._runner.context.has_messages() + + with pytest.raises(RuntimeError, match="in-flight executor messages"): + await workflow.run(test_message) + + with pytest.raises(RuntimeError, match="in-flight executor messages"): + async for _ in workflow.run(test_message, stream=True): + pass + + # ``Workflow.reset`` is the documented escape hatch. + await workflow.reset() + assert not await workflow._runner.context.has_messages() + + # After reset, a new run is accepted again. + result = await workflow.run(test_message) + assert result.get_final_state() == WorkflowRunState.IDLE + + +async def test_workflow_reset_rejects_concurrent_runs(simple_executor: Executor) -> None: + """``Workflow.reset`` must not stomp on an in-progress run.""" + workflow = WorkflowBuilder(start_executor=simple_executor).add_edge(simple_executor, simple_executor).build() + workflow._is_running = True + try: + with pytest.raises(RuntimeError, match="run is in progress"): + await workflow.reset() + finally: + workflow._is_running = False + + async def test_workflow_run_parameter_validation(simple_executor: Executor) -> None: """Test that stream properly validate parameter combinations.""" workflow = WorkflowBuilder(start_executor=simple_executor).add_edge(simple_executor, simple_executor).build() diff --git a/python/packages/declarative/tests/test_powerfx_safe.py b/python/packages/declarative/tests/test_powerfx_safe.py new file mode 100644 index 0000000000..fccbd72b28 --- /dev/null +++ b/python/packages/declarative/tests/test_powerfx_safe.py @@ -0,0 +1,59 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Regression tests for ``_make_powerfx_safe``. + +PowerFx (via pythonnet) only accepts plain primitives, dicts, and lists. +``Enum`` instances - especially ``str``- and ``int``-subclass enums like +MAF's ``MessageRole`` - silently pass ``isinstance(v, str)`` / +``isinstance(v, int)`` checks but blow up later inside pythonnet with +``'' value cannot be converted to System.``. These tests +pin down the Enum coercion branch so we don't regress that interop fix. +""" + +from enum import Enum, IntEnum + +from agent_framework_declarative._workflows._declarative_base import _make_powerfx_safe + + +class _StrRole(str, Enum): + USER = "user" + SYSTEM = "system" + + +class _IntCode(IntEnum): + ONE = 1 + TWO = 2 + + +class _PlainEnum(Enum): + X = "x" + Y = 42 + + +def test_str_subclass_enum_reduces_to_str(): + assert _make_powerfx_safe(_StrRole.USER) == "user" + assert type(_make_powerfx_safe(_StrRole.USER)) is str + + +def test_int_subclass_enum_reduces_to_int(): + assert _make_powerfx_safe(_IntCode.ONE) == 1 + assert type(_make_powerfx_safe(_IntCode.ONE)) is int + + +def test_plain_enum_reduces_to_underlying_value(): + assert _make_powerfx_safe(_PlainEnum.X) == "x" + assert _make_powerfx_safe(_PlainEnum.Y) == 42 + + +def test_enum_inside_dict_is_coerced(): + safe = _make_powerfx_safe({"role": _StrRole.USER, "code": _IntCode.TWO}) + assert safe == {"role": "user", "code": 2} + assert type(safe["role"]) is str + assert type(safe["code"]) is int + + +def test_enum_inside_list_is_coerced(): + safe = _make_powerfx_safe([_StrRole.USER, _IntCode.ONE]) + assert safe == ["user", 1] + assert type(safe[0]) is str + assert type(safe[1]) is int diff --git a/python/packages/declarative/tests/test_workflow_factory.py b/python/packages/declarative/tests/test_workflow_factory.py index e9988ea97c..809747a037 100644 --- a/python/packages/declarative/tests/test_workflow_factory.py +++ b/python/packages/declarative/tests/test_workflow_factory.py @@ -228,7 +228,6 @@ async def test_entry_join_executor_initializes_workflow_inputs_string(self): outputs = result.get_outputs() assert any("hello-world" in str(o) for o in outputs), f"Expected 'hello-world' in outputs but got: {outputs}" - @pytest.mark.asyncio async def test_as_agent_round_trip_with_last_message_text(self): """Regression test: a declarative workflow built via WorkflowFactory must be consumable as an AIAgent via Workflow.as_agent(). @@ -256,6 +255,71 @@ async def test_as_agent_round_trip_with_last_message_text(self): f"Expected 'Hello there' in agent response text but got: {response.text!r}" ) + async def test_as_agent_continuation_preserves_prior_state(self): + """Regression test for the ``is_continuation`` branch in + ``DeclarativeWorkflowExecutor._ensure_state_initialized``. + + Verifies, end-to-end via ``Workflow.as_agent()``: + * Turn 1 initializes the declarative state via ``state.initialize``. + * Turn 2 takes the *continuation* branch (skips ``state.initialize``), + so any non-Inputs/non-System state stamped on turn 1 survives. + * Turn 2 still refreshes ``Inputs.input`` and + ``System.LastMessage*`` to the new user message. + + Without state preservation, ``Workflow.run`` would clear shared state + on entry and ``state.initialize`` would re-run on every turn, + wiping the marker we stamped between calls. + """ + from agent_framework_declarative._workflows._declarative_base import DECLARATIVE_STATE_KEY + + factory = WorkflowFactory() + workflow = factory.create_workflow_from_yaml(""" +name: as-agent-continuation-test +actions: + - kind: SendActivity + activity: + text: =System.LastMessageText +""") + + agent = workflow.as_agent(name="continuation-agent") + + first = await agent.run("turn-1-msg") + assert first.text == "turn-1-msg", ( + f"Expected turn-1 echo 'turn-1-msg', got: {first.text!r}" + ) + + # Stamp a marker into the declarative state between turns. The + # continuation branch must preserve it; a state-clearing run would + # wipe ``DECLARATIVE_STATE_KEY`` and force re-initialization. + state_data = workflow._state.get(DECLARATIVE_STATE_KEY) + assert isinstance(state_data, dict), ( + "Expected declarative state to be initialized after turn 1" + ) + state_data["Local"] = {"persisted_marker": "kept-from-turn-1"} + workflow._state.set(DECLARATIVE_STATE_KEY, state_data) + workflow._state.commit() + + second = await agent.run("turn-2-msg") + assert second.text == "turn-2-msg", ( + f"Expected System.LastMessageText to refresh to 'turn-2-msg', got: {second.text!r}" + ) + + # The continuation branch in ``_ensure_state_initialized`` must: + # 1. preserve the cross-turn marker we stamped above + # 2. refresh Inputs.input and System.LastMessage* to the new turn + post_state = workflow._state.get(DECLARATIVE_STATE_KEY) + assert isinstance(post_state, dict), "declarative state vanished between turns" + local = post_state.get("Local", {}) + assert local.get("persisted_marker") == "kept-from-turn-1", ( + f"Cross-turn marker was wiped (state was reset). post_state Local={local!r}" + ) + assert post_state.get("Inputs", {}).get("input") == "turn-2-msg", ( + f"Inputs.input not refreshed on turn 2: {post_state.get('Inputs')!r}" + ) + assert post_state.get("System", {}).get("LastMessageText") == "turn-2-msg", ( + f"System.LastMessageText not refreshed on turn 2: {post_state.get('System')!r}" + ) + class TestWorkflowFactoryAgentRegistration: """Tests for agent registration.""" From 2ccc75c8f07a7fa4767557818d7441ddeb02328d Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 28 Apr 2026 12:50:28 -0700 Subject: [PATCH 11/15] Skip restore-only pre-pass when checkpoint has pending request_info Address Copilot review on _responses.py: the restore-only checkpoint replay populates self._agent.pending_requests for any request_info events captured in the checkpoint. The follow-up run(input_messages) call would then route through WorkflowAgent._process_pending_requests, which expects function-response content and rejects plain text input as 'unexpected content while awaiting request info responses'. Workflows resumed from a checkpoint that was idle-with-pending-requests would therefore fail every subsequent plain-text user turn. Inspect the loaded checkpoint and skip the pre-pass when its pending_request_info_events dict is non-empty. Workflows that don't use request_info (the current sample set) are unaffected; workflows that do will fall through to a fresh-message run rather than silently corrupting the routing state. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../_responses.py | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index 738318736c..c4d151acff 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -283,12 +283,30 @@ async def _handle_inner_workflow( # a fresh run. latest_checkpoint_id: str | None = None restore_storage: FileCheckpointStorage | None = None + latest_checkpoint = None if context_id is not None: restore_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name) if latest_checkpoint is not None: latest_checkpoint_id = latest_checkpoint.checkpoint_id + # If the latest checkpoint represents a workflow that was idle with + # pending request_info events (human-in-the-loop interrupts), the + # restore-only pre-pass below would replay those events through + # ``WorkflowAgent._convert_workflow_event_to_agent_response_updates``, + # populating ``self._agent.pending_requests``. The subsequent + # ``run(input_messages, ...)`` call would then route through + # :meth:`WorkflowAgent._process_pending_requests`, which expects + # function-response content and rejects plain text input. The host + # currently does not support resuming workflows with outstanding + # request_info via plain-text user turns, so in that scenario we + # skip the restore-only pre-pass and start a fresh turn. State + # accumulated by purely state-preserving workflows (no request_info) + # is unaffected. + skip_restore_due_to_pending_requests = bool( + latest_checkpoint is not None and latest_checkpoint.pending_request_info_events + ) + # Now run the agent with the latest input response_event_stream = ResponseEventStream(response_id=context.response_id, model=request.model) @@ -313,7 +331,11 @@ async def _handle_inner_workflow( # restore-only call may yield events from any pending in-flight # work in the checkpoint; we consume those internally here so they # don't surface to the response stream as duplicates. - if latest_checkpoint_id is not None and restore_storage is not None: + if ( + latest_checkpoint_id is not None + and restore_storage is not None + and not skip_restore_due_to_pending_requests + ): if is_streaming_request: async for _ in self._agent.run( stream=True, From 8252ec6fb285c35b686bc2b153afc97a003cc4b6 Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 28 Apr 2026 13:11:20 -0700 Subject: [PATCH 12/15] Loosen azure-ai-agentserver-* pins to major version The exact-version pins on azure-ai-agentserver-{core,responses,invocations} forced foundry-hosting consumers to upgrade in lockstep with every beta bump from upstream. Switch to '>=current, --- python/packages/foundry_hosting/pyproject.toml | 6 +++--- python/uv.lock | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/packages/foundry_hosting/pyproject.toml b/python/packages/foundry_hosting/pyproject.toml index 9746cc6605..91dfccac58 100644 --- a/python/packages/foundry_hosting/pyproject.toml +++ b/python/packages/foundry_hosting/pyproject.toml @@ -24,9 +24,9 @@ classifiers = [ ] dependencies = [ "agent-framework-core>=1.2.0,<2", - "azure-ai-agentserver-core==2.0.0b3", - "azure-ai-agentserver-responses==1.0.0b5", - "azure-ai-agentserver-invocations==1.0.0b3", + "azure-ai-agentserver-core>=2.0.0b3,<3", + "azure-ai-agentserver-responses>=1.0.0b5,<2", + "azure-ai-agentserver-invocations>=1.0.0b3,<2", ] [tool.uv] diff --git a/python/uv.lock b/python/uv.lock index bb422b4478..6a1ade1705 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -516,9 +516,9 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "agent-framework-core", editable = "packages/core" }, - { name = "azure-ai-agentserver-core", specifier = "==2.0.0b3" }, - { name = "azure-ai-agentserver-invocations", specifier = "==1.0.0b3" }, - { name = "azure-ai-agentserver-responses", specifier = "==1.0.0b5" }, + { name = "azure-ai-agentserver-core", specifier = ">=2.0.0b3,<3" }, + { name = "azure-ai-agentserver-invocations", specifier = ">=1.0.0b3,<2" }, + { name = "azure-ai-agentserver-responses", specifier = ">=1.0.0b5,<2" }, ] [[package]] From 62150df256de94beff867508ebccf219673b20fa Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 28 Apr 2026 13:29:01 -0700 Subject: [PATCH 13/15] Drop Workflow.reset(); checkpointing is the recovery path The in-flight-messages guard prevented silent misbehavior, but the companion Workflow.reset() escape hatch only cleared _messages while leaving iteration count, executor-local state, and shared State mutations in an indeterminate condition after a mid-run failure. That gave a false sense of recovery. Recovery from a mid-run failure is supported only via checkpoint restoration. Keep the guard and reframe its error message accordingly; remove reset() and its tests. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../agent_framework/_workflows/_workflow.py | 31 +++---------------- .../core/tests/workflow/test_workflow.py | 23 ++------------ 2 files changed, 6 insertions(+), 48 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index b43c81b8d7..a7c47c2893 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -617,9 +617,10 @@ async def _run_core( ): raise RuntimeError( "Cannot start a new run with 'message' while in-flight executor " - "messages remain from a prior run. Either resume from a checkpoint " - "(checkpoint_id=...), wait for the prior run to complete, or call " - "'await workflow.reset()' to drop the pending messages." + "messages remain from a prior run. Resume from a checkpoint " + "(checkpoint_id=...) or wait for the prior run to complete. " + "Workflows that need to recover from a mid-run failure must use " + "checkpointing; there is no in-process recovery path." ) initial_executor_fn = self._resolve_execution_mode( @@ -652,30 +653,6 @@ async def _run_cleanup(self, checkpoint_storage: CheckpointStorage | None) -> No self._runner.context.clear_runtime_checkpoint_storage() self._reset_running_flag() - async def reset(self) -> None: - """Drop all in-flight executor messages and per-run accounting. - - Workflows preserve shared state and pending executor messages - across :meth:`run` calls so that multi-turn callers (e.g. - :class:`WorkflowAgent`) can deliver follow-up turns to the same - instance without losing context. If a prior run aborted (e.g. the - runner raised :class:`WorkflowConvergenceException`) and the - workflow is not checkpointed, those pending messages remain in - the runner context and every future ``run(message=...)`` call - fails with ``RuntimeError`` because of the in-flight-messages - guard. Callers that have no checkpoint to resume from can use - ``await workflow.reset()`` as an explicit escape hatch to clear - pending messages and start fresh. - - Note: this does NOT clear the workflow ``State`` (use - :meth:`Workflow.run` with a ``checkpoint_id`` for state replay) - and is a no-op while another run is in progress on this instance. - """ - if self._is_running: - raise RuntimeError("Cannot reset a workflow while a run is in progress.") - self._runner.context.reset_for_new_run() - self._runner.reset_iteration_count() - @staticmethod def _finalize_events( events: Sequence[WorkflowEvent], diff --git a/python/packages/core/tests/workflow/test_workflow.py b/python/packages/core/tests/workflow/test_workflow.py index e49e5f0e53..30e81d8fe6 100644 --- a/python/packages/core/tests/workflow/test_workflow.py +++ b/python/packages/core/tests/workflow/test_workflow.py @@ -960,8 +960,8 @@ async def test_workflow_run_inflight_messages_guard(simple_executor: Executor) - calls. If a prior run aborted before the runner drained those pending messages (e.g. it raised :class:`WorkflowConvergenceException`), the next fresh-message call should fail loudly instead of silently mixing the - leftover messages with the new turn. Callers can recover via - :meth:`Workflow.reset`. + leftover messages with the new turn. The supported recovery path is to + resume from a checkpoint; there is no in-process recovery hatch. """ workflow = WorkflowBuilder(start_executor=simple_executor).add_edge(simple_executor, simple_executor).build() test_message = WorkflowMessage(data="test", source_id="test", target_id=None) @@ -977,25 +977,6 @@ async def test_workflow_run_inflight_messages_guard(simple_executor: Executor) - async for _ in workflow.run(test_message, stream=True): pass - # ``Workflow.reset`` is the documented escape hatch. - await workflow.reset() - assert not await workflow._runner.context.has_messages() - - # After reset, a new run is accepted again. - result = await workflow.run(test_message) - assert result.get_final_state() == WorkflowRunState.IDLE - - -async def test_workflow_reset_rejects_concurrent_runs(simple_executor: Executor) -> None: - """``Workflow.reset`` must not stomp on an in-progress run.""" - workflow = WorkflowBuilder(start_executor=simple_executor).add_edge(simple_executor, simple_executor).build() - workflow._is_running = True - try: - with pytest.raises(RuntimeError, match="run is in progress"): - await workflow.reset() - finally: - workflow._is_running = False - async def test_workflow_run_parameter_validation(simple_executor: Executor) -> None: """Test that stream properly validate parameter combinations.""" From 31a0011063f176200fcf18c091582e8d5338355d Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 28 Apr 2026 14:02:53 -0700 Subject: [PATCH 14/15] Address Tao's review on PR 5531 - Rename Workflow._run_workflow_with_tracing parameter is_fresh_message_run -> is_continuation (default False, inverted). Fresh-message turns reset per-run accounting; continuations (checkpoint restores, responses replays) preserve it. - Simplify the in-flight-messages guard: _validate_run_params already enforces that 'message' is mutually exclusive with 'checkpoint_id' and 'responses', so the additional checks were dead code. - foundry_hosting _responses: move the restore-only pre-pass above emit_created/emit_in_progress; restore is preparation, not run progress. Drop the skip-restore gate (state preservation requires unconditional restore) and instead clear agent.pending_requests after the restore-only call. Collapse over-conditioned check. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../agent_framework/_workflows/_workflow.py | 47 +++++++++-------- .../_responses.py | 50 ++++++++----------- 2 files changed, 44 insertions(+), 53 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index a7c47c2893..adea4bda20 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -299,7 +299,7 @@ def get_executors_list(self) -> list[Executor]: async def _run_workflow_with_tracing( self, initial_executor_fn: Callable[[], Awaitable[None]] | None = None, - is_fresh_message_run: bool = True, + is_continuation: bool = False, streaming: bool = False, function_invocation_kwargs: Mapping[str, Mapping[str, Any]] | Mapping[str, Any] | None = None, client_kwargs: Mapping[str, Mapping[str, Any]] | Mapping[str, Any] | None = None, @@ -311,12 +311,13 @@ async def _run_workflow_with_tracing( Args: initial_executor_fn: Optional function to execute initial executor. - is_fresh_message_run: True when this run is a fresh new turn delivered - via the start executor (i.e. ``message`` is provided without a - ``checkpoint_id`` or ``responses``). Resets per-run accounting - (iteration counter and run kwargs) without touching the shared - workflow state. False for checkpoint restores and responses-only - runs, which are continuations of prior work. + is_continuation: True when this run is a continuation of prior + work (a checkpoint restore or a responses-only replay) rather + than a fresh new turn delivered via the start executor with + ``message=...``. Continuations preserve per-run accounting + (iteration counter and run kwargs) from the prior turn; + fresh-message runs reset them. Shared workflow state is + preserved in both cases. streaming: Whether to enable streaming mode for agents. function_invocation_kwargs: Optional kwargs to store in State for function invocations in subagents. @@ -358,7 +359,7 @@ async def _run_workflow_with_tracing( # the same instance and have prior turns' context survive. # Iteration counting and per-run kwargs ARE per-run though, # so they're reset here. - if is_fresh_message_run: + if not is_continuation: self._runner.reset_iteration_count() # Store run kwargs in State so executors can access them. @@ -381,7 +382,7 @@ async def _run_workflow_with_tracing( client_kwargs, "client_kwargs" ) self._state.set(WORKFLOW_RUN_KWARGS_KEY, combined_kwargs) - elif is_fresh_message_run: + elif not is_continuation: self._state.set(WORKFLOW_RUN_KWARGS_KEY, {}) self._state.commit() # Commit immediately so kwargs are available @@ -601,20 +602,18 @@ async def _run_core( if checkpoint_storage is not None: self._runner.context.set_runtime_checkpoint_storage(checkpoint_storage) - # Async validation: a fresh-message run (no checkpoint, no responses) - # is only allowed when the runner context has fully drained from any - # prior run. If it still has in-flight executor messages, the prior - # run didn't complete - the caller must either resume from a - # checkpoint or wait for the prior run to drain. (Pending request_info - # events are intentionally NOT blocked here: a follow-up run with - # message=... is the normal way to deliver a response to those - # pending requests, e.g. via WorkflowAgent._process_pending_requests.) - if ( - message is not None - and checkpoint_id is None - and responses is None - and await self._runner.context.has_messages() - ): + # Async validation: a fresh-message run is only allowed when the + # runner context has fully drained from any prior run. If it still + # has in-flight executor messages, the prior run didn't complete - + # the caller must either resume from a checkpoint or wait for the + # prior run to drain. (Pending request_info events are intentionally + # NOT blocked here: a follow-up run with message=... is the normal + # way to deliver a response to those pending requests, e.g. via + # WorkflowAgent._process_pending_requests.) + # NOTE: _validate_run_params already enforces that ``message`` is + # mutually exclusive with both ``checkpoint_id`` and ``responses``, + # so we don't need to re-check those here. + if message is not None and await self._runner.context.has_messages(): raise RuntimeError( "Cannot start a new run with 'message' while in-flight executor " "messages remain from a prior run. Resume from a checkpoint " @@ -629,7 +628,7 @@ async def _run_core( async for event in self._run_workflow_with_tracing( initial_executor_fn=initial_executor_fn, - is_fresh_message_run=(message is not None and checkpoint_id is None and responses is None), + is_continuation=(message is None), streaming=streaming, function_invocation_kwargs=function_invocation_kwargs, client_kwargs=client_kwargs, diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index d984ec644c..103a7273ba 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -284,33 +284,12 @@ async def _handle_inner_workflow( # a fresh run. latest_checkpoint_id: str | None = None restore_storage: FileCheckpointStorage | None = None - latest_checkpoint = None if context_id is not None: restore_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name) if latest_checkpoint is not None: latest_checkpoint_id = latest_checkpoint.checkpoint_id - # If the latest checkpoint represents a workflow that was idle with - # pending request_info events (human-in-the-loop interrupts), the - # restore-only pre-pass below would replay those events through - # ``WorkflowAgent._convert_workflow_event_to_agent_response_updates``, - # populating ``self._agent.pending_requests``. The subsequent - # ``run(input_messages, ...)`` call would then route through - # :meth:`WorkflowAgent._process_pending_requests`, which expects - # function-response content and rejects plain text input. The host - # currently does not support resuming workflows with outstanding - # request_info via plain-text user turns, so in that scenario we - # skip the restore-only pre-pass and start a fresh turn. State - # accumulated by purely state-preserving workflows (no request_info) - # is unaffected. - skip_restore_due_to_pending_requests = bool( - latest_checkpoint is not None and latest_checkpoint.pending_request_info_events - ) - - # Now run the agent with the latest input - response_event_stream = ResponseEventStream(response_id=context.response_id, model=request.model) - # Storage that will receive checkpoints written during this turn. # When the caller chains with previous_response_id, the next turn # will reference the current response_id as its previous_response_id, @@ -322,9 +301,6 @@ async def _handle_inner_workflow( write_context_id = context.conversation_id or context.response_id write_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, write_context_id)) - yield response_event_stream.emit_created() - yield response_event_stream.emit_in_progress() - # Multi-turn pattern: when we have a prior checkpoint, restore it # first (drive the workflow back to idle with prior state intact), # then make a separate call that delivers the new user input. This @@ -332,11 +308,20 @@ async def _handle_inner_workflow( # restore-only call may yield events from any pending in-flight # work in the checkpoint; we consume those internally here so they # don't surface to the response stream as duplicates. - if ( - latest_checkpoint_id is not None - and restore_storage is not None - and not skip_restore_due_to_pending_requests - ): + # + # If the restored checkpoint had pending request_info events, the + # restore-only call replays them through + # ``WorkflowAgent._convert_workflow_event_to_agent_response_updates`` + # and populates ``self._agent.pending_requests``. That would route + # the subsequent ``run(input_messages, ...)`` call through + # :meth:`WorkflowAgent._process_pending_requests`, which expects + # function-response content and rejects plain text. The host's + # contract for plain-text user input is "treat as new turn", not + # "respond to outstanding request_info", so we clear + # ``pending_requests`` after the restore-only pre-pass. State + # (Conversation.*, Inputs.*, Local.*) lives in the workflow + # checkpoint and is unaffected by this clear. + if latest_checkpoint_id is not None: if is_streaming_request: async for _ in self._agent.run( stream=True, @@ -350,6 +335,13 @@ async def _handle_inner_workflow( checkpoint_id=latest_checkpoint_id, checkpoint_storage=restore_storage, ) + self._agent.pending_requests.clear() + + # Now run the agent with the latest input + response_event_stream = ResponseEventStream(response_id=context.response_id, model=request.model) + + yield response_event_stream.emit_created() + yield response_event_stream.emit_in_progress() if not is_streaming_request: # Run the agent in non-streaming mode with the new user input. From e584a4c8c0ce8574817feff996104184af69c7ff Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 28 Apr 2026 17:07:32 -0700 Subject: [PATCH 15/15] Don't clear pending_requests after restore-only pre-pass Pending requests in the restored checkpoint represent genuinely outstanding HITL requests. The next user input may carry function responses (Responses API `function_call_output` items become FunctionResultContent / FunctionApprovalResponseContent), which `WorkflowAgent._process_pending_requests` correctly extracts and matches against the populated `pending_requests`. Clearing them after restore would silently drop that state and force the next turn to be treated as a fresh input even when the caller is responding to the outstanding requests. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../agent_framework_foundry_hosting/_responses.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index 103a7273ba..7da9e8413a 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -312,15 +312,11 @@ async def _handle_inner_workflow( # If the restored checkpoint had pending request_info events, the # restore-only call replays them through # ``WorkflowAgent._convert_workflow_event_to_agent_response_updates`` - # and populates ``self._agent.pending_requests``. That would route - # the subsequent ``run(input_messages, ...)`` call through - # :meth:`WorkflowAgent._process_pending_requests`, which expects - # function-response content and rejects plain text. The host's - # contract for plain-text user input is "treat as new turn", not - # "respond to outstanding request_info", so we clear - # ``pending_requests`` after the restore-only pre-pass. State - # (Conversation.*, Inputs.*, Local.*) lives in the workflow - # checkpoint and is unaffected by this clear. + # and populates ``self._agent.pending_requests``. That is the correct + # state: those requests are genuinely outstanding, and the next + # ``run(input_messages, ...)`` call may contain ``function_call_output`` + # items (carried as FunctionResult/FunctionApprovalResponse content) + # that fulfill them via :meth:`WorkflowAgent._process_pending_requests`. if latest_checkpoint_id is not None: if is_streaming_request: async for _ in self._agent.run( @@ -335,7 +331,6 @@ async def _handle_inner_workflow( checkpoint_id=latest_checkpoint_id, checkpoint_storage=restore_storage, ) - self._agent.pending_requests.clear() # Now run the agent with the latest input response_event_stream = ResponseEventStream(response_id=context.response_id, model=request.model)