Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
@pytest.fixture
def workload_params(request):
params = request.param
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)]
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_files)]
return params, files_names
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]:
chunk_size_bytes = chunk_size_kib * 1024
bucket_name = bucket_map[bucket_type]

num_files = num_processes * num_coros
num_files = num_processes

# Create a descriptive name for the parameter set
name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges"
name = f"{pattern}_{bucket_type}_{num_processes}p_{num_coros}c_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges"

params[workload_name].append(
TimeBasedReadParameters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ workload:

- name: "read_rand_multi_process"
pattern: "rand"
coros: [1]
coros: [1, 16]
processes: [1]


defaults:
DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb"
DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb"
Original file line number Diff line number Diff line change
Expand Up @@ -115,47 +115,53 @@ def _download_time_based_json(client, filename, params):


async def _download_time_based_async(client, filename, params):
total_bytes_downloaded = 0

mrd = AsyncMultiRangeDownloader(client, params.bucket_name, filename)
await mrd.open()

offset = 0
is_warming_up = True
start_time = time.monotonic()
warmup_end_time = start_time + params.warmup_duration
test_end_time = warmup_end_time + params.duration
shared_lock = asyncio.Lock()

while time.monotonic() < test_end_time:
current_time = time.monotonic()
if is_warming_up and current_time >= warmup_end_time:
is_warming_up = False
total_bytes_downloaded = 0 # Reset counter after warmup
async def _worker_coro():
total_bytes_downloaded = 0
offset = 0
is_warming_up = True
start_time = time.monotonic()
warmup_end_time = start_time + params.warmup_duration
test_end_time = warmup_end_time + params.duration

ranges = []
if params.pattern == "rand":
for _ in range(params.num_ranges):
offset = random.randint(
0, params.file_size_bytes - params.chunk_size_bytes
)
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
else: # seq
for _ in range(params.num_ranges):
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
offset += params.chunk_size_bytes
if offset + params.chunk_size_bytes > params.file_size_bytes:
offset = 0 # Reset offset if end of file is reached

await mrd.download_ranges(ranges)
while time.monotonic() < test_end_time:
current_time = time.monotonic()
if is_warming_up and current_time >= warmup_end_time:
is_warming_up = False
total_bytes_downloaded = 0 # Reset counter after warmup

bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges)
assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges

if not is_warming_up:
total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges
ranges = []
if params.pattern == "rand":
for _ in range(params.num_ranges):
offset = random.randint(
0, params.file_size_bytes - params.chunk_size_bytes
)
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
else: # seq
for _ in range(params.num_ranges):
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
offset += params.chunk_size_bytes
if offset + params.chunk_size_bytes > params.file_size_bytes:
offset = 0 # Reset offset if end of file is reached

await mrd.download_ranges(ranges, lock=shared_lock)

bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges)
assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges

if not is_warming_up:
total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges
return total_bytes_downloaded

tasks = [asyncio.create_task(_worker_coro()) for _ in range(params.num_coros)]
results = await asyncio.gather(*tasks)

await mrd.close()
return total_bytes_downloaded
return sum(results)


def _download_files_worker(process_idx, filename, params, bucket_type):
Expand Down
Loading