diff --git a/src/uipath/_cli/_evals/_runtime.py b/src/uipath/_cli/_evals/_runtime.py index 8db78ab43..54f52dc29 100644 --- a/src/uipath/_cli/_evals/_runtime.py +++ b/src/uipath/_cli/_evals/_runtime.py @@ -18,6 +18,7 @@ import coverage from opentelemetry import context as context_api +from opentelemetry import trace from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from pydantic import BaseModel @@ -299,60 +300,72 @@ async def execute(self) -> UiPathRuntimeResult: ) try: with self._mocker_cache(): - ( - evaluation_set, - evaluators, - evaluation_iterable, - ) = await self.initiate_evaluation(runtime) - workers = self.context.workers or 1 - assert workers >= 1 - eval_run_result_list = await execute_parallel( - evaluation_iterable, workers - ) - results = UiPathEvalOutput( - evaluation_set_name=evaluation_set.name, - evaluation_set_results=eval_run_result_list, - ) + # Create the parent "Evaluation set run" span + tracer = trace.get_tracer(__name__) + span_attributes = { + "execution.id": self.execution_id, + "span_type": "eval_set_run", + } + if self.context.eval_set_run_id: + span_attributes["eval_set_run_id"] = self.context.eval_set_run_id + with tracer.start_as_current_span( + "Evaluation Set Run", + attributes=span_attributes + ): + ( + evaluation_set, + evaluators, + evaluation_iterable, + ) = await self.initiate_evaluation(runtime) + workers = self.context.workers or 1 + assert workers >= 1 + eval_run_result_list = await execute_parallel( + evaluation_iterable, workers + ) + results = UiPathEvalOutput( + evaluation_set_name=evaluation_set.name, + evaluation_set_results=eval_run_result_list, + ) - # Computing evaluator averages - evaluator_averages: dict[str, float] = defaultdict(float) - evaluator_count: dict[str, int] = defaultdict(int) - - # Check if any eval runs failed - any_failed = False - for eval_run_result in results.evaluation_set_results: - # Check if the agent execution had an error - if ( - eval_run_result.agent_execution_output - and eval_run_result.agent_execution_output.result.error - ): - any_failed = True - - for result_dto in eval_run_result.evaluation_run_results: - evaluator_averages[result_dto.evaluator_id] += ( - result_dto.result.score - ) - evaluator_count[result_dto.evaluator_id] += 1 + # Computing evaluator averages + evaluator_averages: dict[str, float] = defaultdict(float) + evaluator_count: dict[str, int] = defaultdict(int) + + # Check if any eval runs failed + any_failed = False + for eval_run_result in results.evaluation_set_results: + # Check if the agent execution had an error + if ( + eval_run_result.agent_execution_output + and eval_run_result.agent_execution_output.result.error + ): + any_failed = True + + for result_dto in eval_run_result.evaluation_run_results: + evaluator_averages[result_dto.evaluator_id] += ( + result_dto.result.score + ) + evaluator_count[result_dto.evaluator_id] += 1 - for eval_id in evaluator_averages: - evaluator_averages[eval_id] = ( - evaluator_averages[eval_id] / evaluator_count[eval_id] + for eval_id in evaluator_averages: + evaluator_averages[eval_id] = ( + evaluator_averages[eval_id] / evaluator_count[eval_id] + ) + await self.event_bus.publish( + EvaluationEvents.UPDATE_EVAL_SET_RUN, + EvalSetRunUpdatedEvent( + execution_id=self.execution_id, + evaluator_scores=evaluator_averages, + success=not any_failed, + ), + wait_for_completion=False, ) - await self.event_bus.publish( - EvaluationEvents.UPDATE_EVAL_SET_RUN, - EvalSetRunUpdatedEvent( - execution_id=self.execution_id, - evaluator_scores=evaluator_averages, - success=not any_failed, - ), - wait_for_completion=False, - ) - result = UiPathRuntimeResult( - output={**results.model_dump(by_alias=True)}, - status=UiPathRuntimeStatus.SUCCESSFUL, - ) - return result + result = UiPathRuntimeResult( + output={**results.model_dump(by_alias=True)}, + status=UiPathRuntimeStatus.SUCCESSFUL, + ) + return result finally: await runtime.dispose() @@ -378,165 +391,176 @@ async def _execute_eval( ), ) - evaluation_run_results = EvaluationRunResult( - evaluation_name=eval_item.name, evaluation_run_results=[] - ) + # Create the "Evaluation" span for this eval item + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span( + "Evaluation", + attributes={ + "execution.id": execution_id, + "span_type": "evaluation", + "eval_item_id": eval_item.id, + "eval_item_name": eval_item.name, + } + ): + evaluation_run_results = EvaluationRunResult( + evaluation_name=eval_item.name, evaluation_run_results=[] + ) - try: try: - agent_execution_output = await self.execute_runtime( - eval_item, execution_id, runtime - ) - except Exception as e: - if self.context.verbose: - if isinstance(e, EvaluationRuntimeException): - spans = e.spans - logs = e.logs - execution_time = e.execution_time - loggable_error = e.root_exception - else: - spans = [] - logs = [] - execution_time = 0 - loggable_error = e - - error_info = UiPathErrorContract( - code="RUNTIME_SHUTDOWN_ERROR", - title="Runtime shutdown failed", - detail=f"Error: {str(loggable_error)}", - category=UiPathErrorCategory.UNKNOWN, - ) - error_result = UiPathRuntimeResult( - status=UiPathRuntimeStatus.FAULTED, - error=error_info, + try: + agent_execution_output = await self.execute_runtime( + eval_item, execution_id, runtime ) + except Exception as e: + if self.context.verbose: + if isinstance(e, EvaluationRuntimeException): + spans = e.spans + logs = e.logs + execution_time = e.execution_time + loggable_error = e.root_exception + else: + spans = [] + logs = [] + execution_time = 0 + loggable_error = e + + error_info = UiPathErrorContract( + code="RUNTIME_SHUTDOWN_ERROR", + title="Runtime shutdown failed", + detail=f"Error: {str(loggable_error)}", + category=UiPathErrorCategory.UNKNOWN, + ) + error_result = UiPathRuntimeResult( + status=UiPathRuntimeStatus.FAULTED, + error=error_info, + ) + evaluation_run_results.agent_execution_output = ( + convert_eval_execution_output_to_serializable( + UiPathEvalRunExecutionOutput( + execution_time=execution_time, + result=error_result, + spans=spans, + logs=logs, + ) + ) + ) + raise + + if self.context.verbose: evaluation_run_results.agent_execution_output = ( convert_eval_execution_output_to_serializable( - UiPathEvalRunExecutionOutput( - execution_time=execution_time, - result=error_result, - spans=spans, - logs=logs, - ) + agent_execution_output ) ) - raise - - if self.context.verbose: - evaluation_run_results.agent_execution_output = ( - convert_eval_execution_output_to_serializable( - agent_execution_output + evaluation_item_results: list[EvalItemResult] = [] + + for evaluator in evaluators: + if evaluator.id not in eval_item.evaluation_criterias: + # Skip! + continue + evaluation_criteria = eval_item.evaluation_criterias[evaluator.id] + + evaluation_result = await self.run_evaluator( + evaluator=evaluator, + execution_output=agent_execution_output, + eval_item=eval_item, + evaluation_criteria=evaluator.evaluation_criteria_type( + **evaluation_criteria + ) + if evaluation_criteria + else evaluator.evaluator_config.default_evaluation_criteria, ) - ) - evaluation_item_results: list[EvalItemResult] = [] - for evaluator in evaluators: - if evaluator.id not in eval_item.evaluation_criterias: - # Skip! - continue - evaluation_criteria = eval_item.evaluation_criterias[evaluator.id] - - evaluation_result = await self.run_evaluator( - evaluator=evaluator, - execution_output=agent_execution_output, - eval_item=eval_item, - evaluation_criteria=evaluator.evaluation_criteria_type( - **evaluation_criteria + dto_result = EvaluationResultDto.from_evaluation_result( + evaluation_result ) - if evaluation_criteria - else evaluator.evaluator_config.default_evaluation_criteria, - ) - - dto_result = EvaluationResultDto.from_evaluation_result( - evaluation_result - ) - evaluation_run_results.evaluation_run_results.append( - EvaluationRunResultDto( - evaluator_name=evaluator.name, - result=dto_result, - evaluator_id=evaluator.id, + evaluation_run_results.evaluation_run_results.append( + EvaluationRunResultDto( + evaluator_name=evaluator.name, + result=dto_result, + evaluator_id=evaluator.id, + ) ) - ) - evaluation_item_results.append( - EvalItemResult( - evaluator_id=evaluator.id, - result=evaluation_result, + evaluation_item_results.append( + EvalItemResult( + evaluator_id=evaluator.id, + result=evaluation_result, + ) ) + + exception_details = None + agent_output = agent_execution_output.result.output + if agent_execution_output.result.status == UiPathRuntimeStatus.FAULTED: + error = agent_execution_output.result.error + if error is not None: + # we set the exception details for the run event + # Convert error contract to exception + error_exception = Exception( + f"{error.title}: {error.detail} (code: {error.code})" + ) + exception_details = EvalItemExceptionDetails( + exception=error_exception + ) + agent_output = error.model_dump() + + await self.event_bus.publish( + EvaluationEvents.UPDATE_EVAL_RUN, + EvalRunUpdatedEvent( + execution_id=execution_id, + eval_item=eval_item, + eval_results=evaluation_item_results, + success=not agent_execution_output.result.error, + agent_output=agent_output, + agent_execution_time=agent_execution_output.execution_time, + spans=agent_execution_output.spans, + logs=agent_execution_output.logs, + exception_details=exception_details, + ), + wait_for_completion=False, ) - exception_details = None - agent_output = agent_execution_output.result.output - if agent_execution_output.result.status == UiPathRuntimeStatus.FAULTED: - error = agent_execution_output.result.error - if error is not None: - # we set the exception details for the run event - # Convert error contract to exception - error_exception = Exception( - f"{error.title}: {error.detail} (code: {error.code})" - ) - exception_details = EvalItemExceptionDetails( - exception=error_exception + except Exception as e: + exception_details = EvalItemExceptionDetails(exception=e) + + for evaluator in evaluators: + evaluation_run_results.evaluation_run_results.append( + EvaluationRunResultDto( + evaluator_name=evaluator.name, + evaluator_id=evaluator.id, + result=EvaluationResultDto(score=0), + ) ) - agent_output = error.model_dump() - await self.event_bus.publish( - EvaluationEvents.UPDATE_EVAL_RUN, - EvalRunUpdatedEvent( + eval_run_updated_event = EvalRunUpdatedEvent( execution_id=execution_id, eval_item=eval_item, - eval_results=evaluation_item_results, - success=not agent_execution_output.result.error, - agent_output=agent_output, - agent_execution_time=agent_execution_output.execution_time, - spans=agent_execution_output.spans, - logs=agent_execution_output.logs, + eval_results=[], + success=False, + agent_output={}, + agent_execution_time=0.0, exception_details=exception_details, - ), - wait_for_completion=False, - ) - - except Exception as e: - exception_details = EvalItemExceptionDetails(exception=e) - - for evaluator in evaluators: - evaluation_run_results.evaluation_run_results.append( - EvaluationRunResultDto( - evaluator_name=evaluator.name, - evaluator_id=evaluator.id, - result=EvaluationResultDto(score=0), - ) + spans=[], + logs=[], ) + if isinstance(e, EvaluationRuntimeException): + eval_run_updated_event.spans = e.spans + eval_run_updated_event.logs = e.logs + if eval_run_updated_event.exception_details: + eval_run_updated_event.exception_details.exception = ( + e.root_exception + ) + eval_run_updated_event.exception_details.runtime_exception = True - eval_run_updated_event = EvalRunUpdatedEvent( - execution_id=execution_id, - eval_item=eval_item, - eval_results=[], - success=False, - agent_output={}, - agent_execution_time=0.0, - exception_details=exception_details, - spans=[], - logs=[], - ) - if isinstance(e, EvaluationRuntimeException): - eval_run_updated_event.spans = e.spans - eval_run_updated_event.logs = e.logs - if eval_run_updated_event.exception_details: - eval_run_updated_event.exception_details.exception = ( - e.root_exception - ) - eval_run_updated_event.exception_details.runtime_exception = True - - await self.event_bus.publish( - EvaluationEvents.UPDATE_EVAL_RUN, - eval_run_updated_event, - wait_for_completion=False, - ) - finally: - clear_execution_context() + await self.event_bus.publish( + EvaluationEvents.UPDATE_EVAL_RUN, + eval_run_updated_event, + wait_for_completion=False, + ) + finally: + clear_execution_context() - return evaluation_run_results + return evaluation_run_results async def _generate_input_for_eval( self, eval_item: EvaluationItem, runtime: UiPathRuntimeProtocol @@ -678,26 +702,37 @@ async def run_evaluator( *, evaluation_criteria: Any, ) -> EvaluationResult: - output_data: dict[str, Any] | str = {} - if execution_output.result.output: - if isinstance(execution_output.result.output, BaseModel): - output_data = execution_output.result.output.model_dump() - else: - output_data = execution_output.result.output - - agent_execution = AgentExecution( - agent_input=eval_item.inputs, - agent_output=output_data, - agent_trace=execution_output.spans, - expected_agent_behavior=eval_item.expected_agent_behavior, - ) - - result = await evaluator.validate_and_evaluate_criteria( - agent_execution=agent_execution, - evaluation_criteria=evaluation_criteria, - ) + # Create span for evaluator execution + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span( + f"Evaluator: {evaluator.name}", + attributes={ + "span_type": "evaluator", + "evaluator_id": evaluator.id, + "evaluator_name": evaluator.name, + "eval_item_id": eval_item.id, + } + ): + output_data: dict[str, Any] | str = {} + if execution_output.result.output: + if isinstance(execution_output.result.output, BaseModel): + output_data = execution_output.result.output.model_dump() + else: + output_data = execution_output.result.output + + agent_execution = AgentExecution( + agent_input=eval_item.inputs, + agent_output=output_data, + agent_trace=execution_output.spans, + expected_agent_behavior=eval_item.expected_agent_behavior, + ) - return result + result = await evaluator.validate_and_evaluate_criteria( + agent_execution=agent_execution, + evaluation_criteria=evaluation_criteria, + ) + + return result async def _get_agent_model(self, runtime: UiPathRuntimeProtocol) -> str | None: """Get agent model from the runtime. diff --git a/src/uipath/_cli/cli_eval.py b/src/uipath/_cli/cli_eval.py index 736d82ae8..7de710106 100644 --- a/src/uipath/_cli/cli_eval.py +++ b/src/uipath/_cli/cli_eval.py @@ -19,7 +19,7 @@ from uipath._utils._bindings import ResourceOverwritesContext from uipath.eval._helpers import auto_discover_entrypoint from uipath.platform.common import UiPathConfig -from uipath.tracing import LlmOpsHttpExporter +from uipath.tracing import JsonLinesFileExporter, LlmOpsHttpExporter from ._utils._console import ConsoleLogger from ._utils._eval_set import EvalHelpers @@ -98,6 +98,12 @@ def setup_reporting_prereq(no_report: bool) -> bool: default="default", help="Model settings ID from evaluation set to override agent settings (default: 'default')", ) +@click.option( + "--trace-file", + required=False, + type=click.Path(exists=False), + help="File path where the trace data will be exported for local testing", +) def eval( entrypoint: str | None, eval_set: str | None, @@ -109,6 +115,7 @@ def eval( enable_mocker_cache: bool, report_coverage: bool, model_settings_id: str, + trace_file: str | None, ) -> None: """Run an evaluation set against the agent. @@ -122,6 +129,7 @@ def eval( enable_mocker_cache: Enable caching for LLM mocker responses report_coverage: Report evaluation coverage model_settings_id: Model settings ID to override agent settings + trace_file: File path where the trace data will be exported for local testing """ should_register_progress_reporter = setup_reporting_prereq(no_report) @@ -179,6 +187,10 @@ async def execute_eval(): ) as ctx: if ctx.job_id: trace_manager.add_span_exporter(LlmOpsHttpExporter()) + + # Add local file exporter if trace_file is specified + if trace_file: + trace_manager.add_span_exporter(JsonLinesFileExporter(trace_file)) project_id = UiPathConfig.project_id diff --git a/tests/cli/eval/test_evaluate.py b/tests/cli/eval/test_evaluate.py index 9a473b3ab..0a3efed4d 100644 --- a/tests/cli/eval/test_evaluate.py +++ b/tests/cli/eval/test_evaluate.py @@ -191,3 +191,163 @@ async def dispose(self) -> None: # Should be a valid UUID format (36 characters with dashes) assert len(runtime.execution_id) == 36 assert runtime.execution_id.count("-") == 4 + + +async def test_eval_with_trace_manager_integration(): + """Test that evaluation runs work correctly with trace manager integration for hierarchical traces.""" + # Arrange + event_bus = EventBus() + trace_manager = UiPathTraceManager() + context = UiPathEvalContext() + + # Point to the existing test eval set + context.eval_set = str( + Path(__file__).parent / "evals" / "eval-sets" / "default.json" + ) + + async def test_agent(input: dict[str, Any]) -> dict[str, Any]: + """Simple test agent that returns the input.""" + return input + + # Mock runtime and factory + class TestRuntime: + def __init__(self, executor): + self.executor = executor + + async def execute( + self, + input: dict[str, Any] | None = None, + options: UiPathExecuteOptions | None = None, + ) -> UiPathRuntimeResult: + result = await self.executor(input or {}) + return UiPathRuntimeResult( + output=result, + status=UiPathRuntimeStatus.SUCCESSFUL, + ) + + async def stream( + self, + input: dict[str, Any] | None = None, + options: UiPathStreamOptions | None = None, + ) -> AsyncGenerator[UiPathRuntimeEvent, None]: + result = await self.executor(input or {}) + yield UiPathRuntimeResult( + output=result, + status=UiPathRuntimeStatus.SUCCESSFUL, + ) + + async def get_schema(self) -> UiPathRuntimeSchema: + return UiPathRuntimeSchema( + filePath="test.py", + uniqueId="test", + type="workflow", + input={"type": "object", "properties": {}}, + output={"type": "object", "properties": {}}, + ) + + async def dispose(self) -> None: + pass + + class TestFactory: + def __init__(self, executor): + self.executor = executor + + def discover_entrypoints(self) -> list[str]: + return ["test"] + + async def discover_runtimes(self) -> list[UiPathRuntimeProtocol]: + return [TestRuntime(self.executor)] + + async def new_runtime( + self, entrypoint: str, runtime_id: str, **kwargs + ) -> UiPathRuntimeProtocol: + return TestRuntime(self.executor) + + async def dispose(self) -> None: + pass + + factory = TestFactory(test_agent) + + # Act + result = await evaluate(factory, trace_manager, context, event_bus) + + # Assert + assert result.output is not None + + # Verify the result structure indicates hierarchical evaluation worked + output_dict = ( + result.output.model_dump() + if hasattr(result.output, 'model_dump') + else result.output + ) + assert isinstance(output_dict, dict) + assert "evaluationSetResults" in output_dict + + # Verify evaluation ran successfully + eval_results = output_dict["evaluationSetResults"] + assert len(eval_results) > 0 + + for eval_set_result in eval_results: + assert "evaluationRunResults" in eval_set_result + eval_run_results = eval_set_result["evaluationRunResults"] + assert len(eval_run_results) > 0 + + # Each evaluation should have run successfully + for eval_run_result in eval_run_results: + assert "result" in eval_run_result + assert "score" in eval_run_result["result"] + + +async def test_eval_runtime_with_trace_manager(): + """Test that UiPathEvalRuntime works correctly with trace manager.""" + # Arrange + event_bus = EventBus() + trace_manager = UiPathTraceManager() + context = UiPathEvalContext() + context.eval_set = str( + Path(__file__).parent / "evals" / "eval-sets" / "default.json" + ) + + async def identity_agent(input: dict[str, Any]) -> dict[str, Any]: + return input + + class TestRuntime: + async def execute(self, input: dict[str, Any] | None = None, options: UiPathExecuteOptions | None = None) -> UiPathRuntimeResult: + result = await identity_agent(input or {}) + return UiPathRuntimeResult(output=result, status=UiPathRuntimeStatus.SUCCESSFUL) + + async def stream(self, input: dict[str, Any] | None = None, options: UiPathStreamOptions | None = None) -> AsyncGenerator[UiPathRuntimeEvent, None]: + result = await identity_agent(input or {}) + yield UiPathRuntimeResult(output=result, status=UiPathRuntimeStatus.SUCCESSFUL) + + async def get_schema(self) -> UiPathRuntimeSchema: + return UiPathRuntimeSchema(filePath="test.py", uniqueId="test", type="workflow", input={"type": "object"}, output={"type": "object"}) + + async def dispose(self) -> None: + pass + + class TestFactory: + def discover_entrypoints(self) -> list[str]: + return ["test"] + + async def discover_runtimes(self) -> list[UiPathRuntimeProtocol]: + return [TestRuntime()] + + async def new_runtime(self, entrypoint: str, runtime_id: str, **kwargs) -> UiPathRuntimeProtocol: + return TestRuntime() + + async def dispose(self) -> None: + pass + + factory = TestFactory() + + # Act + runtime = UiPathEvalRuntime(context, factory, trace_manager, event_bus) + + # Assert + # Should be a valid UUID format (36 characters with dashes) + assert len(runtime.execution_id) == 36 + assert runtime.execution_id.count("-") == 4 + + # The trace manager should be properly set + assert runtime.trace_manager is trace_manager