Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-langchain"
version = "0.7.5"
version = "0.7.6"
description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
10 changes: 10 additions & 0 deletions src/uipath_langchain/agent/tools/durable_interrupt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Durable interrupt package for side-effect-safe interrupt/resume in LangGraph."""

from .decorator import _durable_state, durable_interrupt
from .skip_interrupt import SkipInterruptValue

__all__ = [
"durable_interrupt",
"SkipInterruptValue",
"_durable_state",
]
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ async def start_job():
from langgraph.config import get_config
from langgraph.types import interrupt

from .skip_interrupt import SkipInterruptValue

F = TypeVar("F", bound=Callable[..., Any])

# Tracks (scratchpad identity, call index) per node execution.
Expand Down Expand Up @@ -77,6 +79,21 @@ def _is_resumed(scratchpad: Any, idx: int) -> bool:
return scratchpad is not None and scratchpad.resume and idx < len(scratchpad.resume)


def _inject_resume(scratchpad: Any, value: Any) -> Any:
"""Inject a value into the scratchpad resume list and return it via interrupt(None).

This keeps LangGraph's interrupt_counter in sync (interrupt(None) increments it)
while avoiding a real suspend — interrupt(None) finds the injected value and
returns it immediately without raising GraphInterrupt.
"""
if scratchpad is not None:
if scratchpad.resume is None:
scratchpad.resume = []
scratchpad.resume.append(value)
return interrupt(None)
return value


def durable_interrupt(fn: F) -> F:
"""Decorator that executes a side-effecting function exactly once and interrupts.

Expand All @@ -85,6 +102,10 @@ def durable_interrupt(fn: F) -> F:
is skipped and ``interrupt(None)`` returns the resume value from the
runtime.

If the body returns a ``SkipInterruptValue``, the resolved value is
injected into the scratchpad resume list and ``interrupt(None)`` returns
it immediately — no real suspend/resume cycle occurs.

Replaces the ``@task`` + ``interrupt()`` two-step pattern with a single
decorator that enforces the pairing contract. Works correctly in both
parent graphs and subgraphs.
Expand Down Expand Up @@ -112,7 +133,10 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
scratchpad, idx = _next_durable_index()
if _is_resumed(scratchpad, idx):
return interrupt(None)
return interrupt(await fn(*args, **kwargs))
result = await fn(*args, **kwargs)
if isinstance(result, SkipInterruptValue):
return _inject_resume(scratchpad, result.resume_value)
return interrupt(result)

return async_wrapper # type: ignore[return-value]

Expand All @@ -121,6 +145,9 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
scratchpad, idx = _next_durable_index()
if _is_resumed(scratchpad, idx):
return interrupt(None)
return interrupt(fn(*args, **kwargs))
result = fn(*args, **kwargs)
if isinstance(result, SkipInterruptValue):
return _inject_resume(scratchpad, result.resume_value)
return interrupt(result)

return sync_wrapper # type: ignore[return-value]
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Skip-interrupt value types for @durable_interrupt.

SkipInterruptValue — base class
================================

When a node has **multiple sequential @durable_interrupt calls**, the
decorator's internal counter must produce the same sequence of indices on
every execution (first run *and* all resume runs). A conditional interrupt
— one that only fires under certain conditions — breaks this assumption and
causes index drift on resume.

``SkipInterruptValue`` solves this by letting a @durable_interrupt-decorated
function signal "the result is already available, skip the real interrupt"
while still keeping LangGraph's interrupt counter in sync. The decorator
injects the resolved value into ``scratchpad.resume`` and calls
``interrupt(None)``, which returns immediately without raising ``GraphInterrupt``.

Usage example::

@durable_interrupt
async def create_index():
index = await client.create_index_async(...)
if index.in_progress():
return WaitIndex(index=index) # real interrupt
return ReadyIndex(index=index) # instant resume

@durable_interrupt
async def start_processing():
return StartProcessing(index_id=index.id) # real interrupt

# Both @durable_interrupt calls always execute — the counter always
# increments by 2. When the index is ready, ReadyIndex (a
# SkipInterruptValue subclass) injects the result into the scratchpad
# so the graph continues without suspending.
"""

from typing import Any


class SkipInterruptValue:
"""Base class for values that skip the interrupt in @durable_interrupt.

