Implement retries for Storage control client calls#787
Implement retries for Storage control client calls#787Mahalaxmibejugam wants to merge 10 commits intofsspec:mainfrom
Conversation
…ge control client folder operations.
There was a problem hiding this comment.
Instead of the custom asyncio.wait_for logic, using a library like tenacity would look like this. Also did we evaluate google.api_core AsyncRetry:
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
@retry(
wait=wait_exponential(multiplier=1, min=2, max=32),
stop=stop_after_attempt(6),
retry=retry_if_exception_type((api_exceptions.ServiceUnavailable, asyncio.TimeoutError)),
reraise=True
)
async def call_with_retry(func, *args, **kwargs):
return await func(*args, **kwargs)
There was a problem hiding this comment.
We need asyncio.wait_for logic on the client side to make sure that we handle the request stalls and won't wait indefinitely for the call to return. Replaced the custom logic with tenacity as it provides in-built support for retries. AsyncRetry also provides the same functionality but would still need the asyncio.wait_for logic on the client side.
There was a problem hiding this comment.
Do we really need another dependency?
There was a problem hiding this comment.
I had a similar opinion, but @jasha26 recommended using Tenacity as it might help in future integrations like client-side throttling
AsyncRetry from google.api_core supports retries based on max_timeout instead of max_retries(which is followed in gcsfs for other JSON API retries). To keep the same retry behaviour for JSON APIs and storage control client calls(i.e., limiting the number of retries) I have implemented the custom logic. To use AsyncRetry with the constraint on number of attempts we would have to maintain a wrapper to track the number of attempts which would be almost same as the initial version of this PR without adding much benefit of using AsyncRetry
So we can either have entirely custom implementation or use Tenacity if we want to maintain the max_attempts behaviour across GCSFS
gcsfs/retry.py
Outdated
|
|
||
|
|
||
| async def execute_with_timebound_retry( | ||
| func, *args, retry_deadline=30.0, max_retries=6, **kwargs |
There was a problem hiding this comment.
We can't really hard code these values, we need the ability so these can be overidden via multiple mechanisms like call site overrides, fsspec config overrides etc.
So i'd recommend we do something like below:
from fsspec.config import conf
@dataclass
class RetryConfig:
max_retries: int = 6
min_delay: float = 2.0
max_delay: float = 32.0
retry_deadline: float = 30.0
def get_resolved_retry_config(call_kwargs) -> RetryConfig:
"""
Resolves retry configuration with a clear hierarchy of overrides:
1. Explicit call-site arguments (e.g., max_retries=10)
2. fsspec.config settings (e.g., ~/.config/fsspec/conf.json)
3. Hardcoded Defaults from the RetryConfig template
"""
# 1. Start with the default template
default = RetryConfig()
# 2. Resolve parameters from Env Vars or fsspec.config, or use defaults
resolved_max_retries = int(
call_kwargs.get("max_retries")
or conf.get("gcsfs.retry.max_retries", default.max_retries)
)
resolved_deadline = float(
call_kwargs.get("retry_deadline")
or conf.get("gcsfs.retry.deadline", default.retry_deadline)
)
return RetryConfig(
max_retries=resolved_max_retries,
retry_deadline=resolved_deadline,
min_delay=default.min_delay,
max_delay=default.max_delay
)
async def with_retry(func, *args, **kwargs):
config = get_resolved_retry_config(kwargs)
# Define transient errors consistent with GCS client best practices.
RETRYABLE_ERRORS = (
api_exceptions.ServiceUnavailable,
api_exceptions.DeadlineExceeded,
api_exceptions.InternalServerError,
api_exceptions.TooManyRequests,
asyncio.TimeoutError,
)
# Replaces custom loop with a declarative tenacity decorator.
@retry(
stop=stop_after_attempt(config.max_retries),
wait=wait_exponential(multiplier=1, min=config.min_delay, max=config.max_delay),
retry=retry_if_exception_type(RETRYABLE_ERRORS),
reraise=True
)
async def _wrapped_call():
return await func(*args, **kwargs)
return await _wrapped_call()
There was a problem hiding this comment.
I am not a fan of adding all these environment variables. fsspec already has a way to specify instantiation kwargs using specially formatted environment variables or files ( https://filesystem-spec.readthedocs.io/en/latest/features.html#configuration ).
Since retries are useful in multiple backends and have the same concepts (number of times, backoff factor, max wait, etc.), we could even make a general fsspec class for this.
6c6461e to
5a19937
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #787 +/- ##
==========================================
+ Coverage 75.96% 76.79% +0.82%
==========================================
Files 14 14
Lines 2663 2693 +30
==========================================
+ Hits 2023 2068 +45
+ Misses 640 625 -15 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
martindurant
left a comment
There was a problem hiding this comment.
Is there anything about the retry logic here that's specific to gRPC? I doesn't appear so to me.
| def _get_retry_config( | ||
| retry_deadline=None, max_retries=None | ||
| ) -> StorageControlRetryConfig: | ||
| conf = fsspec.config.conf.get("gcs", {}) |
There was a problem hiding this comment.
the purpose of the conf is to pass default values to the class constructor at instantiation, so there is not normally any need to query the conf directly, but the values should be passed in.
| ) -> StorageControlRetryConfig: | ||
| conf = fsspec.config.conf.get("gcs", {}) | ||
|
|
||
| return StorageControlRetryConfig( |
There was a problem hiding this comment.
As a dataclass, this already has these defaults.
I think you would be better of passing kwargs round, like in other places inthe codebase. The Config class isn't doing anything for you, except obscuring when the values are being set.
| It passes timeout to the function itself and uses timeout + 1.0 for wait_for. | ||
| """ | ||
| return await asyncio.wait_for( | ||
| func(*args, timeout=timeout, **kwargs), timeout=timeout + 1.0 |
There was a problem hiding this comment.
Where does the arbitrary + 1.0 come from?
How doe we know that func takes a timeout parameter (and if it does, why do we need another one?) ?
There was a problem hiding this comment.
The + 1.0 buffer: It gives the underlying gRPC library a 1-second grace period to cleanly abort and raise its own specific timeout error (e.g., DeadlineExceeded) before Python's asyncio aggressively kills the task.
How we know func takes timeout: This wrapper is specifically built for Google Cloud GAPIC (gRPC) client methods (like the ones in storage_control_v2 - create_folder, rename_folder). By standard design, all of these methods accept a timeout kwarg to set the RPC deadline.
Why we need both: The inner timeout is the actual RPC deadline sent to the server and gRPC core. The outer asyncio.wait_for is a hard fail-safe to ensure the Python event loop doesn't hang forever if the network layer freezes and fails to respect the inner deadline.
This is more specifically added based on the issues that were observed in GCSFuse related to request stalling.
| reraise=True, | ||
| ) | ||
|
|
||
| return await retryer( |
There was a problem hiding this comment.
I don't see that we need an extra library for this one thing. One might argue that this retry logic belongs in fsspec, but it would be fine to write it out here.
There was a problem hiding this comment.
Are you referring to remove entire Tenacity dependency and go back to custom implementation?
There was a problem hiding this comment.
Hi @martindurant,
I completely understand the hesitation around adding new dependencies—I’m usually of the same mind when it comes to keeping the project's footprint light. You're absolutely right that this logic might eventually be better suited for fsspec globally.
The main reason I proposed tenacity is that it has become an industry standard for production-grade retries. It handles the nuances of exponential backoff with jitter and async/sync compatibility right out of the box, which can be surprisingly tricky to get 100% right in a custom implementation.
While we can certainly roll a simpler custom version as you suggested, I thought using a battle-tested library might save us from 'reinventing the wheel' and maintenance overhead later. However, if the priority is keeping the dependency list short, we're happy to strip it back.
What do you think is the best balance for the project?
execute_with_timeout method explicitly injects timeout=timeout into the function call (func(*args, timeout=timeout, **kwargs)). This is specific to the signature of GAPIC gRPC storage_control_v2 methods |
Implement a robust, time-bound, and count-bounded idempotent retry strategy for Google Cloud Storage Control (HNS) folder operations
Implementation Details
retry_deadline. It includes a 1.0-second grace window to allow native GCS server-side timeout errors to bubble up automatically.max_retries=6).request_id) outside the retry loop so that the same request ID is carried perfectly to all transport retries, allowing GCS to safely deduplicate requests.min(random.random() + 2**(attempt-1), 32)for standard transient exceptions.Verification