diff --git a/src/arize/experimental/datasets/core/client.py b/src/arize/experimental/datasets/core/client.py index 518bdf9..a638d05 100644 --- a/src/arize/experimental/datasets/core/client.py +++ b/src/arize/experimental/datasets/core/client.py @@ -119,6 +119,7 @@ def run_experiment( concurrency: int = 3, set_global_tracer_provider: bool = False, exit_on_error: bool = False, + timeout: float = 120, ) -> Union[Tuple[str, pd.DataFrame], None]: """ Run an experiment on a dataset and upload the results. @@ -145,6 +146,7 @@ def run_experiment( set_global_tracer_provider (bool): If True, sets the global tracer provider for the experiment. Defaults to False. exit_on_error (bool): If True, the experiment will stop running on first occurrence of an error. + timeout (float): The timeout in seconds for each task execution. Defaults to 120. Returns: Tuple[str, pd.DataFrame]: @@ -225,6 +227,7 @@ def run_experiment( evaluators=evaluators, concurrency=concurrency, exit_on_error=exit_on_error, + timeout=timeout, ) output_df = _convert_default_columns_to_json_str(output_df) output_df = _convert_boolean_columns_to_str(output_df) diff --git a/src/arize/experimental/datasets/experiments/evaluators/executors.py b/src/arize/experimental/datasets/experiments/evaluators/executors.py index a6673dd..99575be 100644 --- a/src/arize/experimental/datasets/experiments/evaluators/executors.py +++ b/src/arize/experimental/datasets/experiments/evaluators/executors.py @@ -100,6 +100,8 @@ class AsyncExecutor(Executor): termination_signal (signal.Signals, optional): The signal handled to terminate the executor. + timeout (float, optional): The timeout in seconds for each task execution. Defaults to 120. + """ def __init__( @@ -111,6 +113,7 @@ def __init__( exit_on_error: bool = True, fallback_return_value: Union[Unset, Any] = _unset, termination_signal: signal.Signals = signal.SIGINT, + timeout: float = 120, ): self.generate = generation_fn self.fallback_return_value = fallback_return_value @@ -120,6 +123,7 @@ def __init__( self.exit_on_error = exit_on_error self.base_priority = 0 self.termination_signal = termination_signal + self.timeout = timeout async def producer( self, @@ -174,7 +178,7 @@ async def consumer( ) done, pending = await asyncio.wait( [generate_task, termination_event_watcher], - timeout=120, + timeout=self.timeout, return_when=asyncio.FIRST_COMPLETED, ) @@ -429,6 +433,7 @@ def get_executor_on_sync_context( max_retries: int = 10, exit_on_error: bool = True, fallback_return_value: Union[Unset, Any] = _unset, + timeout: float = 120, ) -> Executor: if threading.current_thread() is not threading.main_thread(): # run evals synchronously if not in the main thread @@ -464,6 +469,7 @@ def get_executor_on_sync_context( max_retries=max_retries, exit_on_error=exit_on_error, fallback_return_value=fallback_return_value, + timeout=timeout, ) else: logger.warning( @@ -486,6 +492,7 @@ def get_executor_on_sync_context( max_retries=max_retries, exit_on_error=exit_on_error, fallback_return_value=fallback_return_value, + timeout=timeout, ) diff --git a/src/arize/experimental/datasets/experiments/functions.py b/src/arize/experimental/datasets/experiments/functions.py index 2aba3a3..f5af888 100644 --- a/src/arize/experimental/datasets/experiments/functions.py +++ b/src/arize/experimental/datasets/experiments/functions.py @@ -72,6 +72,7 @@ def run_experiment( evaluators: Optional[Evaluators] = None, concurrency: int = 3, exit_on_error: bool = False, + timeout: float = 120, ) -> pd.DataFrame: """ Run an experiment on a dataset. @@ -86,6 +87,7 @@ def run_experiment( evaluators (Optional[Evaluators]): Optional evaluators to assess the task. concurrency (int): The number of concurrent tasks to run. Default is 3. exit_on_error (bool): Whether to exit on error. Default is False. + timeout (float): The timeout in seconds for each task execution. Default is 120. Returns: pd.DataFrame: The results of the experiment. """ @@ -280,6 +282,7 @@ async def async_run_experiment(example: Example) -> ExperimentRun: fallback_return_value=None, tqdm_bar_format=get_tqdm_progress_bar_formatter("running tasks"), concurrency=concurrency, + timeout=timeout, ) runs, _execution_details = executor.run(examples) @@ -312,6 +315,7 @@ async def async_run_experiment(example: Example) -> ExperimentRun: tracer=tracer, resource=resource, exit_on_error=exit_on_error, + timeout=timeout, ) if exit_on_error and (None in eval_results): @@ -362,6 +366,7 @@ def evaluate_experiment( tracer: Optional[Tracer] = None, resource: Optional[Resource] = None, exit_on_error: bool = False, + timeout: float = 120, ): """ Evaluate the results of an experiment using the provided evaluators. @@ -375,6 +380,7 @@ def evaluate_experiment( tracer (Optional[Tracer]): Optional tracer for tracing the evaluation. resource (Optional[Resource]): Optional resource for the evaluation. exit_on_error (bool): Whether to exit on error. Default is False. + timeout (float): The timeout in seconds for each evaluator execution. Default is 120. Returns: List[ExperimentEvaluationRun]: The evaluation results. """ @@ -548,6 +554,7 @@ async def async_eval_run( "running experiment evaluations" ), concurrency=concurrency, + timeout=timeout, ) eval_runs, _execution_details = executor.run(evaluation_input) return eval_runs diff --git a/tests/experimental/datasets/experiments/test_experiments.py b/tests/experimental/datasets/experiments/test_experiments.py index f543a70..2ec2a2b 100644 --- a/tests/experimental/datasets/experiments/test_experiments.py +++ b/tests/experimental/datasets/experiments/test_experiments.py @@ -443,3 +443,43 @@ def dummy_task(x): for _, row in exp_df.iterrows(): assert row["example_id"] == row["eval.DummyEval.label"] assert row.result == row["eval.DummyEval.metadata.output"] + + +def test_timeout_parameter(): + """Test that custom timeout parameter is accepted and experiment runs successfully""" + import time + + def slow_task(x): + # Simulate a task that takes some time but less than timeout + time.sleep(0.1) + question = x["question"] + return f"Answer to {question}" + + # Create a small dataset for testing + test_dataset = pd.DataFrame( + { + "id": [f"id_{i}" for i in range(3)], + "question": [f"question_{i}" for i in range(3)], + } + ) + + c = ArizeDatasetsClient(api_key="dummy_key") + + # Test with custom timeout value (should succeed) + exp_id, exp_df = c.run_experiment( + space_id="dummy_space_id", + experiment_name="test_timeout_experiment", + dataset_id="dummy_dataset_id", + dataset_df=test_dataset, + task=slow_task, + evaluators=[DummyEval()], + dry_run=True, + timeout=300, # 5 minutes timeout + ) + + # Should complete successfully + assert exp_id == "" + assert exp_df.shape[0] == 3 + assert not exp_df.empty + # All tasks should complete successfully + assert not exp_df["result"].isnull().any()