Skip to content

Add prefetcher reader for standard buckets.#795

Open
googlyrahman wants to merge 6 commits intofsspec:mainfrom
ankitaluthra1:regional
Open

Add prefetcher reader for standard buckets.#795
googlyrahman wants to merge 6 commits intofsspec:mainfrom
ankitaluthra1:regional

Conversation

@googlyrahman
Copy link
Copy Markdown
Contributor

Description generated by AI

Asynchronous Background Prefetcher

A new BackgroundPrefetcher class has been implemented in gcsfs/prefetcher.py ( source). This component is designed to:

  • Proactively Fetch Data: It spawns a background producer task that fetches sequential blocks of data before they are explicitly requested ( source).
  • Adaptive Blocksize: The engine dynamically adjusts its blocksize based on the history of requested read sizes ( source).
  • Sequential Streak Detection: Prefetching is triggered after detecting a "streak" of sequential reads ( source).
  • Optimized Slicing: Uses ctypes for a fast, low-overhead slice implementation (_fast_slice) to manage internal buffers ( source).

Core Refactoring for Concurrency

The file-fetching logic in gcsfs/core.py has been refactored to enable parallel downloads:

  • _cat_file Decomposition: _cat_file is now split into _cat_file_sequential and _cat_file_concurrent ( source).
  • Threshold-Based Routing: Concurrent fetching is automatically utilized when the requested data size exceeds MIN_CHUNK_SIZE_FOR_CONCURRENCY (defaulting to 5MB) and multiple concurrency slots are requested ( source).
  • Integration: The GCSFile object now optionally initializes the _prefetch_engine when the use_prefetch_reader flag is provided ( source).

@googlyrahman googlyrahman changed the title Add prefetcher engine for regional buckets. Add prefetcher reader for regional buckets. Mar 30, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Mar 30, 2026

Codecov Report

❌ Patch coverage is 98.36512% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 79.73%. Comparing base (e70bc65) to head (d6ae2e1).

Files with missing lines Patch % Lines
gcsfs/core.py 93.75% 3 Missing ⚠️
gcsfs/prefetcher.py 99.05% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #795      +/-   ##
==========================================
+ Coverage   75.98%   79.73%   +3.75%     
==========================================
  Files          14       15       +1     
  Lines        2665     3030     +365     
==========================================
+ Hits         2025     2416     +391     
+ Misses        640      614      -26     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@googlyrahman googlyrahman marked this pull request as ready for review March 30, 2026 07:38
@googlyrahman googlyrahman changed the title Add prefetcher reader for regional buckets. Add prefetcher reader for standard buckets. Mar 30, 2026
gcsfs/core.py Outdated
) or os.environ.get("use_prefetch_reader", False)
if use_prefetch_reader:
max_prefetch_size = kwargs.get("max_prefetch_size", None)
concurrency = kwargs.get("concurrency", 4)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's call this prefetcher_concurrency

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think concurrency is better term here, it is because _cat_file also uses this concurrency parameter in case prefetcher engine is disabled, so this is unrelated to prefetcher

gcsfs/core.py Outdated
await asyncio.gather(*tasks, return_exceptions=True)
raise e

async def _cat_file(self, path, start=None, end=None, concurrency=4, **kwargs):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also move this to constant namely DEFAULT_PREFETCHER_CONCURRENCY

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we mixing concerns here? _cat_file does not necessarily use the prefetcher at all. Indeed, why is prefetcher an option, when this is a single blob read, not sequential?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also move this to constant namely DEFAULT_PREFETCHER_CONCURRENCY

Introduced a default in zb_hns_util.py named as DEFAULT_CONCURRENCY

Aren't we mixing concerns here? _cat_file does not necessarily use the prefetcher at all. Indeed, why is prefetcher an option, when this is a single blob read, not sequential?

_cat_file doesn't actually do any prefetching. We are just completing the existing call path (GCSFile -> cache -> GCSFile._fetch -> GCSFileSystem._cat_file). While it does not prefetch, it will now fetch chunks concurrently if the requested size is under 5MB.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resonating with what Martin is saying, I also feel the prefetching and concurrency logics are being integrated directly into the GCSFile core functionality (_cat_file), I think adding custom arguments and environment variables actually is a side effect of mixing these concerns. Should we consider implementing this as an fsspec cache implementation rather than modifying the core _cat_file logic, wdyt @martindurant ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree that this should be implemented as a cache! prefetching, and caching should be two different component. I think moving forward we should've both caching, and prefetching just like kernel.

