Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions test/common/capture_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
from typing import Any, Dict, List

from common.db_utils import write_to_db
Expand Down Expand Up @@ -44,6 +45,7 @@ def post_process(table_name: str, **kwargs) -> List[Dict[str, Any]]:

# ---------------- decorator ----------------
def export_vars(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
# If the function returns a dict containing '_data' or 'data', post-process it
Expand Down
Empty file.
185 changes: 185 additions & 0 deletions test/common/llmperf/run_inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import json
import os
import random
from pathlib import Path
from typing import Any, Dict, List

import yaml
from common.llmperf.utils.token_benchmark import run_token_benchmark
from common.llmperf.utils.utils import reset_prefill_cache


def run_test_cases(
llm_api,
model,
timeout,
max_num_completed_requests,
concurrent_requests,
mean_input_tokens,
stddev_input,
mean_output_tokens,
stddev_output,
additional_sampling_params,
timestamp_dir,
server_url,
tokenizer_path,
hit_rate,
):
print(f"[INFO] Total {len(mean_input_tokens)} test cases to be executed")
all_summaries = []
failed_case = []

# Clear proxy environment variables
env = os.environ.copy()
env.pop("http_proxy", None)
env.pop("https_proxy", None)

for i, (
mean_input,
mean_output,
max_completed,
concurrent,
additional_sampling_params,
hit_rate_val,
) in enumerate(
zip(
mean_input_tokens,
mean_output_tokens,
max_num_completed_requests,
concurrent_requests,
additional_sampling_params,
hit_rate,
),
start=1,
):
# for i, case in enumerate(mean_input_tokens):
print(f"\n>>> Executing test case {i} <<<")
reset_prefill_cache(env, server_url)
# Use a fixed random_seed for each test to control PC hit_rate
random_seed = random.randint(1, 100000)

try:
# Determine if two runs are needed (PC hit_rate test)
if hit_rate_val == 0:
summary = run_token_benchmark(
llm_api=llm_api,
model=model,
test_timeout_s=timeout,
max_num_completed_requests=max_completed,
concurrent_requests=concurrent,
mean_input_tokens=mean_input,
stddev_input_tokens=stddev_input,
mean_output_tokens=mean_output,
stddev_output_tokens=stddev_output,
additional_sampling_params=additional_sampling_params,
results_dir=str(timestamp_dir),
random_seed=random_seed,
openai_api_base=server_url + "/v1",
tokenizer_path=tokenizer_path,
user_metadata={"case_idx": i, "phase": "normal"},
)
else:
print(
f"[INFO] hit_rate > 0 detected, entering prefill mode, PC hit rate: {hit_rate_val} %"
)
# hit_rate > 0: first prefill mode
prefill_mean_input = int(mean_input * hit_rate_val / 100)
print(
f"[INFO] Prefill execution: mean_input_tokens={prefill_mean_input}"
)
run_token_benchmark(
llm_api=llm_api,
model=model,
test_timeout_s=timeout,
max_num_completed_requests=max_completed,
concurrent_requests=concurrent,
mean_input_tokens=prefill_mean_input,
stddev_input_tokens=stddev_input,
mean_output_tokens=2,
stddev_output_tokens=stddev_output,
additional_sampling_params=additional_sampling_params,
results_dir=str(timestamp_dir),
random_seed=random_seed,
openai_api_base=server_url + "/v1",
tokenizer_path=tokenizer_path,
user_metadata={"case_idx": i, "phase": "prefill"},
)
reset_prefill_cache(env, server_url)
# Then run normal mode
print("[INFO] Prefill completed, switching to normal mode execution")
summary = run_token_benchmark(
llm_api=llm_api,
model=model,
test_timeout_s=timeout,
max_num_completed_requests=max_completed,
concurrent_requests=concurrent,
mean_input_tokens=mean_input,
stddev_input_tokens=stddev_input,
mean_output_tokens=mean_output,
stddev_output_tokens=stddev_output,
additional_sampling_params=additional_sampling_params,
results_dir=str(timestamp_dir),
random_seed=random_seed,
openai_api_base=server_url + "/v1",
tokenizer_path=tokenizer_path,
user_metadata={"case_idx": i, "phase": "normal"},
)
all_summaries.append(summary)
except Exception as e:
print(f"[Warning] {e}")
failed_case.append(i)

return all_summaries, failed_case


def inference_results(
mean_input_tokens,
mean_output_tokens,
max_num_completed_requests,
concurrent_requests,
additional_sampling_params,
hit_rate,
):
config_file = Path(__file__).parent.parent.parent / "config.yaml"
print("[INFO] Initialization complete, starting main process")
print(f"[INFO] Reading configuration file: {config_file}")
with open(config_file, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
llm_api = config.get("llm_connection", {}).get("llm_api", "openai")
model = config.get("llm_connection", {}).get("model", "")
test_timeout_s = config.get("llm_connection", {}).get("test_timeout_s", 60000)
stddev_input_tokens = config.get("llm_connection", {}).get(
"stddev_input_tokens", 0
)
stddev_output_tokens = config.get("llm_connection", {}).get(
"stddev_output_tokens", 0
)
timestamp_dir = Path("results")
timestamp_dir.mkdir(parents=True, exist_ok=True)
server_url = config.get("llm_connection", {}).get("server_url", "")
tokenizer_path = config.get("llm_connection", {}).get("tokenizer_path", "")
print(f"[INFO] Created results directory: {timestamp_dir}")

all_summaries, failed_cases = run_test_cases(
llm_api,
model,
test_timeout_s,
max_num_completed_requests,
concurrent_requests,
mean_input_tokens,
stddev_input_tokens,
mean_output_tokens,
stddev_output_tokens,
additional_sampling_params,
timestamp_dir,
server_url,
tokenizer_path,
hit_rate,
)
total = len(mean_input_tokens)
print(
f"\n[INFO] All tests completed! Success: {total - len(failed_cases)}/{total}"
)
if failed_cases:
print(f"[WARN] Failed case indices: {failed_cases}")
return all_summaries
Empty file.
17 changes: 17 additions & 0 deletions test/common/llmperf/utils/common_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# TODO (Avnishn): compute metrics in class
INTER_TOKEN_LAT = "inter_token_latency_s"
TTFT = "ttft_s"
E2E_LAT = "end_to_end_latency_s"
NUM_INPUT_TOKENS = "number_input_tokens"
NUM_OUTPUT_TOKENS = "number_output_tokens"
NUM_TOTAL_TOKENS = "number_total_tokens"
REQ_OUTPUT_THROUGHPUT = "request_output_throughput_token_per_s"
ERROR_MSG = "error_msg"
ERROR_CODE = "error_code"
ERROR_CODE_FREQ = "error_code_frequency"
NUM_ERRORS = "number_errors"
OUTPUT_THROUGHPUT = "mean_output_throughput_token_per_s"
NUM_COMPLETED_REQUESTS = "num_completed_requests"
COMPLETED_REQUESTS_PER_MIN = "num_completed_requests_per_min"
ERROR_RATE = "error_rate"
NUM_REQ_STARTED = "num_requests_started"
23 changes: 23 additions & 0 deletions test/common/llmperf/utils/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Any, Dict, Optional, Tuple

from pydantic import BaseModel


class RequestConfig(BaseModel):
"""The configuration for a request to the LLM API.

Args:
model: The model to use.
prompt: The prompt to provide to the LLM API.
sampling_params: Additional sampling parameters to send with the request.
For more information see the Router app's documentation for the completions
llm_api: The name of the LLM API to send the request to.
metadata: Additional metadata to attach to the request for logging or validation purposes.
"""

model: str
prompt: Tuple[str, int]
sampling_params: Optional[Dict[str, Any]] = None
llm_api: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
openai_api_base: Optional[str] = ""
136 changes: 136 additions & 0 deletions test/common/llmperf/utils/openai_chat_completions_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import json
import os
import time
from asyncio import timeout
from pathlib import Path
from typing import Any, Dict, Tuple

import requests
import yaml
from common.llmperf.utils import common_metrics
from common.llmperf.utils.models import RequestConfig

config_file = Path(__file__).parent.parent.parent.parent / "config.yaml"
with open(config_file, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
stream = config.get("llm_connection", {}).get("stream", True)
ignore_eos = config.get("llm_connection", {}).get("ignore_eos", True)
timeout = config.get("llm_connection", {}).get("timeout", 180)


class OpenAIChatCompletionsClient:
"""
used for sending HTTP requests, receiving token streams, measuring latency, etc.
"""

def llm_request(
self, request_config: RequestConfig
) -> Tuple[Dict[str, Any], str, RequestConfig]:
prompt, prompt_len = request_config.prompt

message = [
{"role": "user", "content": prompt},
]
model = request_config.model
body = {
"model": model,
"messages": message,
"stream": stream,
"ignore_eos": ignore_eos,
}
sampling_params = request_config.sampling_params
body.update(sampling_params or {})

time_to_next_token = []
tokens_received = 0
ttft = 0.0
error_response_code = None
generated_text = ""
error_msg = ""
output_throughput = 0.0
total_request_time = 0.0
flag = False

metrics: Dict[str, Any] = {}

metrics[common_metrics.ERROR_CODE] = None
metrics[common_metrics.ERROR_MSG] = ""

start_time = time.monotonic()
most_recent_received_token_time = start_time

address = request_config.openai_api_base

if not address:
raise ValueError("the environment variable OPENAI_API_BASE must be set.")
key = os.environ.get("OPENAI_API_KEY", "secret_abcdefg")
if not key:
raise ValueError("the environment variable OPENAI_API_KEY must be set.")
headers = {"Authorization": f"Bearer {key}"}
if not address.endswith("/"):
address = address + "/"
address += "chat/completions"
try:
with requests.post(
address,
json=body,
stream=stream,
timeout=timeout,
headers=headers,
) as response:
if response.status_code != 200:
error_msg = response.text
error_response_code = response.status_code
response.raise_for_status()

for chunk in response.iter_lines(chunk_size=None):
if not chunk:
continue
stem = b"data: "
if chunk.startswith(stem):
chunk = chunk[len(stem) :]
# Data might already be bytes or str
if isinstance(chunk, bytes):
chunk = chunk.decode("utf-8", errors="ignore")
if chunk.strip() == "[DONE]":
continue
tokens_received += 1
data = json.loads(chunk)
if "error" in data:
error_msg = data["error"]["message"]
error_response_code = data["error"]["code"]
raise RuntimeError(error_msg)
delta = data["choices"][0]["delta"]
content = delta.get("content", None) or delta.get(
"reasoning_content", ""
)
if content:
if tokens_received != 0 and flag == False:
ttft = time.monotonic() - start_time
flag = True
else:
time_to_next_token.append(
time.monotonic() - most_recent_received_token_time
)
most_recent_received_token_time = time.monotonic()
generated_text += content

total_request_time = time.monotonic() - start_time
if total_request_time > 0:
output_throughput = tokens_received / total_request_time

except Exception as e:
metrics[common_metrics.ERROR_MSG] = error_msg
metrics[common_metrics.ERROR_CODE] = error_response_code
print(f"Warning Or Error: {e}")
print(error_response_code)

metrics[common_metrics.INTER_TOKEN_LAT] = sum(time_to_next_token)
metrics[common_metrics.TTFT] = ttft
metrics[common_metrics.E2E_LAT] = total_request_time
metrics[common_metrics.REQ_OUTPUT_THROUGHPUT] = output_throughput
metrics[common_metrics.NUM_TOTAL_TOKENS] = tokens_received + prompt_len
metrics[common_metrics.NUM_OUTPUT_TOKENS] = tokens_received
metrics[common_metrics.NUM_INPUT_TOKENS] = prompt_len

return metrics, generated_text, request_config
Loading