diff --git a/packages/google-cloud-storage/output.json b/packages/google-cloud-storage/output.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/conftest.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/conftest.py index 5c0c787f036e..92f34440156e 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/conftest.py +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/conftest.py @@ -17,5 +17,5 @@ @pytest.fixture def workload_params(request): params = request.param - files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)] + files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_files)] return params, files_names diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.py index 737bb3b84f5b..a7bf67f465d3 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.py +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.py @@ -80,10 +80,10 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: chunk_size_bytes = chunk_size_kib * 1024 bucket_name = bucket_map[bucket_type] - num_files = num_processes * num_coros + num_files = num_processes # Create a descriptive name for the parameter set - name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges" + name = f"{pattern}_{bucket_type}_{num_processes}p_{num_coros}c_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges" params[workload_name].append( TimeBasedReadParameters( diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.yaml b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.yaml index e739bfd2fc09..18fa05f51001 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.yaml +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.yaml @@ -20,9 +20,10 @@ workload: - name: "read_rand_multi_process" pattern: "rand" - coros: [1] + coros: [1, 16] processes: [1] + defaults: DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb" DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb" \ No newline at end of file diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/test_reads.py index 083909670a56..7ed90060ae35 100644 --- a/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/test_reads.py +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -115,47 +115,53 @@ def _download_time_based_json(client, filename, params): async def _download_time_based_async(client, filename, params): - total_bytes_downloaded = 0 - mrd = AsyncMultiRangeDownloader(client, params.bucket_name, filename) await mrd.open() - offset = 0 - is_warming_up = True - start_time = time.monotonic() - warmup_end_time = start_time + params.warmup_duration - test_end_time = warmup_end_time + params.duration + shared_lock = asyncio.Lock() - while time.monotonic() < test_end_time: - current_time = time.monotonic() - if is_warming_up and current_time >= warmup_end_time: - is_warming_up = False - total_bytes_downloaded = 0 # Reset counter after warmup + async def _worker_coro(): + total_bytes_downloaded = 0 + offset = 0 + is_warming_up = True + start_time = time.monotonic() + warmup_end_time = start_time + params.warmup_duration + test_end_time = warmup_end_time + params.duration - ranges = [] - if params.pattern == "rand": - for _ in range(params.num_ranges): - offset = random.randint( - 0, params.file_size_bytes - params.chunk_size_bytes - ) - ranges.append((offset, params.chunk_size_bytes, BytesIO())) - else: # seq - for _ in range(params.num_ranges): - ranges.append((offset, params.chunk_size_bytes, BytesIO())) - offset += params.chunk_size_bytes - if offset + params.chunk_size_bytes > params.file_size_bytes: - offset = 0 # Reset offset if end of file is reached - - await mrd.download_ranges(ranges) + while time.monotonic() < test_end_time: + current_time = time.monotonic() + if is_warming_up and current_time >= warmup_end_time: + is_warming_up = False + total_bytes_downloaded = 0 # Reset counter after warmup - bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges) - assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges - - if not is_warming_up: - total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges + ranges = [] + if params.pattern == "rand": + for _ in range(params.num_ranges): + offset = random.randint( + 0, params.file_size_bytes - params.chunk_size_bytes + ) + ranges.append((offset, params.chunk_size_bytes, BytesIO())) + else: # seq + for _ in range(params.num_ranges): + ranges.append((offset, params.chunk_size_bytes, BytesIO())) + offset += params.chunk_size_bytes + if offset + params.chunk_size_bytes > params.file_size_bytes: + offset = 0 # Reset offset if end of file is reached + + await mrd.download_ranges(ranges, lock=shared_lock) + + bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges) + assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges + + if not is_warming_up: + total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges + return total_bytes_downloaded + + tasks = [asyncio.create_task(_worker_coro()) for _ in range(params.num_coros)] + results = await asyncio.gather(*tasks) await mrd.close() - return total_bytes_downloaded + return sum(results) def _download_files_worker(process_idx, filename, params, bucket_type):