diff --git a/pyproject.toml b/pyproject.toml index fe189713c..ea5a6854a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath" -version = "2.4.3" +version = "2.4.4" description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools." readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath/_cli/cli_eval.py b/src/uipath/_cli/cli_eval.py index 736d82ae8..a3c8613dc 100644 --- a/src/uipath/_cli/cli_eval.py +++ b/src/uipath/_cli/cli_eval.py @@ -98,6 +98,12 @@ def setup_reporting_prereq(no_report: bool) -> bool: default="default", help="Model settings ID from evaluation set to override agent settings (default: 'default')", ) +@click.option( + "--max-llm-concurrency", + type=int, + default=20, + help="Maximum concurrent LLM requests (default: 20)", +) def eval( entrypoint: str | None, eval_set: str | None, @@ -109,6 +115,7 @@ def eval( enable_mocker_cache: bool, report_coverage: bool, model_settings_id: str, + max_llm_concurrency: int, ) -> None: """Run an evaluation set against the agent. @@ -122,7 +129,12 @@ def eval( enable_mocker_cache: Enable caching for LLM mocker responses report_coverage: Report evaluation coverage model_settings_id: Model settings ID to override agent settings + max_llm_concurrency: Maximum concurrent LLM requests """ + from uipath.platform.chat import set_llm_concurrency + + set_llm_concurrency(max_llm_concurrency) + should_register_progress_reporter = setup_reporting_prereq(no_report) result = Middlewares.next( diff --git a/src/uipath/platform/chat/__init__.py b/src/uipath/platform/chat/__init__.py index 533e4c36e..8aa491606 100644 --- a/src/uipath/platform/chat/__init__.py +++ b/src/uipath/platform/chat/__init__.py @@ -30,6 +30,7 @@ ToolParametersDefinition, ToolPropertyDefinition, ) +from .llm_throttle import get_llm_semaphore, set_llm_concurrency __all__ = [ # Conversations Service @@ -39,6 +40,9 @@ "EmbeddingModels", "UiPathLlmChatService", "UiPathOpenAIService", + # LLM Throttling + "get_llm_semaphore", + "set_llm_concurrency", # LLM Gateway Models "ToolPropertyDefinition", "ToolParametersDefinition", diff --git a/src/uipath/platform/chat/_llm_gateway_service.py b/src/uipath/platform/chat/_llm_gateway_service.py index 8424557ea..989005a7b 100644 --- a/src/uipath/platform/chat/_llm_gateway_service.py +++ b/src/uipath/platform/chat/_llm_gateway_service.py @@ -31,6 +31,7 @@ ToolChoice, ToolDefinition, ) +from .llm_throttle import get_llm_semaphore # Common constants API_VERSION = "2024-10-21" # Standard API version for OpenAI-compatible endpoints @@ -189,13 +190,14 @@ async def embeddings( ) endpoint = Endpoint("/" + endpoint) - response = await self.request_async( - "POST", - endpoint, - json={"input": input}, - params={"api-version": API_VERSION}, - headers=DEFAULT_LLM_HEADERS, - ) + async with get_llm_semaphore(): + response = await self.request_async( + "POST", + endpoint, + json={"input": input}, + params={"api-version": API_VERSION}, + headers=DEFAULT_LLM_HEADERS, + ) return TextEmbedding.model_validate(response.json()) @@ -315,13 +317,14 @@ class Country(BaseModel): # Use provided dictionary format directly request_body["response_format"] = response_format - response = await self.request_async( - "POST", - endpoint, - json=request_body, - params={"api-version": API_VERSION}, - headers=DEFAULT_LLM_HEADERS, - ) + async with get_llm_semaphore(): + response = await self.request_async( + "POST", + endpoint, + json=request_body, + params={"api-version": API_VERSION}, + headers=DEFAULT_LLM_HEADERS, + ) return ChatCompletion.model_validate(response.json()) @@ -546,13 +549,14 @@ class Country(BaseModel): "X-UiPath-LlmGateway-NormalizedApi-ModelName": model, } - response = await self.request_async( - "POST", - endpoint, - json=request_body, - params={"api-version": NORMALIZED_API_VERSION}, - headers=headers, - ) + async with get_llm_semaphore(): + response = await self.request_async( + "POST", + endpoint, + json=request_body, + params={"api-version": NORMALIZED_API_VERSION}, + headers=headers, + ) return ChatCompletion.model_validate(response.json()) diff --git a/src/uipath/platform/chat/llm_throttle.py b/src/uipath/platform/chat/llm_throttle.py new file mode 100644 index 000000000..338ce1b9c --- /dev/null +++ b/src/uipath/platform/chat/llm_throttle.py @@ -0,0 +1,49 @@ +"""LLM request throttling utilities. + +This module provides concurrency control for LLM API requests to prevent +overwhelming the system with simultaneous calls. +""" + +import asyncio + +DEFAULT_LLM_CONCURRENCY = 20 +_llm_concurrency_limit: int = DEFAULT_LLM_CONCURRENCY +_llm_semaphore: asyncio.Semaphore | None = None +_llm_semaphore_loop: asyncio.AbstractEventLoop | None = None + + +def get_llm_semaphore() -> asyncio.Semaphore: + """Get the LLM semaphore, creating with configured limit if not set. + + The semaphore is recreated if called from a different event loop than + the one it was originally created in. This prevents "bound to a different + event loop" errors when using multiple asyncio.run() calls. + """ + global _llm_semaphore, _llm_semaphore_loop + + loop = asyncio.get_running_loop() + + # Recreate semaphore if it doesn't exist or if the event loop changed + if _llm_semaphore is None or _llm_semaphore_loop is not loop: + _llm_semaphore = asyncio.Semaphore(_llm_concurrency_limit) + _llm_semaphore_loop = loop + + return _llm_semaphore + + +def set_llm_concurrency(limit: int) -> None: + """Set the max concurrent LLM requests. Call before making any LLM calls. + + Args: + limit: Maximum number of concurrent LLM requests allowed (must be > 0). + + Raises: + ValueError: If limit is less than 1. + """ + if limit < 1: + raise ValueError("LLM concurrency limit must be at least 1") + + global _llm_concurrency_limit, _llm_semaphore, _llm_semaphore_loop + _llm_concurrency_limit = limit + _llm_semaphore = None + _llm_semaphore_loop = None diff --git a/tests/sdk/services/test_llm_throttle.py b/tests/sdk/services/test_llm_throttle.py new file mode 100644 index 000000000..733cc9275 --- /dev/null +++ b/tests/sdk/services/test_llm_throttle.py @@ -0,0 +1,429 @@ +"""Tests for LLM request throttling functionality.""" + +import asyncio +from unittest.mock import MagicMock, patch + +import pytest + +from uipath.platform import UiPathApiConfig, UiPathExecutionContext +from uipath.platform.chat import UiPathLlmChatService, UiPathOpenAIService +from uipath.platform.chat.llm_throttle import ( + DEFAULT_LLM_CONCURRENCY, + get_llm_semaphore, + set_llm_concurrency, +) + + +class TestLLMThrottling: + """Tests for LLM throttling mechanism.""" + + @pytest.fixture(autouse=True) + def reset_semaphore(self): + """Reset the global semaphore and limit before each test.""" + import uipath.platform.chat.llm_throttle as module + + module._llm_semaphore = None + module._llm_semaphore_loop = None + module._llm_concurrency_limit = module.DEFAULT_LLM_CONCURRENCY + yield + module._llm_semaphore = None + module._llm_semaphore_loop = None + module._llm_concurrency_limit = module.DEFAULT_LLM_CONCURRENCY + + @pytest.fixture + def config(self): + """Create a test config.""" + return UiPathApiConfig(base_url="https://example.com", secret="test_secret") + + @pytest.fixture + def execution_context(self): + """Create a test execution context.""" + return UiPathExecutionContext() + + @pytest.fixture + def openai_service(self, config, execution_context): + """Create an OpenAI service instance.""" + return UiPathOpenAIService(config=config, execution_context=execution_context) + + @pytest.fixture + def llm_service(self, config, execution_context): + """Create an LLM chat service instance.""" + return UiPathLlmChatService(config=config, execution_context=execution_context) + + def test_default_concurrency_constant(self): + """Test that DEFAULT_LLM_CONCURRENCY is set correctly.""" + assert DEFAULT_LLM_CONCURRENCY == 20 + + @pytest.mark.asyncio + async def testget_llm_semaphore_creates_semaphore(self): + """Test that get_llm_semaphore creates a semaphore with default limit.""" + semaphore = get_llm_semaphore() + assert isinstance(semaphore, asyncio.Semaphore) + # Semaphore should allow DEFAULT_LLM_CONCURRENCY concurrent acquisitions + assert semaphore._value == DEFAULT_LLM_CONCURRENCY + + @pytest.mark.asyncio + async def testget_llm_semaphore_returns_same_instance(self): + """Test that get_llm_semaphore returns the same semaphore instance.""" + semaphore1 = get_llm_semaphore() + semaphore2 = get_llm_semaphore() + assert semaphore1 is semaphore2 + + @pytest.mark.asyncio + async def test_set_llm_concurrency_changes_limit(self): + """Test that set_llm_concurrency sets a custom limit.""" + set_llm_concurrency(5) + semaphore = get_llm_semaphore() + assert semaphore._value == 5 + + @pytest.mark.asyncio + async def test_throttle_limits_concurrency(self): + """Test that throttling actually limits concurrent operations.""" + set_llm_concurrency(2) + + concurrent_count = 0 + max_concurrent = 0 + + async def task(): + nonlocal concurrent_count, max_concurrent + async with get_llm_semaphore(): + concurrent_count += 1 + max_concurrent = max(max_concurrent, concurrent_count) + await asyncio.sleep(0.05) + concurrent_count -= 1 + + # Run 10 tasks with concurrency limit of 2 + await asyncio.gather(*[task() for _ in range(10)]) + + assert max_concurrent == 2 + + @patch.object(UiPathOpenAIService, "request_async") + @pytest.mark.asyncio + async def test_openai_service_uses_throttle(self, mock_request, openai_service): + """Test that OpenAI service chat_completions uses throttling.""" + mock_response = MagicMock() + mock_response.json.return_value = { + "id": "test", + "object": "chat.completion", + "created": 1234567890, + "model": "gpt-4o-mini", + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": "Hello"}, + "finish_reason": "stop", + } + ], + "usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}, + } + mock_request.return_value = mock_response + + set_llm_concurrency(1) + semaphore = get_llm_semaphore() + + # Verify semaphore is used during the call + initial_value = semaphore._value + + await openai_service.chat_completions( + messages=[{"role": "user", "content": "Hi"}] + ) + + # After the call, semaphore should be back to initial value + assert semaphore._value == initial_value + + @patch.object(UiPathLlmChatService, "request_async") + @pytest.mark.asyncio + async def test_llm_service_uses_throttle(self, mock_request, llm_service): + """Test that LLM chat service chat_completions uses throttling.""" + mock_response = MagicMock() + mock_response.json.return_value = { + "id": "test", + "object": "chat.completion", + "created": 1234567890, + "model": "gpt-4o-mini", + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": "Hello"}, + "finish_reason": "stop", + } + ], + "usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}, + } + mock_request.return_value = mock_response + + set_llm_concurrency(1) + semaphore = get_llm_semaphore() + + initial_value = semaphore._value + + await llm_service.chat_completions(messages=[{"role": "user", "content": "Hi"}]) + + assert semaphore._value == initial_value + + @patch.object(UiPathOpenAIService, "request_async") + @pytest.mark.asyncio + async def test_embeddings_uses_throttle(self, mock_request, openai_service): + """Test that embeddings endpoint uses throttling.""" + mock_response = MagicMock() + mock_response.json.return_value = { + "data": [{"embedding": [0.1, 0.2, 0.3], "index": 0, "object": "embedding"}], + "model": "text-embedding-ada-002", + "object": "list", + "usage": {"prompt_tokens": 4, "total_tokens": 4}, + } + mock_request.return_value = mock_response + + set_llm_concurrency(1) + semaphore = get_llm_semaphore() + + initial_value = semaphore._value + + await openai_service.embeddings(input="Test input") + + assert semaphore._value == initial_value + + +class TestEventLoopBug: + """Tests for the event loop binding bug. + + The bug: If set_llm_concurrency() creates the semaphore before asyncio.run(), + the semaphore is bound to the wrong event loop and will fail with: + RuntimeError: Semaphore object is bound to a different event loop + + The fix: set_llm_concurrency() only stores the limit, doesn't create semaphore. + """ + + @pytest.fixture(autouse=True) + def reset_semaphore(self): + """Reset the global semaphore and limit before each test.""" + import uipath.platform.chat.llm_throttle as module + + module._llm_semaphore = None + module._llm_semaphore_loop = None + module._llm_concurrency_limit = module.DEFAULT_LLM_CONCURRENCY + yield + module._llm_semaphore = None + module._llm_semaphore_loop = None + module._llm_concurrency_limit = module.DEFAULT_LLM_CONCURRENCY + + def test_set_llm_concurrency_before_asyncio_run(self): + """Test that set_llm_concurrency called before asyncio.run causes issues. + + This test reproduces the bug where the semaphore is created in one + event loop context but used in another (created by asyncio.run). + """ + # This simulates what happens in cli_eval.py: + # 1. set_llm_concurrency() is called (creates semaphore) + # 2. asyncio.run() starts a NEW event loop + # 3. Code tries to use the semaphore in the new loop + + # Step 1: Call set_llm_concurrency outside any event loop + # (simulating CLI code before asyncio.run) + set_llm_concurrency(5) + + # Step 2 & 3: Run async code in a new event loop + async def use_semaphore(): + semaphore = get_llm_semaphore() + async with semaphore: + pass + + # This should raise RuntimeError if the bug exists + # because the semaphore was created in a different loop context + try: + asyncio.run(use_semaphore()) + # If we get here, either: + # a) The bug is fixed (semaphore created lazily in correct loop) + # b) Python version handles this gracefully + bug_exists = False + except RuntimeError as e: + if "different event loop" in str( + e + ) or "attached to a different loop" in str(e): + bug_exists = True + else: + raise + + # This assertion documents expected behavior: + # - If bug_exists is True, the fix is needed + # - If bug_exists is False, the fix has been applied or Python handles it + # Currently we expect the bug to exist (test should fail after fix is applied) + assert not bug_exists, ( + "Event loop bug detected! The semaphore was created outside the running " + "event loop. Fix: set_llm_concurrency should only store the limit, not " + "create the semaphore." + ) + + def test_lazy_semaphore_creation_in_correct_loop(self): + """Test that semaphore created inside asyncio.run works correctly. + + This is the expected behavior after the fix is applied. + """ + import uipath.platform.chat.llm_throttle as module + + # Ensure semaphore is None (not pre-created) + module._llm_semaphore = None + + async def use_semaphore(): + # Semaphore should be created here, inside the running loop + semaphore = get_llm_semaphore() + async with semaphore: + return True + + # This should work because semaphore is created in the correct loop + result = asyncio.run(use_semaphore()) + assert result is True + + def test_set_llm_concurrency_does_not_create_semaphore(self): + """Test that set_llm_concurrency only stores limit, doesn't create semaphore. + + This is the key fix - the semaphore should be created lazily inside + the running event loop, not when set_llm_concurrency is called. + """ + import uipath.platform.chat.llm_throttle as module + + # Ensure semaphore is None initially + module._llm_semaphore = None + + # Call set_llm_concurrency + set_llm_concurrency(5) + + # Verify semaphore is still None (not created yet) + assert module._llm_semaphore is None + + # Verify limit was stored + assert module._llm_concurrency_limit == 5 + + # Now when we get the semaphore, it should be created with the stored limit + async def get_sem(): + return get_llm_semaphore() + + semaphore = asyncio.run(get_sem()) + assert semaphore._value == 5 + + +class TestMultipleEventLoops: + """Tests for semaphore behavior across multiple event loops. + + This tests the scenario where: + 1. First asyncio.run() creates semaphore bound to loop A + 2. Loop A closes + 3. Second asyncio.run() creates loop B + 4. Code tries to use semaphore still bound to dead loop A + 5. Should NOT crash - semaphore should be recreated for loop B + """ + + @pytest.fixture(autouse=True) + def reset_semaphore(self): + """Reset the global semaphore before each test.""" + import uipath.platform.chat.llm_throttle as module + + module._llm_semaphore = None + module._llm_semaphore_loop = None + module._llm_concurrency_limit = module.DEFAULT_LLM_CONCURRENCY + yield + module._llm_semaphore = None + module._llm_semaphore_loop = None + module._llm_concurrency_limit = module.DEFAULT_LLM_CONCURRENCY + + def test_semaphore_works_across_multiple_asyncio_runs(self): + """Test that semaphore works correctly across multiple asyncio.run() calls. + + This is the key test for the event-loop binding bug. Without the fix, + this test will fail with: + RuntimeError: Semaphore object is bound to a different event loop + + NOTE: The bug only triggers when there's CONTENTION on the semaphore + (multiple tasks competing). Without contention, _get_loop() is not + called and the semaphore doesn't bind to the event loop. + """ + # Use a limit of 1 to force contention + set_llm_concurrency(1) + + async def use_semaphore_with_contention(): + """Use the semaphore with contention to trigger loop binding.""" + semaphore = get_llm_semaphore() + + async def contender(): + async with semaphore: + await asyncio.sleep(0.001) + + # Hold the semaphore while another task tries to acquire it + async with semaphore: + # Create a contending task - this forces _get_loop() to be called + task = asyncio.create_task(contender()) + await asyncio.sleep(0.001) + + await task + return True + + # First run - creates semaphore and binds to loop A due to contention + result1 = asyncio.run(use_semaphore_with_contention()) + assert result1 is True + + # Second run - loop A is closed, loop B is created + # Without fix: crashes because semaphore is still bound to loop A + # With fix: should work because semaphore is recreated for loop B + result2 = asyncio.run(use_semaphore_with_contention()) + assert result2 is True + + # Third run - just to be sure + result3 = asyncio.run(use_semaphore_with_contention()) + assert result3 is True + + def test_semaphore_limit_preserved_across_loops(self): + """Test that concurrency limit is preserved when semaphore is recreated.""" + set_llm_concurrency(3) + + async def get_semaphore_value(): + semaphore = get_llm_semaphore() + return semaphore._value + + # First run + value1 = asyncio.run(get_semaphore_value()) + assert value1 == 3 + + # Second run - should still have limit of 3 + value2 = asyncio.run(get_semaphore_value()) + assert value2 == 3 + + +class TestConcurrencyValidation: + """Tests for input validation of concurrency settings.""" + + @pytest.fixture(autouse=True) + def reset_semaphore(self): + """Reset the global semaphore before each test.""" + import uipath.platform.chat.llm_throttle as module + + module._llm_semaphore = None + module._llm_semaphore_loop = None + module._llm_concurrency_limit = module.DEFAULT_LLM_CONCURRENCY + yield + module._llm_semaphore = None + module._llm_semaphore_loop = None + module._llm_concurrency_limit = module.DEFAULT_LLM_CONCURRENCY + + def test_set_llm_concurrency_zero_raises_error(self): + """Test that setting concurrency to 0 raises ValueError. + + A semaphore with value 0 would deadlock all requests. + """ + with pytest.raises(ValueError, match="must be at least 1"): + set_llm_concurrency(0) + + def test_set_llm_concurrency_negative_raises_error(self): + """Test that setting negative concurrency raises ValueError.""" + with pytest.raises(ValueError, match="must be at least 1"): + set_llm_concurrency(-1) + + def test_set_llm_concurrency_one_is_valid(self): + """Test that setting concurrency to 1 (minimum valid) works.""" + set_llm_concurrency(1) + + async def check_semaphore(): + semaphore = get_llm_semaphore() + return semaphore._value + + value = asyncio.run(check_semaphore()) + assert value == 1 diff --git a/uv.lock b/uv.lock index 73dfaffb8..8e85506a1 100644 --- a/uv.lock +++ b/uv.lock @@ -2477,7 +2477,7 @@ wheels = [ [[package]] name = "uipath" -version = "2.4.3" +version = "2.4.4" source = { editable = "." } dependencies = [ { name = "click" },