Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/arize/experimental/datasets/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def run_experiment(
concurrency: int = 3,
set_global_tracer_provider: bool = False,
exit_on_error: bool = False,
timeout: float = 120,
) -> Union[Tuple[str, pd.DataFrame], None]:
"""
Run an experiment on a dataset and upload the results.
Expand All @@ -145,6 +146,7 @@ def run_experiment(
set_global_tracer_provider (bool): If True, sets the global tracer provider for the experiment.
Defaults to False.
exit_on_error (bool): If True, the experiment will stop running on first occurrence of an error.
timeout (float): The timeout in seconds for each task execution. Defaults to 120.

Returns:
Tuple[str, pd.DataFrame]:
Expand Down Expand Up @@ -225,6 +227,7 @@ def run_experiment(
evaluators=evaluators,
concurrency=concurrency,
exit_on_error=exit_on_error,
timeout=timeout,
)
output_df = _convert_default_columns_to_json_str(output_df)
output_df = _convert_boolean_columns_to_str(output_df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class AsyncExecutor(Executor):

termination_signal (signal.Signals, optional): The signal handled to terminate the executor.

timeout (float, optional): The timeout in seconds for each task execution. Defaults to 120.

"""

def __init__(
Expand All @@ -111,6 +113,7 @@ def __init__(
exit_on_error: bool = True,
fallback_return_value: Union[Unset, Any] = _unset,
termination_signal: signal.Signals = signal.SIGINT,
timeout: float = 120,
):
self.generate = generation_fn
self.fallback_return_value = fallback_return_value
Expand All @@ -120,6 +123,7 @@ def __init__(
self.exit_on_error = exit_on_error
self.base_priority = 0
self.termination_signal = termination_signal
self.timeout = timeout

async def producer(
self,
Expand Down Expand Up @@ -174,7 +178,7 @@ async def consumer(
)
done, pending = await asyncio.wait(
[generate_task, termination_event_watcher],
timeout=120,
timeout=self.timeout,
return_when=asyncio.FIRST_COMPLETED,
)

Expand Down Expand Up @@ -429,6 +433,7 @@ def get_executor_on_sync_context(
max_retries: int = 10,
exit_on_error: bool = True,
fallback_return_value: Union[Unset, Any] = _unset,
timeout: float = 120,
) -> Executor:
if threading.current_thread() is not threading.main_thread():
# run evals synchronously if not in the main thread
Expand Down Expand Up @@ -464,6 +469,7 @@ def get_executor_on_sync_context(
max_retries=max_retries,
exit_on_error=exit_on_error,
fallback_return_value=fallback_return_value,
timeout=timeout,
)
else:
logger.warning(
Expand All @@ -486,6 +492,7 @@ def get_executor_on_sync_context(
max_retries=max_retries,
exit_on_error=exit_on_error,
fallback_return_value=fallback_return_value,
timeout=timeout,
)


Expand Down
7 changes: 7 additions & 0 deletions src/arize/experimental/datasets/experiments/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def run_experiment(
evaluators: Optional[Evaluators] = None,
concurrency: int = 3,
exit_on_error: bool = False,
timeout: float = 120,
) -> pd.DataFrame:
"""
Run an experiment on a dataset.
Expand All @@ -86,6 +87,7 @@ def run_experiment(
evaluators (Optional[Evaluators]): Optional evaluators to assess the task.
concurrency (int): The number of concurrent tasks to run. Default is 3.
exit_on_error (bool): Whether to exit on error. Default is False.
timeout (float): The timeout in seconds for each task execution. Default is 120.
Returns:
pd.DataFrame: The results of the experiment.
"""
Expand Down Expand Up @@ -280,6 +282,7 @@ async def async_run_experiment(example: Example) -> ExperimentRun:
fallback_return_value=None,
tqdm_bar_format=get_tqdm_progress_bar_formatter("running tasks"),
concurrency=concurrency,
timeout=timeout,
)

runs, _execution_details = executor.run(examples)
Expand Down Expand Up @@ -312,6 +315,7 @@ async def async_run_experiment(example: Example) -> ExperimentRun:
tracer=tracer,
resource=resource,
exit_on_error=exit_on_error,
timeout=timeout,
)

if exit_on_error and (None in eval_results):
Expand Down Expand Up @@ -362,6 +366,7 @@ def evaluate_experiment(
tracer: Optional[Tracer] = None,
resource: Optional[Resource] = None,
exit_on_error: bool = False,
timeout: float = 120,
):
"""
Evaluate the results of an experiment using the provided evaluators.
Expand All @@ -375,6 +380,7 @@ def evaluate_experiment(
tracer (Optional[Tracer]): Optional tracer for tracing the evaluation.
resource (Optional[Resource]): Optional resource for the evaluation.
exit_on_error (bool): Whether to exit on error. Default is False.
timeout (float): The timeout in seconds for each evaluator execution. Default is 120.
Returns:
List[ExperimentEvaluationRun]: The evaluation results.
"""
Expand Down Expand Up @@ -548,6 +554,7 @@ async def async_eval_run(
"running experiment evaluations"
),
concurrency=concurrency,
timeout=timeout,
)
eval_runs, _execution_details = executor.run(evaluation_input)
return eval_runs
Expand Down
40 changes: 40 additions & 0 deletions tests/experimental/datasets/experiments/test_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,43 @@ def dummy_task(x):
for _, row in exp_df.iterrows():
assert row["example_id"] == row["eval.DummyEval.label"]
assert row.result == row["eval.DummyEval.metadata.output"]


def test_timeout_parameter():
"""Test that custom timeout parameter is accepted and experiment runs successfully"""
import time

def slow_task(x):
# Simulate a task that takes some time but less than timeout
time.sleep(0.1)
question = x["question"]
return f"Answer to {question}"

# Create a small dataset for testing
test_dataset = pd.DataFrame(
{
"id": [f"id_{i}" for i in range(3)],
"question": [f"question_{i}" for i in range(3)],
}
)

c = ArizeDatasetsClient(api_key="dummy_key")

# Test with custom timeout value (should succeed)
exp_id, exp_df = c.run_experiment(
space_id="dummy_space_id",
experiment_name="test_timeout_experiment",
dataset_id="dummy_dataset_id",
dataset_df=test_dataset,
task=slow_task,
evaluators=[DummyEval()],
dry_run=True,
timeout=300, # 5 minutes timeout
)

# Should complete successfully
assert exp_id == ""
assert exp_df.shape[0] == 3
assert not exp_df.empty
# All tasks should complete successfully
assert not exp_df["result"].isnull().any()