-
Notifications
You must be signed in to change notification settings - Fork 152
Improve HTTP response handling and retry logic #520
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,10 @@ | ||
| import logging | ||
| import time | ||
| import random | ||
| from threading import Thread | ||
| import backoff | ||
| import json | ||
|
|
||
| from segment.analytics.request import post, APIError, DatetimeSerializer | ||
| from segment.analytics.request import post, APIError, DatetimeSerializer, parse_retry_after | ||
|
|
||
| from queue import Empty | ||
|
|
||
|
|
@@ -29,7 +29,7 @@ class Consumer(Thread): | |
| log = logging.getLogger('segment') | ||
|
|
||
| def __init__(self, queue, write_key, upload_size=100, host=None, | ||
| on_error=None, upload_interval=0.5, gzip=False, retries=10, | ||
| on_error=None, upload_interval=0.5, gzip=False, retries=1000, | ||
| timeout=15, proxies=None, oauth_manager=None): | ||
| """Create a consumer thread.""" | ||
| Thread.__init__(self) | ||
|
|
@@ -120,40 +120,108 @@ def next(self): | |
| return items | ||
|
|
||
| def request(self, batch): | ||
| """Attempt to upload the batch and retry before raising an error """ | ||
|
|
||
| def fatal_exception(exc): | ||
| if isinstance(exc, APIError): | ||
| # retry on server errors and client errors | ||
| # with 429 status code (rate limited), | ||
| # don't retry on other client errors | ||
| return (400 <= exc.status < 500) and exc.status != 429 | ||
| elif isinstance(exc, FatalError): | ||
| return True | ||
| else: | ||
| # retry on all other errors (eg. network) | ||
| return False | ||
|
|
||
| attempt_count = 0 | ||
|
|
||
| @backoff.on_exception( | ||
| backoff.expo, | ||
| Exception, | ||
| max_tries=self.retries + 1, | ||
| giveup=fatal_exception, | ||
| on_backoff=lambda details: self.log.debug( | ||
| f"Retry attempt {details['tries']}/{self.retries + 1} after {details['elapsed']:.2f}s" | ||
| )) | ||
| def send_request(): | ||
| nonlocal attempt_count | ||
| attempt_count += 1 | ||
| """Attempt to upload the batch and retry before raising an error""" | ||
|
|
||
| def is_retryable_status(status): | ||
| """ | ||
| Determine if a status code is retryable. | ||
| Retryable 4xx: 408, 410, 429, 460 | ||
| Non-retryable 4xx: 400, 401, 403, 404, 413, 422, and all other 4xx | ||
| Retryable 5xx: All except 501, 505 | ||
| Non-retryable 5xx: 501, 505 | ||
| """ | ||
| if 400 <= status < 500: | ||
| return status in (408, 410, 429, 460) | ||
| elif 500 <= status < 600: | ||
| return status not in (501, 505) | ||
| return False | ||
|
|
||
| def should_use_retry_after(status): | ||
| """Check if status code should respect Retry-After header""" | ||
| return status in (408, 429, 503) | ||
|
|
||
| total_attempts = 0 | ||
| backoff_attempts = 0 | ||
| max_backoff_attempts = self.retries + 1 | ||
|
|
||
| while True: | ||
| try: | ||
| return post(self.write_key, self.host, gzip=self.gzip, | ||
| timeout=self.timeout, batch=batch, proxies=self.proxies, | ||
| oauth_manager=self.oauth_manager) | ||
| except Exception as e: | ||
| if attempt_count >= self.retries + 1: | ||
| self.log.error(f"All {self.retries} retries exhausted. Final error: {e}") | ||
| # Make the request with current retry count | ||
| response = post( | ||
| self.write_key, | ||
| self.host, | ||
| gzip=self.gzip, | ||
| timeout=self.timeout, | ||
| batch=batch, | ||
| proxies=self.proxies, | ||
| oauth_manager=self.oauth_manager, | ||
| retry_count=total_attempts | ||
| ) | ||
| # Success | ||
| return response | ||
|
|
||
| except FatalError as e: | ||
| # Non-retryable error | ||
| self.log.error(f"Fatal error after {total_attempts} attempts: {e}") | ||
| raise | ||
|
|
||
| send_request() | ||
| except APIError as e: | ||
| total_attempts += 1 | ||
|
|
||
| # Check if we should use Retry-After header | ||
| if should_use_retry_after(e.status) and e.response: | ||
| retry_after = parse_retry_after(e.response) | ||
| if retry_after: | ||
| self.log.debug( | ||
| f"Retry-After header present: waiting {retry_after}s (attempt {total_attempts})" | ||
| ) | ||
| time.sleep(retry_after) | ||
| continue # Does not count against backoff budget | ||
|
|
||
| # Check if status is retryable | ||
| if not is_retryable_status(e.status): | ||
| self.log.error( | ||
| f"Non-retryable error {e.status} after {total_attempts} attempts: {e}" | ||
| ) | ||
| raise | ||
|
|
||
| # Count this against backoff attempts | ||
| backoff_attempts += 1 | ||
| if backoff_attempts >= max_backoff_attempts: | ||
| self.log.error( | ||
| f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}" | ||
| ) | ||
| raise | ||
|
|
||
| # Calculate exponential backoff delay with jitter | ||
| base_delay = 0.5 * (2 ** (backoff_attempts - 1)) | ||
| jitter = random.uniform(0, 0.1 * base_delay) | ||
| delay = min(base_delay + jitter, 60) # Cap at 60 seconds | ||
|
|
||
| self.log.debug( | ||
| f"Retry attempt {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) " | ||
| f"after {delay:.2f}s for status {e.status}" | ||
| ) | ||
| time.sleep(delay) | ||
|
|
||
| except Exception as e: | ||
| # Network errors or other exceptions - retry with backoff | ||
| total_attempts += 1 | ||
| backoff_attempts += 1 | ||
|
|
||
| if backoff_attempts >= max_backoff_attempts: | ||
| self.log.error( | ||
| f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}" | ||
| ) | ||
| raise | ||
|
|
||
| # Calculate exponential backoff delay with jitter | ||
| base_delay = 0.5 * (2 ** (backoff_attempts - 1)) | ||
| jitter = random.uniform(0, 0.1 * base_delay) | ||
| delay = min(base_delay + jitter, 60) # Cap at 60 seconds | ||
|
Comment on lines
+196
to
+221
|
||
|
|
||
| self.log.debug( | ||
| f"Network error retry {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) " | ||
| f"after {delay:.2f}s: {e}" | ||
| ) | ||
| time.sleep(delay) | ||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |||||||
| from gzip import GzipFile | ||||||||
| import logging | ||||||||
| import json | ||||||||
| import base64 | ||||||||
| from dateutil.tz import tzutc | ||||||||
| from requests.auth import HTTPBasicAuth | ||||||||
|
||||||||
| from requests.auth import HTTPBasicAuth |
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parse_retry_after function doesn't validate that the parsed delay is non-negative. If a server sends a negative Retry-After value (e.g., "Retry-After: -5"), this will cause time.sleep() to raise a ValueError. Consider adding validation to ensure the delay is non-negative: return min(max(delay, 0), MAX_RETRY_AFTER_SECONDS) or return None for negative values.
| return min(delay, MAX_RETRY_AFTER_SECONDS) | |
| # Ensure delay is non-negative before applying upper bound | |
| return min(max(delay, 0), MAX_RETRY_AFTER_SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retry logic does not impose an upper bound on Retry-After attempts. If the server continuously responds with Retry-After headers (for status codes 408, 429, or 503), the client could retry indefinitely. Consider adding a maximum total attempt limit to prevent infinite retry loops, even when Retry-After is present. For example, you could add a check like
if total_attempts >= some_large_limit: raisebefore processing Retry-After.