The repetitive access should be cached (This will help significantly when reading header/footer based file), and prefetching where the engine fetches the data in background which user request next.

There's a switch to turn the prefetching off as per user wish.

Regarding change in _cat_file, The current code path still points to previously written code, so i don't see a problem here, and we need concurrency in _cat_file to support large file non-streaming downloads.

@googlyrahman googlyrahman force-pushed the regional branch 5 times, most recently from 82240a5 to 44d34f3 Compare March 31, 2026 21:48
gcsfs/core.py Outdated
use_prefetch_reader = kwargs.get(
"use_experimental_adaptive_prefetching", False
) or os.environ.get(
"use_experimental_adaptive_prefetching", "false"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's capitalize this, to be consistent with other env variable naming convention

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can someone please explain why we cannot use normal cache_type= and cache_options= alone rather than having to invent a set of new environment variables (not to mention the extra kwargs)?

Copy link
Copy Markdown
Contributor

@jasha26 jasha26 Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, Martin! I definitely see the logic in wanting to keep the cache_options namespace clean.

While this was initially implemented as a cache but later on we chose to keep the prefetcher separate because it addresses a different concern than our standard caches. While cache_type defines how we persist data locally, this prefetcher is focused on when we request data from the network based on sequential read streaks.

If we merge this into a cache option, we lose the ability to use it in tandem with the existing, robust fsspec caches. We wanted a solution where a customer could get 'smarter' fetches without having to re-architect their current caching strategy.

Regarding the environment variables and kwargs: I completely agree it's a bit of 'noise.' We introduced the feature flag (use_experimental_adaptive_prefetching) mainly to keep it safe and opt-in while it's in this experimental stage.

Additionally, the concurrent downloading logic in _cat_file is a fundamental speed improvement that we felt should benefit the core file-fetching process regardless of the caching or prefetching state.

Open to your thoughts on if there's a better way to expose this 'fetch-level' optimization without cluttering the API!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, the concurrent downloading logic in _cat_file is a fundamental speed improvement that we felt should benefit the core file-fetching process regardless of the caching or prefetching state.

Agreed on this - it's completely decoupled functionality, could be split into a separate PR (essentially the same as fsspec/s3fs#1007 )

Copy link
Copy Markdown
Member

@martindurant martindurant Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you may be fixing the wrong problem. I think the source of trouble, is that the File API is sync/blocking, and so all the cache implementations are too (_fetch() calls sync on the filesystem for async impls), but you want to run async code.
I think it might be wrong, though - the cache API does see all the reads of the file and you need to cross the sync/async bridge eventually either way.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant I didn't quite catch the meaning of your last comment - could you please elaborate?

Regarding why we need separate environment variable, and why it can't come in cache_options: the reason it isn't included there yet is that the existing fsspec caches don't accept *args or **kwargs.

Cache gets initialised with cache_options here If a user passes an unsupported option like (enable_prefetch_engine) in cache_options, the cache raises an unexpected argument error. I plan to open a separate PR to allow caches to accept arbitrary arguments, and I've already left a comment in this PR tracking that.

Finally, as for why this is implemented as a prefetch engine rather than a cache, that architectural decision is already in discussion in another thread.

# there currently causes instantiation errors. We are holding off on introducing
# them as explicit keyword arguments to ensure existing user workloads are not
# disrupted. This will be refactored once the upstream `fsspec` changes are merged.
use_prefetch_reader = kwargs.get(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's only do env variable for flag and not kwargs

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree! This means when we want to surface the arguments, we'll have to support both and decide precedence.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I would also like to keep the argument & environment variable so that users can disable the prefetch engine if they would like to disable.

for t in tasks:
if not t.done():
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is in case there is any exceptions in coroutines, it would be added in results, so we also need to iterate through results to catch the exception

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we already caught an exception in the gather() above at this point - so his is just running the coroutines rather than leaving them pending. Cancel might be better.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Martin is right. I don't think we need any change here - We do not need to iterate through the results and raise an exception. Here is what happens:

  • We spawn four tasks.
  • If any of them fail, we cancel the rest.
  • Simply calling t.cancel() does not destroy the task. We need to explicitly await t for that, which is why we use a gather statement.
  • Since the other tasks are destroyed abruptly by the initial error, we only raise the original error that caused the cancellations.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first gather() will not cancel pending tasks when it returns with the first exception, we still need to do that, otherwise, we wait for all IO to complete.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant, we're already cancelling the task, See t.cancel() in the image
Screenshot 2026-04-05 at 1 27 56 AM

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, apologies

gcsfs/core.py Outdated
await asyncio.gather(*tasks, return_exceptions=True)
raise e

async def _cat_file(self, path, start=None, end=None, concurrency=4, **kwargs):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resonating with what Martin is saying, I also feel the prefetching and concurrency logics are being integrated directly into the GCSFile core functionality (_cat_file), I think adding custom arguments and environment variables actually is a side effect of mixing these concerns. Should we consider implementing this as an fsspec cache implementation rather than modifying the core _cat_file logic, wdyt @martindurant ?

Copy link
Copy Markdown
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main and I think overriding reason to have the prefetcher be a cache_type, is that we want to make it accessible from other (async) filesystem implementations eventually. This will show that this collaboration isn't just good for GCS, but for the community at large.
In addition, keeping to the established pattern will help long-term maintainability.

):
"""Simple one-shot, or concurrent get of file data"""
if concurrency > 1:
return await self._cat_file_concurrent(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should work for concurrency==1 too, instead of having two separate method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. However, in a follow-up CL, the concurrent method will use zero-copy. Therefore, this call is necessary because _cat_file_concurrent will fetch data differently moving forward, and we want to avoid shifting our entire workload to that new path all at once.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to avoid shifting our entire workload to that new path all at once.

Why? Having one code path to maintain should be better, unless you anticipate some problem.

Furthermore, join() doe not copy when not necessary:

>>> x is b"".join([x])
True

Copy link
Copy Markdown
Contributor Author

@googlyrahman googlyrahman Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I don't have any issue pointing this to the same code path, and I do not anticipate any problems with the new implementation.

However, from an organizational standpoint, we need to keep this feature strictly behind a flag to ensure the merge has no immediate impact, and hence want to keep these changes isolated so that when we later introduce zero-copy to the concurrent path, users who opt out of the flag will still safely default to the old behavior.

Once we make the new behavior the default, we will consolidate the code and remove this method. I have already added a comment in the code for the same.

for t in tasks:
if not t.done():
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we already caught an exception in the gather() above at this point - so his is just running the coroutines rather than leaving them pending. Cancel might be better.

Comment on lines +1206 to +1220
for i in range(concurrency):
offset = start + (i * part_size)
actual_size = (
part_size if i < concurrency - 1 else total_size - (i * part_size)
)
tasks.append(
asyncio.create_task(
self._cat_file_sequential(
path, start=offset, end=offset + actual_size, **kwargs
)
)
)

try:
results = await asyncio.gather(*tasks)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just gather(*[...]); I don't think you need to write out the loop. Also, you don't need create_task(), gather() does that automatically if given coroutines.

Copy link
Copy Markdown
Contributor Author

@googlyrahman googlyrahman Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to keep asyncio.create_task() is because we need explicit Task objects to manually cancel them in the except block if a failure occurs. If we were on Python 3.11+, we could definitely drop this and use asyncio.TaskGroup to handle the cancellation automatically, but gather doesn't do that natively.

I'm also going to stick with the explicit for loop. Packing the start/end offset calculations into a list comprehension makes that block too dense, so the explicit loop is necessary here for readability.

The code if i remove the loop

tasks = [
    asyncio.create_task(
         self._cat_file_sequential(
              path,
              start=start + (i * part_size),
              end=start + (i * part_size) + (part_size if i < concurrency - 1 else total_size - (i * part_size)),
              **kwargs
         )
    )
    for i in range(concurrency)
]

# there currently causes instantiation errors. We are holding off on introducing
# them as explicit keyword arguments to ensure existing user workloads are not
# disrupted. This will be refactored once the upstream `fsspec` changes are merged.
use_prefetch_reader = kwargs.get(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree! This means when we want to surface the arguments, we'll have to support both and decide precedence.

try:
return self.gcsfs.cat_file(self.path, start=start, end=end)
if self._prefetch_engine:
return self._prefetch_engine._fetch(start=start, end=end)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't see why you would have a standard fsspec cacher overlayed on the prefetcher. The only one it might work with is "readhead", but actually the prefetches does all of that functionality and more, no?

Copy link
Copy Markdown
Contributor Author

@googlyrahman googlyrahman Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary reason to position the prefetcher below cache rather than integrating it directly is to allow other caches to benefit from the prefetched data. Furthermore, we will always retain the ability to enable or disable the prefetch logic as needed.

This approach mirrors standard OS kernel architecture, which maintains the page cache (which can be bypassed) and the read-ahead prefetching mechanism as distinct, decoupled entities.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we can do some experiments if you like, read patterns like:

Some backtracking: 1,2,3,4,2,5,6,7,5...
Frequent visits home: 1,2,1,3,1,4,1,5...

but I strongly suspect that the first one would behave just like readahead (but better because of prefetching) and the second would be better with type "first" and the prefetcher doesn't help at all.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're exactly right that the prefetcher doesn't actively help with the 'frequent visits home' pattern, but crucially, it doesn't backfire either. Because the prefetcher only triggers after detecting a threshold of sequential reads (which we can configure to be 2, 3, or more), it simply stays out of the way during non-sequential access. This is why purely random workloads perform exactly the same whether prefetching is enabled or disabled, as reflected in the benchmarks.

Regarding the 'frequent visits home' pattern specifically: identifying and serving repeatedly accessed blocks (like block 1) is entirely the job of a cache, not a prefetcher. This pattern is actually a perfect example of why decoupling the two is so valuable. Layering them allows the cache to handle the repetitive hits, while the prefetcher handles the sequential scans.

@googlyrahman
Copy link
Copy Markdown
Contributor Author

googlyrahman commented Apr 3, 2026

please see https://github.com/ankitaluthra1/gcsfs/blob/regional/docs/source/prefetcher.rst, I would also like to highlight going forward, we'll have cache_type="none" as default

@martindurant
Copy link
Copy Markdown
Member

Thanks for the writeup. I am reading through, but today is a holiday here, and I will have limited time.

@martindurant
Copy link
Copy Markdown
Member

we'll have cache_type="none" as default

I agree that this is exactly what makes sense, and seems to me to back up my suggestion that this is really a cache type. :)

else:
# Native Python slicing was GIL bound in my experiments.
chunk = await asyncio.to_thread(
_fast_slice, self._current_block, self._current_block_idx, take
Copy link
Copy Markdown
Member

@martindurant martindurant Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would need heavy convincing that this is worth it. Consider, the new thread needs to spin up, sync with the interpreter, and then call into C, while this thread waits for the message to come back. The time spent copying is very small, probably small compared to the time to reacquire the GIL (depending on read size).

Naive timings:

[ ] data = b"0" * 512 * 2**20
[ ] %timeit data[3*2**20:14*2**20]
244 μs ± 1.25 μs

[ ] %timeit memoryview(data)[3*2**20:14*2**20]
145 ns ± 0.0863 ns  ## <--- note change of unit!

[ ] %timeit _fast_slice(data, 3*2**20, 14*2**20 - 3*2**20)
247 μs ± 3.28 μs

[ ] %timeit th = threading.Thread(target=lambda:True); th.start(); th.join()
31.1 μs ± 1.04 μs

[ ] pool = concurrent.futures.ThreadPoolExecutor(1)
[ ] %timeit pool.submit(lambda: True).result()
12.9 μs ± 431 ns

[ ] pool = ThreadPoolExecutor(16)
[ ] %timeit list(pool.map(lambda _:_fast_slice(data, 3*2**20, 14*2**20 - 3*2**20), range(16)))
7.36 ms ± 193 μs. ## <--- note change of unit!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your comparison data is wrong. Here is why and why_fast_slice is absolutely worth it:

First, you're comparing memoryview(data)[a:b] with _fast_slice(a, b). This is the wrong comparison in the first place. The former returns a memoryview (a reference), while the latter returns bytes (a memory copy). We strictly need bytes, not a memoryview. To evaluate this properly, you have to compare bytes(memoryview(data)[a:b]). Once you do that to force the required memory copy, the performance drops to the exact same range.

Second, regarding single-threaded execution: your benchmarks noted that _fast_slice and native Python slicing run in a very similar time range. I agree. (Note: I am talking strictly about raw slicing here, not wrapping it in asyncio.to_thread - we'll get to that). However, similar execution time does not mean zero benefit. The true advantage of _fast_slice over native or memoryview slicing is that it is GIL-free. During my experiments, the native methods held the GIL for 30s out of a 30s runtime. By contrast, _fast_slice only held the GIL for about 300ms or less. This is what makes it vastly superior even if the execution time is same in single threaded environment.

Because it releases the GIL, _fast_slice scales massively under concurrency. Yes, overall performance depends on the slice size. In my experiments, _fast_slice is slightly slower (by about ~20-30%) for micro-slicing (< 512KB) due to FFI overhead though it still frees the GIL during that time. But when we slice 10MB payloads with concurrent tasks enabled, it pulls ahead and becomes up to 12-15x faster than raw native or memoryview slicing.

Why not use _fast_slice and why await asyncio.to_thread(_fast_slice)? It is common industry practice to wrap heavy synchronous methods in asyncio.to_thread so that your event loop doesn't get blocked, allowing it to process other tasks. If we are testing concurrent workloads, this exact same practice should be applied to native slicing. And regarding thread spin-up cost: asyncio.to_thread doesn't create a new thread from scratch every time; it maintains an optimized thread pool, size of which depends upon the machine specs. What we can do better here is that, we can check the slicing size before passing it to asyncio.to_thread. This ensures we don't pay the thread routing overhead for micro-slices in the low KBs, while keeping the massive performance gains for large payloads.

In my benchmarks, _fast_slice outperforms every scenario you mentioned under concurrent load, while providing similar execution time (but GIL-free!) in a single-threaded environment. When comparing asyncio.to_thread(_fast_slice), I expect the exact same threading practice to be applied to native slicing for a real comparison.

margubur@perf-test:~/scripts/rahman$ python3 check.py 
Starting Asyncio Slicing Benchmark...
=======================================================

--- Micro-slicing - 1 Task(s) ---
Slice Size: 512 KB | Concurrency: 1 task(s) | 10000 slices per task
Native Slicing           : 0.5533 seconds
Memoryview Slicing       : 0.5706 seconds
Fast Slice (ctypes)      : 0.5866 seconds

Results vs Native Slicing Baseline:
-> Memoryview Slicing  is 1.03x SLOWER
-> Fast Slice (ctypes) is 1.06x SLOWER
-------------------------------------------------------

--- Micro-slicing - 4 Task(s) ---
Slice Size: 512 KB | Concurrency: 4 task(s) | 10000 slices per task
Native Slicing           : 1.8398 seconds
Memoryview Slicing       : 1.8453 seconds
Fast Slice (ctypes)      : 1.8970 seconds

Results vs Native Slicing Baseline:
-> Memoryview Slicing  is 1.00x SLOWER
-> Fast Slice (ctypes) is 1.03x SLOWER
-------------------------------------------------------

--- Micro-slicing - 10 Task(s) ---
Slice Size: 512 KB | Concurrency: 10 task(s) | 10000 slices per task
Native Slicing           : 4.2347 seconds
Memoryview Slicing       : 4.2817 seconds
Fast Slice (ctypes)      : 3.9721 seconds

Results vs Native Slicing Baseline:
-> Memoryview Slicing  is 1.01x SLOWER
-> Fast Slice (ctypes) is 1.07x FASTER
-------------------------------------------------------

--- Macro-slicing - 1 Task(s) ---
Slice Size: 10240 KB | Concurrency: 1 task(s) | 1000 slices per task
Native Slicing           : 0.7649 seconds
Memoryview Slicing       : 0.7508 seconds
Fast Slice (ctypes)      : 0.7488 seconds

Results vs Native Slicing Baseline:
-> Memoryview Slicing  is 1.02x FASTER
-> Fast Slice (ctypes) is 1.02x FASTER
-------------------------------------------------------

--- Macro-slicing - 4 Task(s) ---
Slice Size: 10240 KB | Concurrency: 4 task(s) | 1000 slices per task
Native Slicing           : 7.2861 seconds
Memoryview Slicing       : 7.2601 seconds
Fast Slice (ctypes)      : 0.6805 seconds

Results vs Native Slicing Baseline:
-> Memoryview Slicing  is 1.00x FASTER
-> Fast Slice (ctypes) is 10.71x FASTER
-------------------------------------------------------

--- Macro-slicing - 10 Task(s) ---
Slice Size: 10240 KB | Concurrency: 10 task(s) | 1000 slices per task
Native Slicing           : 11.6029 seconds
Memoryview Slicing       : 11.0525 seconds
Fast Slice (ctypes)      : 0.9628 seconds

Results vs Native Slicing Baseline:
-> Memoryview Slicing  is 1.05x FASTER
-> Fast Slice (ctypes) is 12.05x FASTER
-------------------------------------------------------

Script for your reference:

import asyncio
import ctypes
import os
import time

PyBytes_FromStringAndSize = ctypes.pythonapi.PyBytes_FromStringAndSize
PyBytes_FromStringAndSize.restype = ctypes.py_object
PyBytes_FromStringAndSize.argtypes = [ctypes.c_void_p, ctypes.c_ssize_t]

PyBytes_AsString = ctypes.pythonapi.PyBytes_AsString
PyBytes_AsString.restype = ctypes.c_void_p
PyBytes_AsString.argtypes = [ctypes.py_object]

def _fast_slice(src_bytes, offset, read_size):
    if read_size == 0:
        return b""
    dest_bytes = PyBytes_FromStringAndSize(None, read_size)
    src_ptr = PyBytes_AsString(src_bytes)
    dest_ptr = PyBytes_AsString(dest_bytes)
    ctypes.memmove(dest_ptr, src_ptr + offset, read_size)
    return dest_bytes

async def native_worker(data: bytes, offset: int, size: int, iterations: int):
    """Worker using native python slicing inside a thread."""
    def worker():
        return data[offset : offset + size]

    for _ in range(iterations):
        _ = await asyncio.to_thread(worker)

async def memoryview_worker(data: bytes, offset: int, size: int, iterations: int):
    """Worker using memoryview slicing inside a thread."""
    def worker():
        return bytes(memoryview(data)[offset : offset + size])

    for _ in range(iterations):
        _ = await asyncio.to_thread(worker)

async def fast_slice_worker(data: bytes, offset: int, size: int, iterations: int):
    """Worker delegating _fast_slice to a background thread."""
    def worker():
        return _fast_slice(data, offset, size)

    for _ in range(iterations):
        _ = await asyncio.to_thread(worker)


async def run_scenario(scenario_name: str, payload_mb: int, slice_kb: int, tasks: int, iterations_per_task: int):
    print(f"\n--- {scenario_name} ---")
    print(f"Slice Size: {slice_kb} KB | Concurrency: {tasks} task(s) | {iterations_per_task} slices per task")
    
    data = os.urandom(payload_mb * 1024 * 1024)
    offset = 1024 * 1024
    size = int(slice_kb * 1024)

    methods = {
        "Native Slicing": native_worker,
        "Memoryview Slicing": memoryview_worker,
        "Fast Slice (ctypes)": fast_slice_worker
    }

    results = {}

    for name, worker_func in methods.items():
        start_time = time.perf_counter()
        async_tasks = [
            asyncio.create_task(worker_func(data, offset, size, iterations_per_task)) 
            for _ in range(tasks)
        ]
        await asyncio.gather(*async_tasks)
        elapsed_time = time.perf_counter() - start_time
        
        results[name] = elapsed_time
        print(f"{name: <25}: {elapsed_time:.4f} seconds")

    print("\nResults vs Native Slicing Baseline:")
    baseline = results["Native Slicing"]
    for name, elapsed in results.items():
        if name == "Native Slicing":
            continue
        speedup = baseline / elapsed
        direction = "FASTER" if speedup >= 1 else "SLOWER"
        display_ratio = speedup if speedup >= 1 else (1 / speedup) 
        print(f"-> {name: <19} is {display_ratio:.2f}x {direction}")
    print("-" * 55)


async def main():
    print("Starting Asyncio Slicing Benchmark...")
    print("=" * 55)
    
    scenarios = [
        {
            "name": "Micro-slicing",
            "payload_mb": 10,
            "slice_kb": 512,
            "iterations_per_task": 10000
        },
        {
            "name": "Macro-slicing",
            "payload_mb": 100,
            "slice_kb": 10 * 1024,
            "iterations_per_task": 1000
        }
    ]

    for scenario in scenarios:
        for task_count in [1, 4, 10]:
            await run_scenario(
                scenario_name=f"{scenario['name']} - {task_count} Task(s)",
                payload_mb=scenario["payload_mb"],
                slice_kb=scenario["slice_kb"],
                tasks=task_count,
                iterations_per_task=scenario["iterations_per_task"]
            )

if __name__ == "__main__":
    asyncio.run(main())

Let me know if you've any questions!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One correction on memoryview:
the end use of the sliced chunks will be inside b"".join(...) which a) does a copy of the inputs into the output and b) works on memview objects just like bytes objects, so casting to bytes is unnecessary.
(Indeed, memoryviews are the perfect data container, in case we wish to pursue subinterpreters in the future, memoryviews are the only thing that can be passed with zero overhead.)

I will see if I can amend your benchmarks to show this. Thank you for providing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants