diff --git a/tests/cli/test_hitl.py b/tests/cli/test_hitl.py deleted file mode 100644 index 6c0ae3da1..000000000 --- a/tests/cli/test_hitl.py +++ /dev/null @@ -1,1356 +0,0 @@ -import uuid -from typing import Any -from unittest.mock import AsyncMock, patch - -import pytest -from pytest_httpx import HTTPXMock - -from uipath.core.errors import ErrorCategory, UiPathFaultedTriggerError -from uipath.core.triggers import ( - UiPathApiTrigger, - UiPathResumeTrigger, - UiPathResumeTriggerType, -) -from uipath.platform.action_center import Task -from uipath.platform.action_center.tasks import TaskStatus -from uipath.platform.common import ( - CreateBatchTransform, - CreateDeepRag, - CreateEphemeralIndex, - CreateTask, - DocumentExtraction, - DocumentExtractionValidation, - InvokeProcess, - InvokeSystemAgent, - WaitBatchTransform, - WaitDeepRag, - WaitDocumentExtractionValidation, - WaitEphemeralIndex, - WaitJob, - WaitSystemAgent, - WaitTask, -) -from uipath.platform.context_grounding import ( - BatchTransformCreationResponse, - BatchTransformOutputColumn, - Citation, - CitationMode, - DeepRagCreationResponse, - DeepRagStatus, - EphemeralIndexUsage, - IndexStatus, -) -from uipath.platform.context_grounding.context_grounding_index import ( - ContextGroundingIndex, -) -from uipath.platform.documents import ( - ExtractionResponseIXP, - ExtractionResult, - FieldGroupValueProjection, - FieldType, - FieldValueProjection, - ProjectType, - StartExtractionValidationResponse, - ValidateExtractionAction, -) -from uipath.platform.orchestrator import Job, JobErrorInfo -from uipath.platform.resume_triggers import ( - PropertyName, - TriggerMarker, - UiPathResumeTriggerCreator, - UiPathResumeTriggerReader, -) -from uipath.runtime import UiPathRuntimeStatus - - -@pytest.fixture -def base_url(mock_env_vars: dict[str, str]) -> str: - return mock_env_vars["UIPATH_URL"] - - -@pytest.fixture -def setup_test_env( - monkeypatch: pytest.MonkeyPatch, mock_env_vars: dict[str, str] -) -> None: - """Setup test environment variables.""" - for key, value in mock_env_vars.items(): - monkeypatch.setenv(key, value) - - -class TestHitlReader: - """Tests for the HitlReader class.""" - - @pytest.mark.anyio - async def test_read_task_trigger( - self, - setup_test_env: None, - ) -> None: - """Test reading an action trigger.""" - action_key = "test-action-key" - action_data = {"answer": "test-action-data"} - - mock_action = Task(key=action_key, data=action_data) - mock_retrieve_async = AsyncMock(return_value=mock_action) - - with patch( - "uipath.platform.action_center._tasks_service.TasksService.retrieve_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.TASK, - item_key=action_key, - folder_key="test-folder", - folder_path="test-path", - ) - reader = UiPathResumeTriggerReader() - result = await reader.read_trigger(resume_trigger) - assert result == action_data - mock_retrieve_async.assert_called_once_with( - action_key, - app_folder_key="test-folder", - app_folder_path="test-path", - app_name=None, - ) - - @pytest.mark.anyio - async def test_read_task_trigger_empty_response( - self, - setup_test_env: None, - ) -> None: - """Test reading an action trigger.""" - action_key = "test-action-key" - action_data: dict[str, Any] = {} - - mock_task = Task(key=action_key, data=action_data, status=2) - mock_retrieve_async = AsyncMock(return_value=mock_task) - - with patch( - "uipath.platform.action_center._tasks_service.TasksService.retrieve_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.TASK, - item_key=action_key, - folder_key="test-folder", - folder_path="test-path", - ) - reader = UiPathResumeTriggerReader() - result = await reader.read_trigger(resume_trigger) - assert result == { - "status": TaskStatus(2).name.lower(), - PropertyName.INTERNAL.value: TriggerMarker.NO_CONTENT.value, - } - mock_retrieve_async.assert_called_once_with( - action_key, - app_folder_key="test-folder", - app_folder_path="test-path", - app_name=None, - ) - - @pytest.mark.anyio - async def test_read_job_trigger_successful( - self, - setup_test_env: None, - ) -> None: - """Test reading a successful job trigger.""" - job_key = "test-job-key" - job_id = 1234 - output_args = str({"result": "success"}) - - mock_job = Job( - id=job_id, - key=job_key, - state=UiPathRuntimeStatus.SUCCESSFUL.value, - output_arguments=output_args, - folder_key="d0e09040-5997-44e1-93b7-4087689521b7", - ) - mock_retrieve_async = AsyncMock(return_value=mock_job) - - with patch( - "uipath.platform.orchestrator._jobs_service.JobsService.retrieve_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.JOB, - item_key=job_key, - folder_key="test-folder", - folder_path="test-path", - ) - reader = UiPathResumeTriggerReader() - result = await reader.read_trigger(resume_trigger) - assert result == output_args - mock_retrieve_async.assert_called_once_with( - job_key, - folder_key="test-folder", - folder_path="test-path", - process_name=None, - ) - - @pytest.mark.anyio - async def test_read_job_trigger_successful_empty_output( - self, - setup_test_env: None, - ) -> None: - """Test reading a successful job trigger with empty output returns job state.""" - job_key = "test-job-key" - job_id = 1234 - job_state = UiPathRuntimeStatus.SUCCESSFUL.value - - mock_job = Job( - id=job_id, - key=job_key, - state=job_state, - output_arguments="{}", - folder_key="d0e09040-5997-44e1-93b7-4087689521b7", - ) - mock_retrieve_async = AsyncMock(return_value=mock_job) - - with patch( - "uipath.platform.orchestrator._jobs_service.JobsService.retrieve_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.JOB, - item_key=job_key, - folder_key="test-folder", - folder_path="test-path", - ) - reader = UiPathResumeTriggerReader() - result = await reader.read_trigger(resume_trigger) - assert result == { - "state": job_state.lower(), - PropertyName.INTERNAL.value: TriggerMarker.NO_CONTENT.value, - } - mock_retrieve_async.assert_called_once_with( - job_key, - folder_key="test-folder", - folder_path="test-path", - process_name=None, - ) - - @pytest.mark.anyio - async def test_read_job_trigger_failed( - self, - setup_test_env: None, - ) -> None: - """Test reading a failed job trigger.""" - job_key = "test-job-key" - job_error_info = JobErrorInfo(code="error code") - job_id = 1234 - - mock_job = Job( - id=job_id, - key=job_key, - state="Faulted", - job_error=job_error_info, - folder_key="d0e09040-5997-44e1-93b7-4087689521b7", - ) - mock_retrieve_async = AsyncMock(return_value=mock_job) - - with patch( - "uipath.platform.orchestrator._jobs_service.JobsService.retrieve_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.JOB, - item_key=job_key, - folder_key="test-folder", - folder_path="test-path", - payload={"name": "process_name"}, - ) - - with pytest.raises(UiPathFaultedTriggerError) as exc_info: - reader = UiPathResumeTriggerReader() - await reader.read_trigger(resume_trigger) - assert exc_info.value.category == ErrorCategory.USER - mock_retrieve_async.assert_called_once_with( - job_key, - folder_key="test-folder", - folder_path="test-path", - process_name="process_name", - ) - - @pytest.mark.anyio - async def test_read_system_agent_job_trigger_successful( - self, - setup_test_env: None, - ) -> None: - """Test reading a successful system agent job trigger.""" - job_key = "test-system-agent-job-key" - job_id = 1234 - output_args = str({"result": "agent completed successfully"}) - - mock_job = Job( - id=job_id, - key=job_key, - state=UiPathRuntimeStatus.SUCCESSFUL.value, - output_arguments=output_args, - folder_key="d0e09040-5997-44e1-93b7-4087689521b7", - ) - mock_retrieve_async = AsyncMock(return_value=mock_job) - - with patch( - "uipath.platform.orchestrator._jobs_service.JobsService.retrieve_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.JOB, - item_key=job_key, - folder_key="test-folder", - folder_path="test-path", - payload={ - "agent_name": "template-filler-agent", - "entrypoint": "main", - }, - ) - reader = UiPathResumeTriggerReader() - result = await reader.read_trigger(resume_trigger) - assert result == output_args - mock_retrieve_async.assert_called_once_with( - job_key, - folder_key="test-folder", - folder_path="test-path", - process_name=None, - ) - - @pytest.mark.anyio - async def test_read_api_trigger( - self, - httpx_mock: HTTPXMock, - base_url: str, - setup_test_env: None, - ) -> None: - """Test reading an API trigger.""" - inbox_id = str(uuid.uuid4()) - payload_data = {"key": "value"} - - httpx_mock.add_response( - url=f"{base_url}/orchestrator_/api/JobTriggers/GetPayload/{inbox_id}", - status_code=200, - json={"payload": payload_data}, - ) - - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.API, - api_resume=UiPathApiTrigger(inbox_id=inbox_id, request="test"), - ) - - reader = UiPathResumeTriggerReader() - result = await reader.read_trigger(resume_trigger) - assert result == payload_data - - @pytest.mark.anyio - async def test_read_api_trigger_failure( - self, - httpx_mock: HTTPXMock, - base_url: str, - setup_test_env: None, - ) -> None: - """Test reading an API trigger with a failed response.""" - inbox_id = str(uuid.uuid4()) - - httpx_mock.add_response( - url=f"{base_url}/orchestrator_/api/JobTriggers/GetPayload/{inbox_id}", - status_code=500, - ) - - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.API, - api_resume=UiPathApiTrigger(inbox_id=inbox_id, request="test"), - ) - - with pytest.raises(UiPathFaultedTriggerError) as exc_info: - reader = UiPathResumeTriggerReader() - await reader.read_trigger(resume_trigger) - assert exc_info.value.category == ErrorCategory.SYSTEM - - @pytest.mark.anyio - async def test_read_deep_rag_trigger_successful( - self, - setup_test_env: None, - ) -> None: - """Test reading a successful deep rag trigger.""" - from uipath.platform.context_grounding import DeepRagResponse - from uipath.platform.context_grounding.context_grounding import DeepRagContent - - task_id = "test-deep-rag-id" - content = DeepRagContent( - text="test content", - citations=[ - Citation( - ordinal=1, page_number=1, source="source", reference="reference" - ) - ], - ) - mock_deep_rag = DeepRagResponse( - name="test-deep-rag", - created_date="2024-01-01", - last_deep_rag_status=DeepRagStatus.SUCCESSFUL, - content=content, - ) - mock_retrieve_async = AsyncMock(return_value=mock_deep_rag) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.retrieve_deep_rag_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.DEEP_RAG, - item_key=task_id, - folder_key="test-folder", - folder_path="test-path", - payload={"index_name": "test-index"}, - ) - reader = UiPathResumeTriggerReader() - result = await reader.read_trigger(resume_trigger) - expected_content = content.model_dump() - expected_content["deepRagId"] = task_id - assert result == expected_content - mock_retrieve_async.assert_called_once_with( - task_id, - index_name="test-index", - ) - - @pytest.mark.anyio - async def test_read_deep_rag_trigger_pending( - self, - setup_test_env: None, - ) -> None: - """Test reading a pending deep rag trigger raises pending error.""" - from uipath.core.errors import UiPathPendingTriggerError - from uipath.platform.context_grounding import DeepRagResponse - - task_id = "test-deep-rag-id" - mock_deep_rag = DeepRagResponse( - name="test-deep-rag", - created_date="2024-01-01", - last_deep_rag_status=DeepRagStatus.QUEUED, - content=None, - ) - mock_retrieve_async = AsyncMock(return_value=mock_deep_rag) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.retrieve_deep_rag_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.DEEP_RAG, - item_key=task_id, - folder_key="test-folder", - folder_path="test-path", - ) - - with pytest.raises(UiPathPendingTriggerError): - reader = UiPathResumeTriggerReader() - await reader.read_trigger(resume_trigger) - - @pytest.mark.anyio - async def test_read_deep_rag_trigger_failed( - self, - setup_test_env: None, - ) -> None: - """Test reading a failed deep rag trigger raises faulted error.""" - from uipath.platform.context_grounding import DeepRagResponse - - task_id = "test-deep-rag-id" - mock_deep_rag = DeepRagResponse( - name="test-deep-rag", - created_date="2024-01-01", - last_deep_rag_status=DeepRagStatus.FAILED, - content=None, - ) - mock_retrieve_async = AsyncMock(return_value=mock_deep_rag) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.retrieve_deep_rag_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.DEEP_RAG, - item_key=task_id, - folder_key="test-folder", - folder_path="test-path", - ) - - with pytest.raises(UiPathFaultedTriggerError) as exc_info: - reader = UiPathResumeTriggerReader() - await reader.read_trigger(resume_trigger) - assert exc_info.value.category == ErrorCategory.USER - - @pytest.mark.anyio - async def test_read_deep_rag_trigger_empty_response( - self, - setup_test_env: None, - ) -> None: - """Test reading a deep rag trigger with empty content returns placeholder.""" - from uipath.platform.context_grounding import DeepRagResponse - - task_id = "test-deep-rag-id" - mock_deep_rag = DeepRagResponse( - name="test-deep-rag", - created_date="2024-01-01", - last_deep_rag_status=DeepRagStatus.SUCCESSFUL, - content=None, - ) - mock_retrieve_async = AsyncMock(return_value=mock_deep_rag) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.retrieve_deep_rag_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.DEEP_RAG, - item_key=task_id, - folder_key="test-folder", - folder_path="test-path", - ) - reader = UiPathResumeTriggerReader() - result = await reader.read_trigger(resume_trigger) - assert result == { - "status": DeepRagStatus.SUCCESSFUL.value, - PropertyName.INTERNAL.value: TriggerMarker.NO_CONTENT.value, - } - - @pytest.mark.anyio - async def test_read_batch_rag_trigger_successful( - self, - setup_test_env: None, - ) -> None: - """Test reading a successful batch rag trigger.""" - import os - - task_id = "test-batch-rag-id" - destination_path = "test/output.xlsx" - mock_download_async = AsyncMock(return_value=None) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.download_batch_transform_result_async", - new=mock_download_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.BATCH_RAG, - item_key=task_id, - folder_key="test-folder", - folder_path="test-path", - payload={ - "index_name": "test-index", - "destination_path": destination_path, - }, - ) - reader = UiPathResumeTriggerReader() - result = await reader.read_trigger(resume_trigger) - assert ( - result - == f"Batch transform completed. Modified file available at {os.path.abspath(destination_path)}" - ) - mock_download_async.assert_called_once_with( - task_id, - destination_path, - validate_status=True, - index_name="test-index", - ) - - @pytest.mark.anyio - async def test_read_batch_rag_trigger_pending( - self, - setup_test_env: None, - ) -> None: - """Test reading a pending batch rag trigger raises pending error.""" - from uipath.core.errors import UiPathPendingTriggerError - from uipath.platform.errors import BatchTransformNotCompleteException - - task_id = "test-batch-rag-id" - destination_path = "test/output.xlsx" - mock_download_async = AsyncMock( - side_effect=BatchTransformNotCompleteException(task_id, "InProgress") - ) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.download_batch_transform_result_async", - new=mock_download_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.BATCH_RAG, - item_key=task_id, - folder_key="test-folder", - folder_path="test-path", - payload={ - "index_name": "test-index", - "destination_path": destination_path, - }, - ) - - with pytest.raises(UiPathPendingTriggerError): - reader = UiPathResumeTriggerReader() - await reader.read_trigger(resume_trigger) - - @pytest.mark.anyio - async def test_read_ephemeral_index_trigger_successful( - self, - setup_test_env: None, - ) -> None: - """Test reading a successful ephemeral index trigger.""" - index_id = "test-ephemeral-index-id" - index_data = { - "id": index_id, - "name": "test-index", - "lastIngestionStatus": IndexStatus.SUCCESSFUL.value, - } - - mock_retrieve_by_id = AsyncMock(return_value=index_data) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.retrieve_by_id_async", - new=mock_retrieve_by_id, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.INDEX_INGESTION, - item_key=index_id, - ) - reader = UiPathResumeTriggerReader() - result = await reader.read_trigger(resume_trigger) - - assert isinstance(result, dict) - assert result["id"] == index_id - mock_retrieve_by_id.assert_called_once_with(index_id) - - @pytest.mark.anyio - async def test_read_ephemeral_index_trigger_pending( - self, - setup_test_env: None, - ) -> None: - """Test reading a pending ephemeral index trigger raises pending error.""" - from uipath.core.errors import UiPathPendingTriggerError - - index_id = "test-ephemeral-index-id" - index_data = { - "id": index_id, - "name": "test-index", - "lastIngestionStatus": IndexStatus.IN_PROGRESS.value, - } - - mock_retrieve_by_id = AsyncMock(return_value=index_data) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.retrieve_by_id_async", - new=mock_retrieve_by_id, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.INDEX_INGESTION, - item_key=index_id, - ) - - with pytest.raises(UiPathPendingTriggerError): - reader = UiPathResumeTriggerReader() - await reader.read_trigger(resume_trigger) - - @pytest.mark.anyio - async def test_read_ephemeral_index_trigger_failed( - self, - setup_test_env: None, - ) -> None: - """Test reading a failed ephemeral index trigger raises faulted error.""" - index_id = "test-ephemeral-index-id" - index_data = { - "id": index_id, - "name": "test-index", - "lastIngestionStatus": IndexStatus.FAILED.value, - } - - mock_retrieve_by_id = AsyncMock(return_value=index_data) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.retrieve_by_id_async", - new=mock_retrieve_by_id, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.INDEX_INGESTION, - item_key=index_id, - ) - - with pytest.raises(UiPathFaultedTriggerError) as exc_info: - reader = UiPathResumeTriggerReader() - await reader.read_trigger(resume_trigger) - assert exc_info.value.category == ErrorCategory.USER - - @pytest.mark.anyio - async def test_read_ixp_vs_escalation_trigger_successful( - self, - setup_test_env: None, - ) -> None: - """Test reading a successful IXP VS escalation trigger.""" - operation_id = "test-escalation-id" - project_id = "test-project-id" - tag = "test-tag" - action_data = { - "status": TaskStatus.COMPLETED.name.lower(), - "data": {"field": "value"}, - } - - mock_escalation = ValidateExtractionAction( - action_data=action_data, - action_status=TaskStatus.COMPLETED.name, - project_id=project_id, - project_type=ProjectType.IXP, - extractor_id=None, - tag=tag, - operation_id=operation_id, - document_type_id="test-doc-type", - ) - mock_retrieve_async = AsyncMock(return_value=mock_escalation) - - with patch( - "uipath.platform.documents._documents_service.DocumentsService.retrieve_ixp_extraction_validation_result_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.IXP_VS_ESCALATION, - item_key=operation_id, - payload={"project_id": project_id, "tag": tag}, - ) - reader = UiPathResumeTriggerReader() - result = await reader.read_trigger(resume_trigger) - assert result == mock_escalation.model_dump() - mock_retrieve_async.assert_called_once_with( - project_id, - tag, - operation_id, - ) - - @pytest.mark.anyio - async def test_read_ixp_vs_escalation_trigger_pending( - self, - setup_test_env: None, - ) -> None: - """Test reading a pending IXP VS escalation trigger raises pending error.""" - from uipath.core.errors import UiPathPendingTriggerError - - operation_id = "test-escalation-id" - project_id = "test-project-id" - tag = "test-tag" - action_data = { - "status": TaskStatus.PENDING.name.lower(), - "data": {"field": "value"}, - } - - mock_escalation = ValidateExtractionAction( - action_data=action_data, - action_status=TaskStatus.PENDING.name, - project_id=project_id, - project_type=ProjectType.IXP, - extractor_id=None, - tag=tag, - operation_id=operation_id, - document_type_id="test-doc-type", - ) - mock_retrieve_async = AsyncMock(return_value=mock_escalation) - - with patch( - "uipath.platform.documents._documents_service.DocumentsService.retrieve_ixp_extraction_validation_result_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.IXP_VS_ESCALATION, - item_key=operation_id, - payload={"project_id": project_id, "tag": tag}, - ) - - with pytest.raises(UiPathPendingTriggerError): - reader = UiPathResumeTriggerReader() - await reader.read_trigger(resume_trigger) - - @pytest.mark.anyio - async def test_read_ixp_vs_escalation_trigger_unassigned( - self, - setup_test_env: None, - ) -> None: - """Test reading an unassigned IXP VS escalation trigger raises pending error.""" - from uipath.core.errors import UiPathPendingTriggerError - - operation_id = "test-escalation-id" - project_id = "test-project-id" - tag = "test-tag" - action_data = { - "status": TaskStatus.UNASSIGNED.name.lower(), - "data": {"field": "value"}, - } - - mock_escalation = ValidateExtractionAction( - action_data=action_data, - action_status=TaskStatus.UNASSIGNED.name, - project_id=project_id, - project_type=ProjectType.IXP, - extractor_id=None, - tag=tag, - operation_id=operation_id, - document_type_id="test-doc-type", - ) - mock_retrieve_async = AsyncMock(return_value=mock_escalation) - - with patch( - "uipath.platform.documents._documents_service.DocumentsService.retrieve_ixp_extraction_validation_result_async", - new=mock_retrieve_async, - ): - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.IXP_VS_ESCALATION, - item_key=operation_id, - payload={"project_id": project_id, "tag": tag}, - ) - - with pytest.raises(UiPathPendingTriggerError): - reader = UiPathResumeTriggerReader() - await reader.read_trigger(resume_trigger) - - -class TestHitlProcessor: - """Tests for the HitlProcessor class.""" - - @pytest.mark.anyio - async def test_create_resume_trigger_create_task( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for CreateTask.""" - action_key = "test-action-key" - create_action = CreateTask( - title="Test Action", - app_name="TestApp", - app_folder_path="/test/path", - data={"input": "test-input"}, - ) - - mock_action = Task(key=action_key) - mock_create_async = AsyncMock(return_value=mock_action) - - with patch( - "uipath.platform.action_center._tasks_service.TasksService.create_async", - new=mock_create_async, - ): - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(create_action) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.TASK - assert resume_trigger.item_key == action_key - assert resume_trigger.folder_path == create_action.app_folder_path - mock_create_async.assert_called_once_with( - title=create_action.title, - app_name=create_action.app_name, - app_folder_path=create_action.app_folder_path, - app_folder_key="", - app_key="", - assignee="", - recipient="", - data=create_action.data, - priority=None, - labels=None, - is_actionable_message_enabled=None, - actionable_message_metadata=None, - source_name="Agent", - ) - - @pytest.mark.anyio - async def test_create_resume_trigger_wait_task( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for WaitTask.""" - action_key = "test-action-key" - action = Task(key=action_key) - wait_action = WaitTask(action=action, app_folder_path="/test/path") - - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(wait_action) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.TASK - assert resume_trigger.item_key == action_key - assert resume_trigger.folder_path == wait_action.app_folder_path - - @pytest.mark.anyio - async def test_create_resume_trigger_invoke_process( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for InvokeProcess.""" - job_key = "test-job-key" - invoke_process = InvokeProcess( - name="TestProcess", - process_folder_path="/test/path", - input_arguments={"key": "value"}, - ) - - mock_job = Job( - id=1234, key=job_key, folder_key="d0e09040-5997-44e1-93b7-4087689521b7" - ) - mock_invoke = AsyncMock(return_value=mock_job) - - with patch( - "uipath.platform.orchestrator._processes_service.ProcessesService.invoke_async", - new=mock_invoke, - ) as mock_process_invoke_async: - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(invoke_process) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.JOB - assert resume_trigger.item_key == job_key - assert resume_trigger.folder_path == invoke_process.process_folder_path - mock_process_invoke_async.assert_called_once_with( - name=invoke_process.name, - input_arguments=invoke_process.input_arguments, - attachments=None, - folder_path=invoke_process.process_folder_path, - folder_key=None, - ) - - @pytest.mark.anyio - async def test_create_resume_trigger_wait_job( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for WaitJob.""" - job_key = "test-job-key" - job = Job( - id=1234, key=job_key, folder_key="d0e09040-5997-44e1-93b7-4087689521b7" - ) - wait_job = WaitJob(job=job, process_folder_path="/test/path") - - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(wait_job) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.JOB - assert resume_trigger.item_key == job_key - assert resume_trigger.folder_path == wait_job.process_folder_path - - @pytest.mark.anyio - async def test_create_resume_trigger_invoke_system_agent( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for InvokeSystemAgent.""" - job_key = "test-system-agent-job-key" - invoke_system_agent = InvokeSystemAgent( - agent_name="template-filler-agent", - entrypoint="main", - folder_path="/test/path", - folder_key="test-folder-key", - input_arguments={"metadata_id": "123", "template_id": "456"}, - ) - - mock_invoke_system_agent = AsyncMock(return_value=job_key) - - with patch( - "uipath.platform.agenthub._agenthub_service.AgentHubService.invoke_system_agent_async", - new=mock_invoke_system_agent, - ) as mock_agent_invoke: - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(invoke_system_agent) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.JOB - assert resume_trigger.item_key == job_key - assert resume_trigger.folder_path == invoke_system_agent.folder_path - assert resume_trigger.folder_key == invoke_system_agent.folder_key - mock_agent_invoke.assert_called_once_with( - agent_name=invoke_system_agent.agent_name, - entrypoint=invoke_system_agent.entrypoint, - input_arguments=invoke_system_agent.input_arguments, - folder_path=invoke_system_agent.folder_path, - folder_key=invoke_system_agent.folder_key, - ) - - @pytest.mark.anyio - async def test_create_resume_trigger_wait_system_agent( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for WaitSystemAgent.""" - job_key = "1234" - wait_system_agent = WaitSystemAgent( - job_key="1234", - process_folder_path="/test/path", - process_folder_key="test-folder-key", - ) - - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(wait_system_agent) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.JOB - assert resume_trigger.item_key == job_key - assert resume_trigger.folder_path == wait_system_agent.process_folder_path - assert resume_trigger.folder_key == wait_system_agent.process_folder_key - - @pytest.mark.anyio - async def test_create_resume_trigger_api( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for API type.""" - api_input = "payload" - - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(api_input) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.API - assert resume_trigger.api_resume is not None - assert isinstance(resume_trigger.api_resume.inbox_id, str) - assert resume_trigger.api_resume.request == api_input - - @pytest.mark.anyio - async def test_create_resume_trigger_create_deep_rag( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for CreateDeepRag.""" - deep_rag_id = "test-deep-rag-id" - create_deep_rag = CreateDeepRag( - name="test-deep-rag", - index_name="test-index", - prompt="test prompt", - glob_pattern="**/*.pdf", - citation_mode=CitationMode.INLINE, - index_folder_path="/test/path", - ) - - mock_deep_rag = DeepRagCreationResponse( - id=deep_rag_id, - last_deep_rag_status=DeepRagStatus.QUEUED, - created_date="2024-01-01", - ) - mock_start_deep_rag_async = AsyncMock(return_value=mock_deep_rag) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.start_deep_rag_async", - new=mock_start_deep_rag_async, - ): - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(create_deep_rag) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.DEEP_RAG - assert resume_trigger.item_key == deep_rag_id - mock_start_deep_rag_async.assert_called_once_with( - name=create_deep_rag.name, - index_name=create_deep_rag.index_name, - index_id=None, - prompt=create_deep_rag.prompt, - glob_pattern=create_deep_rag.glob_pattern, - citation_mode=create_deep_rag.citation_mode, - folder_path=create_deep_rag.index_folder_path, - folder_key=create_deep_rag.index_folder_key, - ) - - @pytest.mark.anyio - async def test_create_resume_trigger_wait_deep_rag( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for WaitDeepRag.""" - deep_rag_id = "test-deep-rag-id" - deep_rag = DeepRagCreationResponse( - id=deep_rag_id, - last_deep_rag_status=DeepRagStatus.IN_PROGRESS, - created_date="2024-01-01", - ) - wait_deep_rag = WaitDeepRag(deep_rag=deep_rag, index_folder_path="/test/path") - - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(wait_deep_rag) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.DEEP_RAG - assert resume_trigger.item_key == deep_rag_id - - @pytest.mark.anyio - async def test_create_resume_trigger_create_batch_transform( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for CreateBatchTransform.""" - batch_transform_id = "test-batch-transform-id" - output_columns = [ - BatchTransformOutputColumn(name="column1", description="desc1"), - ] - create_batch_transform = CreateBatchTransform( - name="test-batch-transform", - index_name="test-index", - prompt="test prompt", - output_columns=output_columns, - destination_path="/output/path.xlsx", - index_folder_path="/test/path", - ) - - mock_batch_transform = BatchTransformCreationResponse( - id=batch_transform_id, - last_batch_rag_status=DeepRagStatus.QUEUED, - ) - mock_start_batch_transform = AsyncMock(return_value=mock_batch_transform) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.start_batch_transform_async", - new=mock_start_batch_transform, - ): - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(create_batch_transform) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.BATCH_RAG - assert resume_trigger.item_key == batch_transform_id - mock_start_batch_transform.assert_called_once_with( - name=create_batch_transform.name, - index_name=create_batch_transform.index_name, - index_id=None, - prompt=create_batch_transform.prompt, - output_columns=create_batch_transform.output_columns, - storage_bucket_folder_path_prefix=create_batch_transform.storage_bucket_folder_path_prefix, - enable_web_search_grounding=create_batch_transform.enable_web_search_grounding, - folder_path=create_batch_transform.index_folder_path, - folder_key=create_batch_transform.index_folder_key, - ) - - @pytest.mark.anyio - async def test_create_resume_trigger_wait_batch_transform( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for WaitBatchTransform.""" - batch_transform_id = "test-batch-transform-id" - batch_transform = BatchTransformCreationResponse( - id=batch_transform_id, - last_batch_rag_status=DeepRagStatus.IN_PROGRESS, - ) - wait_batch_transform = WaitBatchTransform( - batch_transform=batch_transform, index_folder_path="/test/path" - ) - - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(wait_batch_transform) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.BATCH_RAG - assert resume_trigger.item_key == batch_transform_id - - @pytest.mark.anyio - async def test_create_resume_trigger_create_ephemeral_index( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for CreateEphemeralIndex.""" - index_id = "test-ephemeral-index-id" - attachments = ["attachment-uuid-1", "attachment-uuid-2"] - create_ephemeral_index = CreateEphemeralIndex( - usage=EphemeralIndexUsage.DEEP_RAG, - attachments=attachments, - ) - - mock_index = ContextGroundingIndex( - id=index_id, - name="ephemeral-index", - last_ingestion_status=IndexStatus.QUEUED.value, - ) - mock_create_ephemeral_index = AsyncMock(return_value=mock_index) - - with patch( - "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.create_ephemeral_index_async", - new=mock_create_ephemeral_index, - ): - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(create_ephemeral_index) - - assert resume_trigger is not None - assert ( - resume_trigger.trigger_type == UiPathResumeTriggerType.INDEX_INGESTION - ) - assert resume_trigger.item_key == index_id - mock_create_ephemeral_index.assert_called_once_with( - usage=create_ephemeral_index.usage, - attachments=create_ephemeral_index.attachments, - ) - - @pytest.mark.anyio - async def test_create_resume_trigger_wait_ephemeral_index( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for WaitEphemeralIndex.""" - index_id = "test-ephemeral-index-id" - ephemeral_index = ContextGroundingIndex( - id=index_id, - name="ephemeral-index", - last_ingestion_status=IndexStatus.IN_PROGRESS.value, - ) - wait_ephemeral_index = WaitEphemeralIndex(index=ephemeral_index) - - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(wait_ephemeral_index) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.INDEX_INGESTION - assert resume_trigger.item_key == index_id - - @pytest.mark.anyio - async def test_create_resume_trigger_document_extraction_validation( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for DocumentExtractionValidation.""" - operation_id = "test-validation-operation-id" - project_id = "test-project-id" - tag = "test-tag" - - # Create a mock extraction response - extraction_result = ExtractionResult( - document_id="doc-123", - results_version=1, - results_document={}, - ) - mock_extraction_response = ExtractionResponseIXP( - extraction_result=extraction_result, - project_id=project_id, - project_type=ProjectType.IXP, - extractor_id=None, - tag=tag, - document_type_id="doc-type-123", - data_projection=[ - FieldGroupValueProjection( - field_group_name="test_group", - field_values=[ - FieldValueProjection( - id="field-1", - name="test_field", - value="test_value", - unformatted_value="test_value", - confidence=0.95, - ocr_confidence=0.98, - type=FieldType.TEXT, - ) - ], - ) - ], - ) - - document_extraction_validation = DocumentExtractionValidation( - extraction_response=mock_extraction_response, - action_title="Test Validation", - ) - - mock_validation_response = StartExtractionValidationResponse( - operation_id=operation_id, - document_id="doc-123", - project_id=project_id, - tag=tag, - ) - mock_start_validation = AsyncMock(return_value=mock_validation_response) - - with patch( - "uipath.platform.documents._documents_service.DocumentsService.start_ixp_extraction_validation_async", - new=mock_start_validation, - ): - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger( - document_extraction_validation - ) - - assert resume_trigger is not None - assert ( - resume_trigger.trigger_type == UiPathResumeTriggerType.IXP_VS_ESCALATION - ) - assert resume_trigger.item_key == operation_id - # Verify the payload contains project_id and tag - assert isinstance(resume_trigger.payload, dict) - assert resume_trigger.payload.get("project_id") == project_id - assert resume_trigger.payload.get("tag") == tag - mock_start_validation.assert_called_once_with( - extraction_response=document_extraction_validation.extraction_response, - action_title=document_extraction_validation.action_title, - action_priority=document_extraction_validation.action_priority, - action_folder=document_extraction_validation.action_folder, - storage_bucket_name=document_extraction_validation.storage_bucket_name, - storage_bucket_directory_path=document_extraction_validation.storage_bucket_directory_path, - ) - - @pytest.mark.anyio - async def test_create_resume_trigger_wait_document_extraction_validation( - self, - setup_test_env: None, - ) -> None: - """Test creating a resume trigger for WaitDocumentExtractionValidation.""" - operation_id = "test-validation-operation-id" - project_id = "test-project-id" - tag = "test-tag" - - validation_response = StartExtractionValidationResponse( - operation_id=operation_id, - document_id="doc-123", - project_id=project_id, - tag=tag, - ) - wait_validation = WaitDocumentExtractionValidation( - extraction_validation=validation_response - ) - - processor = UiPathResumeTriggerCreator() - resume_trigger = await processor.create_trigger(wait_validation) - - assert resume_trigger is not None - assert resume_trigger.trigger_type == UiPathResumeTriggerType.IXP_VS_ESCALATION - assert resume_trigger.item_key == operation_id - - -class TestDocumentExtractionModels: - """Tests for document extraction models.""" - - def test_create_document_extraction_with_file(self) -> None: - """Test DocumentExtraction with file provided.""" - file_content = b"test content" - extraction = DocumentExtraction( - project_name="test_project", - tag="test_tag", - file=file_content, - ) - - assert extraction.project_name == "test_project" - assert extraction.tag == "test_tag" - assert extraction.file == file_content - assert extraction.file_path is None - - def test_create_document_extraction_with_file_path(self) -> None: - """Test DocumentExtraction with file_path provided.""" - extraction = DocumentExtraction( - project_name="test_project", - tag="test_tag", - file_path="/path/to/file.pdf", - ) - - assert extraction.project_name == "test_project" - assert extraction.tag == "test_tag" - assert extraction.file is None - assert extraction.file_path == "/path/to/file.pdf" - - def test_create_document_extraction_with_both_raises_error(self) -> None: - """Test DocumentExtraction with both file and file_path raises ValueError.""" - file_content = b"test content" - - with pytest.raises(ValueError) as exc_info: - DocumentExtraction( - project_name="test_project", - tag="test_tag", - file=file_content, - file_path="/path/to/file.pdf", - ) - - assert "not both or neither" in str(exc_info.value) - - def test_create_document_extraction_with_neither_raises_error(self) -> None: - """Test DocumentExtraction with neither file nor file_path raises ValueError.""" - with pytest.raises(ValueError) as exc_info: - DocumentExtraction( - project_name="test_project", - tag="test_tag", - ) - - assert "not both or neither" in str(exc_info.value)