Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-langchain"
version = "0.2.0"
version = "0.2.1"
description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
13 changes: 10 additions & 3 deletions src/uipath_langchain/agent/react/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from ..guardrails.actions import GuardrailAction
from ..tools import create_tool_node
from ..tools.orchestrator_node import create_orchestrator_node
from .guardrails.guardrails_subgraph import (
create_agent_init_guardrails_subgraph,
create_agent_terminate_guardrails_subgraph,
Expand Down Expand Up @@ -105,6 +106,9 @@ def create_agent(
)
builder.add_node(AgentGraphNode.TERMINATE, terminate_with_guardrails_subgraph)

orchestrator_node = create_orchestrator_node(config.thinking_messages_limit)
builder.add_node(AgentGraphNode.ORCHESTRATOR, orchestrator_node)

builder.add_edge(START, AgentGraphNode.INIT)

llm_node = create_llm_node(model, llm_tools, config.thinking_messages_limit)
Expand All @@ -114,16 +118,19 @@ def create_agent(
builder.add_node(AgentGraphNode.AGENT, llm_with_guardrails_subgraph)
builder.add_edge(AgentGraphNode.INIT, AgentGraphNode.AGENT)

builder.add_edge(AgentGraphNode.AGENT, AgentGraphNode.ORCHESTRATOR)

tool_node_names = list(tool_nodes_with_guardrails.keys())
route_agent = create_route_agent(config.thinking_messages_limit)
route_agent = create_route_agent()

builder.add_conditional_edges(
AgentGraphNode.AGENT,
AgentGraphNode.ORCHESTRATOR,
route_agent,
[AgentGraphNode.AGENT, *tool_node_names, AgentGraphNode.TERMINATE],
)

for tool_name in tool_node_names:
builder.add_edge(tool_name, AgentGraphNode.AGENT)
builder.add_edge(tool_name, AgentGraphNode.ORCHESTRATOR)

builder.add_edge(AgentGraphNode.TERMINATE, END)

Expand Down
117 changes: 43 additions & 74 deletions src/uipath_langchain/agent/react/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,107 +2,76 @@

from typing import Literal

from langchain_core.messages import AIMessage, AnyMessage, ToolCall
from uipath.agent.react import END_EXECUTION_TOOL, RAISE_ERROR_TOOL

from ..exceptions import AgentNodeRoutingException
from .types import AgentGraphNode, AgentGraphState
from .utils import count_consecutive_thinking_messages

FLOW_CONTROL_TOOLS = [END_EXECUTION_TOOL.name, RAISE_ERROR_TOOL.name]


def __filter_control_flow_tool_calls(
tool_calls: list[ToolCall],
) -> list[ToolCall]:
"""Remove control flow tools when multiple tool calls exist."""
if len(tool_calls) <= 1:
return tool_calls

return [tc for tc in tool_calls if tc.get("name") not in FLOW_CONTROL_TOOLS]


def __has_control_flow_tool(tool_calls: list[ToolCall]) -> bool:
"""Check if any tool call is of a control flow tool."""
return any(tc.get("name") in FLOW_CONTROL_TOOLS for tc in tool_calls)


def __validate_last_message_is_AI(messages: list[AnyMessage]) -> AIMessage:
"""Validate and return last message from state.

Raises:
AgentNodeRoutingException: If messages are empty or last message is not AIMessage
"""
if not messages:
raise AgentNodeRoutingException(
"No messages in state - cannot route after agent"
)

last_message = messages[-1]
if not isinstance(last_message, AIMessage):
raise AgentNodeRoutingException(
f"Last message is not AIMessage (type: {type(last_message).__name__}) - cannot route after agent"
)

return last_message
from .types import (
FLOW_CONTROL_TOOLS,
AgentGraphNode,
AgentGraphState,
)
from .utils import find_latest_ai_message


def create_route_agent(thinking_messages_limit: int = 0):
"""Create a routing function configured with thinking_messages_limit.

Args:
thinking_messages_limit: Max consecutive thinking messages before error
def create_route_agent():
"""Create a routing function for LangGraph conditional edges.

Returns:
Routing function for LangGraph conditional edges
"""

def route_agent(
state: AgentGraphState,
) -> list[str] | Literal[AgentGraphNode.AGENT, AgentGraphNode.TERMINATE]:
"""Route after agent: handles all routing logic including control flow detection.
) -> str | Literal[AgentGraphNode.AGENT, AgentGraphNode.TERMINATE]:
"""Route after agent: looks at current tool call index and routes to corresponding tool node.

Routing logic:
1. If multiple tool calls exist, filter out control flow tools (EndExecution, RaiseError)
2. If control flow tool(s) remain, route to TERMINATE
3. If regular tool calls remain, route to specific tool nodes (return list of tool names)
4. If no tool calls, handle consecutive completions
1. If current_tool_call_index is None, route back to LLM
2. If current_tool_call_index is set, route to the corresponding tool node
3. Handle control flow tools for termination

Returns:
- list[str]: Tool node names for parallel execution
- AgentGraphNode.AGENT: For consecutive completions
- str: Tool node name for single tool execution
- AgentGraphNode.AGENT: When no current tool call index
- AgentGraphNode.TERMINATE: For control flow termination

Raises:
AgentNodeRoutingException: When encountering unexpected state (empty messages, non-AIMessage, or excessive completions)
AgentNodeRoutingException: When encountering unexpected state
"""
current_index = state.current_tool_call_index

# no tool call in progress, route back to LLM
if current_index is None:
return AgentGraphNode.AGENT

messages = state.messages
last_message = __validate_last_message_is_AI(messages)

tool_calls = list(last_message.tool_calls) if last_message.tool_calls else []
tool_calls = __filter_control_flow_tool_calls(tool_calls)
if not messages:
raise AgentNodeRoutingException(
"No messages in state - cannot route after agent"
)

if tool_calls and __has_control_flow_tool(tool_calls):
return AgentGraphNode.TERMINATE
latest_ai_message = find_latest_ai_message(messages)

if tool_calls:
return [tc["name"] for tc in tool_calls]
if latest_ai_message is None:
raise AgentNodeRoutingException(
"No AIMessage found in messages - cannot route after agent"
)

consecutive_thinking_messages = count_consecutive_thinking_messages(messages)
tool_calls = (
list(latest_ai_message.tool_calls) if latest_ai_message.tool_calls else []
)

if consecutive_thinking_messages > thinking_messages_limit:
if current_index >= len(tool_calls):
raise AgentNodeRoutingException(
f"Agent exceeded consecutive completions limit without producing tool calls "
f"(completions: {consecutive_thinking_messages}, max: {thinking_messages_limit}). "
f"This should not happen as tool_choice='required' is enforced at the limit."
f"Current tool call index {current_index} exceeds available tool calls ({len(tool_calls)})"
)

if last_message.content:
return AgentGraphNode.AGENT
current_tool_call = tool_calls[current_index]
tool_name = current_tool_call["name"]

raise AgentNodeRoutingException(
f"Agent produced empty response without tool calls "
f"(completions: {consecutive_thinking_messages}, has_content: False)"
)
# handle control flow tools for termination
if tool_name in FLOW_CONTROL_TOOLS:
return AgentGraphNode.TERMINATE

return tool_name

return route_agent
5 changes: 5 additions & 0 deletions src/uipath_langchain/agent/react/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
from pydantic import BaseModel, Field
from uipath.agent.react import END_EXECUTION_TOOL, RAISE_ERROR_TOOL
from uipath.platform.attachments import Attachment

from uipath_langchain.agent.react.utils import add_job_attachments

FLOW_CONTROL_TOOLS = [END_EXECUTION_TOOL.name, RAISE_ERROR_TOOL.name]


class AgentTerminationSource(StrEnum):
ESCALATION = "escalation"
Expand All @@ -27,6 +30,7 @@ class AgentGraphState(BaseModel):
messages: Annotated[list[AnyMessage], add_messages] = []
job_attachments: Annotated[dict[str, Attachment], add_job_attachments] = {}
termination: AgentTermination | None = None
current_tool_call_index: int | None = None


class AgentGuardrailsGraphState(AgentGraphState):
Expand All @@ -41,6 +45,7 @@ class AgentGraphNode(StrEnum):
GUARDED_INIT = "guarded-init"
AGENT = "agent"
LLM = "llm"
ORCHESTRATOR = "orchestrator"
TOOLS = "tools"
TERMINATE = "terminate"
GUARDED_TERMINATE = "guarded-terminate"
Expand Down
17 changes: 16 additions & 1 deletion src/uipath_langchain/agent/react/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing import Any, Sequence

from langchain_core.messages import AIMessage, BaseMessage
from langchain_core.messages import AIMessage, AnyMessage, BaseMessage
from pydantic import BaseModel
from uipath.agent.react import END_EXECUTION_TOOL
from uipath.platform.attachments import Attachment
Expand Down Expand Up @@ -73,3 +73,18 @@ def add_job_attachments(
return right

return {**left, **right}


def find_latest_ai_message(messages: list[AnyMessage]) -> AIMessage | None:
"""Find and return the latest AIMessage from a list of messages.

Args:
messages: List of messages to search through

Returns:
The latest AIMessage found, or None if no AIMessage exists
"""
for message in reversed(messages):
if isinstance(message, AIMessage):
return message
return None
111 changes: 111 additions & 0 deletions src/uipath_langchain/agent/tools/orchestrator_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from typing import Any

from langchain_core.messages import ToolCall

from uipath_langchain.agent.exceptions import AgentNodeRoutingException
from uipath_langchain.agent.react.types import FLOW_CONTROL_TOOLS, AgentGraphState
from uipath_langchain.agent.react.utils import (
count_consecutive_thinking_messages,
find_latest_ai_message,
)


def __filter_control_flow_tool_calls(tool_calls: list[ToolCall]) -> list[ToolCall]:
"""Remove control flow tools when multiple tool calls exist."""
if len(tool_calls) <= 1:
return tool_calls

return [tc for tc in tool_calls if tc.get("name") not in FLOW_CONTROL_TOOLS]


def create_orchestrator_node(thinking_messages_limit: int = 0):
"""Create an orchestrator node responsible for sequencing tool calls.

Args:
thinking_messages_limit: Max consecutive thinking messages before error
"""

def orchestrator_node(state: AgentGraphState) -> dict[str, Any]:
current_index = state.current_tool_call_index

if current_index is None:
# new batch of tool calls
if not state.messages:
raise AgentNodeRoutingException(
"No messages in state - cannot process tool calls"
)

# check consecutive thinking messages limit
if thinking_messages_limit >= 0:
consecutive_thinking = count_consecutive_thinking_messages(
state.messages
)
if consecutive_thinking > thinking_messages_limit:
raise AgentNodeRoutingException(
f"Too many consecutive thinking messages ({consecutive_thinking}). "
f"Limit is {thinking_messages_limit}. Agent must use tools."
)

latest_ai_message = find_latest_ai_message(state.messages)

if latest_ai_message is None or not latest_ai_message.tool_calls:
return {"current_tool_call_index": None}

# apply flow control tool filtering
original_tool_calls = list(latest_ai_message.tool_calls)
filtered_tool_calls = __filter_control_flow_tool_calls(original_tool_calls)

if len(filtered_tool_calls) != len(original_tool_calls):
modified_message = latest_ai_message.model_copy()
modified_message.tool_calls = filtered_tool_calls

# we need to filter out the content within the message as well, otherwise the LLM will raise an error
filtered_tool_call_ids = {tc["id"] for tc in filtered_tool_calls}
if isinstance(modified_message.content, list):
modified_message.content = [
block
for block in modified_message.content
if (
isinstance(block, dict)
and (
block.get("call_id") in filtered_tool_call_ids
or block.get("call_id") is None # keep non-tool blocks
)
)
or not isinstance(block, dict)
]

return {
"current_tool_call_index": 0,
"messages": [modified_message],
}

return {"current_tool_call_index": 0}

# in the middle of processing a batch
if not state.messages:
raise AgentNodeRoutingException(
"No messages in state during batch processing"
)

latest_ai_message = find_latest_ai_message(state.messages)

if latest_ai_message is None:
raise AgentNodeRoutingException(
"No AI message found during batch processing"
)

if not latest_ai_message.tool_calls:
raise AgentNodeRoutingException(
"No tool calls found in AI message during batch processing"
)

total_tool_calls = len(latest_ai_message.tool_calls)
next_index = current_index + 1

if next_index >= total_tool_calls:
return {"current_tool_call_index": None}
else:
return {"current_tool_call_index": next_index}

return orchestrator_node
Loading