Subclasses must implement the ``resume_value`` property, returning the
value to inject into the scratchpad resume list.
"""

@property
def resume_value(self) -> Any:
"""The value to inject into the resume list and return to the caller."""
raise NotImplementedError
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
from uipath_langchain.agent.exceptions import AgentStartupError, AgentStartupErrorCode
from uipath_langchain.agent.react.jsonschema_pydantic_converter import create_model
from uipath_langchain.agent.react.types import AgentGraphState
from uipath_langchain.agent.tools.durable_interrupt import durable_interrupt
from uipath_langchain.agent.tools.durable_interrupt import (
SkipInterruptValue,
durable_interrupt,
)
from uipath_langchain.agent.tools.internal_tools.schema_utils import (
BATCH_TRANSFORM_OUTPUT_SCHEMA,
add_query_field_to_schema,
Expand All @@ -42,6 +45,17 @@
from uipath_langchain.agent.tools.utils import sanitize_tool_name


class ReadyEphemeralIndex(SkipInterruptValue):
"""An ephemeral index that is already ready (no wait needed)."""

def __init__(self, index: ContextGroundingIndex):
self.index = index

@property
def resume_value(self) -> Any:
return self.index.model_dump()


def create_batch_transform_tool(
resource: AgentInternalToolResourceConfig, llm: BaseChatModel
) -> StructuredTool:
Expand Down Expand Up @@ -131,7 +145,7 @@ async def create_ephemeral_index():
)
if ephemeral_index.in_progress_ingestion():
return WaitEphemeralIndex(index=ephemeral_index)
return ephemeral_index
return ReadyEphemeralIndex(index=ephemeral_index)

index_result = await create_ephemeral_index()
if isinstance(index_result, dict):
Expand Down
37 changes: 21 additions & 16 deletions src/uipath_langchain/agent/tools/internal_tools/deeprag_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
from uipath_langchain.agent.exceptions import AgentStartupError, AgentStartupErrorCode
from uipath_langchain.agent.react.jsonschema_pydantic_converter import create_model
from uipath_langchain.agent.react.types import AgentGraphState
from uipath_langchain.agent.tools.durable_interrupt import durable_interrupt
from uipath_langchain.agent.tools.durable_interrupt import (
SkipInterruptValue,
durable_interrupt,
)
from uipath_langchain.agent.tools.internal_tools.schema_utils import (
add_query_field_to_schema,
)
Expand All @@ -37,6 +40,17 @@
from uipath_langchain.agent.tools.utils import sanitize_tool_name


class ReadyEphemeralIndex(SkipInterruptValue):
"""An ephemeral index that is already ready (no wait needed)."""

def __init__(self, index: ContextGroundingIndex):
self.index = index

@property
def resume_value(self) -> Any:
return self.index.model_dump()


def create_deeprag_tool(
resource: AgentInternalToolResourceConfig, llm: BaseChatModel
) -> StructuredTool:
Expand Down Expand Up @@ -101,6 +115,7 @@ async def deeprag_tool_fn(**kwargs: Any) -> dict[str, Any]:
example_calls=[], # Examples cannot be provided for internal tools
)
async def invoke_deeprag(**_tool_kwargs: Any):
@durable_interrupt
async def create_ephemeral_index():
uipath = UiPath()
ephemeral_index = (
Expand All @@ -109,21 +124,9 @@ async def create_ephemeral_index():
attachments=[attachment_id],
)
)

# TODO this will not resume on concurrent runs for the same attachment
if ephemeral_index.in_progress_ingestion():

@durable_interrupt
async def wait_for_ephemeral_index():
return WaitEphemeralIndex(index=ephemeral_index)

index_result = await wait_for_ephemeral_index()
if isinstance(index_result, dict):
ephemeral_index = ContextGroundingIndex(**index_result)
else:
ephemeral_index = index_result

return ephemeral_index
return WaitEphemeralIndex(index=ephemeral_index)
return ReadyEphemeralIndex(index=ephemeral_index)

index_result = await create_ephemeral_index()
if isinstance(index_result, dict):
Expand All @@ -142,7 +145,9 @@ async def create_deeprag():
is_ephemeral_index=True,
)

return await create_deeprag()
result = await create_deeprag()

return result

return await invoke_deeprag(**kwargs)

Expand Down
30 changes: 13 additions & 17 deletions tests/agent/tools/internal_tools/test_batch_transform_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def resource_config_dynamic(self, batch_transform_settings_dynamic_query):
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPathConfig"
)
@patch("uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPath")
@patch("uipath_langchain.agent.tools.durable_interrupt.interrupt")
@patch("uipath_langchain.agent.tools.durable_interrupt.decorator.interrupt")
@patch(
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.mockable",
lambda **kwargs: lambda f: f,
Expand Down Expand Up @@ -180,9 +180,8 @@ async def test_create_batch_transform_tool_static_query_index_ready(
return_value=mock_index
)

# durable_interrupt always calls interrupt(); first for index, second for transform
# Index is ready → ReadyEphemeralIndex skips interrupt(). Only create_batch_transform fires.
mock_interrupt.side_effect = [
mock_index,
{"file_path": "/path/to/output.csv"},
]

Expand Down Expand Up @@ -226,8 +225,8 @@ async def test_create_batch_transform_tool_static_query_index_ready(
assert call_kwargs["usage"] == "BatchRAG"
assert mock_attachment.ID in call_kwargs["attachments"]

# Both durable_interrupts call interrupt()
assert mock_interrupt.call_count == 2
# Only create_batch_transform calls interrupt(); index was instant-resumed
assert mock_interrupt.call_count == 1

# Verify attachment was uploaded
mock_uipath.jobs.create_attachment_async.assert_called_once_with(
Expand All @@ -243,7 +242,7 @@ async def test_create_batch_transform_tool_static_query_index_ready(
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPathConfig"
)
@patch("uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPath")
@patch("uipath_langchain.agent.tools.durable_interrupt.interrupt")
@patch("uipath_langchain.agent.tools.durable_interrupt.decorator.interrupt")
@patch(
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.mockable",
lambda **kwargs: lambda f: f,
Expand Down Expand Up @@ -324,7 +323,7 @@ async def test_create_batch_transform_tool_static_query_wait_for_ingestion(
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPathConfig"
)
@patch("uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPath")
@patch("uipath_langchain.agent.tools.durable_interrupt.interrupt")
@patch("uipath_langchain.agent.tools.durable_interrupt.decorator.interrupt")
@patch(
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.mockable",
lambda **kwargs: lambda f: f,
Expand Down Expand Up @@ -354,9 +353,8 @@ async def test_create_batch_transform_tool_dynamic_query(
return_value=mock_index
)

# durable_interrupt always calls interrupt(); first for index, second for transform
# Index is ready → ReadyEphemeralIndex skips interrupt(). Only create_batch_transform fires.
mock_interrupt.side_effect = [
mock_index,
{"output": "Transformation complete"},
]

Expand Down Expand Up @@ -397,7 +395,7 @@ async def test_create_batch_transform_tool_dynamic_query(
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPathConfig"
)
@patch("uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPath")
@patch("uipath_langchain.agent.tools.durable_interrupt.interrupt")
@patch("uipath_langchain.agent.tools.durable_interrupt.decorator.interrupt")
@patch(
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.mockable",
lambda **kwargs: lambda f: f,
Expand Down Expand Up @@ -427,9 +425,8 @@ async def test_create_batch_transform_tool_default_destination_path(
return_value=mock_index
)

# durable_interrupt always calls interrupt(); first for index, second for transform
# Index is ready → ReadyEphemeralIndex skips interrupt(). Only create_batch_transform fires.
mock_interrupt.side_effect = [
mock_index,
{"file_path": "output.csv"},
]

Expand Down Expand Up @@ -461,8 +458,8 @@ async def test_create_batch_transform_tool_default_destination_path(
}
}

# Both durable_interrupts call interrupt()
assert mock_interrupt.call_count == 2
# Only create_batch_transform calls interrupt(); index was instant-resumed
assert mock_interrupt.call_count == 1

# Verify attachment was uploaded with default path
mock_uipath.jobs.create_attachment_async.assert_called_once_with(
Expand All @@ -478,7 +475,7 @@ async def test_create_batch_transform_tool_default_destination_path(
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPathConfig"
)
@patch("uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPath")
@patch("uipath_langchain.agent.tools.durable_interrupt.interrupt")
@patch("uipath_langchain.agent.tools.durable_interrupt.decorator.interrupt")
@patch(
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.mockable",
lambda **kwargs: lambda f: f,
Expand Down Expand Up @@ -508,9 +505,8 @@ async def test_create_batch_transform_tool_custom_destination_path(
return_value=mock_index
)

# durable_interrupt always calls interrupt(); first for index, second for transform
# Index is ready → ReadyEphemeralIndex skips interrupt(). Only create_batch_transform fires.
mock_interrupt.side_effect = [
mock_index,
{"file_path": "/custom/path/result.csv"},
]

Expand Down
Loading
Loading