Skip to content

Commit e00a201

Browse files
author
zhixiangli
committed
perf: update microbenchmarks and use concurrent download_ranges
1 parent 14abfd5 commit e00a201

File tree

3 files changed

+48
-35
lines changed

3 files changed

+48
-35
lines changed

packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]:
8383
num_files = num_processes * num_coros
8484

8585
# Create a descriptive name for the parameter set
86-
name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges"
86+
name = f"{pattern}_{bucket_type}_{num_processes}p_{num_coros}c_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges"
8787

8888
params[workload_name].append(
8989
TimeBasedReadParameters(

packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ workload:
2323
coros: [1]
2424
processes: [1]
2525

26+
- name: "read_rand_multi_coro"
27+
pattern: "rand"
28+
coros: [16]
29+
processes: [1]
30+
2631
defaults:
2732
DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb"
2833
DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb"

packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/test_reads.py

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -115,47 +115,53 @@ def _download_time_based_json(client, filename, params):
115115

116116

117117
async def _download_time_based_async(client, filename, params):
118-
total_bytes_downloaded = 0
119-
120118
mrd = AsyncMultiRangeDownloader(client, params.bucket_name, filename)
121119
await mrd.open()
122120

123-
offset = 0
124-
is_warming_up = True
125-
start_time = time.monotonic()
126-
warmup_end_time = start_time + params.warmup_duration
127-
test_end_time = warmup_end_time + params.duration
121+
shared_lock = asyncio.Lock()
128122

129-
while time.monotonic() < test_end_time:
130-
current_time = time.monotonic()
131-
if is_warming_up and current_time >= warmup_end_time:
132-
is_warming_up = False
133-
total_bytes_downloaded = 0 # Reset counter after warmup
123+
async def _worker_coro():
124+
total_bytes_downloaded = 0
125+
offset = 0
126+
is_warming_up = True
127+
start_time = time.monotonic()
128+
warmup_end_time = start_time + params.warmup_duration
129+
test_end_time = warmup_end_time + params.duration
134130

135-
ranges = []
136-
if params.pattern == "rand":
137-
for _ in range(params.num_ranges):
138-
offset = random.randint(
139-
0, params.file_size_bytes - params.chunk_size_bytes
140-
)
141-
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
142-
else: # seq
143-
for _ in range(params.num_ranges):
144-
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
145-
offset += params.chunk_size_bytes
146-
if offset + params.chunk_size_bytes > params.file_size_bytes:
147-
offset = 0 # Reset offset if end of file is reached
148-
149-
await mrd.download_ranges(ranges)
131+
while time.monotonic() < test_end_time:
132+
current_time = time.monotonic()
133+
if is_warming_up and current_time >= warmup_end_time:
134+
is_warming_up = False
135+
total_bytes_downloaded = 0 # Reset counter after warmup
150136

151-
bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges)
152-
assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges
153-
154-
if not is_warming_up:
155-
total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges
137+
ranges = []
138+
if params.pattern == "rand":
139+
for _ in range(params.num_ranges):
140+
offset = random.randint(
141+
0, params.file_size_bytes - params.chunk_size_bytes
142+
)
143+
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
144+
else: # seq
145+
for _ in range(params.num_ranges):
146+
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
147+
offset += params.chunk_size_bytes
148+
if offset + params.chunk_size_bytes > params.file_size_bytes:
149+
offset = 0 # Reset offset if end of file is reached
150+
151+
await mrd.download_ranges(ranges, lock=shared_lock)
152+
153+
bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges)
154+
assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges
155+
156+
if not is_warming_up:
157+
total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges
158+
return total_bytes_downloaded
159+
160+
tasks = [asyncio.create_task(_worker_coro()) for _ in range(params.num_coros)]
161+
results = await asyncio.gather(*tasks)
156162

157163
await mrd.close()
158-
return total_bytes_downloaded
164+
return sum(results)
159165

160166

161167
def _download_files_worker(process_idx, filename, params, bucket_type):
@@ -176,7 +182,9 @@ def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type):
176182

177183
@pytest.mark.parametrize(
178184
"workload_params",
179-
all_params["read_seq_multi_process"] + all_params["read_rand_multi_process"],
185+
all_params["read_seq_multi_process"]
186+
+ all_params["read_rand_multi_process"]
187+
+ all_params.get("read_rand_multi_coro", []),
180188
indirect=True,
181189
ids=lambda p: p.name,
182190
)

0 commit comments

Comments
 (0)