Skip to content

feat: implement AsyncMultiRangeDownloader with multiplexed bidi-gRPC stream support#16528

Open
zhixiangli wants to merge 3 commits intogoogleapis:mainfrom
zhixiangli:zhixiangli/multiplexing-downloader
Open

feat: implement AsyncMultiRangeDownloader with multiplexed bidi-gRPC stream support#16528
zhixiangli wants to merge 3 commits intogoogleapis:mainfrom
zhixiangli:zhixiangli/multiplexing-downloader

Conversation

@zhixiangli
Copy link
Copy Markdown

@zhixiangli zhixiangli commented Apr 2, 2026

This PR implements AsyncMultiRangeDownloader with a new _StreamMultiplexer, enabling multiple concurrent range downloads to share a single bidirectional gRPC stream.

Before vs. After

Feature Before After (This PR)
Concurrency Sequential or multiple connections Concurrent over one connection
Overhead High (multiple gRPC streams) Low (multiplexed single stream)
Reliability Per-stream retry logic Unified generation-gated reopening

How it works

The system uses a background _StreamMultiplexer to manage the shared bidirectional stream:

  1. Requests: Concurrent tasks send range requests (BidiReadObjectRequest) directly to the shared stream.
  2. Multiplexing: A background Recv Loop listens for all responses. It uses the read_id in each response to route data to the correct task-specific asyncio.Queue.
  3. Error Handling: If the stream breaks, a generation-gated lock ensures the stream is reopened only once. All active tasks receive a _StreamError and automatically retry using the new stream generation.

Key Changes:

  • _StreamMultiplexer: Background receiver loop for routing responses.
  • Generation-Gated Reopening: Coordinates stream recovery across concurrent tasks.
  • AsyncMultiRangeDownloader Integration: Full support for concurrent download_ranges calls.

@zhixiangli zhixiangli requested review from a team as code owners April 2, 2026 07:22
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a _StreamMultiplexer to handle concurrent download tasks over a single bidirectional gRPC stream, replacing the previous locking mechanism in AsyncMultiRangeDownloader. The multiplexer routes responses to per-task asyncio queues based on read_id, allowing for better resource utilization. The lock parameter in download_ranges is now deprecated. Feedback focuses on improving the multiplexer's reliability and performance, specifically by using asyncio.gather to prevent head-of-line blocking during response broadcasting, ensuring the background receive loop terminates when no tasks are active, and adding error logging for observability.

Comment on lines +123 to +124
for queue in queues_to_notify:
await queue.put(response)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Broadcasting responses to multiple queues sequentially using await queue.put(response) can lead to head-of-line blocking. If one consumer is slow or its queue is full, it will delay delivery to all other concurrent tasks. Using asyncio.gather allows all put operations to be initiated concurrently, improving throughput and reducing the impact of a single slow consumer.

Furthermore, if a task is cancelled, its queue will no longer be drained. If the _recv_loop is blocked on await queue.put for that specific queue, the entire multiplexer will hang until the stream is reopened or closed. This sequential broadcast pattern is also used on lines 112-113 and 126-127 and should be updated there as well.

Suggested change
for queue in queues_to_notify:
await queue.put(response)
if queues_to_notify:
await asyncio.gather(*[queue.put(response) for queue in queues_to_notify])

@zhixiangli zhixiangli changed the title feat: implement AsyncMultiRangeDownloader and integrate _StreamMultiplexer feat: implement AsyncMultiRangeDownloader with multiplexed bidi-gRPC stream support Apr 2, 2026
@zhixiangli zhixiangli force-pushed the zhixiangli/multiplexing-downloader branch from 13d5d08 to 774d691 Compare April 2, 2026 08:22
@zhixiangli zhixiangli force-pushed the zhixiangli/multiplexing-downloader branch from acfab40 to aed8682 Compare April 2, 2026 09:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant