Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
__pycache__
.pytest_cache
logs
.vscode
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ $ docker run azure-openai-benchmarking load --help
Consider the following guidelines when creating your benchmark tests

1. **Ensure call characteristics match your production expectations**. The number of calls per minute and total tokens you are able to process varies depending on the prompt size, generation size and call rate.
1. **Run your test long enough to reach a stable state**. Throttling is based on the total compute you have deployed and are utilizing. The utilization includes active calls. As a result you will see a higher call rate when ramping up on an unloaded deployment because there are no existing active calls being processed. Once your deplyoment is fully loaded with a utilzation near 100%, throttling will increase as calls can only be processed as earlier ones are completed. To ensure an accurate measure, set the duration long enough for the throughput to stabilize, especialy when running at or close to 100% utilization.
1. **Run your test long enough to reach a stable state**. Throttling is based on the total compute you have deployed and are utilizing. The utilization includes active calls. As a result you will see a higher call rate when ramping up on an unloaded deployment because there are no existing active calls being processed. Once your deplyoment is fully loaded with a utilzation near 100%, throttling will increase as calls can only be processed as earlier ones are completed. To ensure an accurate measure, set the duration long enough for the throughput to stabilize, especialy when running at or close to 100% utilization. Also note that once the test ends (either by termination, or reaching the maximum duration or number of requests), any pending requests will continue to drain, which can result in lower throughput values as the load on the endpoint gradually decreases to 0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great clarification!



## Usage examples
Expand Down Expand Up @@ -101,6 +101,13 @@ $ cat mychatcontext.json | python -m benchmark.bench tokenize \
tokens: 65
```

**Analyse JSON logs from multiple runs**

The `combine_logs` subcommand can be used to load and combine the logs from multiple runs into a single CSV, ready for comparison. This tool extracts the run arguments as well as the final set of stats prior to the run ending (either by termination or hitting the request/duration limit).
```
$ python -m benchmark.bench combine_logs logs/ combined_logs.csv --load_recursive
```

## Configuration Option Details
### Shape profiles

Expand Down
28 changes: 27 additions & 1 deletion benchmark/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@

import argparse
import logging
import os
from datetime import datetime

from .tokenizecmd import tokenize
from .jsonloganalysis import combine_logs_to_csv
from .loadcmd import load
from .tokenizecmd import tokenize


def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
Expand All @@ -30,6 +34,7 @@ def main():
load_parser.add_argument("--temperature", type=float, help="Request temperature.")
load_parser.add_argument("--top-p", type=float, help="Request top_p.")
load_parser.add_argument("-f", "--output-format", type=str, default="human", help="Output format.", choices=["jsonl", "human"])
load_parser.add_argument("--log-save-dir", type=str, help="If provided, will save stddout to this directory. Filename will include important run parameters.")
load_parser.add_argument("-t", "--retry", type=str, default="none", help="Request retry strategy.", choices=["none", "exponential"])
load_parser.add_argument("-e", "--deployment", type=str, help="Azure OpenAI deployment name.", required=True)
load_parser.add_argument("api_base_endpoint", help="Azure OpenAI deployment base endpoint.", nargs=1)
Expand All @@ -45,7 +50,28 @@ def main():
tokenizer_parser.add_argument("text", help="Input text or chat messages json to tokenize. Default to stdin.", nargs="?")
tokenizer_parser.set_defaults(func=tokenize)

combine_logs_parser = sub_parsers.add_parser("combine_logs", help="Combine JSON logs from previous runs into a single CSV.")
combine_logs_parser.add_argument("logdir", type=str, help="Directory containing the log files.")
combine_logs_parser.add_argument("savepath", type=str, help="Path to save the output output CSV.")
combine_logs_parser.add_argument("--load-recursive", action="store_true", help="Whether to load logs in all subdirectories of log_dir.")
combine_logs_parser.set_defaults(func=combine_logs_to_csv)

args = parser.parse_args()

if args.func is load and args.log_save_dir is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering how is this different from using shell redirection of stderr? I could do something like this:

$ python -m benchmark.bench load ... 2> my/output/dir/output.log

because this is only stderr, I will only get the logger in output.log

now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
shape_str = f"context={args.context_tokens}_max_tokens={args.max_tokens}" if args.shape_profile == "custom" else args.shape_profile
rate_str = str(int(args.rate)) if (args.rate is not None) else 'none'
output_path = os.path.join(args.log_save_dir, f"{now}_{args.deployment}_shape-{shape_str}_clients={int(args.clients)}_rate={rate_str}.log")
os.makedirs(args.log_save_dir, exist_ok=True)
try:
os.remove(output_path)
except FileNotFoundError:
pass
fh = logging.FileHandler(output_path)
logger = logging.getLogger()
logger.addHandler(fh)

if "func" in args:
args.func(args)
else:
Expand Down
97 changes: 97 additions & 0 deletions benchmark/jsonloganalysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import argparse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this feature is too specific for this tool. In addition, but outputting stats in jsonl, you can use simple command line tools such as jq to aggregate.

Perhaps you should take this feature out in a separate PR for better discussion.

import json
import logging
from pathlib import Path
from typing import Optional

import pandas as pd


def combine_logs_to_csv(
args: argparse.Namespace,
) -> None:
"""
Combines all logs in a directory into a single csv file.
Args:
log_dir: Directory containing the log files.
save_path: Path to save the output output CSV.
load_recursive: Whether to load logs in all subdirectories of log_dir.
Defaults to True.
"""
log_dir = args.logdir
save_path = args.savepath
load_recursive = args.load_recursive

log_dir = Path(log_dir)
log_files = log_dir.rglob("*.log") if load_recursive else log_dir.glob("*.log")
log_files = sorted(log_files)
num_files = len(log_files)
# Extract run info from each log file
run_summaries = [extract_run_info_from_log_path(log_file) for log_file in log_files]
run_summaries = [summary for summary in run_summaries if isinstance(summary, dict)]
# Convert to dataframe and save to csv
if run_summaries:
df = pd.DataFrame(run_summaries)
df.set_index("filename", inplace=True)
df.to_csv(save_path, index=True)
logging.info(f"Saved {len(df)} runs to {save_path}")
else:
logging.error(f"No valid runs found in {log_dir}")
return

def extract_run_info_from_log_path(log_file: str) -> Optional[dict]:
"""Extracts run info from log file path"""
run_args = None
last_logged_stats = None
early_terminated = False
# Process lines, including only info before early termination or when requests start to drain
with open(log_file) as f:
for line in f.readlines():
if "got terminate signal" in line:
early_terminated = True
if "got terminate signal" in line or "requests to drain" in line:
# Ignore any stats after termination or draining of requests (since RPM, TPM, rate etc will start to decline as requests gradually finish)
break
# Save most recent line prior to termination/draining
if "Load" in line:
run_args = json.loads(line.split("Load test args: ")[-1])
if "run_seconds" in line:
last_logged_stats = line
if not run_args:
logging.error(f"Could not extract run args from log file {log_file} - missing run info (it might have been generated with a previous code version).")
return None
run_args["early-terminated"] = early_terminated
run_args["filename"] = Path(log_file).name
# Extract last line of valid stats from log if available
if last_logged_stats:
last_logged_stats = flatten_dict(json.loads(last_logged_stats))
run_args.update(last_logged_stats)
return run_args

def flatten_dict(input: dict) -> dict:
"""
Flattens dictionary of nested dictionaries/lists into a single level dictionary
Taken from https://www.geeksforgeeks.org/flattening-json-objects-in-python/
"""
out = {}

def flatten(x, name=''):
# If the Nested key-value
# pair is of dict type
if isinstance(x, dict):
for a in x:
flatten(x[a], name + a + '_')

# If the Nested key-value
# pair is of list type
elif isinstance(x, dict):
i = 0
for a in x:
flatten(a, name + str(i) + '_')
i += 1
else:
out[name[:-1]] = x

flatten(input)
return out
42 changes: 34 additions & 8 deletions benchmark/loadcmd.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import time
import json
import logging
import math
import os
import sys
import time
from typing import Iterable, Iterator

import aiohttp
import wonderwords
import math
import logging
from typing import Iterable, Iterator
from .statsaggregator import _StatsAggregator
from .oairequester import OAIRequester

from .asynchttpexecuter import AsyncHTTPExecuter
from .ratelimiting import RateLimiter, NoRateLimiter
from .oairequester import OAIRequester
from .oaitokenizer import num_tokens_from_messages
from .ratelimiting import NoRateLimiter, RateLimiter
from .statsaggregator import _StatsAggregator

import sys

class _RequestBuilder:
"""
Expand Down Expand Up @@ -66,6 +69,29 @@ def load(args):
print(f"invalid argument(s): {e}")
sys.exit(1)

run_args = {
"api_base_endpoint": args.api_base_endpoint[0],
"deployment": args.deployment,
"clients": args.clients,
"requests": args.requests,
"duration": args.duration,
"rate": args.rate,
"aggregation_window": args.aggregation_window,
"shape_profile": args.shape_profile,
"context_tokens": args.context_tokens,
"max_tokens": args.max_tokens,
"completions": args.completions,
"retry": args.retry,
"api_version": args.api_version,
"frequency_penalty": args.frequency_penalty,
"presence_penalty": args.presence_penalty,
"temperature": args.temperature,
"top_p": args.top_p,
"output_format": args.output_format,
}
converted = json.dumps(run_args)
logging.info("Load test args: " + converted)

api_key = os.getenv(args.api_key_env)
url = args.api_base_endpoint[0] + "/openai/deployments/" + args.deployment + "/chat/completions"
url += "?api-version=" + args.api_version
Expand Down
13 changes: 9 additions & 4 deletions benchmark/statsaggregator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import threading
import datetime
import time
import json
import logging
import threading
import time

import numpy as np

from .oairequester import RequestStats

logger = logging.getLogger()

class _Samples:
def __init__(self):
# [0] timestamp, [1] value
Expand Down Expand Up @@ -153,9 +158,9 @@ def _dump(self):
"95th": util_95th,
},
}
print(json.dumps(j), flush=True)
logger.info(json.dumps(j))
else:
print(f"{timestamp} rpm: {rpm:<5} requests: {self.requests_count:<5} failures: {self.failed_count:<4} throttled: {self.throttled_count:<4} tpm: {tokens_per_minute:<6} ttft_avg: {ttft_avg:<6} ttft_95th: {ttft_95th:<6} tbt_avg: {tbt_avg:<6} tbt_95th: {tbt_95th:<6} e2e_avg: {e2e_latency_avg:<6} e2e_95th: {e2e_latency_95th:<6} util_avg: {util_avg:<6} util_95th: {util_95th:<6}", flush=True)
logger.info(f"rpm: {rpm:<5} requests: {self.requests_count:<5} failures: {self.failed_count:<4} throttled: {self.throttled_count:<4} tpm: {tokens_per_minute:<6} ttft_avg: {ttft_avg:<6} ttft_95th: {ttft_95th:<6} tbt_avg: {tbt_avg:<6} tbt_95th: {tbt_95th:<6} e2e_avg: {e2e_latency_avg:<6} e2e_95th: {e2e_latency_95th:<6} util_avg: {util_avg:<6} util_95th: {util_95th:<6}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind using print and not logger is to redirect to stdout vs stderr such that you can use shell redirection to only get the stats or jsonl. You will need to add conditionals here to make sure that this behvior doesn't break.


def _slide_window(self):
with self.lock:
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ numpy
backoff
wonderwords
asyncio
aiohttp
aiohttp
pandas