diff --git a/benchmark/aiperf/README.md b/benchmark/aiperf/README.md new file mode 100644 index 000000000..9a3608927 --- /dev/null +++ b/benchmark/aiperf/README.md @@ -0,0 +1,365 @@ +# AIPerf Benchmarking for NeMo Guardrails + +## Introduction + +[AIPerf](https://github.com/ai-dynamo/aiperf) is NVIDIA's latest benchmarking tool for LLMs. It supports any OpenAI-compatible inference service and generates synthetic data loads, benchmarks, and all the metrics needed for performance comparison and analysis. + +The [`run_aiperf.py`](run_aiperf.py) script enhances AIPerf's capabilities by providing: + +- **Batch Execution**: Run multiple benchmarks in sequence with a single command +- **Parameter Sweeps**: Automatically generate and run benchmarks across different parameter combinations (e.g., sweeping concurrency levels, token counts, etc.) +- **Organized Results**: Automatically organizes benchmark results in timestamped directories with clear naming conventions +- **YAML Configuration**: Simple, declarative configuration files for reproducible benchmark runs +- **Run Metadata**: Saves complete metadata about each run (configuration, command, timestamp) for future analysis and reproduction +- **Service Health Checks**: Validates that the target service is available before starting benchmarks + +Instead of manually running AIPerf multiple times with different parameters, you can define a sweep in a YAML file and let the script handle the rest. + +## Getting Started + +### Prerequisites + +These steps have been tested with Python 3.11.11. +To use the provided configurations, you need to create accounts at https://build.nvidia.com/ and [Huggingface](https://huggingface.co/). +* The provided configurations use models hosted at https://build.nvidia.com/, you'll need to create a Personal API Key to access the models. +* The provided AIperf configurations require the [Meta Llama 3.3 70B Instruct tokenizer](https://huggingface.co/meta-llama/Llama-3.3-70B-Instruct) to calculate token-counts. + +1. **Create a virtual environment in which to install AIPerf** + + ```bash + $ mkdir ~/env + $ python -m venv ~/env/aiperf + ``` + +2. **Install dependencies in the virtual environment** + + ```bash + $ pip install aiperf huggingface_hub typer + ``` + +3. ** Login to Hugging Face:** + + ```bash + huggingface-cli login + ``` + +4. ** Set NVIDIA API Key:** + + The provided configs use models hosted on [build.nvidia.com](https://build.nvidia.com/). + To access these, [create an account](https://build.nvidia.com/), and create a Personal API Key. + After creating a Personal API key, set the `NVIDIA_API_KEY` variable as below. + + ```bash + $ export NVIDIA_API_KEY="your-api-key-here" + ``` + +## Running Benchmarks + +Each benchmark is configured using the `AIPerfConfig` Pydantic model in [aiperf_models.py](aiperf_models.py). +The configs are stored in YAML files, and converted to an `AIPerfConfig` object. +There are two example configs included which can be extended for your use-cases. These both use Nvidia-hosted models, : + +- [`single_concurrency.yaml`](aiperf_configs/single_concurrency.yaml): Example single-run benchmark with a single concurrency value. +- [`sweep_concurrency.yaml`](aiperf_configs/sweep_concurrency.yaml): Example multiple-run benchmark to sweep concurency values and run a new benchmark for each. + +To run a benchmark, use the following command: + +```bash +$ python -m benchmark.aiperf --config-file +``` + +### Running a Single Benchmark + +To run a single benchmark with fixed parameters, use the `single_concurrency.yaml` configuration: + +```bash +$ python -m benchmark.aiperf --config-file aiperf/configs/single_concurrency.yaml +``` + +**Example output:** + +```text +2025-12-01 10:35:17 INFO: Running AIPerf with configuration: aiperf/configs/single_concurrency.yaml +2025-12-01 10:35:17 INFO: Results root directory: aiperf_results/single_concurrency/20251201_103517 +2025-12-01 10:35:17 INFO: Sweeping parameters: None +2025-12-01 10:35:17 INFO: Running AIPerf with configuration: aiperf/configs/single_concurrency.yaml +2025-12-01 10:35:17 INFO: Output directory: aiperf_results/single_concurrency/20251201_103517 +2025-12-01 10:35:17 INFO: Single Run +2025-12-01 10:36:54 INFO: Run completed successfully +2025-12-01 10:36:54 INFO: SUMMARY +2025-12-01 10:36:54 INFO: Total runs : 1 +2025-12-01 10:36:54 INFO: Completed : 1 +2025-12-01 10:36:54 INFO: Failed : 0 +``` + +### Running a Concurrency Sweep + +To run multiple benchmarks with different concurrency levels, use the `sweep_concurrency.yaml` configuration as below: + +```bash +$ python -m benchmark.aiperf --config-file aiperf/configs/sweep_concurrency.yaml +``` + +**Example output:** + +```text +2025-11-14 14:02:54 INFO: Running AIPerf with configuration: nemoguardrails/benchmark/aiperf/aiperf_configs/sweep_concurrency.yaml +2025-11-14 14:02:54 INFO: Results root directory: aiperf_results/sweep_concurrency/20251114_140254 +2025-11-14 14:02:54 INFO: Sweeping parameters: {'concurrency': [1, 2, 4]} +2025-11-14 14:02:54 INFO: Running 3 benchmarks +2025-11-14 14:02:54 INFO: Run 1/3 +2025-11-14 14:02:54 INFO: Sweep parameters: {'concurrency': 1} +2025-11-14 14:04:12 INFO: Run 1 completed successfully +2025-11-14 14:04:12 INFO: Run 2/3 +2025-11-14 14:04:12 INFO: Sweep parameters: {'concurrency': 2} +2025-11-14 14:05:25 INFO: Run 2 completed successfully +2025-11-14 14:05:25 INFO: Run 3/3 +2025-11-14 14:05:25 INFO: Sweep parameters: {'concurrency': 4} +2025-11-14 14:06:38 INFO: Run 3 completed successfully +2025-11-14 14:06:38 INFO: SUMMARY +2025-11-14 14:06:38 INFO: Total runs : 3 +2025-11-14 14:06:38 INFO: Completed : 3 +2025-11-14 14:06:38 INFO: Failed : 0 +``` + +## Additional Options + +### AIPerf run options + +The `--dry-run` option allows you to preview all benchmark commands without executing them. This is useful for: + +- Validating your configuration file +- Checking which parameter combinations will be generated +- Estimating total execution time before committing to a long-running sweep +- Debugging configuration issues + +```bash +$ python -m benchmark.aiperf --config-file aiperf/configs/sweep_concurrency.yaml --dry-run +``` + +When in dry-run mode, the script will: + +- Load and validate your configuration +- Check service connectivity +- Generate all sweep combinations +- Display what would be executed +- Exit without running any benchmarks + +### Verbose Mode + +The `--verbose` option outputs more detailed debugging information to understand each step of the benchmarking process. + +```bash +$ python -m benchmark.aiperf --config-file --verbose +``` + +Verbose mode provides: + +- Complete command-line arguments passed to AIPerf +- Detailed parameter merging logic (base config + sweep params) +- Output directory creation details +- Real-time AIPerf output (normally captured to files) +- Full stack traces for errors + +**Tip:** Use verbose mode when debugging configuration issues or when you want to see live progress of the benchmark execution. + +## Configuration Files + +Configuration files are YAML files located in [aiperf_configs](aiperf_configs). The configuration is validated using Pydantic models to catch errors early. + +### Top-Level Configuration Fields + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `batch_name` | string | Yes | Name for this batch of benchmarks. Used in output directory naming (e.g., `aiperf_results/batch_name/timestamp/`) | +| `output_base_dir` | string | Yes | Base directory where all benchmark results will be stored | +| `base_config` | object | Yes | Base configuration parameters applied to all benchmark runs (see below) | +| `sweeps` | object | No | Optional parameter sweeps for running multiple benchmarks with different values | + +### Base Configuration Parameters + +The `base_config` section contains parameters that are passed to AIPerf. Any of these can be overridden by sweep parameters. + +#### Model and Service Configuration + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `model` | string | Yes | Model identifier (e.g., `meta/llama-3.3-70b-instruct`) | +| `tokenizer` | string | No | Tokenizer name from Hugging Face or local path. If not provided, AIPerf will attempt to use the model name | +| `url` | string | Yes | Base URL of the inference service (e.g., `https://integrate.api.nvidia.com`) | +| `endpoint` | string | No | API endpoint path (default: `/v1/chat/completions`) | +| `endpoint_type` | string | No | Type of endpoint: `chat` or `completions` (default: `chat`) | +| `api_key_env_var` | string | No | Name of environment variable containing API key (e.g., `NVIDIA_API_KEY`) | +| `streaming` | boolean | No | Whether to use streaming mode (default: `false`) | + +#### Load Generation Settings + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `warmup_request_count` | integer | Yes | Number of warmup requests to send before starting the benchmark | +| `benchmark_duration` | integer | Yes | Duration of the benchmark in seconds | +| `concurrency` | integer | Yes | Number of concurrent requests to maintain during the benchmark | +| `request_rate` | float | No | Target request rate in requests/second. If not provided, calculated from concurrency | +| `request_rate_mode` | string | No | Distribution mode: `constant` or `poisson` (default: `constant`) | + +#### Synthetic Data Generation + +These parameters control the generation of synthetic prompts for benchmarking: + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `random_seed` | integer | No | Random seed for reproducible synthetic data generation | +| `prompt_input_tokens_mean` | integer | No | Mean number of input tokens per prompt | +| `prompt_input_tokens_stddev` | integer | No | Standard deviation of input token count | +| `prompt_output_tokens_mean` | integer | No | Mean number of expected output tokens | +| `prompt_output_tokens_stddev` | integer | No | Standard deviation of output token count | + +### Parameter Sweeps + +The `sweeps` section allows you to run multiple benchmarks with different parameter values. The script generates a **Cartesian product** of all sweep values, running a separate benchmark for each combination. + +#### Basic Sweep Example + +```yaml +sweeps: + concurrency: [1, 2, 4, 8, 16] +``` + +This will run 5 benchmarks, one for each concurrency level. + +#### Multi-Parameter Sweep Example + +```yaml +sweeps: + concurrency: [1, 4, 16] + prompt_input_tokens_mean: [100, 500, 1000] +``` + +This will run **9 benchmarks**, one for each value of `concurrency` and `prompt_input_tokens_mean`. + +Each sweep combination creates a subdirectory named with the parameter values: + +```text +aiperf_results/ +└── my_benchmark/ + └── 20251114_140254/ + ├── concurrency1_prompt_input_tokens_mean100/ + ├── concurrency1_prompt_input_tokens_mean500/ + ├── concurrency4_prompt_input_tokens_mean100/ + └── ... +``` + +### Complete Configuration Example + +```yaml +# Name for this batch of benchmarks +batch_name: my_benchmark + +# Base directory where all benchmark results will be stored +output_base_dir: aiperf_results + +# Base configuration applied to all benchmark runs +base_config: + # Model and service configuration + model: meta/llama-3.3-70b-instruct + tokenizer: meta-llama/Llama-3.3-70B-Instruct + url: "https://integrate.api.nvidia.com" + endpoint: "/v1/chat/completions" + endpoint_type: chat + api_key_env_var: NVIDIA_API_KEY + streaming: true + + # Load generation settings + warmup_request_count: 20 + benchmark_duration: 60 + concurrency: 1 + request_rate_mode: "constant" + + # Synthetic data generation + random_seed: 12345 + prompt_input_tokens_mean: 100 + prompt_input_tokens_stddev: 10 + prompt_output_tokens_mean: 50 + prompt_output_tokens_stddev: 5 + +# Optional: parameter sweeps (Cartesian product) +sweeps: + concurrency: [1, 2, 4, 8, 16] + prompt_input_tokens_mean: [100, 500, 1000] +``` + +### Common Sweep Patterns + +#### Concurrency Scaling Test + +```yaml +sweeps: + concurrency: [1, 2, 4, 8, 16, 32, 64] +``` + +Useful for finding optimal concurrency levels and throughput limits. + +#### Token Length Impact Test + +```yaml +sweeps: + prompt_input_tokens_mean: [50, 100, 500, 1000, 2000] + prompt_output_tokens_mean: [50, 100, 500, 1000] +``` + +Useful for understanding how token counts affect latency and throughput. + +#### Request Rate Comparison + +```yaml +sweeps: + request_rate_mode: ["constant", "poisson"] + concurrency: [4, 8, 16] +``` + +Useful for comparing different load patterns. + +## Output Structure + +Results are organized in timestamped directories: + +```text +aiperf_results/ +├── / +│ └── / +│ ├── run_metadata.json # Single run +│ ├── process_result.json +│ └── +│ # OR for sweeps: +│ ├── concurrency1/ +│ │ ├── run_metadata.json +│ │ ├── process_result.json +│ │ └── +│ ├── concurrency2/ +│ │ └── ... +│ └── concurrency4/ +│ └── ... +``` + +### Output Files + +Each run directory contains multiple files with benchmark results and metadata. A summary of these is shown below: + +#### Benchmark runner files + +- **`run_metadata.json`**: Contains complete metadata about the benchmark run for reproducibility. +- **`process_result.json`**: Contains the subprocess execution results. + +#### Files Generated by AIPerf + +- **`inputs.json`**: Synthetic prompt data generated for the benchmark. +- **`profile_export_aiperf.json`**: Main metrics file in JSON format containing aggregated statistics. +- **`profile_export_aiperf.csv`**: Same metrics as the JSON file, but in CSV format for easy import into spreadsheet tools or data analysis libraries. +- **`profile_export.jsonl`**: JSON Lines format file containing per-request metrics. Each line is a complete JSON object for one request with: +- **`logs/aiperf.log`**: Detailed log file from AIPerf execution containing: + +## Resources + +- [AIPerf GitHub Repository](https://github.com/triton-inference-server/perf_analyzer/tree/main/genai-perf) +- [AIPerf Documentation](https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/client/src/c%2B%2B/perf_analyzer/genai-perf/README.html) +- [NVIDIA API Catalog](https://build.nvidia.com/) diff --git a/benchmark/aiperf/__init__.py b/benchmark/aiperf/__init__.py new file mode 100644 index 000000000..3159bfe65 --- /dev/null +++ b/benchmark/aiperf/__init__.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/benchmark/aiperf/__main__.py b/benchmark/aiperf/__main__.py new file mode 100644 index 000000000..8bb221f4a --- /dev/null +++ b/benchmark/aiperf/__main__.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Entry point for running aiperf as a module: python -m aiperf""" + +from benchmark.aiperf.run_aiperf import app + +if __name__ == "__main__": + app() diff --git a/benchmark/aiperf/aiperf_models.py b/benchmark/aiperf/aiperf_models.py new file mode 100644 index 000000000..f3c0f6843 --- /dev/null +++ b/benchmark/aiperf/aiperf_models.py @@ -0,0 +1,127 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Pydantic models for AIPerf configuration validation. +""" + +from pathlib import Path +from typing import Any, Dict, List, Literal, Optional, Union + +from pydantic import BaseModel, Field, field_validator, model_validator + + +class BaseConfig(BaseModel): + """Base configuration for AIPerf benchmark runs.""" + + # Model details + model: str = Field(..., description="Model name") + tokenizer: Optional[str] = Field( + default=None, + description="Optional tokenizer Huggingface name, or local directory", + ) + url: str = Field(..., description="Model base URL") + endpoint: str = Field(default="/v1/chat/completions", description="API endpoint path") + endpoint_type: Literal["chat", "completions"] = Field( + default="chat", + description="Type of endpoint (chat or completions)", + ) + api_key_env_var: Optional[str] = Field(default=None, description="API key environment variable") + streaming: Optional[bool] = Field(default=False, description="Streaming mode") + + # Load generation settings + warmup_request_count: int = Field(description="Requests to send before beginning performance-test") + benchmark_duration: int = Field(description="Benchmark duration in seconds") + concurrency: int = Field(description="Number of concurrent requests") + request_rate: Optional[float] = Field( + default=None, + description="Request rate (requests per second, auto-calculated if not provided)", + ) + request_rate_mode: Optional[Literal["constant", "poisson"]] = Field( + default="constant", + description="Request rate mode (constant, poisson, etc.)", + ) + + # Synthetic data generation + random_seed: Optional[int] = Field(default=None, description="Random seed for reproducibility") + prompt_input_tokens_mean: Optional[int] = Field( + default=None, + description="Mean number of input tokens", + ) + prompt_input_tokens_stddev: Optional[int] = Field( + default=None, + description="Standard deviation of input tokens", + ) + prompt_output_tokens_mean: Optional[int] = Field( + default=None, + description="Mean number of output tokens", + ) + prompt_output_tokens_stddev: Optional[int] = Field( + default=None, + description="Standard deviation of output tokens", + ) + + +class AIPerfConfig(BaseModel): + """Main configuration model for AIPerf benchmark runner.""" + + batch_name: str = Field(default="benchmark", description="Name for this batch of benchmarks") + output_base_dir: str = Field( + default="aiperf_results", + description="Base directory for benchmark results", + ) + base_config: BaseConfig = Field(..., description="Base configuration applied to all benchmark runs") + sweeps: Optional[Dict[str, List[Union[int, str]]]] = Field( + default=None, + description="Parameter sweeps. Key is the parameter to change, value is a list of values to use", + ) + + @field_validator("sweeps") + @classmethod + def validate_sweeps(cls, v: Optional[Dict[str, List[Any]]]) -> Optional[Dict[str, List[Any]]]: + """Validate that sweep values are lists of ints or strings.""" + if v is None: + return v + + for param_name, values in v.items(): + if len(values) == 0: + raise ValueError(f"Sweep parameter '{param_name}' cannot be empty") + + return v + + @model_validator(mode="after") + def validate_sweep_keys(self): + """Validate that sweep keys exist in base_config.""" + sweeps = self.sweeps + if sweeps is None: + return self + + # Get all valid field names from BaseConfig + valid_keys = set(BaseConfig.model_fields.keys()) + + # Check each sweep parameter + for param_name in sweeps: + if param_name not in valid_keys: + valid_fields = sorted(valid_keys) + raise ValueError( + f"Sweep parameter '{param_name}' is not a valid BaseConfig field. " + f"Valid fields are: {', '.join(valid_fields)}" + ) + + return self + + def get_output_base_path(self) -> Path: + """Get the base output directory as a Path object.""" + return Path(self.output_base_dir) diff --git a/benchmark/aiperf/configs/single_concurrency.yaml b/benchmark/aiperf/configs/single_concurrency.yaml new file mode 100644 index 000000000..ce8227795 --- /dev/null +++ b/benchmark/aiperf/configs/single_concurrency.yaml @@ -0,0 +1,33 @@ +# Single-run AIPerf benchmark configuration + +# Name for this batch of benchmarks (will be part of output directory name) +batch_name: single_concurrency + +# Base directory where all benchmark results will be stored. +# Actual name is // +output_base_dir: aiperf_results + +# Base configuration applied to all benchmark runs +# These parameters can be overridden by sweep parameters +base_config: + # Model details + model: meta/llama-3.3-70b-instruct + tokenizer: meta-llama/Llama-3.3-70B-Instruct + url: "https://integrate.api.nvidia.com" + endpoint: "/v1/chat/completions" + endpoint_type: chat + api_key_env_var: NVIDIA_API_KEY + streaming: True + + # Load generation settings. + warmup_request_count: 20 + benchmark_duration: 60 + concurrency: 1 + request_rate_mode: "constant" + + # Synthetic data generation + random_seed: 12345 + prompt_input_tokens_mean: 100 + prompt_input_tokens_stddev: 10 + prompt_output_tokens_mean: 50 + prompt_output_tokens_stddev: 5 diff --git a/benchmark/aiperf/configs/sweep_concurrency.yaml b/benchmark/aiperf/configs/sweep_concurrency.yaml new file mode 100644 index 000000000..35e2cbc53 --- /dev/null +++ b/benchmark/aiperf/configs/sweep_concurrency.yaml @@ -0,0 +1,38 @@ +# Concurrency sweep. One-minute tests at log-spaced concurrencies + +# Name for this batch of benchmarks (will be part of output directory name) +batch_name: sweep_concurrency + +# Base directory where all benchmark results will be stored. +# Actual name is // for sweeps +output_base_dir: aiperf_results + +# Base configuration applied to all benchmark runs +# These parameters can be overridden by sweep parameters +base_config: + # Model details + model: meta/llama-3.3-70b-instruct + tokenizer: meta-llama/Llama-3.3-70B-Instruct + url: "https://integrate.api.nvidia.com" + endpoint: "/v1/chat/completions" + endpoint_type: chat + api_key_env_var: NVIDIA_API_KEY + + # Load generation settings. + warmup_request_count: 10 + benchmark_duration: 60 + concurrency: 0 # Overridden by the concurrency sweep below + request_rate_mode: "constant" + + # Synthetic data generation + random_seed: 12345 + prompt_input_tokens_mean: 100 + prompt_input_tokens_stddev: 10 + prompt_output_tokens_mean: 50 + prompt_output_tokens_stddev: 5 + +# Parameter sweeps. Each parameter can have multiple values +# The script will run all combinations (Cartesian product) +sweeps: + # Sweep over the following concurrency values + concurrency: [1, 2, 4] diff --git a/benchmark/aiperf/run_aiperf.py b/benchmark/aiperf/run_aiperf.py new file mode 100755 index 000000000..614e2aadf --- /dev/null +++ b/benchmark/aiperf/run_aiperf.py @@ -0,0 +1,480 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import json +import logging +import os +import subprocess +import sys +import urllib.parse +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from subprocess import CompletedProcess +from typing import Any, Dict, List, Optional, Union + +import httpx +import typer +import yaml +from pydantic import ValidationError + +from benchmark.aiperf.aiperf_models import AIPerfConfig + +log = logging.getLogger(__name__) +log.setLevel(logging.INFO) + +formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S") +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(formatter) + +log.addHandler(console_handler) + +app = typer.Typer() + + +@dataclass +class AIPerfSummary: + total: int + completed: int + failed: int + + +class AIPerfRunner: + """Run batches of AIPerf benchmarks using YAML config and optional parameter sweeps""" + + def __init__(self, config_path: Path): + self.config_path = config_path + self.config = self._load_config() + + def _load_config(self) -> AIPerfConfig: + """Load and validate the YAML configuration file using Pydantic.""" + try: + with open(self.config_path, "r", encoding="utf-8") as f: + config_data = yaml.safe_load(f) + + # Validate with Pydantic model + config = AIPerfConfig(**config_data) + return config + + except FileNotFoundError: + log.error("Configuration file not found: %s", self.config_path) + sys.exit(1) + except yaml.YAMLError as e: + log.error("Error parsing YAML configuration: %s", e) + sys.exit(1) + except ValidationError as e: + log.error("Configuration validation error:\n%s", e) + sys.exit(1) + except Exception as e: + log.error("Unexpected error loading configuration: %s", e) + sys.exit(1) + + def _get_sweep_combinations(self) -> Optional[List[Dict[str, Union[int, str]]]]: + """Create cartesian-product of parameter sweep values for benchmarks""" + + if not self.config.sweeps: + # No sweeps, return single empty combination + return None + + # Extract parameter names and their values + param_names = list(self.config.sweeps.keys()) + param_values = [self.config.sweeps[name] for name in param_names] + + num_runs = 1 + for _, sweep_values in self.config.sweeps.items(): + num_runs *= len(sweep_values) + + max_runs = 100 + if num_runs > max_runs: + raise RuntimeError(f"Requested {num_runs} runs, max is {max_runs}") + + # Generate all combinations + combinations = [] + for combination in itertools.product(*param_values): + combinations.append(dict(zip(param_names, combination))) + + return combinations + + @staticmethod + def _sanitize_command_for_logging(cmd: List[str]) -> str: + """Convert command list to string with API key redacted. + + Args: + cmd: List of command-line arguments + + Returns: + String with --api-key value replaced with * apart from last N chars + """ + last_n_chars = 6 # Show the last 6 characters + + sanitized = [] + i = 0 + while i < len(cmd): + current = cmd[i] + sanitized.append(current) + + # If this is --api-key, replace the next value with + if current == "--api-key" and i + 1 < len(cmd): + api_key = cmd[i + 1] + len_api_key = len(api_key) + sanitized_api_key = "*" * (len_api_key - last_n_chars) + sanitized_api_key += api_key[-last_n_chars:] + sanitized.append(sanitized_api_key) + i += 2 # Skip the actual API key value + else: + i += 1 + + return " ".join(sanitized) + + def _build_command(self, sweep_params: Optional[Dict[str, Union[str, int]]], output_dir: Path) -> List[str]: + """Create a list of strings with the aiperf command and arguments to execute""" + + # Run aiperf in profile mode: `aiperf profile` + cmd = ["aiperf", "profile"] + + # Get base config as dictionary + base_params = self.config.base_config.model_dump() + + # Merge base config with sweep params (sweep params override base) + params = base_params if not sweep_params else {**base_params, **sweep_params} + log.debug("Building command-line with params: %s", params) + + # Add output directory + params["output-artifact-dir"] = str(output_dir) + + # Use the --verbose CLI option (which changes log.level to debug) to enable more debugging + params["ui_type"] = "simple" if log.level == logging.DEBUG else "none" + + # Convert parameters to command line arguments + for key, value in params.items(): + # If an optional field isn't provided, don't pass that argument to aiperf + if value is None: + continue + + # If `api_key_env_var` is provided, get the value of the env var and add it + # to the command + if key == "api_key_env_var": + api_key = os.environ.get(value) + if not api_key: + raise RuntimeError( + f"Environment variable '{value}' is not set. Please set it: export {value}='your-api-key'" + ) + cmd.extend(["--api-key", str(api_key)]) + continue + + # Convert underscores to hyphens for CLI arguments + arg_name = key.replace("_", "-") + + # Handle different value types + if isinstance(value, bool): + if value: + cmd.append(f"--{arg_name}") + elif isinstance(value, list): + # For list values, add multiple arguments + for item in value: + cmd.extend([f"--{arg_name}", str(item)]) + elif value is not None: + cmd.extend([f"--{arg_name}", str(value)]) + + log.debug("Final command-line: %s", self._sanitize_command_for_logging(cmd)) + return cmd + + @staticmethod + def _create_output_dir( + base_dir: Path, + sweep_params: Optional[Dict[str, Union[str, int]]], + ) -> Path: + """Create directory in which to store AIPerf outputs.""" + + # Early-out if we're not sweeping anything + if not sweep_params: + base_dir.mkdir(parents=True, exist_ok=True) + return base_dir + + param_parts = [f"{key}{value}" for key, value in sorted(sweep_params.items())] + param_dir = "_".join(param_parts) + + output_dir = base_dir / param_dir + output_dir.mkdir(parents=True, exist_ok=True) + return output_dir + + def _save_run_metadata( + self, + output_dir: Path, + sweep_params: Optional[Dict[str, Any]], + command: List[str], + run_index: int, + ): + """Save metadata about the run for future reruns or analysis""" + metadata = { + "run_index": run_index, + "timestamp": datetime.now().isoformat(), + "config_file": str(self.config_path), + "sweep_params": sweep_params, + "base_config": self.config.base_config.model_dump(), + "command": self._sanitize_command_for_logging(command), + } + + metadata_file = output_dir / "run_metadata.json" + with open(metadata_file, "w", encoding="utf-8") as f: + json.dump(metadata, f, indent=2) + + @staticmethod + def _save_subprocess_result_json(output_dir: Path, result: CompletedProcess) -> None: + """Save the subprocess result to the given filename""" + + process_result_file = output_dir / "process_result.json" + save_data = result.__dict__ + + try: + with open(process_result_file, "w", encoding="utf-8") as f: + json.dump(save_data, f, indent=2) + + except (IOError, OSError) as e: + log.error("Could not write %s to file %s: %s", save_data, process_result_file, e) + raise + + except TypeError as e: + log.error("Couldn't serialize %s to %s: %s", save_data, process_result_file, e) + raise + + def _check_service(self, endpoint: Optional[str] = "/v1/models") -> None: + """Check if the service is up before we run the benchmarks""" + url = urllib.parse.urljoin(self.config.base_config.url, endpoint) + log.debug("Checking service is up using endpoint %s", url) + + # If the user has an API Key stored in an env var, use that in the /v1/models call + api_key_env_var = self.config.base_config.api_key_env_var + api_key = None + if api_key_env_var: + api_key = os.environ.get(api_key_env_var) + headers = {"Authorization": f"Bearer {api_key}"} if api_key else None + + try: + response = httpx.get(url, timeout=5, headers=headers) + except httpx.ConnectError as e: + raise RuntimeError(f"Can't connect to {url}: {e}") + + if response.status_code != 200: + raise RuntimeError(f"Can't access {url}: {response}") + + def run(self, dry_run: bool = False) -> int: + """Run benchmarks with AIPerf""" + + # Check the service is up before running anything + self._check_service() + + # Get the directory under which all benchmarks will store results + batch_dir = self._get_batch_dir() + + log.info("Running AIPerf with configuration: %s", self.config_path) + log.info("Results root directory: %s", batch_dir) + log.info("Sweeping parameters: %s", self.config.sweeps) + + benchmark_result: AIPerfSummary = ( + self.run_batch_benchmarks(batch_dir, dry_run) + if self.config.sweeps + else self.run_single_benchmark(batch_dir, dry_run) + ) + + # Log summary + log.info("SUMMARY") + log.info("Total runs : %s", benchmark_result.total) + log.info("Completed : %s", benchmark_result.completed) + log.info("Failed : %s", benchmark_result.failed) + + return 1 if benchmark_result.failed > 0 else 0 + + def _get_batch_dir(self) -> Path: + # Get base output directory + base_output_dir = self.config.get_output_base_path() + batch_name = self.config.batch_name + + # Create timestamped batch directory + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + batch_dir = base_output_dir / batch_name / timestamp + return batch_dir + + def run_single_benchmark( + self, + run_directory: Path, + dry_run: bool, + ) -> AIPerfSummary: + """Run a single benchmark. Return OS exit code.""" + + run_output_dir = self._create_output_dir(run_directory, sweep_params=None) + + log.info("Running AIPerf with configuration: %s", self.config_path) + log.info("Output directory: %s", run_output_dir) + + # Build command + command = self._build_command(sweep_params=None, output_dir=run_output_dir) + + # Save metadata + self._save_run_metadata(run_output_dir, None, command, 0) + + log.info("Single Run") + log.debug("Output directory: %s", run_output_dir) + log.debug("Command: %s", self._sanitize_command_for_logging(command)) + if dry_run: + log.info("Dry-run mode. Commands will not be executed") + return AIPerfSummary(total=0, completed=0, failed=0) + + try: + capture_output = log.level != logging.DEBUG + # Execute the command + result = subprocess.run( + command, + check=True, + capture_output=capture_output, + text=True, + ) + log.info("Run completed successfully") + self._save_subprocess_result_json(run_output_dir, result) + run_completed = 1 if result.returncode == 0 else 0 + return AIPerfSummary(total=1, completed=run_completed, failed=1 - run_completed) + + except subprocess.CalledProcessError as e: + log.error("Run failed with exit code %s", e.returncode) + return AIPerfSummary(total=1, completed=0, failed=1) + + except KeyboardInterrupt: + log.warning("Interrupted by user") + raise + + def run_batch_benchmarks( + self, + run_directory: Path, + dry_run: bool, + ) -> AIPerfSummary: + """Run a batch of benchmarks using sweeps values. Return OS exit code.""" + + # Generate all sweep combinations + combinations = self._get_sweep_combinations() + if not combinations: + raise RuntimeError(f"Can't generate sweep combinations from {self.config.sweeps}") + + num_combinations = len(combinations) + log.info("Running %s benchmarks", num_combinations) + + # Early-out if it's a dry-run + if dry_run: + log.info("Dry-run mode. Commands will not be executed") + return AIPerfSummary(total=0, completed=0, failed=0) + + # If logging isn't set to DEBUG, we'll capture the AIPerf stdout and stderr to a file + capture_output = log.level != logging.DEBUG + + # Execute each combination + failed_runs = 0 + + # Iterate over the sweep combinations, saving out results in separate directories + for i, sweep_params in enumerate(combinations): + run_num = i + 1 # 1-indexed for run status printouts + + # Create output directory for this run + run_output_dir = self._create_output_dir(run_directory, sweep_params) + + # Create the command-line for this sweep param + command = self._build_command(sweep_params, run_output_dir) + + # Save metadata to reproduce benchmark results later if needed + self._save_run_metadata(run_output_dir, sweep_params, command, i) + + log.info("Run %s/%s", run_num, num_combinations) + log.info("Sweep parameters: %s", sweep_params) + log.debug("Output directory: %s", run_output_dir) + log.debug("Command: %s", " ".join(command)) + + try: + # Execute the command + result = subprocess.run( + command, + check=True, + capture_output=capture_output, + text=True, + ) + log.info("Run %s completed successfully", run_num) + + self._save_subprocess_result_json(run_output_dir, result) + if result.returncode != 0: + failed_runs += 1 + + except subprocess.CalledProcessError as e: + log.error( + "Run %s with sweep params %s failed with exit code %s", + i, + sweep_params, + e.returncode, + ) + failed_runs += 1 + + except KeyboardInterrupt: + log.warning("Interrupted by user") + raise + + return AIPerfSummary( + total=num_combinations, + completed=num_combinations - failed_runs, + failed=failed_runs, + ) + + +# Create typer app +app = typer.Typer( + help="AIPerf application to run, analyze, and compare benchmarks", + add_completion=False, +) + + +@app.command() +def run( + config_file: Path = typer.Option( + ..., + "--config-file", + help="Path to YAML configuration file", + exists=True, + file_okay=True, + dir_okay=False, + readable=True, + ), + dry_run: bool = typer.Option( + False, + "--dry-run", + help="Print commands without executing them", + ), + verbose: bool = typer.Option( + False, + "--verbose", + help="Print additional debugging information during run", + ), +): + """Run AIPerf benchmark using the provided YAML config file""" + + if verbose: + log.setLevel(logging.DEBUG) + + # Create and run the benchmark runner + runner = AIPerfRunner(config_file) + exit_code = runner.run(dry_run=dry_run) + + raise typer.Exit(code=exit_code) + + +if __name__ == "__main__": + app() diff --git a/pyproject.toml b/pyproject.toml index 03ebc905a..f3452a964 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -163,6 +163,7 @@ include = [ "nemoguardrails/tracing/**", "nemoguardrails/server/**", "tests/test_callbacks.py", + "nemoguardrails/benchmark/**" ] exclude = [ "nemoguardrails/llm/providers/trtllm/**", diff --git a/tests/benchmark/test_aiperf_models.py b/tests/benchmark/test_aiperf_models.py new file mode 100644 index 000000000..70973e6b6 --- /dev/null +++ b/tests/benchmark/test_aiperf_models.py @@ -0,0 +1,373 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for AIPerf configuration models. +""" + +from pathlib import Path + +import pytest +from pydantic import ValidationError + +from benchmark.aiperf.aiperf_models import AIPerfConfig, BaseConfig + + +class TestBaseConfig: + """Test the BaseConfig model.""" + + def test_base_config_minimal_valid(self): + """Test creating BaseConfig with minimal required fields.""" + config = BaseConfig( + model="test-model", + tokenizer="test-tokenizer", + url="http://localhost:8000", + warmup_request_count=10, + benchmark_duration=60, + concurrency=5, + ) + assert config.model == "test-model" + assert config.tokenizer == "test-tokenizer" + assert config.url == "http://localhost:8000" + assert config.endpoint == "/v1/chat/completions" # Default + assert config.endpoint_type == "chat" # Default + assert config.warmup_request_count == 10 + assert config.benchmark_duration == 60 + assert config.concurrency == 5 + assert config.request_rate_mode == "constant" # Default + + def test_base_config_with_all_fields(self): + """Test creating BaseConfig with all fields specified.""" + config = BaseConfig( + model="test-model", + tokenizer="test-tokenizer", + url="http://localhost:8000", + endpoint="/v1/completions", + endpoint_type="completions", + api_key_env_var="AIPERF_API_KEY", + warmup_request_count=10, + benchmark_duration=60, + concurrency=5, + request_rate=2.5, + request_rate_mode="poisson", + random_seed=42, + prompt_input_tokens_mean=100, + prompt_input_tokens_stddev=10, + prompt_output_tokens_mean=50, + prompt_output_tokens_stddev=5, + ) + assert config.model == "test-model" + assert config.tokenizer == "test-tokenizer" + assert config.endpoint == "/v1/completions" + assert config.endpoint_type == "completions" + assert config.api_key_env_var == "AIPERF_API_KEY" + assert config.request_rate == 2.5 + assert config.request_rate_mode == "poisson" + assert config.random_seed == 42 + assert config.prompt_input_tokens_mean == 100 + assert config.prompt_input_tokens_stddev == 10 + assert config.prompt_output_tokens_mean == 50 + assert config.prompt_output_tokens_stddev == 5 + + def test_base_config_missing_required_fields(self): + """Test that missing required fields raise validation error.""" + with pytest.raises(ValidationError) as exc_info: + BaseConfig( + model="test-model", + url="http://localhost:8000", + # Missing warmup_request_count, benchmark_duration, concurrency + ) + errors = exc_info.value.errors() + error_fields = {err["loc"][0] for err in errors} + assert "warmup_request_count" in error_fields + assert "benchmark_duration" in error_fields + assert "concurrency" in error_fields + + def test_base_config_invalid_endpoint_type(self): + """Test that invalid endpoint_type raises validation error.""" + with pytest.raises(ValidationError) as exc_info: + BaseConfig( + model="test-model", + url="http://localhost:8000", + endpoint_type="invalid", # Must be "chat" or "completions" + warmup_request_count=10, + benchmark_duration=60, + concurrency=5, + ) + errors = exc_info.value.errors() + assert any("endpoint_type" in str(err["loc"]) for err in errors) + + def test_base_config_invalid_request_rate_mode(self): + """Test that invalid request_rate_mode raises validation error.""" + with pytest.raises(ValidationError) as exc_info: + BaseConfig( + model="test-model", + url="http://localhost:8000", + request_rate_mode="invalid", # Must be "constant" or "poisson" + warmup_request_count=10, + benchmark_duration=60, + concurrency=5, + ) + errors = exc_info.value.errors() + assert any("request_rate_mode" in str(err["loc"]) for err in errors) + + +class TestAIPerfConfig: + """Test the AIPerfConfig model.""" + + @pytest.fixture(autouse=True) + def valid_base_config(self) -> BaseConfig: + """Helper to get a valid base config dictionary.""" + return BaseConfig( + model="test-model", + url="http://localhost:8000", + warmup_request_count=10, + benchmark_duration=60, + concurrency=5, + ) + + def test_aiperf_config_minimal_valid(self, valid_base_config): + """Test creating AIPerfConfig with minimal required fields.""" + config = AIPerfConfig(base_config=valid_base_config) + + assert config.batch_name == "benchmark" # Default + assert config.output_base_dir == "aiperf_results" # Default + assert config.base_config.model == "test-model" + assert config.sweeps is None + + def test_aiperf_config_with_custom_fields(self, valid_base_config): + """Test creating AIPerfConfig with custom batch_name and output_dir.""" + config = AIPerfConfig( + batch_name="my_benchmark", + output_base_dir="custom_results", + base_config=valid_base_config, + ) + assert config.batch_name == "my_benchmark" + assert config.output_base_dir == "custom_results" + assert config.base_config.model == "test-model" + + def test_aiperf_config_with_valid_sweeps_int(self, valid_base_config): + """Test creating AIPerfConfig with valid integer sweeps.""" + + sweeps: dict[str, list[int]] = { + "concurrency": [10, 20, 30], + "warmup_request_count": [5, 10, 15], + } + config = AIPerfConfig( + base_config=valid_base_config, + sweeps=sweeps, + ) + assert config.sweeps == sweeps + + def test_aiperf_config_with_valid_sweeps_str(self, valid_base_config): + """Test creating AIPerfConfig with valid string sweeps.""" + config = AIPerfConfig( + base_config=valid_base_config, + sweeps={ + "model": ["model-a", "model-b", "model-c"], + "endpoint": ["/v1/chat", "/v1/completions"], + }, + ) + assert config.sweeps == { + "model": ["model-a", "model-b", "model-c"], + "endpoint": ["/v1/chat", "/v1/completions"], + } + + def test_aiperf_config_with_valid_sweeps_mixed(self, valid_base_config): + """Test creating AIPerfConfig with mixed int and string sweeps.""" + sweeps = { + "concurrency": [10, 20], + "model": ["model-a", "model-b"], + } + config = AIPerfConfig( + base_config=valid_base_config, + sweeps=sweeps, + ) + assert config.sweeps == sweeps + + def test_aiperf_config_sweep_invalid_key(self, valid_base_config): + """Test that invalid sweep key raises validation error.""" + with pytest.raises(ValidationError) as exc_info: + AIPerfConfig( + base_config=valid_base_config, + sweeps={ + "invalid_field": [1, 2, 3], + }, + ) + error_msg = str(exc_info.value) + assert "invalid_field" in error_msg + assert "not a valid BaseConfig field" in error_msg + + def test_aiperf_config_sweep_invalid_value_type_float(self, valid_base_config): + """Test that float values in sweeps raise validation error.""" + with pytest.raises(ValidationError) as exc_info: + AIPerfConfig( + base_config=valid_base_config, + sweeps={ + "concurrency": [10, 20.5, 30], # Float not allowed + }, + ) + error_msg = str(exc_info.value) + # Pydantic catches this during type validation + assert "sweeps.concurrency" in error_msg + assert "must be int or str" in error_msg or "int_from_float" in error_msg + + def test_aiperf_config_sweep_invalid_value_type_dict(self, valid_base_config): + """Test that dict values in sweeps raise validation error.""" + with pytest.raises(ValidationError) as exc_info: + AIPerfConfig( + base_config=valid_base_config, + sweeps={ + "concurrency": [10, {"value": 20}, 30], # Dict not allowed + }, + ) + error_msg = str(exc_info.value) + # Pydantic catches this during type validation + assert "sweeps.concurrency" in error_msg + assert "must be int or str" in error_msg or "int_type" in error_msg or "string_type" in error_msg + + def test_aiperf_config_sweep_invalid_value_type_list(self, valid_base_config): + """Test that list values in sweeps raise validation error.""" + with pytest.raises(ValidationError) as exc_info: + AIPerfConfig( + base_config=valid_base_config, + sweeps={ + "concurrency": [10, [20, 30]], # List not allowed + }, + ) + error_msg = str(exc_info.value) + # Pydantic catches this during type validation + assert "sweeps.concurrency" in error_msg + assert "must be int or str" in error_msg or "int_type" in error_msg or "string_type" in error_msg + + def test_aiperf_config_sweep_empty_list(self, valid_base_config): + """Test that empty sweep list raises validation error.""" + with pytest.raises(ValidationError) as exc_info: + AIPerfConfig( + base_config=valid_base_config, + sweeps={ + "concurrency": [], + }, + ) + error_msg = str(exc_info.value) + assert "cannot be empty" in error_msg + + def test_aiperf_config_sweep_not_list(self, valid_base_config): + """Test that non-list sweep value raises validation error.""" + with pytest.raises(ValidationError) as exc_info: + AIPerfConfig( + base_config=valid_base_config, + sweeps={ + "concurrency": 10, # Should be a list + }, + ) + error_msg = str(exc_info.value) + # Pydantic catches this during type validation + assert "sweeps.concurrency" in error_msg + assert "must be a list" in error_msg or "list_type" in error_msg + + def test_aiperf_config_multiple_invalid_sweep_keys(self, valid_base_config): + """Test that multiple invalid sweep keys are all reported.""" + with pytest.raises(ValidationError) as exc_info: + AIPerfConfig( + base_config=valid_base_config, + sweeps={ + "invalid_field_1": [1, 2], + "invalid_field_2": [3, 4], + }, + ) + error_msg = str(exc_info.value) + # At least one of the invalid fields should be mentioned + assert "invalid_field" in error_msg + + def test_aiperf_config_get_output_base_path(self, valid_base_config): + """Test get_output_base_path method.""" + config = AIPerfConfig(output_base_dir="custom_results", base_config=valid_base_config) + path = config.get_output_base_path() + assert isinstance(path, Path) + assert str(path) == "custom_results" + + def test_aiperf_config_get_output_base_path_default(self, valid_base_config): + """Test get_output_base_path method with default output_base_dir.""" + config = AIPerfConfig(base_config=valid_base_config) + path = config.get_output_base_path() + assert isinstance(path, Path) + assert str(path) == "aiperf_results" + + def test_aiperf_config_missing_base_config(self): + """Test that missing base_config raises validation error.""" + with pytest.raises(ValidationError) as exc_info: + AIPerfConfig() # Missing required base_config + errors = exc_info.value.errors() + assert any("base_config" in str(err["loc"]) for err in errors) + + def test_aiperf_config_invalid_base_config(self): + """Test that invalid base_config raises validation error.""" + with pytest.raises(ValidationError): + AIPerfConfig( + base_config={ + "model": "test-model", + # Missing required fields + }, + ) + + def test_aiperf_config_all_valid_sweep_keys(self, valid_base_config): + """Test sweeps with all valid BaseConfig field names.""" + config = AIPerfConfig( + base_config=valid_base_config, + sweeps={ + "model": ["model-a", "model-b"], + "url": ["http://localhost:8000", "http://localhost:8001"], + "endpoint": ["/v1/chat", "/v1/completions"], + "endpoint_type": ["chat", "completions"], + "warmup_request_count": [5, 10], + "benchmark_duration": [30, 60], + "concurrency": [5, 10], + "request_rate_mode": ["constant", "poisson"], + "random_seed": [42, 123], + "prompt_input_tokens_mean": [100, 200], + "prompt_input_tokens_stddev": [10, 20], + "prompt_output_tokens_mean": [50, 100], + "prompt_output_tokens_stddev": [5, 10], + }, + ) + # All sweeps should be accepted + assert len(config.sweeps) == 13 + + def test_sweeps_none(self, valid_base_config): + """Test sweeps set to None don't raise Exception.""" + config = AIPerfConfig( + base_config=valid_base_config, + sweeps=None, + ) + + def test_sweeps_not_list_raises(self, valid_base_config): + """Test sweeps set to None don't raise Exception.""" + with pytest.raises(ValueError, match="Input should be a valid list"): + config = AIPerfConfig( + base_config=valid_base_config, + sweeps={"benchmark_duration": 1}, + ) + + def test_sweeps_empty_list_raises(self, valid_base_config): + """Test sweeps set to None don't raise Exception.""" + with pytest.raises( + ValueError, + match="Sweep parameter 'concurrency' cannot be empty", + ): + config = AIPerfConfig( + base_config=valid_base_config, + sweeps={"concurrency": []}, + ) diff --git a/tests/benchmark/test_run_aiperf.py b/tests/benchmark/test_run_aiperf.py new file mode 100644 index 000000000..3ee4f1f5b --- /dev/null +++ b/tests/benchmark/test_run_aiperf.py @@ -0,0 +1,1170 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for AIPerf run_aiperf module. +""" + +import json +import logging +import subprocess +from pathlib import Path +from typing import Any, Dict, Optional +from unittest.mock import Mock, patch + +import httpx +import pytest +import yaml +from typer.testing import CliRunner + +from benchmark.aiperf.aiperf_models import AIPerfConfig, BaseConfig +from benchmark.aiperf.run_aiperf import AIPerfRunner, AIPerfSummary + + +@pytest.fixture +def create_config_data(): + """Returns a function with sample basic config, and allows mutation of fields to cover + more cases or add extra fields""" + + def _create_config( + batch_name="test_batch", + output_base_dir="test_output", + model="test-model", + tokenizer="test-tokenizer", + url="http://localhost:8000", + warmup_request_count=10, + benchmark_duration=60, + concurrency=5, + sweeps=None, + **extra_base_config, + ): + base_config = { + "model": model, + "tokenizer": tokenizer, + "url": url, + "warmup_request_count": warmup_request_count, + "benchmark_duration": benchmark_duration, + "concurrency": concurrency, + } + + config_data = { + "batch_name": batch_name, + "output_base_dir": output_base_dir, + "base_config": base_config, + } + + # Add sweeps if provided + if sweeps: + config_data["sweeps"] = sweeps + + # Merge any extra base_config parameters + if extra_base_config: + base_config.update(extra_base_config) + + return config_data + + return _create_config + + +@pytest.fixture +def create_config_file(tmp_path, create_config_data): + """Fixture to write config data to a file and return the path.""" + + def _write_config_file( + extra_base_config: Optional[Dict[str, Any]] = None, + filename: Optional[str] = "config.yml", + sweeps: Optional[Dict[str, Any]] = None, + ) -> Path: + """Apply extra base config to config data, write to file and return the path.""" + + # Unpack extra_base_config as kwargs if provided + if extra_base_config: + config_data = create_config_data(sweeps=sweeps, **extra_base_config) + else: + config_data = create_config_data(sweeps=sweeps) + + config_file = tmp_path / filename + config_file.write_text(yaml.dump(config_data)) + return config_file + + return _write_config_file + + +class TestAIPerfSummary: + """Test the AIPerfSummary dataclass.""" + + def test_aiperf_summary_creation(self): + """Test creating an AIPerfSummary instance.""" + summary = AIPerfSummary(total=10, completed=8, failed=2) + assert summary.total == 10 + assert summary.completed == 8 + assert summary.failed == 2 + + +class TestAIPerfRunnerInit: + """Test AIPerfRunner initialization and config loading.""" + + def test_init_with_valid_config(self, create_config_file): + """Test initialization with a valid config file.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + + assert runner.config_path == config_file + assert isinstance(runner.config, AIPerfConfig) + assert runner.config.batch_name == "test_batch" + assert runner.config.output_base_dir == "test_output" + assert runner.config.base_config.model == "test-model" + assert runner.config.base_config.tokenizer == "test-tokenizer" + assert runner.config.base_config.url == "http://localhost:8000" + assert runner.config.base_config.warmup_request_count == 10 + assert runner.config.base_config.benchmark_duration == 60 + assert runner.config.base_config.concurrency == 5 + assert runner.config.sweeps is None + + def test_init_with_nonexistent_config(self, tmp_path): + """Test initialization with a nonexistent config file.""" + config_file = tmp_path / "nonexistent.yaml" + + with pytest.raises(SystemExit): + AIPerfRunner(config_file) + + def test_init_with_invalid_yaml(self, tmp_path): + """Test initialization with invalid YAML syntax.""" + config_file = tmp_path / "invalid.yaml" + config_file.write_text("invalid: yaml: syntax: [") + + with pytest.raises(SystemExit): + AIPerfRunner(config_file) + + def test_init_with_validation_error(self, tmp_path): + """Test initialization with config that fails Pydantic validation.""" + + config_file = tmp_path / "invalid.yaml" + config_file.write_text( + yaml.dump( + { + "batch_name": "test_batch", + "base_config": { + "model": "test-model", + # Missing required fields + }, + } + ) + ) + + with pytest.raises(SystemExit): + AIPerfRunner(config_file) + + def test_init_with_unexpected_error(self, create_config_file): + """Test initialization with an unexpected error.""" + config_file = create_config_file() + + # Mock yaml.safe_load to raise an unexpected exception + with patch("yaml.safe_load", side_effect=RuntimeError("Unexpected error")): + with pytest.raises(SystemExit): + AIPerfRunner(config_file) + + +class TestGetSweepCombinations: + """Test the _get_sweep_combinations method.""" + + def test_no_sweeps_returns_none(self, create_config_file): + """Test that no sweeps returns None.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + combinations = runner._get_sweep_combinations() + + assert combinations is None + + def test_single_sweep_parameter(self, create_config_file): + """Test sweep with a single parameter.""" + config_file = create_config_file(sweeps={"concurrency": [1, 2, 4]}) + + runner = AIPerfRunner(config_file) + combinations = runner._get_sweep_combinations() + + assert len(combinations) == 3 + assert combinations == [ + {"concurrency": 1}, + {"concurrency": 2}, + {"concurrency": 4}, + ] + + def test_multiple_sweep_parameters(self, create_config_file): + """Test sweep with multiple parameters (Cartesian product).""" + config_file = create_config_file( + sweeps={ + "concurrency": [1, 2], + "benchmark_duration": [30, 60], + } + ) + + runner = AIPerfRunner(config_file) + combinations = runner._get_sweep_combinations() + + assert len(combinations) == 4 + assert {"concurrency": 1, "benchmark_duration": 30} in combinations + assert {"concurrency": 1, "benchmark_duration": 60} in combinations + assert {"concurrency": 2, "benchmark_duration": 30} in combinations + assert {"concurrency": 2, "benchmark_duration": 60} in combinations + + def test_too_many_runs_raises(self, create_config_file): + """Test sweeps with more than 100 runs to make sure Excpetion is raised""" + + # Create a config with two parameter sweeps of 100 each + # This has a total of 10,000, greater than 100 limit + config_file = create_config_file( + sweeps={ + "concurrency": list(range(100)), + "benchmark_duration": list(range(100)), + } + ) + + runner = AIPerfRunner(config_file) + with pytest.raises(RuntimeError, match="Requested 10000 runs, max is 100"): + _ = runner._get_sweep_combinations() + + +class TestSanitizeCommandForLogging: + """Test the _sanitize_command_for_logging static method.""" + + def test_sanitize_command_with_api_key(self): + """Test sanitizing command with API key showing last 6 chars.""" + cmd = [ + "aiperf", + "profile", + "--model", + "test-model", + "--api-key", + "secret-key-123", + "--url", + "http://localhost:8000", + ] + result = AIPerfRunner._sanitize_command_for_logging(cmd) + + # "secret-key-123" has 14 chars, so 8 asterisks + last 6 chars "ey-123" + assert result == ("aiperf profile --model test-model --api-key ********ey-123 --url http://localhost:8000") + + def test_sanitize_command_without_api_key(self): + """Test sanitizing command without API key.""" + cmd = [ + "aiperf", + "profile", + "--model", + "test-model", + "--url", + "http://localhost:8000", + ] + result = AIPerfRunner._sanitize_command_for_logging(cmd) + + assert result == " ".join(cmd) + + def test_sanitize_command_api_key_at_end_no_value(self): + """Test sanitizing command where --api-key is at the end with no value.""" + cmd = ["aiperf", "profile", "--model", "test-model", "--api-key"] + result = AIPerfRunner._sanitize_command_for_logging(cmd) + + # Should just include --api-key without sanitizing since there's no value + assert result == "aiperf profile --model test-model --api-key" + + def test_sanitize_command_empty_list(self): + """Test sanitizing an empty command list.""" + cmd = [] + result = AIPerfRunner._sanitize_command_for_logging(cmd) + + assert result == "" + + def test_sanitize_command_single_element(self): + """Test sanitizing command with a single element.""" + cmd = ["aiperf"] + result = AIPerfRunner._sanitize_command_for_logging(cmd) + + assert result == "aiperf" + + def test_sanitize_command_multiple_api_keys(self): + """Test sanitizing command with multiple API key occurrences.""" + cmd = [ + "aiperf", + "profile", + "--api-key", + "first-key", + "--model", + "test-model", + "--api-key", + "second-key", + ] + result = AIPerfRunner._sanitize_command_for_logging(cmd) + assert result == ("aiperf profile --api-key ***st-key --model test-model --api-key ****nd-key") + + def test_sanitize_command_preserves_other_values(self): + """Test that other command values are preserved exactly.""" + cmd = [ + "aiperf", + "profile", + "--api-key", + "my-secret-key", + "--concurrency", + "10", + "--benchmark-duration", + "60", + "--streaming", + ] + result = AIPerfRunner._sanitize_command_for_logging(cmd) + + # "my-secret-key" has 13 chars, so 7 asterisks + "et-key" (last 6 chars) + assert result == ("aiperf profile --api-key *******et-key --concurrency 10 --benchmark-duration 60 --streaming") + + def test_sanitize_command_short_api_key(self): + """Test sanitizing command with API key shorter than or equal to 6 chars.""" + cmd = ["aiperf", "profile", "--api-key", "abc123"] + result = AIPerfRunner._sanitize_command_for_logging(cmd) + + # "abc123" has exactly 6 chars, so 0 asterisks + all 6 chars + assert result == "aiperf profile --api-key abc123" + + def test_sanitize_command_very_short_api_key(self): + """Test sanitizing command with API key shorter than 6 chars.""" + cmd = ["aiperf", "profile", "--api-key", "abc"] + result = AIPerfRunner._sanitize_command_for_logging(cmd) + + # "abc" has 3 chars, so shows all of them (no asterisks due to negative masking) + assert result == "aiperf profile --api-key abc" + + def test_sanitize_command_long_api_key(self): + """Test sanitizing command with a long API key.""" + cmd = [ + "aiperf", + "profile", + "--api-key", + "sk-proj-1234567890abcdefghijklmnopqrstuvwxyz", + ] + result = AIPerfRunner._sanitize_command_for_logging(cmd) + + # API key has 44 chars, so 38 asterisks + last 6 chars "uvwxyz" + expected_masked = "*" * 38 + "uvwxyz" + assert result == f"aiperf profile --api-key {expected_masked}" + + +class TestBuildCommand: + """Test the _build_command method.""" + + def test_build_command_basic(self, create_config_file, tmp_path): + """Test building a basic command.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + cmd = runner._build_command(None, output_dir) + + assert cmd[0] == "aiperf" + assert cmd[1] == "profile" + assert "--model" in cmd + assert "test-model" in cmd + assert "--url" in cmd + assert "http://localhost:8000" in cmd + assert "--output-artifact-dir" in cmd + assert str(output_dir) in cmd + + def test_build_command_with_sweep_params(self, create_config_file, tmp_path): + """Test building command with sweep parameters that override base config.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + sweep_params = {"concurrency": 10, "benchmark_duration": 30} + cmd = runner._build_command(sweep_params, output_dir) + + assert "--concurrency" in cmd + concurrency_idx = cmd.index("--concurrency") + assert cmd[concurrency_idx + 1] == "10" + + assert "--benchmark-duration" in cmd + duration_idx = cmd.index("--benchmark-duration") + assert cmd[duration_idx + 1] == "30" + + def test_build_command_with_api_key_env_var(self, create_config_file, tmp_path, monkeypatch): + """Test building command with API key from environment variable.""" + config_file = create_config_file(extra_base_config={"api_key_env_var": "TEST_API_KEY"}) + + # Set the environment variable + monkeypatch.setenv("TEST_API_KEY", "secret-key-123") + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + cmd = runner._build_command(None, output_dir) + + assert "--api-key" in cmd + api_key_idx = cmd.index("--api-key") + assert cmd[api_key_idx + 1] == "secret-key-123" + + def test_build_command_with_missing_api_key_env_var(self, create_config_file, tmp_path): + """Test building command when API key environment variable is not set.""" + config_file = create_config_file(extra_base_config={"api_key_env_var": "MISSING_API_KEY"}) + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + + with pytest.raises( + RuntimeError, + match="Environment variable 'MISSING_API_KEY' is not set. Please set it: export MISSING_API_KEY='your-api-key'", + ): + runner._build_command(None, output_dir) + + def test_build_command_with_streaming_true(self, create_config_file, tmp_path): + """Test building command with streaming enabled""" + config_file = create_config_file(extra_base_config={"streaming": True}) + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + cmd = runner._build_command(None, output_dir) + + assert "--streaming" in cmd + + def test_build_command_with_streaming_false(self, create_config_file, tmp_path): + """Test building command with boolean False value (should not be in command).""" + config_file = create_config_file(extra_base_config={"streaming": False}) + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + cmd = runner._build_command(None, output_dir) + + assert "--streaming" not in cmd + + def test_build_command_default_streaming(self, create_config_file, tmp_path): + """Test building command with streaming default of False""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + cmd = runner._build_command(None, output_dir) + + assert "--streaming" not in cmd + + def test_build_command_default_api_key(self, create_config_file, tmp_path): + """Test building command with None values (should be skipped).""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + cmd = runner._build_command(None, output_dir) + + # Optional fields with None should not appear + assert "--api-key-env-var" not in cmd + + def test_build_command_ui_type_debug(self, create_config_file, tmp_path): + """Test that ui_type is 'simple' when log level is DEBUG.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + + # Patch log.level to be DEBUG + with patch("benchmark.aiperf.run_aiperf.log.level", logging.DEBUG): + cmd = runner._build_command(None, output_dir) + + assert "--ui-type" in cmd + ui_type_idx = cmd.index("--ui-type") + assert cmd[ui_type_idx + 1] == "simple" + + def test_build_command_ui_type_non_debug(self, create_config_file, tmp_path): + """Test that ui_type is 'none' when log level is not DEBUG.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + + # Patch log.level to be INFO + with patch("benchmark.aiperf.run_aiperf.log.level", logging.INFO): + cmd = runner._build_command(None, output_dir) + + assert "--ui-type" in cmd + ui_type_idx = cmd.index("--ui-type") + assert cmd[ui_type_idx + 1] == "none" + + def test_build_command_with_list_in_sweep_params(self, create_config_file, tmp_path): + """Test building command when sweep params contain list values.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + + # Patch model_dump method at the class level to return a list value + original_model_dump = BaseConfig.model_dump + + def mock_model_dump(self): + result = original_model_dump(self) + result["extra_param"] = ["value1", "value2"] + return result + + with patch.object(BaseConfig, "model_dump", mock_model_dump): + cmd = runner._build_command(None, output_dir) + + # List values should appear multiple times in the command + assert "--extra-param" in cmd + assert cmd.count("--extra-param") == 2 + value1_idx = cmd.index("value1") + value2_idx = cmd.index("value2") + assert value1_idx > 0 + assert value2_idx > 0 + + +class TestCreateOutputDir: + """Test the _create_output_dir static method.""" + + def test_create_output_dir_no_sweep(self, tmp_path): + """Test creating output directory without sweep parameters.""" + base_dir = tmp_path / "output" + result = AIPerfRunner._create_output_dir(base_dir, None) + + assert result == base_dir + assert result.exists() + assert result.is_dir() + + def test_create_output_dir_with_sweep(self, tmp_path): + """Test creating output directory with sweep parameters.""" + base_dir = tmp_path / "output" + sweep_params = {"concurrency": 10, "benchmark_duration": 30} + result = AIPerfRunner._create_output_dir(base_dir, sweep_params) + + # Directory should contain sweep parameter values + assert result == base_dir / "benchmark_duration30_concurrency10" + assert result.exists() + assert result.is_dir() + + def test_create_output_dir_creates_parent(self, tmp_path): + """Test that parent directories are created if they don't exist.""" + base_dir = tmp_path / "parent" / "child" / "output" + result = AIPerfRunner._create_output_dir(base_dir, None) + + assert result.exists() + assert result.is_dir() + + +class TestSaveRunMetadata: + """Test the _save_run_metadata method.""" + + def test_save_run_metadata_without_sweep(self, create_config_file, tmp_path): + """Test saving run metadata without sweep parameters.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + output_dir.mkdir() + + command = ["aiperf", "profile", "--model", "test-model"] + runner._save_run_metadata(output_dir, None, command, 0) + + metadata_file = output_dir / "run_metadata.json" + assert metadata_file.exists() + + with open(metadata_file) as f: + metadata = json.load(f) + + assert metadata["run_index"] == 0 + assert metadata["config_file"] == str(config_file) + assert metadata["sweep_params"] is None + assert metadata["command"] == " ".join(command) + assert "timestamp" in metadata + assert "base_config" in metadata + + def test_save_run_metadata_with_sweep(self, create_config_file, tmp_path): + """Test saving run metadata with sweep parameters.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + output_dir = tmp_path / "output" + output_dir.mkdir() + + sweep_params = {"concurrency": 10} + command = ["aiperf", "profile", "--concurrency", "10"] + runner._save_run_metadata(output_dir, sweep_params, command, 1) + + metadata_file = output_dir / "run_metadata.json" + assert metadata_file.exists() + + with open(metadata_file) as f: + metadata = json.load(f) + + assert metadata["run_index"] == 1 + assert metadata["sweep_params"] == sweep_params + + +class TestSaveSubprocessResultJson: + """Test the _save_subprocess_result_json static method.""" + + def test_save_subprocess_result_success(self, tmp_path): + """Test saving successful subprocess result.""" + output_dir = tmp_path / "output" + output_dir.mkdir() + + # Create a mock CompletedProcess + result = subprocess.CompletedProcess( + args=["aiperf", "profile"], + returncode=0, + stdout="Success output", + stderr="", + ) + + AIPerfRunner._save_subprocess_result_json(output_dir, result) + + process_result_file = output_dir / "process_result.json" + assert process_result_file.exists() + + with open(process_result_file) as f: + saved_data = json.load(f) + + assert saved_data["returncode"] == 0 + assert saved_data["stdout"] == "Success output" + + def test_save_subprocess_result_failure(self, tmp_path): + """Test saving failed subprocess result.""" + output_dir = tmp_path / "output" + output_dir.mkdir() + + result = subprocess.CompletedProcess( + args=["aiperf", "profile"], + returncode=1, + stdout="", + stderr="Error message", + ) + + AIPerfRunner._save_subprocess_result_json(output_dir, result) + + process_result_file = output_dir / "process_result.json" + assert process_result_file.exists() + + with open(process_result_file) as f: + saved_data = json.load(f) + + assert saved_data["returncode"] == 1 + assert saved_data["stderr"] == "Error message" + + def test_save_subprocess_result_io_error(self, tmp_path): + """Test saving subprocess result when IOError occurs.""" + output_dir = tmp_path / "output" + output_dir.mkdir() + + result = subprocess.CompletedProcess( + args=["aiperf", "profile"], + returncode=0, + stdout="Success", + stderr="", + ) + + # Mock open to raise IOError + with patch("builtins.open", side_effect=IOError("Disk full")): + with pytest.raises(IOError): + AIPerfRunner._save_subprocess_result_json(output_dir, result) + + def test_save_subprocess_result_type_error(self, tmp_path): + """Test saving subprocess result when TypeError occurs during serialization.""" + output_dir = tmp_path / "output" + output_dir.mkdir() + + result = subprocess.CompletedProcess( + args=["aiperf", "profile"], + returncode=0, + stdout="Success", + stderr="", + ) + + # Mock json.dump to raise TypeError + with patch("json.dump", side_effect=TypeError("Cannot serialize")): + with pytest.raises(TypeError): + AIPerfRunner._save_subprocess_result_json(output_dir, result) + + +class TestCheckService: + """Test the _check_service method.""" + + def test_check_service_success(self, create_config_file): + """Test checking service when it's available.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + + # Mock httpx.get to return success + with patch("httpx.get") as mock_get: + mock_response = Mock() + mock_response.status_code = 200 + mock_get.return_value = mock_response + + # Should not raise any exception + runner._check_service() + + def test_check_service_connect_error(self, create_config_file): + """Test checking service when connection fails.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + + # Mock httpx.get to raise ConnectError + with patch("httpx.get", side_effect=httpx.ConnectError("Connection refused")): + with pytest.raises(RuntimeError, match="Can't connect to"): + runner._check_service() + + def test_check_service_non_200_response(self, create_config_file): + """Test checking service when it returns non-200 status.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + + # Mock httpx.get to return 404 + with patch("httpx.get") as mock_get: + mock_response = Mock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + + with pytest.raises(RuntimeError, match="Can't access"): + runner._check_service() + + def test_check_service_custom_endpoint(self, create_config_file): + """Test checking service with custom endpoint.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + + # Mock httpx.get + with patch("httpx.get") as mock_get: + mock_response = Mock() + mock_response.status_code = 200 + mock_get.return_value = mock_response + + runner._check_service("/custom/endpoint") + + # Verify the URL was constructed correctly + mock_get.assert_called_once() + call_args = mock_get.call_args + assert "/custom/endpoint" in call_args[0][0] + + def test_check_service_no_api_key_env_var(self, create_config_file): + """Test checking service when api_key_env_var is not configured (None).""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + + # Mock httpx.get + with patch("httpx.get") as mock_get: + mock_response = Mock() + mock_response.status_code = 200 + mock_get.return_value = mock_response + + runner._check_service() + + # Verify headers=None was passed + mock_get.assert_called_once() + call_args = mock_get.call_args + assert call_args[1]["headers"] is None + + def test_check_service_api_key_env_var_not_set(self, create_config_file): + """Test checking service when api_key_env_var is configured but env var doesn't exist.""" + config_file = create_config_file(extra_base_config={"api_key_env_var": "NONEXISTENT_API_KEY"}) + + runner = AIPerfRunner(config_file) + + # Mock httpx.get + with patch("httpx.get") as mock_get: + mock_response = Mock() + mock_response.status_code = 200 + mock_get.return_value = mock_response + + runner._check_service() + + # Verify headers=None was passed (since env var doesn't exist) + mock_get.assert_called_once() + call_args = mock_get.call_args + assert call_args[1]["headers"] is None + + def test_check_service_api_key_env_var_set(self, create_config_file, monkeypatch): + """Test checking service when api_key_env_var is configured and env var exists.""" + config_file = create_config_file(extra_base_config={"api_key_env_var": "TEST_API_KEY"}) + + # Set the environment variable + monkeypatch.setenv("TEST_API_KEY", "test-secret-key-123") + + runner = AIPerfRunner(config_file) + + # Mock httpx.get + with patch("httpx.get") as mock_get: + mock_response = Mock() + mock_response.status_code = 200 + mock_get.return_value = mock_response + + runner._check_service() + + # Verify headers with Authorization Bearer token was passed + mock_get.assert_called_once() + call_args = mock_get.call_args + assert call_args[1]["headers"] is not None + assert call_args[1]["headers"]["Authorization"] == "Bearer test-secret-key-123" + + +class TestGetBatchDir: + """Test the _get_batch_dir method.""" + + def test_get_batch_dir(self, create_config_file, tmp_path): + """Test getting the batch directory with timestamp.""" + config_file = create_config_file( + extra_base_config={ + "batch_name": "test_batch", + "output_base_dir": str(tmp_path / "output"), + } + ) + + runner = AIPerfRunner(config_file) + batch_dir = runner._get_batch_dir() + + # Check that the path contains the expected components + assert "test_batch" in str(batch_dir) + assert str(tmp_path / "output") in str(batch_dir) + # Check that there's a timestamp-like pattern (YYYYMMDD_HHMMSS) + assert len(batch_dir.name) == 15 # Timestamp format + + +class TestRunSingleBenchmark: + """Test the run_single_benchmark method.""" + + def test_run_single_benchmark_success(self, create_config_file, tmp_path): + """Test running a single benchmark successfully.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + run_directory = tmp_path / "runs" + + # Mock subprocess.run to return success + with patch("subprocess.run") as mock_run: + mock_result = subprocess.CompletedProcess( + args=["aiperf", "profile"], + returncode=0, + stdout="Success", + stderr="", + ) + mock_run.return_value = mock_result + + summary = runner.run_single_benchmark(run_directory, dry_run=False) + + assert summary.total == 1 + assert summary.completed == 1 + assert summary.failed == 0 + + def test_run_single_benchmark_dry_run(self, create_config_file, tmp_path): + """Test running a single benchmark in dry-run mode.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + run_directory = tmp_path / "runs" + + summary = runner.run_single_benchmark(run_directory, dry_run=True) + + assert summary.total == 0 + assert summary.completed == 0 + assert summary.failed == 0 + + def test_run_single_benchmark_failure(self, create_config_file, tmp_path): + """Test running a single benchmark that fails.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + run_directory = tmp_path / "runs" + + # Mock subprocess.run to raise CalledProcessError + with patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "aiperf")): + summary = runner.run_single_benchmark(run_directory, dry_run=False) + + assert summary.total == 1 + assert summary.completed == 0 + assert summary.failed == 1 + + def test_run_single_benchmark_keyboard_interrupt(self, create_config_file, tmp_path): + """Test that KeyboardInterrupt is re-raised.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + run_directory = tmp_path / "runs" + + # Mock subprocess.run to raise KeyboardInterrupt + with patch("subprocess.run", side_effect=KeyboardInterrupt): + with pytest.raises(KeyboardInterrupt): + runner.run_single_benchmark(run_directory, dry_run=False) + + +class TestRunBatchBenchmarks: + """Test the run_batch_benchmarks method.""" + + def test_run_batch_benchmarks_success(self, create_config_file, tmp_path): + """Test running batch benchmarks successfully.""" + config_file = create_config_file(sweeps={"concurrency": [1, 2]}) + + runner = AIPerfRunner(config_file) + run_directory = tmp_path / "runs" + + # Mock subprocess.run to return success first, then failure + with patch("subprocess.run") as mock_run: + mock_run.side_effect = [ + subprocess.CompletedProcess( + args=["aiperf", "profile"], + returncode=0, + stdout="Success", + stderr="", + ), + subprocess.CompletedProcess( + args=["aiperf", "profile"], + returncode=1, + stdout="", + stderr="Error", + ), + ] + + summary = runner.run_batch_benchmarks(run_directory, dry_run=False) + + assert summary.total == 2 + assert summary.completed == 1 + assert summary.failed == 1 + assert mock_run.call_count == 2 + + def test_run_batch_benchmarks_dry_run(self, create_config_file, tmp_path): + """Test running batch benchmarks in dry-run mode.""" + config_file = create_config_file(sweeps={"concurrency": [1, 2]}) + + runner = AIPerfRunner(config_file) + run_directory = tmp_path / "runs" + + summary = runner.run_batch_benchmarks(run_directory, dry_run=True) + + assert summary.total == 0 + assert summary.completed == 0 + assert summary.failed == 0 + + def test_run_batch_benchmarks_partial_failure(self, create_config_file, tmp_path): + """Test running batch benchmarks with some failures.""" + config_file = create_config_file(sweeps={"concurrency": [1, 2, 4]}) + + runner = AIPerfRunner(config_file) + run_directory = tmp_path / "runs" + + # Mock subprocess.run to fail on second call + call_count = 0 + + def side_effect(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 2: + raise subprocess.CalledProcessError(1, "aiperf") + return subprocess.CompletedProcess( + args=["aiperf", "profile"], + returncode=0, + stdout="Success", + stderr="", + ) + + with patch("subprocess.run", side_effect=side_effect): + summary = runner.run_batch_benchmarks(run_directory, dry_run=False) + + assert summary.total == 3 + assert summary.completed == 2 + assert summary.failed == 1 + + def test_run_batch_benchmarks_no_combinations(self, create_config_file, tmp_path): + """Test running batch benchmarks with no sweep combinations raises error.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + # Manually set sweeps to empty dict to trigger error + runner.config.sweeps = {} + + run_directory = tmp_path / "runs" + + with pytest.raises(RuntimeError, match="Can't generate sweep combinations"): + runner.run_batch_benchmarks(run_directory, dry_run=False) + + def test_run_batch_benchmarks_keyboard_interrupt(self, create_config_file, tmp_path): + """Test that KeyboardInterrupt is re-raised in batch benchmarks.""" + config_file = create_config_file(sweeps={"concurrency": [1, 2]}) + + runner = AIPerfRunner(config_file) + run_directory = tmp_path / "runs" + + # Mock subprocess.run to raise KeyboardInterrupt on first call + with patch("subprocess.run", side_effect=KeyboardInterrupt): + with pytest.raises(KeyboardInterrupt): + runner.run_batch_benchmarks(run_directory, dry_run=False) + + def test_run_batch_benchmarks_non_zero_returncode(self, create_config_file, tmp_path): + """Test running batch benchmarks when subprocess returns non-zero but doesn't raise.""" + config_file = create_config_file(sweeps={"concurrency": [1, 2]}) + + runner = AIPerfRunner(config_file) + run_directory = tmp_path / "runs" + + # Mock subprocess.run to return non-zero returncode without raising + with patch("subprocess.run") as mock_run: + mock_result = subprocess.CompletedProcess( + args=["aiperf", "profile"], + returncode=1, # Non-zero return code + stdout="", + stderr="Error", + ) + mock_run.return_value = mock_result + + summary = runner.run_batch_benchmarks(run_directory, dry_run=False) + + assert summary.total == 2 + assert summary.completed == 0 + assert summary.failed == 2 + + +class TestRun: + """Test the main run method.""" + + def test_run_single_benchmark(self, create_config_file): + """Test main run method with single benchmark (no sweeps).""" + config_file = create_config_file() + runner = AIPerfRunner(config_file) + + # Mock _check_service and subprocess.run + with patch.object(runner, "_check_service"): + with patch("subprocess.run") as mock_run: + mock_result = subprocess.CompletedProcess( + args=["aiperf", "profile"], + returncode=0, + stdout="Success", + stderr="", + ) + mock_run.return_value = mock_result + + exit_code = runner.run(dry_run=False) + + assert exit_code == 0 + + def test_run_batch_benchmarks(self, create_config_file): + """Test main run method with batch benchmarks (with sweeps).""" + config_file = create_config_file(sweeps={"concurrency": [1, 2]}) + runner = AIPerfRunner(config_file) + + # Mock _check_service and subprocess.run + with patch.object(runner, "_check_service"): + with patch("subprocess.run") as mock_run: + mock_result = subprocess.CompletedProcess( + args=["aiperf", "profile"], + returncode=0, + stdout="Success", + stderr="", + ) + mock_run.return_value = mock_result + + exit_code = runner.run(dry_run=False) + + assert exit_code == 0 + assert mock_run.call_count == 2 + + def test_run_with_failures(self, create_config_file): + """Test main run method returns non-zero exit code on failures.""" + config_file = create_config_file() + + runner = AIPerfRunner(config_file) + + # Mock _check_service and subprocess.run to fail + with patch.object(runner, "_check_service"): + with patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "aiperf")): + exit_code = runner.run(dry_run=False) + assert exit_code == 1 + + def test_run_service_check_failure(self, create_config_file): + """Test that service check failure raises error.""" + config_file = create_config_file() + runner = AIPerfRunner(config_file) + + # Mock _check_service to raise error + with patch.object(runner, "_check_service", side_effect=RuntimeError("Service unavailable")): + with pytest.raises(RuntimeError, match="Service unavailable"): + runner.run(dry_run=False) + + +class TestCLICommand: + """Test the CLI command function.""" + + def test_cli_run_command_basic(self, create_config_file): + """Test CLI run command with basic options.""" + config_file = create_config_file() + runner = CliRunner() + + from benchmark.aiperf.run_aiperf import app + + # Mock the runner and service check + with patch("benchmark.aiperf.run_aiperf.AIPerfRunner") as mock_runner_class: + mock_runner = Mock() + mock_runner.run.return_value = 0 + mock_runner_class.return_value = mock_runner + + result = runner.invoke(app, ["--config-file", str(config_file)]) + + assert result.exit_code == 0 + mock_runner.run.assert_called_once_with(dry_run=False) + + def test_cli_run_command_with_verbose(self, create_config_file): + """Test CLI run command with verbose flag.""" + config_file = create_config_file() + runner = CliRunner() + + from benchmark.aiperf.run_aiperf import app + + # Mock the runner and service check + with patch("benchmark.aiperf.run_aiperf.AIPerfRunner") as mock_runner_class: + mock_runner = Mock() + mock_runner.run.return_value = 0 + mock_runner_class.return_value = mock_runner + + result = runner.invoke(app, ["--config-file", str(config_file), "--verbose"]) + + assert result.exit_code == 0 + mock_runner.run.assert_called_once_with(dry_run=False) + + def test_cli_run_command_with_dry_run(self, create_config_file): + """Test CLI run command with dry-run flag.""" + config_file = create_config_file() + runner = CliRunner() + + from benchmark.aiperf.run_aiperf import app + + # Mock the runner and service check + with patch("benchmark.aiperf.run_aiperf.AIPerfRunner") as mock_runner_class: + mock_runner = Mock() + mock_runner.run.return_value = 0 + mock_runner_class.return_value = mock_runner + + result = runner.invoke(app, ["--config-file", str(config_file), "--dry-run"]) + + assert result.exit_code == 0 + mock_runner.run.assert_called_once_with(dry_run=True) + + def test_cli_run_command_with_failure(self, create_config_file): + """Test CLI run command when benchmark fails.""" + config_file = create_config_file() + runner = CliRunner() + + from benchmark.aiperf.run_aiperf import app + + # Mock the runner to return failure + with patch("benchmark.aiperf.run_aiperf.AIPerfRunner") as mock_runner_class: + mock_runner = Mock() + mock_runner.run.return_value = 1 # Failure + mock_runner_class.return_value = mock_runner + + result = runner.invoke(app, ["--config-file", str(config_file)]) + + assert result.exit_code == 1 + mock_runner.run.assert_called_once_with(dry_run=False)