diff --git a/packages/uipath-agent-framework/pyproject.toml b/packages/uipath-agent-framework/pyproject.toml index 9c73f40..2e8b4cb 100644 --- a/packages/uipath-agent-framework/pyproject.toml +++ b/packages/uipath-agent-framework/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-agent-framework" -version = "0.0.2" +version = "0.0.3" description = "Python SDK that enables developers to build and deploy Microsoft Agent Framework agents to the UiPath Cloud Platform" readme = "README.md" requires-python = ">=3.11" diff --git a/packages/uipath-agent-framework/samples/README.md b/packages/uipath-agent-framework/samples/README.md index b0ed3f9..0fde9c0 100644 --- a/packages/uipath-agent-framework/samples/README.md +++ b/packages/uipath-agent-framework/samples/README.md @@ -8,3 +8,6 @@ Sample agents built with [Agent Framework](https://github.com/microsoft/agent-fr |--------|-------------| | [quickstart-agent](./quickstart-agent/) | Single agent with tool calling: fetches live weather data for any location | | [multi-agent](./multi-agent/) | Multi-agent coordinator: delegates research and code execution to specialist sub-agents via `as_tool()` | +| [group-chat](./group-chat/) | Group chat orchestration: researcher, critic, and writer discuss a topic with an orchestrator picking speakers | +| [concurrent](./concurrent/) | Concurrent orchestration: sentiment, topic extraction, and summarization agents analyze text in parallel | +| [handoff](./handoff/) | Handoff orchestration: customer support agents transfer control to specialists with explicit routing rules | diff --git a/packages/uipath-agent-framework/samples/concurrent/README.md b/packages/uipath-agent-framework/samples/concurrent/README.md new file mode 100644 index 0000000..6cd5502 --- /dev/null +++ b/packages/uipath-agent-framework/samples/concurrent/README.md @@ -0,0 +1,33 @@ +# Concurrent + +Three specialist agents analyze the same input text in parallel: sentiment analysis, topic extraction, and summarization. Results are aggregated automatically. + +## Agent Graph + +```mermaid +flowchart TB + __start__(__start__) + concurrent_analysis(concurrent_analysis) + __end__(__end__) + __start__ --> |input|concurrent_analysis + concurrent_analysis --> |output|__end__ +``` + +Internally, the concurrent orchestration fans out to: +- **sentiment** — analyzes positive/negative/neutral sentiment +- **topic_extractor** — identifies topics, entities, and themes +- **summarizer** — writes a concise summary + +All three run simultaneously, results are collected and returned together. + +## Run + +``` +uipath run agent '{"messages": [{"contentParts": [{"data": {"inline": "Apple announced a new Vision Pro headset today, priced at $3,499. The mixed-reality device features a custom M4 chip and will be available in 10 countries. Analysts are divided on whether the high price point will limit adoption."}}], "role": "user"}]}' +``` + +## Debug + +``` +uipath dev web +``` diff --git a/packages/uipath-agent-framework/samples/concurrent/agent.mermaid b/packages/uipath-agent-framework/samples/concurrent/agent.mermaid new file mode 100644 index 0000000..8a47219 --- /dev/null +++ b/packages/uipath-agent-framework/samples/concurrent/agent.mermaid @@ -0,0 +1,16 @@ +flowchart TB + __start__(__start__) + __end__(__end__) + dispatcher(dispatcher) + sentiment(sentiment) + topic_extractor(topic_extractor) + summarizer(summarizer) + aggregator(aggregator) + __start__ --> |input|dispatcher + dispatcher --> sentiment + dispatcher --> topic_extractor + dispatcher --> summarizer + sentiment --> aggregator + topic_extractor --> aggregator + summarizer --> aggregator + aggregator --> |output|__end__ diff --git a/packages/uipath-agent-framework/samples/concurrent/agent_framework.json b/packages/uipath-agent-framework/samples/concurrent/agent_framework.json new file mode 100644 index 0000000..61974ae --- /dev/null +++ b/packages/uipath-agent-framework/samples/concurrent/agent_framework.json @@ -0,0 +1,5 @@ +{ + "agents": { + "agent": "main.py:agent" + } +} diff --git a/packages/uipath-agent-framework/samples/concurrent/main.py b/packages/uipath-agent-framework/samples/concurrent/main.py new file mode 100644 index 0000000..8f45c29 --- /dev/null +++ b/packages/uipath-agent-framework/samples/concurrent/main.py @@ -0,0 +1,40 @@ +from agent_framework.orchestrations import ConcurrentBuilder + +from uipath_agent_framework.chat import UiPathOpenAIChatClient + +client = UiPathOpenAIChatClient(model="gpt-5-mini-2025-08-07") + +sentiment_agent = client.as_agent( + name="sentiment", + description="Analyzes text sentiment.", + instructions=( + "You are a sentiment analyst. Analyze the sentiment of the given text. " + "Return a short assessment: positive/negative/neutral with a confidence " + "percentage and a one-sentence explanation." + ), +) + +topic_agent = client.as_agent( + name="topic_extractor", + description="Extracts main topics and entities from text.", + instructions=( + "You are a topic extraction specialist. Identify the main topics, " + "entities (people, organizations, places), and key themes in the " + "given text. Return a structured list." + ), +) + +summary_agent = client.as_agent( + name="summarizer", + description="Writes concise summaries.", + instructions=( + "You are a summarization expert. Write a concise one-paragraph " + "summary of the given text, capturing the key points." + ), +) + +workflow = ConcurrentBuilder( + participants=[sentiment_agent, topic_agent, summary_agent], +).build() + +agent = workflow.as_agent(name="concurrent_analysis") diff --git a/packages/uipath-agent-framework/samples/concurrent/pyproject.toml b/packages/uipath-agent-framework/samples/concurrent/pyproject.toml new file mode 100644 index 0000000..a3252cf --- /dev/null +++ b/packages/uipath-agent-framework/samples/concurrent/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "concurrent" +version = "0.0.1" +description = "Concurrent: multiple agents analyze the same input in parallel" +authors = [{ name = "John Doe" }] +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "uipath", + "uipath-agent-framework", + "agent-framework-core>=1.0.0b260212", + "agent-framework-orchestrations>=1.0.0b260212", +] + +[dependency-groups] +dev = [ + "uipath-dev", +] + +[tool.uv] +prerelease = "allow" + +[tool.uv.sources] +uipath-dev = { path = "../../../../../uipath-dev-python", editable = true } +uipath-agent-framework = { path = "../../", editable = true } diff --git a/packages/uipath-agent-framework/samples/concurrent/uipath.json b/packages/uipath-agent-framework/samples/concurrent/uipath.json new file mode 100644 index 0000000..7969b8f --- /dev/null +++ b/packages/uipath-agent-framework/samples/concurrent/uipath.json @@ -0,0 +1,14 @@ +{ + "$schema": "https://cloud.uipath.com/draft/2024-12/uipath", + "runtimeOptions": { + "isConversational": true + }, + "packOptions": { + "fileExtensionsIncluded": [], + "filesIncluded": [], + "filesExcluded": [], + "directoriesExcluded": [], + "includeUvLock": true + }, + "functions": {} +} diff --git a/packages/uipath-agent-framework/samples/group-chat/README.md b/packages/uipath-agent-framework/samples/group-chat/README.md new file mode 100644 index 0000000..ff56094 --- /dev/null +++ b/packages/uipath-agent-framework/samples/group-chat/README.md @@ -0,0 +1,32 @@ +# Group Chat + +A group discussion where multiple agents (researcher, critic, writer) take turns in a shared conversation. An orchestrator agent selects who speaks next based on the discussion flow. + +## Agent Graph + +```mermaid +flowchart TB + __start__(__start__) + group_chat(group_chat) + __end__(__end__) + __start__ --> |input|group_chat + group_chat --> |output|__end__ +``` + +Internally, the group chat orchestration manages: +- **researcher** — finds facts via Wikipedia +- **critic** — challenges claims and asks probing questions +- **writer** — synthesizes the discussion into clear prose +- **orchestrator** — picks the next speaker each round + +## Run + +``` +uipath run agent '{"messages": [{"contentParts": [{"data": {"inline": "What are the environmental impacts of lithium mining?"}}], "role": "user"}]}' +``` + +## Debug + +``` +uipath dev web +``` diff --git a/packages/uipath-agent-framework/samples/group-chat/agent_framework.json b/packages/uipath-agent-framework/samples/group-chat/agent_framework.json new file mode 100644 index 0000000..61974ae --- /dev/null +++ b/packages/uipath-agent-framework/samples/group-chat/agent_framework.json @@ -0,0 +1,5 @@ +{ + "agents": { + "agent": "main.py:agent" + } +} diff --git a/packages/uipath-agent-framework/samples/group-chat/main.py b/packages/uipath-agent-framework/samples/group-chat/main.py new file mode 100644 index 0000000..8913685 --- /dev/null +++ b/packages/uipath-agent-framework/samples/group-chat/main.py @@ -0,0 +1,82 @@ +import httpx +from agent_framework.orchestrations import GroupChatBuilder + +from uipath_agent_framework.chat import UiPathOpenAIChatClient + + +def search_wikipedia(query: str) -> str: + """Search Wikipedia for a topic and return a summary. + + Args: + query: The search query, e.g. "Python programming language" + + Returns: + A summary of the Wikipedia article, or an error message. + """ + try: + resp = httpx.get( + "https://en.wikipedia.org/api/rest_v1/page/summary/" + + query.replace(" ", "_"), + headers={"User-Agent": "UiPathGroupChat/1.0"}, + timeout=10, + follow_redirects=True, + ) + resp.raise_for_status() + data = resp.json() + return data.get("extract", "No summary available.") + except Exception as e: + return f"Wikipedia search failed for '{query}': {e}" + + +client = UiPathOpenAIChatClient(model="gpt-5-mini-2025-08-07") + +researcher = client.as_agent( + name="researcher", + description="Expert at finding facts and data using Wikipedia.", + instructions=( + "You are a research specialist. Use the search_wikipedia tool " + "to find factual information. Provide concise, well-sourced responses." + ), + tools=[search_wikipedia], +) + +critic = client.as_agent( + name="critic", + description="Challenges assumptions and evaluates claims critically.", + instructions=( + "You are a critical thinker. Evaluate the claims made by other " + "participants. Point out gaps, biases, or missing context. " + "Ask probing questions to deepen the discussion." + ), +) + +writer = client.as_agent( + name="writer", + description="Synthesizes group discussion into clear, structured prose.", + instructions=( + "You are a skilled writer. Synthesize the group discussion into " + "a clear, well-organized summary. Incorporate the researcher's " + "facts and address the critic's concerns." + ), +) + +orchestrator = client.as_agent( + name="orchestrator", + description="Coordinates the group discussion by selecting the next speaker.", + instructions=( + "You coordinate a team of researcher, critic, and writer. " + "Select the next speaker based on the conversation flow:\n" + "- Pick 'researcher' when facts or data are needed.\n" + "- Pick 'critic' to challenge or evaluate claims.\n" + "- Pick 'writer' to synthesize when enough discussion has happened.\n" + "Respond with ONLY the agent name, nothing else." + ), +) + +workflow = GroupChatBuilder( + participants=[researcher, critic, writer], + orchestrator_agent=orchestrator, + max_rounds=6, +).build() + +agent = workflow.as_agent(name="group_chat") diff --git a/packages/uipath-agent-framework/samples/group-chat/pyproject.toml b/packages/uipath-agent-framework/samples/group-chat/pyproject.toml new file mode 100644 index 0000000..c61a2cd --- /dev/null +++ b/packages/uipath-agent-framework/samples/group-chat/pyproject.toml @@ -0,0 +1,21 @@ +[project] +name = "group-chat" +version = "0.0.1" +description = "Group chat: multiple agents discuss a topic with an orchestrator picking speakers" +authors = [{ name = "John Doe" }] +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "uipath", + "uipath-agent-framework", + "agent-framework-core>=1.0.0b260212", + "agent-framework-orchestrations>=1.0.0b260212", +] + +[dependency-groups] +dev = [ + "uipath-dev", +] + +[tool.uv] +prerelease = "allow" diff --git a/packages/uipath-agent-framework/samples/group-chat/uipath.json b/packages/uipath-agent-framework/samples/group-chat/uipath.json new file mode 100644 index 0000000..7969b8f --- /dev/null +++ b/packages/uipath-agent-framework/samples/group-chat/uipath.json @@ -0,0 +1,14 @@ +{ + "$schema": "https://cloud.uipath.com/draft/2024-12/uipath", + "runtimeOptions": { + "isConversational": true + }, + "packOptions": { + "fileExtensionsIncluded": [], + "filesIncluded": [], + "filesExcluded": [], + "directoriesExcluded": [], + "includeUvLock": true + }, + "functions": {} +} diff --git a/packages/uipath-agent-framework/samples/handoff/README.md b/packages/uipath-agent-framework/samples/handoff/README.md new file mode 100644 index 0000000..ebc7a8c --- /dev/null +++ b/packages/uipath-agent-framework/samples/handoff/README.md @@ -0,0 +1,34 @@ +# Handoff + +A customer support system where agents transfer control to each other based on the customer's needs. The triage agent routes to specialists, and specialists can re-route if needed. + +## Agent Graph + +```mermaid +flowchart TB + __start__(__start__) + customer_support(customer_support) + __end__(__end__) + __start__ --> |input|customer_support + customer_support --> |output|__end__ +``` + +Internally, the handoff orchestration manages routing between: +- **triage** — first contact, routes to the right specialist +- **billing_agent** — invoices, payments, charges +- **tech_agent** — bugs, errors, technical setup +- **returns_agent** — product returns and refunds + +Agents hand off control using auto-generated `handoff_to_` tools. When an agent hands off, it steps out and the target agent takes over the conversation directly. + +## Run + +``` +uipath run agent '{"messages": [{"contentParts": [{"data": {"inline": "Hi, I was charged twice for my last order"}}], "role": "user"}]}' +``` + +## Debug + +``` +uipath dev web +``` diff --git a/packages/uipath-agent-framework/samples/handoff/agent_framework.json b/packages/uipath-agent-framework/samples/handoff/agent_framework.json new file mode 100644 index 0000000..61974ae --- /dev/null +++ b/packages/uipath-agent-framework/samples/handoff/agent_framework.json @@ -0,0 +1,5 @@ +{ + "agents": { + "agent": "main.py:agent" + } +} diff --git a/packages/uipath-agent-framework/samples/handoff/main.py b/packages/uipath-agent-framework/samples/handoff/main.py new file mode 100644 index 0000000..6386a6c --- /dev/null +++ b/packages/uipath-agent-framework/samples/handoff/main.py @@ -0,0 +1,64 @@ +from agent_framework.orchestrations import HandoffBuilder + +from uipath_agent_framework.chat import UiPathOpenAIChatClient + +client = UiPathOpenAIChatClient(model="gpt-5-mini-2025-08-07") + +triage = client.as_agent( + name="triage", + description="Routes customer requests to the right specialist.", + instructions=( + "You are a customer support triage agent. Greet the customer and " + "determine what they need help with, then hand off to the right agent:\n" + "- Billing issues (invoices, payments, charges) -> billing_agent\n" + "- Technical problems (bugs, errors, setup) -> tech_agent\n" + "- Returns and refunds -> returns_agent\n\n" + "Ask clarifying questions if needed before handing off." + ), +) + +billing = client.as_agent( + name="billing_agent", + description="Handles billing, invoices, and payment issues.", + instructions=( + "You are a billing specialist. Help customers with invoices, " + "payment issues, and charges. If the issue turns out to be " + "technical, hand off to tech_agent. If the customer wants to " + "start a return, hand off to returns_agent." + ), +) + +tech = client.as_agent( + name="tech_agent", + description="Handles technical support and troubleshooting.", + instructions=( + "You are a technical support specialist. Help customers " + "troubleshoot issues step by step. If the issue turns out to be " + "billing-related, hand off to billing_agent." + ), +) + +returns = client.as_agent( + name="returns_agent", + description="Handles product returns and refund requests.", + instructions=( + "You are a returns specialist. Help customers process returns " + "and initiate refunds. If they have questions about the refund " + "amount or payment, hand off to billing_agent." + ), +) + +workflow = ( + HandoffBuilder( + name="customer_support", + participants=[triage, billing, tech, returns], + ) + .with_start_agent(triage) + .add_handoff(triage, [billing, tech, returns]) + .add_handoff(billing, [tech, returns, triage]) + .add_handoff(tech, [billing, triage]) + .add_handoff(returns, [billing, triage]) + .build() +) + +agent = workflow.as_agent(name="customer_support") diff --git a/packages/uipath-agent-framework/samples/handoff/pyproject.toml b/packages/uipath-agent-framework/samples/handoff/pyproject.toml new file mode 100644 index 0000000..81aa0f6 --- /dev/null +++ b/packages/uipath-agent-framework/samples/handoff/pyproject.toml @@ -0,0 +1,21 @@ +[project] +name = "handoff" +version = "0.0.1" +description = "Handoff: agents transfer control to specialists with explicit routing rules" +authors = [{ name = "John Doe" }] +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "uipath", + "uipath-agent-framework", + "agent-framework-core>=1.0.0b260212", + "agent-framework-orchestrations>=1.0.0b260212", +] + +[dependency-groups] +dev = [ + "uipath-dev", +] + +[tool.uv] +prerelease = "allow" diff --git a/packages/uipath-agent-framework/samples/handoff/uipath.json b/packages/uipath-agent-framework/samples/handoff/uipath.json new file mode 100644 index 0000000..7969b8f --- /dev/null +++ b/packages/uipath-agent-framework/samples/handoff/uipath.json @@ -0,0 +1,14 @@ +{ + "$schema": "https://cloud.uipath.com/draft/2024-12/uipath", + "runtimeOptions": { + "isConversational": true + }, + "packOptions": { + "fileExtensionsIncluded": [], + "filesIncluded": [], + "filesExcluded": [], + "directoriesExcluded": [], + "includeUvLock": true + }, + "functions": {} +} diff --git a/packages/uipath-agent-framework/samples/multi-agent/agent.mermaid b/packages/uipath-agent-framework/samples/multi-agent/agent.mermaid new file mode 100644 index 0000000..ea16e8f --- /dev/null +++ b/packages/uipath-agent-framework/samples/multi-agent/agent.mermaid @@ -0,0 +1,18 @@ +flowchart TB + __start__(__start__) + coordinator(coordinator) + research_agent(research_agent) + research_agent_tools(tools) + code_agent(code_agent) + code_agent_tools(tools) + __end__(__end__) + research_agent --> research_agent_tools + research_agent_tools --> research_agent + coordinator --> |research_agent|research_agent + research_agent --> coordinator + code_agent --> code_agent_tools + code_agent_tools --> code_agent + coordinator --> |code_agent|code_agent + code_agent --> coordinator + __start__ --> |input|coordinator + coordinator --> |output|__end__ diff --git a/packages/uipath-agent-framework/samples/multi-agent/main.py b/packages/uipath-agent-framework/samples/multi-agent/main.py index 789c1b2..94587a0 100644 --- a/packages/uipath-agent-framework/samples/multi-agent/main.py +++ b/packages/uipath-agent-framework/samples/multi-agent/main.py @@ -50,7 +50,7 @@ def run_python(code: str) -> str: sys.stdout = old_stdout -client = UiPathOpenAIChatClient(model="gpt-4o-mini") +client = UiPathOpenAIChatClient(model="gpt-5-mini-2025-08-07") research_agent = client.as_agent( name="research_agent", diff --git a/packages/uipath-agent-framework/samples/multi-agent/pyproject.toml b/packages/uipath-agent-framework/samples/multi-agent/pyproject.toml index 0b7e731..d7e3ff9 100644 --- a/packages/uipath-agent-framework/samples/multi-agent/pyproject.toml +++ b/packages/uipath-agent-framework/samples/multi-agent/pyproject.toml @@ -9,7 +9,6 @@ dependencies = [ "uipath", "uipath-agent-framework", "agent-framework-core>=1.0.0b260212", - "httpx>=0.27.0", ] [dependency-groups] @@ -19,3 +18,7 @@ dev = [ [tool.uv] prerelease = "allow" + +[tool.uv.sources] +uipath-agent-framework = { path = "../../", editable = true } +uipath-dev = { path = "../../../../../uipath-dev-python", editable = true } diff --git a/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/factory.py b/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/factory.py index 8741b6b..ef4b72c 100644 --- a/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/factory.py +++ b/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/factory.py @@ -45,9 +45,7 @@ def __init__( self.context = context self._config: AgentFrameworkConfig | None = None - self._agent_cache: dict[str, BaseAgent] = {} self._agent_loaders: dict[str, AgentFrameworkAgentLoader] = {} - self._agent_lock = asyncio.Lock() self._session_store: SqliteSessionStore | None = None self._session_store_lock = asyncio.Lock() @@ -165,27 +163,16 @@ async def _load_agent(self, entrypoint: str) -> BaseAgent: ) from e async def _resolve_agent(self, entrypoint: str) -> BaseAgent: + """Load a fresh agent instance for the given entrypoint. + + Agents are NOT cached — each runtime gets its own instance. + Agent Framework agents (especially WorkflowAgents) hold internal + mutable state (e.g. Workflow._is_running) that prevents concurrent + executions on the same instance. Since the factory creates multiple + runtimes in parallel (one per request), sharing an agent instance + would cause "Workflow is already running" errors. """ - Resolve an agent from configuration. - Results are cached for reuse across multiple runtime instances. - - Args: - entrypoint: Name of the agent to resolve - - Returns: - The loaded BaseAgent ready for execution - - Raises: - UiPathAgentFrameworkRuntimeError: If resolution fails - """ - async with self._agent_lock: - if entrypoint in self._agent_cache: - return self._agent_cache[entrypoint] - - loaded_agent = await self._load_agent(entrypoint) - self._agent_cache[entrypoint] = loaded_agent - - return loaded_agent + return await self._load_agent(entrypoint) def discover_entrypoints(self) -> list[str]: """ @@ -256,7 +243,6 @@ async def dispose(self) -> None: await loader.cleanup() self._agent_loaders.clear() - self._agent_cache.clear() if self._session_store: await self._session_store.dispose() diff --git a/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/runtime.py b/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/runtime.py index 570f0aa..4197930 100644 --- a/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/runtime.py +++ b/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/runtime.py @@ -1,7 +1,6 @@ """Runtime class for executing Agent Framework agents within the UiPath framework.""" import json -import logging from typing import Any, AsyncGenerator from uuid import uuid4 @@ -12,8 +11,10 @@ BaseAgent, Content, FunctionTool, + Message, + WorkflowAgent, ) -from uipath.core.serialization import serialize_defaults +from uipath.core.serialization import serialize_json from uipath.runtime import ( UiPathExecuteOptions, UiPathRuntimeResult, @@ -39,7 +40,40 @@ ) from .storage import SqliteSessionStore -logger = logging.getLogger(__name__) + +class _StreamState: + """Mutable state tracker for agent streaming. + + Holds the sub-agent metadata (computed once) and the active node + state that changes as function_call / function_result events arrive. + """ + + __slots__ = ( + "root_agent", + "active_agent", + "active_tools", + "call_ids", + "agent_tool_names", + "tool_name_to_agent", + "sub_agents_with_tools", + ) + + def __init__( + self, + root_agent: str, + agent_tool_names: set[str], + tool_name_to_agent: dict[str, str], + sub_agents_with_tools: set[str], + ) -> None: + self.root_agent = root_agent + self.active_agent: str = root_agent + self.active_tools: str | None = None + # call_id → sub-agent name (content.name on function_result + # may be empty for as_tool() wrappers, so we match by call_id). + self.call_ids: dict[str, str] = {} + self.agent_tool_names = agent_tool_names + self.tool_name_to_agent = tool_name_to_agent + self.sub_agents_with_tools = sub_agents_with_tools class UiPathAgentFrameworkRuntime: @@ -58,49 +92,48 @@ def __init__( self.chat = AgentFrameworkChatMessagesMapper() self._session_store = session_store + # ------------------------------------------------------------------ + # Sub-agent introspection + # ------------------------------------------------------------------ + @staticmethod - def _build_agent_tool_names(agent: BaseAgent) -> set[str]: - """Build a set of tool names that correspond to agent-as-tools. + def _build_sub_agent_info( + agent: BaseAgent, + ) -> tuple[set[str], dict[str, str], set[str]]: + """Inspect the agent's tools once to extract all sub-agent metadata. - Inspects the agent's tools list and identifies which ones wrap sub-agents - (created via BaseAgent.as_tool()). Returns the tool names (not agent names) - so we can match them against function_call content.name during streaming. + Returns: + agent_tool_names: tool names that wrap sub-agents + tool_name_to_agent: mapping from tool name → sub-agent node name + sub_agents_with_tools: sub-agent names that own tools """ agent_tool_names: set[str] = set() + tool_name_to_agent: dict[str, str] = {} + sub_agents_with_tools: set[str] = set() + for tool in get_agent_tools(agent): inner_agent = extract_agent_from_tool(tool) - if inner_agent is not None and isinstance(tool, FunctionTool): - agent_tool_names.add(tool.name) - return agent_tool_names + if inner_agent is None or not isinstance(tool, FunctionTool): + continue - @staticmethod - def _build_tool_name_to_agent(agent: BaseAgent) -> dict[str, str]: - """Build a mapping from tool name to sub-agent node name. + agent_tool_names.add(tool.name) + inner_name = inner_agent.name or "agent" + tool_name_to_agent[tool.name] = inner_name - When a function_call uses a tool name that wraps a sub-agent, we need - to know which agent node to emit STARTED/COMPLETED for. - """ - mapping: dict[str, str] = {} - for tool in get_agent_tools(agent): - inner_agent = extract_agent_from_tool(tool) - if inner_agent is not None and isinstance(tool, FunctionTool): - agent_name = inner_agent.name or "agent" - mapping[tool.name] = agent_name - return mapping + if get_agent_tools(inner_agent): + sub_agents_with_tools.add(inner_name) - async def _load_session(self) -> AgentSession: - """Load or create an AgentSession for this runtime_id. + return agent_tool_names, tool_name_to_agent, sub_agents_with_tools - If a session store is configured, loads the persisted session state. - Otherwise creates a fresh session each time. - """ + # ------------------------------------------------------------------ + # Session helpers + # ------------------------------------------------------------------ + + async def _load_session(self) -> AgentSession: + """Load or create an AgentSession for this runtime_id.""" if self._session_store: session_data = await self._session_store.load_session(self.runtime_id) if session_data is not None: - logger.debug( - "Restoring session from store for runtime_id=%s", - self.runtime_id, - ) return AgentSession.from_dict(session_data) # type: ignore[attr-defined] return self.agent.create_session(session_id=self.runtime_id) # type: ignore[attr-defined] @@ -110,7 +143,10 @@ async def _save_session(self, session: AgentSession) -> None: if self._session_store: session_data = session.to_dict() # type: ignore[attr-defined] await self._session_store.save_session(self.runtime_id, session_data) - logger.debug("Saved session to store for runtime_id=%s", self.runtime_id) + + # ------------------------------------------------------------------ + # Execute (non-streaming) + # ------------------------------------------------------------------ async def execute( self, @@ -128,6 +164,10 @@ async def execute( except Exception as e: raise self._create_runtime_error(e) from e + # ------------------------------------------------------------------ + # Stream (main entry) + # ------------------------------------------------------------------ + async def stream( self, input: dict[str, Any] | None = None, @@ -135,202 +175,434 @@ async def stream( ) -> AsyncGenerator[UiPathRuntimeEvent, None]: """Stream agent execution events in real-time. - Emits lifecycle phase events (STARTED/COMPLETED) to drive the - uipath-dev graph visualization. Tracks the currently active - agent/tools nodes and emits proper STARTED/COMPLETED transitions - when execution moves between agents and tools. - - For multi-agent setups (via as_tool()), sub-agents get their own - STARTED/COMPLETED events with node IDs matching the graph schema. + Two streaming paths: + - WorkflowAgent: raw workflow events (executor_invoked/completed). + - Regular BaseAgent: function_call/function_result content tracking. """ try: user_input = self._prepare_input(input) session = await self._load_session() agent_name = self.agent.name or "agent" - # Pre-compute which tool names correspond to sub-agents - agent_tool_names = self._build_agent_tool_names(self.agent) - tool_name_to_agent = self._build_tool_name_to_agent(self.agent) + if isinstance(self.agent, WorkflowAgent): + async for event in self._stream_workflow( + user_input, session, agent_name + ): + yield event + else: + async for event in self._stream_agent(user_input, session, agent_name): + yield event + + except Exception as e: + raise self._create_runtime_error(e) from e + + # ------------------------------------------------------------------ + # Workflow streaming + # ------------------------------------------------------------------ + + async def _stream_workflow( + self, + user_input: str, + session: AgentSession, + agent_name: str, + ) -> AsyncGenerator[UiPathRuntimeEvent, None]: + """Stream workflow execution with real-time executor lifecycle events.""" + assert isinstance(self.agent, WorkflowAgent) + workflow = self.agent.workflow + + yield UiPathRuntimeStateEvent( + payload={}, + node_name=agent_name, + phase=UiPathRuntimeStatePhase.STARTED, + ) + + response_stream = workflow.run(message=user_input, stream=True) + + async for event in response_stream: + if event.type == "executor_invoked": + yield UiPathRuntimeStateEvent( + payload=self._serialize_event_data(event.data), + node_name=event.executor_id, + phase=UiPathRuntimeStatePhase.STARTED, + ) + elif event.type == "executor_completed": + yield UiPathRuntimeStateEvent( + payload=self._serialize_event_data(event.data), + node_name=event.executor_id, + phase=UiPathRuntimeStatePhase.COMPLETED, + ) + elif event.type == "output": + for msg_event in self._extract_workflow_messages(event.data): + yield UiPathRuntimeMessageEvent(payload=msg_event) + + yield UiPathRuntimeStateEvent( + payload={}, + node_name=agent_name, + phase=UiPathRuntimeStatePhase.COMPLETED, + ) + + for msg_event in self.chat.close_message(): + yield UiPathRuntimeMessageEvent(payload=msg_event) + + await self._save_session(session) + + final_result = await response_stream.get_final_response() + output = self._extract_workflow_output(final_result) + yield self._create_success_result(output) + + # ------------------------------------------------------------------ + # Agent streaming + # ------------------------------------------------------------------ + + async def _stream_agent( + self, + user_input: str, + session: AgentSession, + agent_name: str, + ) -> AsyncGenerator[UiPathRuntimeEvent, None]: + """Stream regular BaseAgent execution with tool/sub-agent tracking.""" + state = _StreamState(agent_name, *self._build_sub_agent_info(self.agent)) + + yield UiPathRuntimeStateEvent( + payload={}, + node_name=agent_name, + phase=UiPathRuntimeStatePhase.STARTED, + ) + + response_stream = self.agent.run(user_input, stream=True, session=session) # type: ignore[attr-defined] + async for update in response_stream: + if not isinstance(update, AgentResponseUpdate): + continue + + for content in update.contents or []: + if not isinstance(content, Content): + continue - # Emit root agent started + for event in self._process_agent_content(state, content): + yield event + + for msg in self.chat.map_streaming_content(content): + yield UiPathRuntimeMessageEvent(payload=msg) + + # Teardown: close remaining nodes + if state.active_tools: yield UiPathRuntimeStateEvent( payload={}, - node_name=agent_name, - phase=UiPathRuntimeStatePhase.STARTED, + node_name=state.active_tools, + phase=UiPathRuntimeStatePhase.COMPLETED, ) - active_agent: str | None = agent_name - active_tools: str | None = None - final_text = "" + yield UiPathRuntimeStateEvent( + payload={}, + node_name=agent_name, + phase=UiPathRuntimeStatePhase.COMPLETED, + ) - response_stream = self.agent.run(user_input, stream=True, session=session) # type: ignore[attr-defined] - async for update in response_stream: - if not isinstance(update, AgentResponseUpdate): - continue + for msg in self.chat.close_message(): + yield UiPathRuntimeMessageEvent(payload=msg) + + await self._save_session(session) + + final_response = await response_stream.get_final_response() + yield self._create_success_result(self._extract_output(final_response)) + + # ------------------------------------------------------------------ + # Agent content event handlers + # ------------------------------------------------------------------ + + def _process_agent_content( + self, s: _StreamState, content: Content + ) -> list[UiPathRuntimeStateEvent]: + """Dispatch a streaming Content to the appropriate handler.""" + if content.type == "function_call": + if not content.name: + return [] + if content.name in s.agent_tool_names: + return self._on_sub_agent_call(s, content) + return self._on_tool_call(s, content) + + if content.type == "function_result": + return self._on_function_result(s, content) + + return [] + + def _on_sub_agent_call( + self, s: _StreamState, content: Content + ) -> list[UiPathRuntimeStateEvent]: + """Handle a function_call that invokes a sub-agent via as_tool().""" + call_name = content.name or "" + sub_agent = s.tool_name_to_agent.get(call_name, call_name) + events: list[UiPathRuntimeStateEvent] = [] + + if content.call_id: + s.call_ids[content.call_id] = sub_agent + + # Close any active tools node + if s.active_tools: + events.append( + UiPathRuntimeStateEvent( + payload={}, + node_name=s.active_tools, + phase=UiPathRuntimeStatePhase.COMPLETED, + ) + ) + s.active_tools = None - # Process contents from the streaming update - contents = update.contents or [] - for content in contents: - if not isinstance(content, Content): - continue - - # Track tool/agent node state transitions - if content.type == "function_call": - # During streaming, only the first chunk carries - # content.name. Subsequent chunks are partial - # argument fragments — skip them. - if not content.name: - continue - - call_name = content.name - logger.debug( - "function_call: name=%s, call_id=%s, " - "is_agent_tool=%s, active_agent=%s", - call_name, - content.call_id, - call_name in agent_tool_names, - active_agent, - ) - - if call_name in agent_tool_names: - # This is a call to a sub-agent tool - sub_agent_name = tool_name_to_agent.get( - call_name, call_name - ) + payload = {"function_name": call_name} - # Close any active tools node first - if active_tools: - yield UiPathRuntimeStateEvent( - payload={}, - node_name=active_tools, - phase=UiPathRuntimeStatePhase.COMPLETED, - ) - active_tools = None - - # Emit STARTED for the sub-agent node - yield UiPathRuntimeStateEvent( - payload={}, - node_name=sub_agent_name, - phase=UiPathRuntimeStatePhase.STARTED, - ) - active_agent = sub_agent_name - - # Emit state event for the agent tool call - payload: dict[str, Any] = { - "function_name": call_name, - } - if isinstance(content.arguments, dict): - payload["function_args"] = serialize_defaults( - content.arguments - ) - yield UiPathRuntimeStateEvent( - payload=payload, - node_name=sub_agent_name, - metadata={"event_type": "function_call"}, - ) - else: - # Regular tool call - tools_node = f"{active_agent}_tools" - if active_tools != tools_node: - if active_tools: - yield UiPathRuntimeStateEvent( - payload={}, - node_name=active_tools, - phase=UiPathRuntimeStatePhase.COMPLETED, - ) - active_tools = tools_node - yield UiPathRuntimeStateEvent( - payload={}, - node_name=tools_node, - phase=UiPathRuntimeStatePhase.STARTED, - ) - - # Emit state event for the tool call - payload = { - "function_name": call_name, - } - if isinstance(content.arguments, dict): - payload["function_args"] = serialize_defaults( - content.arguments - ) - yield UiPathRuntimeStateEvent( - payload=payload, - node_name=tools_node, - metadata={"event_type": "function_call"}, - ) + # Start sub-agent node + events.append( + UiPathRuntimeStateEvent( + payload=payload, + node_name=sub_agent, + phase=UiPathRuntimeStatePhase.STARTED, + ) + ) + s.active_agent = sub_agent + + # Sub-agent's internal tool calls are opaque in the as_tool() + # stream — emit a synthetic STARTED on its tools node. + if sub_agent in s.sub_agents_with_tools: + tools_node = f"{sub_agent}_tools" + events.append( + UiPathRuntimeStateEvent( + payload=payload, + node_name=tools_node, + phase=UiPathRuntimeStatePhase.STARTED, + ) + ) + s.active_tools = tools_node + else: + events.append( + UiPathRuntimeStateEvent( + payload=payload, + node_name=sub_agent, + metadata={"event_type": "function_call"}, + ) + ) - elif content.type == "function_result": - result_name = content.name or "" + return events + + def _on_tool_call( + self, s: _StreamState, content: Content + ) -> list[UiPathRuntimeStateEvent]: + """Handle a regular (non-agent) function_call.""" + call_name = content.name or "" + tools_node = f"{s.active_agent}_tools" + events: list[UiPathRuntimeStateEvent] = [] + + if s.active_tools != tools_node: + if s.active_tools: + events.append( + UiPathRuntimeStateEvent( + payload={}, + node_name=s.active_tools, + phase=UiPathRuntimeStatePhase.COMPLETED, + ) + ) + s.active_tools = tools_node + events.append( + UiPathRuntimeStateEvent( + payload={}, + node_name=tools_node, + phase=UiPathRuntimeStatePhase.STARTED, + ) + ) - if result_name in agent_tool_names: - # Sub-agent completed — emit COMPLETED, re-START parent - sub_agent_name = tool_name_to_agent.get( - result_name, result_name - ) - yield UiPathRuntimeStateEvent( - payload={}, - node_name=sub_agent_name, - phase=UiPathRuntimeStatePhase.COMPLETED, - ) - active_agent = agent_name - yield UiPathRuntimeStateEvent( - payload={}, - node_name=agent_name, - phase=UiPathRuntimeStatePhase.STARTED, - ) - elif active_tools: - # Regular tool completed - yield UiPathRuntimeStateEvent( - payload={}, - node_name=active_tools, - phase=UiPathRuntimeStatePhase.COMPLETED, - ) - active_tools = None - # Re-start agent node after tool completion - if active_agent: - yield UiPathRuntimeStateEvent( - payload={}, - node_name=active_agent, - phase=UiPathRuntimeStatePhase.STARTED, - ) - - # Yield conversation message events - for msg_event in self.chat.map_streaming_content(content): - yield UiPathRuntimeMessageEvent(payload=msg_event) - - # Accumulate text from the update - if update.text: - final_text += update.text - - # Close any remaining active tools node - if active_tools: - yield UiPathRuntimeStateEvent( + events.append( + UiPathRuntimeStateEvent( + payload={"function_name": call_name}, + node_name=tools_node, + metadata={"event_type": "function_call"}, + ) + ) + return events + + def _on_function_result( + self, s: _StreamState, content: Content + ) -> list[UiPathRuntimeStateEvent]: + """Handle a function_result for either a sub-agent or regular tool.""" + call_id = content.call_id or "" + result_name = content.name or "" + events: list[UiPathRuntimeStateEvent] = [] + + # Match sub-agent by call_id first (reliable), fall back to name + matched = s.call_ids.pop(call_id, None) + if matched is None and result_name in s.agent_tool_names: + matched = s.tool_name_to_agent.get(result_name, result_name) + + result_payload = self._build_result_payload(content) + + if matched: + # Sub-agent completed — close tools, then agent, re-start root + if s.active_tools and s.active_tools == f"{matched}_tools": + events.append( + UiPathRuntimeStateEvent( + payload=result_payload, + node_name=s.active_tools, + phase=UiPathRuntimeStatePhase.COMPLETED, + ) + ) + s.active_tools = None + + events.append( + UiPathRuntimeStateEvent( + payload=result_payload, + node_name=matched, + phase=UiPathRuntimeStatePhase.COMPLETED, + ) + ) + s.active_agent = s.root_agent + events.append( + UiPathRuntimeStateEvent( payload={}, - node_name=active_tools, + node_name=s.root_agent, + phase=UiPathRuntimeStatePhase.STARTED, + ) + ) + elif s.active_tools: + # Regular tool completed + events.append( + UiPathRuntimeStateEvent( + payload=result_payload, + node_name=s.active_tools, phase=UiPathRuntimeStatePhase.COMPLETED, ) - - # Complete agent node - yield UiPathRuntimeStateEvent( - payload={}, - node_name=agent_name, - phase=UiPathRuntimeStatePhase.COMPLETED, ) + s.active_tools = None + if s.active_agent: + events.append( + UiPathRuntimeStateEvent( + payload={}, + node_name=s.active_agent, + phase=UiPathRuntimeStatePhase.STARTED, + ) + ) - # Close any open conversation message - for msg_event in self.chat.close_message(): - yield UiPathRuntimeMessageEvent(payload=msg_event) + return events - # Persist session state after streaming completes - await self._save_session(session) + # ------------------------------------------------------------------ + # Payload / serialization helpers + # ------------------------------------------------------------------ - # Get final response - final_response = await response_stream.get_final_response() - output = self._extract_output(final_response) - yield self._create_success_result(output) + @staticmethod + def _build_result_payload(content: Content) -> dict[str, Any]: + """Build a payload dict from a function_result Content.""" + payload: dict[str, Any] = {} + if content.name: + payload["function_name"] = content.name + if content.result is not None: + try: + payload["function_response"] = json.loads( + serialize_json(content.result) + ) + except Exception: + payload["function_response"] = str(content.result) + return payload - except Exception as e: - raise self._create_runtime_error(e) from e + @staticmethod + def _serialize_event_data(data: Any) -> dict[str, Any]: + """Serialize workflow event data into a JSON-safe payload.""" + if data is None: + return {} + try: + safe = json.loads(serialize_json(data)) + if isinstance(safe, dict): + return safe + return {"data": safe} + except Exception: + return {"data": str(data)} + + # ------------------------------------------------------------------ + # Workflow message / output extraction + # ------------------------------------------------------------------ + + def _extract_workflow_messages(self, data: Any) -> list[Any]: + """Extract UiPath conversation message events from workflow output data.""" + events: list[Any] = [] + contents: list[Any] = [] + + if isinstance(data, AgentResponseUpdate): + contents = list(data.contents or []) + elif isinstance(data, AgentResponse): + for message in data.messages or []: + contents.extend(message.contents or []) + elif isinstance(data, Message): + contents = list(data.contents or []) + elif isinstance(data, list): + for item in data: + events.extend(self._extract_workflow_messages(item)) + return events + + for content in contents: + if isinstance(content, Content): + events.extend(self.chat.map_streaming_content(content)) + + return events + + def _extract_workflow_output(self, result: Any) -> Any: + """Extract output from WorkflowRunResult.""" + outputs: list[Any] = [] + if hasattr(result, "get_outputs"): + outputs = result.get_outputs() + + if not outputs: + return "" + + texts: list[str] = [] + for data in outputs: + text = self._extract_text_from_data(data) + if text: + texts.append(text) + + if texts: + return "\n\n".join(texts) + + try: + return json.loads(serialize_json(outputs[-1])) + except Exception: + return str(outputs[-1]) + + @staticmethod + def _extract_text_from_data(data: Any) -> str: + """Extract text from any workflow data type.""" + if isinstance(data, (AgentResponseUpdate, AgentResponse)): + return data.text or "" + if isinstance(data, Message): + return "".join( + c.text for c in (data.contents or []) if hasattr(c, "text") and c.text + ) + if isinstance(data, str): + return data + if isinstance(data, list): + parts: list[str] = [] + for item in data: + if isinstance(item, Message): + text = "".join( + c.text + for c in (item.contents or []) + if hasattr(c, "text") and c.text + ) + if text: + parts.append(text) + elif isinstance(item, str): + parts.append(item) + elif isinstance(item, list): + for inner in item: + if isinstance(inner, Message) and inner.role == "assistant": + text = "".join( + c.text + for c in (inner.contents or []) + if hasattr(c, "text") and c.text + ) + if text: + parts.append(text) + return "\n\n".join(parts) + return "" + + # ------------------------------------------------------------------ + # Input / output / result helpers + # ------------------------------------------------------------------ def _prepare_input(self, input: dict[str, Any] | None) -> str: """Prepare input string from UiPath input dictionary.""" @@ -340,7 +612,6 @@ def _prepare_input(self, input: dict[str, Any] | None) -> str: if "messages" in input: return self.chat.map_messages_to_input(input["messages"]) - # Fallback: serialize entire input as JSON return json.dumps(input) def _extract_output(self, response: AgentResponse) -> Any: @@ -351,7 +622,7 @@ def _extract_output(self, response: AgentResponse) -> Any: def _create_success_result(self, output: Any) -> UiPathRuntimeResult: """Create result for successful completion.""" - serialized_output = serialize_defaults(output) + serialized_output = json.loads(serialize_json(output)) if not isinstance(serialized_output, dict): serialized_output = { @@ -398,6 +669,10 @@ def _create_runtime_error(self, e: Exception) -> UiPathAgentFrameworkRuntimeErro UiPathErrorCategory.USER, ) + # ------------------------------------------------------------------ + # Schema + # ------------------------------------------------------------------ + async def get_schema(self) -> UiPathRuntimeSchema: """Get schema for this Agent Framework runtime.""" entrypoints_schema = get_entrypoints_schema(self.agent) diff --git a/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/schema.py b/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/schema.py index 37eae6e..f46f02b 100644 --- a/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/schema.py +++ b/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/schema.py @@ -3,7 +3,15 @@ from collections.abc import Callable from typing import Any -from agent_framework import BaseAgent, FunctionTool +from agent_framework import ( + AgentExecutor, + BaseAgent, + Edge, + Executor, + FunctionTool, + Workflow, + WorkflowAgent, +) from uipath.runtime.schema import ( UiPathRuntimeEdge, UiPathRuntimeGraph, @@ -77,9 +85,11 @@ def _default_messages_schema() -> dict[str, Any]: def get_agent_graph(agent: BaseAgent) -> UiPathRuntimeGraph: """Extract graph structure from an Agent Framework agent. - Traverses the agent tree, inspecting tools for agent-as-tool instances - (created via BaseAgent.as_tool()). For each agent-as-tool, creates a - separate node and recursively processes its own tools. + Handles two cases: + 1. WorkflowAgent (from orchestrations): extracts the underlying Workflow's + executors and edge_groups to build a proper multi-agent graph. + 2. Regular BaseAgent: traverses the agent tree, inspecting tools for + agent-as-tool instances (created via BaseAgent.as_tool()). Args: agent: An Agent Framework BaseAgent instance @@ -87,6 +97,149 @@ def get_agent_graph(agent: BaseAgent) -> UiPathRuntimeGraph: Returns: UiPathRuntimeGraph with nodes and edges representing the agent structure """ + if isinstance(agent, WorkflowAgent): + return _build_workflow_graph(agent.workflow) + + return _build_agent_graph(agent) + + +def _build_workflow_graph(workflow: Workflow) -> UiPathRuntimeGraph: + """Build graph from a Workflow's executors and edge groups. + + Traverses the workflow structure to create nodes for each executor + and edges from the workflow's edge groups. For AgentExecutors that + wrap agents with tools, also creates tool nodes. + """ + nodes: list[UiPathRuntimeNode] = [] + edges: list[UiPathRuntimeEdge] = [] + + # Add __start__ and __end__ + nodes.append( + UiPathRuntimeNode( + id="__start__", + name="__start__", + type="__start__", + subgraph=None, + metadata=None, + ) + ) + nodes.append( + UiPathRuntimeNode( + id="__end__", + name="__end__", + type="__end__", + subgraph=None, + metadata=None, + ) + ) + + executors: dict[str, Executor] = workflow.executors + start_id: str = workflow.start_executor_id + + # Add a node for each executor + for exec_id, executor in executors.items(): + nodes.append( + UiPathRuntimeNode( + id=exec_id, + name=exec_id, + type="node", + subgraph=None, + metadata=None, + ) + ) + + # AgentExecutors wrap a BaseAgent that may have tools + if isinstance(executor, AgentExecutor): + inner_agent: BaseAgent | None = getattr(executor, "_agent", None) + if inner_agent is not None: + _add_executor_tool_nodes(exec_id, inner_agent, nodes, edges) + + # Connect __start__ → start executor + edges.append(UiPathRuntimeEdge(source="__start__", target=start_id, label="input")) + + # Process edge groups into graph edges + for edge_group in workflow.edge_groups: + group_type = type(edge_group).__name__ + if group_type == "InternalEdgeGroup": + continue + + edge: Edge + for edge in edge_group.edges: + label = edge.condition_name + edges.append( + UiPathRuntimeEdge( + source=edge.source_id, target=edge.target_id, label=label + ) + ) + + # Connect output executors → __end__ + output_executors: list[Executor] = [] + try: + output_executors = workflow.get_output_executors() + except Exception: + pass + + if output_executors: + for executor in output_executors: + edges.append( + UiPathRuntimeEdge(source=executor.id, target="__end__", label="output") + ) + else: + # Fallback: connect start executor to __end__ + edges.append( + UiPathRuntimeEdge(source=start_id, target="__end__", label="output") + ) + + return UiPathRuntimeGraph(nodes=nodes, edges=edges) + + +def _add_executor_tool_nodes( + executor_id: str, + agent: BaseAgent, + nodes: list[UiPathRuntimeNode], + edges: list[UiPathRuntimeEdge], +) -> None: + """Add tool nodes for an executor's wrapped agent's tools.""" + tools = get_agent_tools(agent) + if not tools: + return + + regular_tools = [t for t in tools if extract_agent_from_tool(t) is None] + if not regular_tools: + return + + tool_names = [_get_tool_name(t) for t in regular_tools] + tool_names = [n for n in tool_names if n] + + if tool_names: + tools_node_id = f"{executor_id}_tools" + nodes.append( + UiPathRuntimeNode( + id=tools_node_id, + name="tools", + type="tool", + subgraph=None, + metadata={ + "tool_names": tool_names, + "tool_count": len(tool_names), + }, + ) + ) + edges.append( + UiPathRuntimeEdge(source=executor_id, target=tools_node_id, label=None) + ) + edges.append( + UiPathRuntimeEdge(source=tools_node_id, target=executor_id, label=None) + ) + + +def _build_agent_graph(agent: BaseAgent) -> UiPathRuntimeGraph: + """Build graph from a regular BaseAgent with tools. + + Traverses the agent tree, inspecting tools for agent-as-tool instances + (created via BaseAgent.as_tool()). For each agent-as-tool, creates a + separate node and recursively processes its own tools. + """ nodes: list[UiPathRuntimeNode] = [] edges: list[UiPathRuntimeEdge] = [] visited: set[str] = set() @@ -238,6 +391,9 @@ def _add_agent_node( _process_tools(agent, agent_name, nodes, edges, visited) +_extract_cache: dict[int, BaseAgent | None] = {} + + def extract_agent_from_tool( tool: FunctionTool | Callable[..., Any], ) -> BaseAgent | None: @@ -245,7 +401,20 @@ def extract_agent_from_tool( The as_tool() method creates an async agent_wrapper closure that captures `self` (the BaseAgent instance). We inspect the closure cells to find it. + Results are cached by tool identity to avoid repeated introspection. """ + tool_id = id(tool) + if tool_id in _extract_cache: + return _extract_cache[tool_id] + + result = _extract_agent_from_closure(tool) + _extract_cache[tool_id] = result + return result + + +def _extract_agent_from_closure( + tool: FunctionTool | Callable[..., Any], +) -> BaseAgent | None: if not isinstance(tool, FunctionTool): return None diff --git a/packages/uipath-agent-framework/tests/test_graph.py b/packages/uipath-agent-framework/tests/test_graph.py index 9946631..976c94f 100644 --- a/packages/uipath-agent-framework/tests/test_graph.py +++ b/packages/uipath-agent-framework/tests/test_graph.py @@ -1,8 +1,16 @@ """Tests for graph building utilities.""" +from typing import Any from unittest.mock import MagicMock -from agent_framework import BaseAgent +from agent_framework import ( + AgentExecutor, + BaseAgent, + Edge, + Executor, + Workflow, + WorkflowAgent, +) from uipath_agent_framework.runtime.schema import get_agent_graph @@ -110,3 +118,391 @@ def test_no_subgraph_or_metadata_on_simple_nodes(self): assert node.subgraph is None if node.type != "tool": assert node.metadata is None + + +def _make_edge( + source_id: str, target_id: str, condition_name: str | None = None +) -> Edge: + """Create a mock Edge.""" + edge = MagicMock(spec=Edge) + edge.source_id = source_id + edge.target_id = target_id + edge.condition_name = condition_name + return edge + + +def _make_edge_group(edges: list[Edge], group_type: str = "SingleEdgeGroup"): + """Create a mock EdgeGroup with the given type name.""" + group = MagicMock() + group.edges = edges + type(group).__name__ = group_type + return group + + +def _make_executor(exec_id: str, agent: BaseAgent | None = None) -> Executor: + """Create a mock Executor (AgentExecutor if agent is provided).""" + if agent is not None: + executor = MagicMock(spec=AgentExecutor) + executor._agent = agent + else: + executor = MagicMock(spec=Executor) + executor.id = exec_id + return executor + + +def _make_workflow( + executors: dict[str, Executor], + edge_groups: list[Any], + start_executor_id: str, + output_executors: list[Executor] | None = None, +) -> Workflow: + """Create a mock Workflow.""" + workflow = MagicMock(spec=Workflow) + workflow.executors = executors + workflow.edge_groups = edge_groups + workflow.start_executor_id = start_executor_id + workflow.get_output_executors.return_value = output_executors or [] + return workflow + + +def _make_workflow_agent( + workflow: Workflow, name: str = "workflow_agent" +) -> WorkflowAgent: + """Create a mock WorkflowAgent.""" + agent = MagicMock(spec=WorkflowAgent) + agent.name = name + agent.workflow = workflow + return agent + + +class TestWorkflowGraph: + """Tests for workflow-based graph building.""" + + def test_simple_workflow_has_start_and_end(self): + """Workflow graph has __start__ and __end__ nodes.""" + executor = _make_executor("agent_a", agent=None) + workflow = _make_workflow( + executors={"agent_a": executor}, + edge_groups=[], + start_executor_id="agent_a", + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + node_types = {n.type for n in graph.nodes} + assert "__start__" in node_types + assert "__end__" in node_types + + def test_workflow_executor_nodes(self): + """Each executor becomes a node in the graph.""" + executors = { + "triage": _make_executor("triage"), + "billing": _make_executor("billing"), + "tech": _make_executor("tech"), + } + workflow = _make_workflow( + executors=executors, + edge_groups=[], + start_executor_id="triage", + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + node_ids = {n.id for n in graph.nodes} + assert "triage" in node_ids + assert "billing" in node_ids + assert "tech" in node_ids + + def test_workflow_start_edge(self): + """__start__ connects to the start executor.""" + executor = _make_executor("triage") + workflow = _make_workflow( + executors={"triage": executor}, + edge_groups=[], + start_executor_id="triage", + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + start_edge = next(e for e in graph.edges if e.source == "__start__") + assert start_edge.target == "triage" + assert start_edge.label == "input" + + def test_workflow_edges_from_edge_groups(self): + """Edge groups are translated into graph edges.""" + executors = { + "triage": _make_executor("triage"), + "billing": _make_executor("billing"), + } + edge_groups = [ + _make_edge_group([_make_edge("triage", "billing")], "SingleEdgeGroup"), + _make_edge_group([_make_edge("billing", "triage")], "SingleEdgeGroup"), + ] + workflow = _make_workflow( + executors=executors, + edge_groups=edge_groups, + start_executor_id="triage", + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + edge_pairs = [(e.source, e.target) for e in graph.edges] + assert ("triage", "billing") in edge_pairs + assert ("billing", "triage") in edge_pairs + + def test_workflow_fanout_edges(self): + """FanOutEdgeGroup creates edges to multiple targets.""" + executors = { + "triage": _make_executor("triage"), + "billing": _make_executor("billing"), + "tech": _make_executor("tech"), + } + edge_groups = [ + _make_edge_group( + [ + _make_edge("triage", "billing"), + _make_edge("triage", "tech"), + ], + "FanOutEdgeGroup", + ), + ] + workflow = _make_workflow( + executors=executors, + edge_groups=edge_groups, + start_executor_id="triage", + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + edge_pairs = [(e.source, e.target) for e in graph.edges] + assert ("triage", "billing") in edge_pairs + assert ("triage", "tech") in edge_pairs + + def test_workflow_fanin_edges(self): + """FanInEdgeGroup creates edges from multiple sources.""" + executors = { + "start": _make_executor("start"), + "a": _make_executor("a"), + "b": _make_executor("b"), + "merge": _make_executor("merge"), + } + edge_groups = [ + _make_edge_group( + [_make_edge("a", "merge"), _make_edge("b", "merge")], + "FanInEdgeGroup", + ), + ] + workflow = _make_workflow( + executors=executors, + edge_groups=edge_groups, + start_executor_id="start", + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + edge_pairs = [(e.source, e.target) for e in graph.edges] + assert ("a", "merge") in edge_pairs + assert ("b", "merge") in edge_pairs + + def test_workflow_internal_edges_skipped(self): + """InternalEdgeGroup edges are excluded from the graph.""" + executors = { + "a": _make_executor("a"), + "b": _make_executor("b"), + } + edge_groups = [ + _make_edge_group([_make_edge("a", "b")], "InternalEdgeGroup"), + ] + workflow = _make_workflow( + executors=executors, + edge_groups=edge_groups, + start_executor_id="a", + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + # Only __start__→a and a→__end__ (fallback), no a→b internal edge + edge_pairs = [(e.source, e.target) for e in graph.edges] + assert ("a", "b") not in edge_pairs + + def test_workflow_output_executors_connect_to_end(self): + """Output executors connect to __end__.""" + out_exec = _make_executor("writer") + executors = { + "researcher": _make_executor("researcher"), + "writer": out_exec, + } + workflow = _make_workflow( + executors=executors, + edge_groups=[], + start_executor_id="researcher", + output_executors=[out_exec], + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + end_edges = [e for e in graph.edges if e.target == "__end__"] + assert len(end_edges) == 1 + assert end_edges[0].source == "writer" + assert end_edges[0].label == "output" + + def test_workflow_no_output_executors_fallback(self): + """Without output executors, start executor connects to __end__.""" + executors = { + "triage": _make_executor("triage"), + "billing": _make_executor("billing"), + } + workflow = _make_workflow( + executors=executors, + edge_groups=[], + start_executor_id="triage", + output_executors=[], + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + end_edges = [e for e in graph.edges if e.target == "__end__"] + assert len(end_edges) == 1 + assert end_edges[0].source == "triage" + + def test_workflow_executor_with_tools(self): + """Agent executors with tools get tool nodes.""" + + def search_wikipedia(): + pass + + search_wikipedia.__name__ = "search_wikipedia" + + inner_agent = _make_agent(name="researcher", tools=[search_wikipedia]) + executors = { + "researcher": _make_executor("researcher", agent=inner_agent), + } + workflow = _make_workflow( + executors=executors, + edge_groups=[], + start_executor_id="researcher", + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + node_ids = {n.id for n in graph.nodes} + assert "researcher_tools" in node_ids + + tools_node = next(n for n in graph.nodes if n.id == "researcher_tools") + assert tools_node.type == "tool" + assert tools_node.metadata is not None + assert "search_wikipedia" in tools_node.metadata["tool_names"] + + edge_pairs = [(e.source, e.target) for e in graph.edges] + assert ("researcher", "researcher_tools") in edge_pairs + assert ("researcher_tools", "researcher") in edge_pairs + + def test_workflow_edge_condition_labels(self): + """Conditional edges include condition_name as label.""" + executors = { + "triage": _make_executor("triage"), + "billing": _make_executor("billing"), + } + edge_groups = [ + _make_edge_group( + [_make_edge("triage", "billing", condition_name="is_billing")], + "SwitchCaseEdgeGroup", + ), + ] + workflow = _make_workflow( + executors=executors, + edge_groups=edge_groups, + start_executor_id="triage", + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + edge = next( + e for e in graph.edges if e.source == "triage" and e.target == "billing" + ) + assert edge.label == "is_billing" + + def test_workflow_handoff_pattern(self): + """Full handoff pattern: triage routes to specialists.""" + executors = { + "triage": _make_executor("triage"), + "billing": _make_executor("billing"), + "tech": _make_executor("tech"), + "returns": _make_executor("returns"), + } + edge_groups = [ + _make_edge_group( + [ + _make_edge("triage", "billing"), + _make_edge("triage", "tech"), + _make_edge("triage", "returns"), + ], + "FanOutEdgeGroup", + ), + _make_edge_group([_make_edge("billing", "triage")], "SingleEdgeGroup"), + _make_edge_group([_make_edge("tech", "triage")], "SingleEdgeGroup"), + _make_edge_group([_make_edge("returns", "triage")], "SingleEdgeGroup"), + ] + workflow = _make_workflow( + executors=executors, + edge_groups=edge_groups, + start_executor_id="triage", + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + node_ids = {n.id for n in graph.nodes} + assert node_ids == { + "__start__", + "__end__", + "triage", + "billing", + "tech", + "returns", + } + + edge_pairs = [(e.source, e.target) for e in graph.edges] + # Start → triage + assert ("__start__", "triage") in edge_pairs + # Triage routes to all specialists + assert ("triage", "billing") in edge_pairs + assert ("triage", "tech") in edge_pairs + assert ("triage", "returns") in edge_pairs + # Specialists route back to triage + assert ("billing", "triage") in edge_pairs + assert ("tech", "triage") in edge_pairs + assert ("returns", "triage") in edge_pairs + + def test_workflow_concurrent_pattern(self): + """Full concurrent pattern: fan-out then fan-in.""" + executors = { + "sentiment": _make_executor("sentiment"), + "topics": _make_executor("topics"), + "summary": _make_executor("summary"), + "merge": _make_executor("merge"), + } + merge_exec = executors["merge"] + edge_groups = [ + _make_edge_group( + [ + _make_edge("sentiment", "merge"), + _make_edge("topics", "merge"), + _make_edge("summary", "merge"), + ], + "FanInEdgeGroup", + ), + ] + workflow = _make_workflow( + executors=executors, + edge_groups=edge_groups, + start_executor_id="sentiment", + output_executors=[merge_exec], + ) + agent = _make_workflow_agent(workflow) + graph = get_agent_graph(agent) + + edge_pairs = [(e.source, e.target) for e in graph.edges] + assert ("sentiment", "merge") in edge_pairs + assert ("topics", "merge") in edge_pairs + assert ("summary", "merge") in edge_pairs + assert ("merge", "__end__") in edge_pairs diff --git a/packages/uipath-agent-framework/tests/test_streaming.py b/packages/uipath-agent-framework/tests/test_streaming.py new file mode 100644 index 0000000..1b06168 --- /dev/null +++ b/packages/uipath-agent-framework/tests/test_streaming.py @@ -0,0 +1,464 @@ +"""Tests for streaming event pairing (STARTED/COMPLETED). + +Uses real Agent Framework agents and workflows with mocked execution +to verify that the runtime emits matched STARTED/COMPLETED state events +for every node in the graph. +""" + +import asyncio +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +from agent_framework import ( + AgentResponseUpdate, + BaseAgent, + Content, + RawAgent, + WorkflowAgent, + WorkflowBuilder, +) +from uipath.runtime import UiPathRuntimeResult +from uipath.runtime.events import ( + UiPathRuntimeStateEvent, + UiPathRuntimeStatePhase, +) + +from uipath_agent_framework.runtime.factory import UiPathAgentFrameworkRuntimeFactory +from uipath_agent_framework.runtime.runtime import UiPathAgentFrameworkRuntime + +STARTED = UiPathRuntimeStatePhase.STARTED +COMPLETED = UiPathRuntimeStatePhase.COMPLETED + + +# --------------------------------------------------------------------------- +# Async stream mock +# --------------------------------------------------------------------------- + + +class _MockAsyncStream: + """Async iterable with get_final_response() support.""" + + def __init__(self, items: list[Any], final: Any = None): + self._items = list(items) + self._final = final or MagicMock(text="done") + + def __aiter__(self): + return self + + async def __anext__(self): + if not self._items: + raise StopAsyncIteration + return self._items.pop(0) + + async def get_final_response(self): + return self._final + + +# --------------------------------------------------------------------------- +# Content helpers +# --------------------------------------------------------------------------- + + +def _update(*contents: Content) -> AgentResponseUpdate: + return AgentResponseUpdate(contents=list(contents)) + + +def _fc(name: str, call_id: str = "c1") -> Content: + return Content(type="function_call", name=name, call_id=call_id) + + +def _fr(name: str = "", call_id: str = "c1", result: Any = "ok") -> Content: + return Content(type="function_result", name=name, call_id=call_id, result=result) + + +def _text(text: str = "hi") -> Content: + return Content(type="text", text=text) + + +def _wf_event(event_type: str, executor_id: str) -> MagicMock: + evt = MagicMock() + evt.type = event_type + evt.executor_id = executor_id + evt.data = None + return evt + + +# --------------------------------------------------------------------------- +# Real tools (no LLM needed) +# --------------------------------------------------------------------------- + + +def search_wikipedia(query: str) -> str: + """Search Wikipedia for a topic.""" + return f"Result for: {query}" + + +def run_python(code: str) -> str: + """Execute a Python code snippet.""" + return f"Output: {code}" + + +def calculator(expression: str) -> str: + """Evaluate a math expression.""" + return str(eval(expression)) + + +# --------------------------------------------------------------------------- +# Agent / runtime setup helpers +# --------------------------------------------------------------------------- + +_mock_client = MagicMock() + + +def _make_runtime(agent: BaseAgent) -> UiPathAgentFrameworkRuntime: + """Create a runtime with mocked chat mapper.""" + runtime = UiPathAgentFrameworkRuntime(agent=agent) + runtime.chat = MagicMock() + runtime.chat.map_messages_to_input.return_value = "test" + runtime.chat.map_streaming_content.return_value = [] + runtime.chat.close_message.return_value = [] + return runtime + + +async def _collect_events(runtime: UiPathAgentFrameworkRuntime) -> list[Any]: + events: list[Any] = [] + async for event in runtime.stream(input=None): + events.append(event) + return events + + +# --------------------------------------------------------------------------- +# Assertion helpers +# --------------------------------------------------------------------------- + + +def _state_events(events: list[Any]) -> list[tuple[str, UiPathRuntimeStatePhase]]: + return [ + (e.node_name, e.phase) + for e in events + if isinstance(e, UiPathRuntimeStateEvent) and e.node_name is not None + ] + + +def _assert_all_completed(se: list[tuple[str, UiPathRuntimeStatePhase]]) -> None: + """Every node that was STARTED must also have a COMPLETED event.""" + started = {n for n, p in se if p == STARTED} + completed = {n for n, p in se if p == COMPLETED} + missing = started - completed + assert not missing, f"STARTED but never COMPLETED: {missing}" + + +def _assert_started_before_completed( + se: list[tuple[str, UiPathRuntimeStatePhase]], node: str +) -> None: + first_started = next( + (i for i, (n, p) in enumerate(se) if n == node and p == STARTED), None + ) + first_completed = next( + (i for i, (n, p) in enumerate(se) if n == node and p == COMPLETED), None + ) + assert first_started is not None, f"{node} never STARTED" + assert first_completed is not None, f"{node} never COMPLETED" + assert first_started < first_completed, f"{node}: COMPLETED before STARTED" + + +# =========================================================================== +# Agent streaming tests +# =========================================================================== + + +class TestAgentStreamingEvents: + """Verify STARTED/COMPLETED pairing for agent streaming.""" + + async def test_simple_agent_no_tools(self): + """Agent with no tools: root STARTED then COMPLETED.""" + agent = RawAgent(_mock_client, name="root") + agent.run = MagicMock(return_value=_MockAsyncStream([_update(_text())])) # type: ignore[method-assign] + agent.create_session = MagicMock(return_value=MagicMock()) # type: ignore[method-assign] + + runtime = _make_runtime(agent) + events = await _collect_events(runtime) + + se = _state_events(events) + _assert_all_completed(se) + _assert_started_before_completed(se, "root") + assert isinstance(events[-1], UiPathRuntimeResult) + + async def test_agent_with_regular_tools(self): + """Agent with regular tools: tools node gets STARTED/COMPLETED.""" + agent = RawAgent(_mock_client, name="researcher", tools=[search_wikipedia]) + agent.run = MagicMock( # type: ignore[method-assign] + return_value=_MockAsyncStream( + [ + _update(_fc("search_wikipedia", "c1")), + _update(_fr("search_wikipedia", "c1", "wiki result")), + _update(_text("here's what I found")), + ] + ) + ) + agent.create_session = MagicMock(return_value=MagicMock()) # type: ignore[method-assign] + + runtime = _make_runtime(agent) + events = await _collect_events(runtime) + + se = _state_events(events) + _assert_all_completed(se) + _assert_started_before_completed(se, "researcher") + _assert_started_before_completed(se, "researcher_tools") + + async def test_multi_agent_with_sub_agents(self): + """Coordinator with sub-agents via as_tool(): all nodes paired.""" + research = RawAgent( + _mock_client, + name="research_agent", + tools=[search_wikipedia], + ) + coder = RawAgent( + _mock_client, + name="code_agent", + tools=[run_python], + ) + coordinator = RawAgent( + _mock_client, + name="coordinator", + tools=[research.as_tool(), coder.as_tool()], + ) + + # Get actual tool names assigned by as_tool() + tools = coordinator.default_options.get("tools", []) + research_tool_name = tools[0].name + code_tool_name = tools[1].name + + coordinator.run = MagicMock( # type: ignore[method-assign] + return_value=_MockAsyncStream( + [ + _update(_fc(research_tool_name, "c1")), + _update(_fr("", "c1", "research done")), + _update(_fc(code_tool_name, "c2")), + _update(_fr("", "c2", "code done")), + _update(_text("final answer")), + ] + ) + ) + coordinator.create_session = MagicMock(return_value=MagicMock()) # type: ignore[method-assign] + + runtime = _make_runtime(coordinator) + events = await _collect_events(runtime) + + se = _state_events(events) + _assert_all_completed(se) + _assert_started_before_completed(se, "coordinator") + _assert_started_before_completed(se, "research_agent") + _assert_started_before_completed(se, "research_agent_tools") + _assert_started_before_completed(se, "code_agent") + _assert_started_before_completed(se, "code_agent_tools") + + async def test_sub_agent_completed_via_call_id(self): + """Sub-agent COMPLETED even when function_result has empty name. + + The original bug: as_tool() wrappers produce function_result with + empty content.name. We match by call_id instead. + """ + inner = RawAgent(_mock_client, name="inner_agent", tools=[calculator]) + outer = RawAgent( + _mock_client, + name="outer", + tools=[inner.as_tool()], + ) + + tool_name = outer.default_options["tools"][0].name + + outer.run = MagicMock( # type: ignore[method-assign] + return_value=_MockAsyncStream( + [ + _update(_fc(tool_name, "call_xyz")), + # empty name on result — must still complete inner_agent + _update(_fr("", "call_xyz", "42")), + _update(_text("done")), + ] + ) + ) + outer.create_session = MagicMock(return_value=MagicMock()) # type: ignore[method-assign] + + runtime = _make_runtime(outer) + events = await _collect_events(runtime) + + se = _state_events(events) + _assert_all_completed(se) + _assert_started_before_completed(se, "inner_agent") + _assert_started_before_completed(se, "inner_agent_tools") + + async def test_mixed_regular_tools_and_sub_agents(self): + """Agent with both regular tools and agent-as-tool.""" + inner = RawAgent(_mock_client, name="helper") + agent = RawAgent( + _mock_client, + name="main", + tools=[search_wikipedia, inner.as_tool()], + ) + + agent_tool_name = next( + t.name for t in agent.default_options["tools"] if hasattr(t, "func") + ) + + agent.run = MagicMock( # type: ignore[method-assign] + return_value=_MockAsyncStream( + [ + _update(_fc("search_wikipedia", "c1")), + _update(_fr("search_wikipedia", "c1", "wiki")), + _update(_fc(agent_tool_name, "c2")), + _update(_fr("", "c2", "helped")), + _update(_text("done")), + ] + ) + ) + agent.create_session = MagicMock(return_value=MagicMock()) # type: ignore[method-assign] + + runtime = _make_runtime(agent) + events = await _collect_events(runtime) + + se = _state_events(events) + _assert_all_completed(se) + _assert_started_before_completed(se, "main") + _assert_started_before_completed(se, "main_tools") + _assert_started_before_completed(se, "helper") + + +# =========================================================================== +# Workflow streaming tests +# =========================================================================== + + +class TestWorkflowStreamingEvents: + """Verify STARTED/COMPLETED pairing for workflow streaming.""" + + async def test_simple_workflow(self): + """Workflow with two executors: all nodes paired.""" + triage = RawAgent(_mock_client, name="triage") + billing = RawAgent(_mock_client, name="billing", tools=[calculator]) + + workflow = ( + WorkflowBuilder(start_executor=triage).add_edge(triage, billing).build() # type: ignore[arg-type] + ) + agent = WorkflowAgent(workflow=workflow, name="my_workflow") + + final = MagicMock() + final.get_outputs.return_value = [] + workflow.run = MagicMock( # type: ignore[method-assign] + return_value=_MockAsyncStream( + [ + _wf_event("executor_invoked", "triage"), + _wf_event("executor_completed", "triage"), + _wf_event("executor_invoked", "billing"), + _wf_event("executor_completed", "billing"), + ], + final, + ) + ) + agent.create_session = MagicMock(return_value=MagicMock()) # type: ignore[method-assign] + + runtime = _make_runtime(agent) + events = await _collect_events(runtime) + + se = _state_events(events) + _assert_all_completed(se) + _assert_started_before_completed(se, "my_workflow") + _assert_started_before_completed(se, "triage") + _assert_started_before_completed(se, "billing") + assert isinstance(events[-1], UiPathRuntimeResult) + + async def test_multi_executor_workflow(self): + """Workflow with three executors in sequence.""" + a = RawAgent(_mock_client, name="step_a") + b = RawAgent(_mock_client, name="step_b", tools=[search_wikipedia]) + c = RawAgent(_mock_client, name="step_c", tools=[run_python]) + + workflow = ( + WorkflowBuilder(start_executor=a).add_edge(a, b).add_edge(b, c).build() # type: ignore[arg-type] + ) + agent = WorkflowAgent(workflow=workflow, name="pipeline") + + wf_events: list[Any] = [] + for name in ["step_a", "step_b", "step_c"]: + wf_events.append(_wf_event("executor_invoked", name)) + wf_events.append(_wf_event("executor_completed", name)) + + final = MagicMock() + final.get_outputs.return_value = [] + workflow.run = MagicMock(return_value=_MockAsyncStream(wf_events, final)) # type: ignore[method-assign] + agent.create_session = MagicMock(return_value=MagicMock()) # type: ignore[method-assign] + + runtime = _make_runtime(agent) + events = await _collect_events(runtime) + + se = _state_events(events) + _assert_all_completed(se) + for name in ["pipeline", "step_a", "step_b", "step_c"]: + _assert_started_before_completed(se, name) + + async def test_workflow_root_wraps_executors(self): + """Root workflow STARTED is first, COMPLETED is last state event.""" + a = RawAgent(_mock_client, name="worker") + workflow = WorkflowBuilder(start_executor=a).build() # type: ignore[arg-type] + agent = WorkflowAgent(workflow=workflow, name="wf") + + final = MagicMock() + final.get_outputs.return_value = [] + workflow.run = MagicMock( # type: ignore[method-assign] + return_value=_MockAsyncStream( + [ + _wf_event("executor_invoked", "worker"), + _wf_event("executor_completed", "worker"), + ], + final, + ) + ) + agent.create_session = MagicMock(return_value=MagicMock()) # type: ignore[method-assign] + + runtime = _make_runtime(agent) + events = await _collect_events(runtime) + + se = _state_events(events) + assert se[0] == ("wf", STARTED) + assert se[-1] == ("wf", COMPLETED) + + +# =========================================================================== +# Factory tests — no agent caching +# =========================================================================== + + +class TestFactoryNoCaching: + """Verify factory creates fresh agent instances per runtime.""" + + async def test_concurrent_new_runtime_gets_separate_agents(self): + """Multiple concurrent new_runtime calls must each get their own agent.""" + context = MagicMock() + context.resolved_state_file_path = ":memory:" + context.resume = False + context.job_id = None + context.keep_state_file = False + + factory = UiPathAgentFrameworkRuntimeFactory(context) + + # Track every agent instance returned by _load_agent + loaded_agents: list[BaseAgent] = [] + + async def _fake_load_agent(entrypoint: str) -> BaseAgent: + agent = MagicMock(spec=BaseAgent) + agent.name = f"agent_{len(loaded_agents)}" + loaded_agents.append(agent) + return agent + + with patch.object(factory, "_load_agent", side_effect=_fake_load_agent): + with patch.object(factory, "_get_session_store", new_callable=AsyncMock): + runtimes = await asyncio.gather( + factory.new_runtime("agent", "runtime_1"), + factory.new_runtime("agent", "runtime_2"), + factory.new_runtime("agent", "runtime_3"), + ) + + # Each runtime must have gotten a separate agent instance + assert len(loaded_agents) == 3 + agents = [r.agent for r in runtimes] # type: ignore[attr-defined] + assert len(set(id(a) for a in agents)) == 3, "Runtimes share agent instances!" diff --git a/packages/uipath-agent-framework/uv.lock b/packages/uipath-agent-framework/uv.lock index 50ccef3..cda770e 100644 --- a/packages/uipath-agent-framework/uv.lock +++ b/packages/uipath-agent-framework/uv.lock @@ -2448,7 +2448,7 @@ wheels = [ [[package]] name = "uipath-agent-framework" -version = "0.0.2" +version = "0.0.3" source = { editable = "." } dependencies = [ { name = "agent-framework-core" }, diff --git a/packages/uipath-google-adk/src/uipath_google_adk/runtime/runtime.py b/packages/uipath-google-adk/src/uipath_google_adk/runtime/runtime.py index eff399b..e4059bf 100644 --- a/packages/uipath-google-adk/src/uipath_google_adk/runtime/runtime.py +++ b/packages/uipath-google-adk/src/uipath_google_adk/runtime/runtime.py @@ -12,7 +12,7 @@ from google.adk.sessions.base_session_service import BaseSessionService from google.adk.sessions.session import Session from google.genai import types -from uipath.core.serialization import serialize_defaults +from uipath.core.serialization import serialize_json from uipath.runtime import ( UiPathExecuteOptions, UiPathRuntimeResult, @@ -362,7 +362,7 @@ def _convert_event(self, event: Event) -> list[UiPathRuntimeStateEvent]: is_transfer = fc.name == _TRANSFER_FN payload = { "function_name": fc.name or "unknown", - "function_args": serialize_defaults(fc.args or {}), + "function_args": json.loads(serialize_json(fc.args or {})), } event_type = "agent_transfer" if is_transfer else "function_call" # Agent node always gets the event @@ -393,7 +393,7 @@ def _convert_event(self, event: Event) -> list[UiPathRuntimeStateEvent]: continue payload = { "function_name": fr.name or "unknown", - "function_response": serialize_defaults(fr.response or {}), + "function_response": json.loads(serialize_json(fr.response or {})), } events.append( UiPathRuntimeStateEvent( @@ -419,7 +419,7 @@ def _convert_event(self, event: Event) -> list[UiPathRuntimeStateEvent]: if event.actions.state_delta: events.append( UiPathRuntimeStateEvent( - payload=serialize_defaults(event.actions.state_delta), + payload=json.loads(serialize_json(event.actions.state_delta)), node_name=author, metadata={"event_type": "state_delta"}, ) @@ -430,7 +430,7 @@ def _convert_event(self, event: Event) -> list[UiPathRuntimeStateEvent]: def _create_success_result(self, output: Any) -> UiPathRuntimeResult: """Create result for successful completion.""" - serialized_output = serialize_defaults(output) + serialized_output = json.loads(serialize_json(output)) if not isinstance(serialized_output, dict): serialized_output = { diff --git a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py index a113f31..9350281 100644 --- a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py +++ b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py @@ -12,7 +12,7 @@ ToolCall, ToolCallResult, ) -from uipath.core.serialization import serialize_defaults +from uipath.core.serialization import serialize_json from uipath.runtime import ( UiPathExecuteOptions, UiPathRuntimeResult, @@ -178,14 +178,14 @@ async def _run_workflow( (AgentOutput, AgentInput, AgentStream, ToolCall, ToolCallResult), ): message_event = UiPathRuntimeMessageEvent( - payload=serialize_defaults(event), + payload=json.loads(serialize_json(event)), node_name=node_name, execution_id=self.runtime_id, ) yield message_event elif not isinstance(event, BreakpointEvent): state_event = UiPathRuntimeStateEvent( - payload=serialize_defaults(event), + payload=json.loads(serialize_json(event)), node_name=node_name, execution_id=self.runtime_id, ) @@ -255,7 +255,7 @@ def _create_breakpoint_result( return UiPathBreakpointResult( breakpoint_node=self._get_node_name(event), breakpoint_type="before", - current_state=serialize_defaults(event), + current_state=json.loads(serialize_json(event)), next_nodes=[], # We don't know what's next in the stream ) @@ -284,11 +284,13 @@ def _create_success_result(self, output: Any) -> UiPathRuntimeResult: """Create result for successful completion.""" if isinstance(output, AgentOutput): if output.structured_response is not None: - serialized_output = serialize_defaults(output.structured_response) + serialized_output = json.loads( + serialize_json(output.structured_response) + ) else: - serialized_output = serialize_defaults(output) + serialized_output = json.loads(serialize_json(output)) else: - serialized_output = serialize_defaults(output) + serialized_output = json.loads(serialize_json(output)) if isinstance(serialized_output, str): serialized_output = {"result": serialized_output} diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py index ffb4f42..eec5419 100644 --- a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py +++ b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py @@ -6,7 +6,7 @@ from agents import Agent, Runner from pydantic import BaseModel -from uipath.core.serialization import serialize_defaults +from uipath.core.serialization import serialize_json from uipath.runtime import ( UiPathExecuteOptions, UiPathRuntimeResult, @@ -207,12 +207,12 @@ def _convert_stream_event_to_runtime_event( # Determine if this is a message or state event if event_name in ["message_output_created", "reasoning_item_created"]: return UiPathRuntimeMessageEvent( - payload=serialize_defaults(event_item), + payload=json.loads(serialize_json(event_item)), metadata={"event_name": event_name}, ) else: return UiPathRuntimeStateEvent( - payload=serialize_defaults(event_item), + payload=json.loads(serialize_json(event_item)), metadata={"event_name": event_name}, ) @@ -265,7 +265,7 @@ def _serialize_message(self, message: Any) -> dict[str, Any]: Returns: Dictionary representation of the message """ - serialized = serialize_defaults(message) + serialized = json.loads(serialize_json(message)) # Ensure the result is a dictionary if isinstance(serialized, dict): @@ -285,7 +285,7 @@ def _create_success_result(self, output: Any) -> UiPathRuntimeResult: UiPathRuntimeResult with serialized output """ # Serialize output - serialized_output = serialize_defaults(output) + serialized_output = json.loads(serialize_json(output)) # Ensure output is a dictionary if not isinstance(serialized_output, dict):