From d3db143606aa161bb08fa83218800589241256b0 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 10 Jun 2026 22:00:41 +0000 Subject: [PATCH 1/3] fix: treat 429 retry budget exhaustion as terminal in async job orchestrator Introduce RateLimitBudgetExhaustedException subclass of AirbyteTracedException. HttpClient raises this specific type when 429 retry budget is exhausted. AsyncJobOrchestrator treats it as a breaking exception, preventing cascading retries at orchestrator and platform levels. Co-Authored-By: bot_apk --- .../declarative/async_job/job_orchestrator.py | 15 +++-- .../sources/streams/http/http_client.py | 9 ++- airbyte_cdk/utils/__init__.py | 10 +++- airbyte_cdk/utils/traced_exception.py | 9 +++ .../async_job/test_job_orchestrator.py | 57 +++++++++++++++++++ unit_tests/sources/streams/http/test_http.py | 5 +- .../sources/streams/http/test_http_client.py | 25 +++++++- 7 files changed, 118 insertions(+), 12 deletions(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index 3607ad026..67cb19500 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -32,7 +32,10 @@ from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.types import StreamSlice from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets -from airbyte_cdk.utils.traced_exception import AirbyteTracedException +from airbyte_cdk.utils.traced_exception import ( + AirbyteTracedException, + RateLimitBudgetExhaustedException, +) LOGGER = logging.getLogger("airbyte") _NO_TIMEOUT = timedelta.max @@ -561,9 +564,13 @@ def _abort_all_running_jobs(self) -> None: self._running_partitions = [] def _is_breaking_exception(self, exception: Exception) -> bool: - return isinstance(exception, self._exceptions_to_break_on) or ( - isinstance(exception, AirbyteTracedException) - and exception.failure_type == FailureType.config_error + return ( + isinstance(exception, self._exceptions_to_break_on) + or ( + isinstance(exception, AirbyteTracedException) + and exception.failure_type == FailureType.config_error + ) + or isinstance(exception, RateLimitBudgetExhaustedException) ) def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index c1d0eabd6..200261696 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -56,7 +56,10 @@ from airbyte_cdk.utils.stream_status_utils import ( as_airbyte_message as stream_status_as_airbyte_message, ) -from airbyte_cdk.utils.traced_exception import AirbyteTracedException +from airbyte_cdk.utils.traced_exception import ( + AirbyteTracedException, + RateLimitBudgetExhaustedException, +) # Backward-compatible deprecated alias. This class was removed in PR #927 but is still # imported by connectors in the airbyte monorepo. Keep as a simple alias to @@ -306,9 +309,9 @@ def _send_with_retry( ) if is_rate_limited: - raise AirbyteTracedException( + raise RateLimitBudgetExhaustedException( internal_message=f"Rate limit retry budget exhausted. Last exception: {e}", - message="API rate limit exceeded.", + message="Rate limit retry budget exhausted.", failure_type=FailureType.transient_error, exception=e, stream_descriptor=StreamDescriptor(name=self._name), diff --git a/airbyte_cdk/utils/__init__.py b/airbyte_cdk/utils/__init__.py index dbfb641dd..f0fb8db93 100644 --- a/airbyte_cdk/utils/__init__.py +++ b/airbyte_cdk/utils/__init__.py @@ -5,6 +5,12 @@ from .is_cloud_environment import is_cloud_environment from .print_buffer import PrintBuffer from .schema_inferrer import SchemaInferrer -from .traced_exception import AirbyteTracedException +from .traced_exception import AirbyteTracedException, RateLimitBudgetExhaustedException -__all__ = ["AirbyteTracedException", "SchemaInferrer", "is_cloud_environment", "PrintBuffer"] +__all__ = [ + "AirbyteTracedException", + "RateLimitBudgetExhaustedException", + "SchemaInferrer", + "is_cloud_environment", + "PrintBuffer", +] diff --git a/airbyte_cdk/utils/traced_exception.py b/airbyte_cdk/utils/traced_exception.py index cb4072b53..8d3f2eb84 100644 --- a/airbyte_cdk/utils/traced_exception.py +++ b/airbyte_cdk/utils/traced_exception.py @@ -158,3 +158,12 @@ def as_sanitized_airbyte_message( error_message.trace.error.stack_trace # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage ) return error_message + + +class RateLimitBudgetExhaustedException(AirbyteTracedException): + """Raised when the HTTP client exhausts its configured 429 retry budget. + + Higher-level retry loops (e.g. the async-job orchestrator) treat this as a + terminal failure and do not retry, preventing cascading retries across + orchestrator and platform layers. + """ diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index dce40e624..d5cb16566 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -22,6 +22,7 @@ from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.types import StreamSlice from airbyte_cdk.utils import AirbyteTracedException +from airbyte_cdk.utils.traced_exception import RateLimitBudgetExhaustedException _ANY_STREAM_SLICE = Mock() _A_STREAM_SLICE = Mock() @@ -288,6 +289,62 @@ def test_given_traced_config_error_when_start_job_and_raise_this_exception_and_a assert job_tracker.try_to_get_intent() + def test_given_rate_limit_budget_exhausted_when_start_job_then_break_immediately( + self, + ) -> None: + """Rate limit budget exhaustion during job creation must be treated as a + breaking exception so the orchestrator does not retry and cascade into + hundreds of API calls.""" + job_tracker = JobTracker(1) + self._job_repository.start.side_effect = RateLimitBudgetExhaustedException( + internal_message="Rate limit retry budget exhausted.", + message="Rate limit retry budget exhausted.", + failure_type=FailureType.transient_error, + ) + + orchestrator = AsyncJobOrchestrator( + self._job_repository, + [_A_STREAM_SLICE], + job_tracker, + self._message_repository, + ) + + with pytest.raises(RateLimitBudgetExhaustedException): + list(orchestrator.create_and_get_completed_partitions()) + + # Only one attempt — no orchestrator retry + assert self._job_repository.start.call_count == 1 + # Budget was freed + assert job_tracker.try_to_get_intent() + + @mock.patch(sleep_mock_target) + def test_given_rate_limit_budget_exhausted_with_running_jobs_then_abort_and_break( + self, mock_sleep: MagicMock + ) -> None: + """When rate limit budget exhaustion occurs while other jobs are running, + all running jobs should be aborted and the exception should propagate.""" + self._job_repository.start.side_effect = [ + self._job_for_a_slice, + RateLimitBudgetExhaustedException( + internal_message="Rate limit retry budget exhausted.", + message="Rate limit retry budget exhausted.", + failure_type=FailureType.transient_error, + ), + ] + + orchestrator = AsyncJobOrchestrator( + self._job_repository, + [_A_STREAM_SLICE, _ANOTHER_STREAM_SLICE], + JobTracker(_NO_JOB_LIMIT), + self._message_repository, + ) + + with pytest.raises(RateLimitBudgetExhaustedException): + list(orchestrator.create_and_get_completed_partitions()) + + assert len(orchestrator._job_tracker._jobs) == 0 + self._job_repository.abort.assert_called_once_with(self._job_for_a_slice) + @mock.patch(sleep_mock_target) def test_given_exception_on_single_job_when_create_and_get_completed_partitions_then_return( self, mock_sleep: MagicMock diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index 7f4f9ac41..5a44d828b 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -34,6 +34,7 @@ from airbyte_cdk.sources.streams.http.http_client import HttpClient from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator from airbyte_cdk.utils import AirbyteTracedException +from airbyte_cdk.utils.traced_exception import RateLimitBudgetExhaustedException from airbyte_cdk.utils.airbyte_secrets_utils import update_secrets @@ -316,8 +317,8 @@ def test_raise_on_http_errors_off_429(mocker): mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises( - AirbyteTracedException, - match="API rate limit exceeded.", + RateLimitBudgetExhaustedException, + match="Rate limit retry budget exhausted.", ): stream.exit_on_rate_limit = True list(stream.read_records(SyncMode.full_refresh)) diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 48d396cb6..6bded1a8f 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -27,7 +27,7 @@ ) from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator -from airbyte_cdk.utils.traced_exception import AirbyteTracedException +from airbyte_cdk.utils.traced_exception import AirbyteTracedException, RateLimitBudgetExhaustedException def test_http_client(): @@ -721,6 +721,29 @@ def test_backoff_strategy_endless( assert mocked_send.call_count == expected_call_count +@pytest.mark.usefixtures("mock_sleep") +def test_given_429_budget_exhausted_then_raises_rate_limit_budget_exhausted_exception(): + http_client = HttpClient( + name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock()) + ) + + mocked_response = MagicMock(spec=requests.Response) + mocked_response.status_code = 429 + mocked_response.headers = {} + mocked_response.ok = False + + with patch.object(requests.Session, "send", return_value=mocked_response): + with pytest.raises(RateLimitBudgetExhaustedException) as exc_info: + http_client.send_request( + http_method="get", + url="https://test_base_url.com/v1/endpoint", + request_kwargs={}, + exit_on_rate_limit=True, + ) + assert exc_info.value.failure_type == FailureType.transient_error + assert "Rate limit retry budget exhausted" in str(exc_info.value.internal_message) + + def test_given_different_headers_then_response_is_not_cached(requests_mock): http_client = HttpClient(name="test", logger=MagicMock(), use_cache=True) first_request_headers = {"header_key": "first"} From e894d0907170d0f2d8b4ce4671f03e0ad4cca90a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 10 Jun 2026 22:02:10 +0000 Subject: [PATCH 2/3] fix: resolve ruff lint and format issues in test files Co-Authored-By: bot_apk --- unit_tests/sources/streams/http/test_http.py | 2 +- unit_tests/sources/streams/http/test_http_client.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index 5a44d828b..8732de0a5 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -34,8 +34,8 @@ from airbyte_cdk.sources.streams.http.http_client import HttpClient from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator from airbyte_cdk.utils import AirbyteTracedException -from airbyte_cdk.utils.traced_exception import RateLimitBudgetExhaustedException from airbyte_cdk.utils.airbyte_secrets_utils import update_secrets +from airbyte_cdk.utils.traced_exception import RateLimitBudgetExhaustedException class StubBasicReadHttpStream(HttpStream): diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 6bded1a8f..068450b13 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -27,7 +27,10 @@ ) from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator -from airbyte_cdk.utils.traced_exception import AirbyteTracedException, RateLimitBudgetExhaustedException +from airbyte_cdk.utils.traced_exception import ( + AirbyteTracedException, + RateLimitBudgetExhaustedException, +) def test_http_client(): From bec34b9ecd7bd2d0b59c507cebe3d9fe0ca7c9c5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 10 Jun 2026 22:11:33 +0000 Subject: [PATCH 3/3] fix: update test_stub_custom_backoff_http_stream_retries to expect RateLimitBudgetExhaustedException Co-Authored-By: bot_apk --- unit_tests/sources/streams/http/test_http.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index 8732de0a5..3446c6cea 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -223,8 +223,8 @@ def get_error_handler(self) -> Optional[ErrorHandler]: send_mock = mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises( - AirbyteTracedException, - match="API rate limit exceeded.", + RateLimitBudgetExhaustedException, + match="Rate limit retry budget exhausted.", ): list(stream.read_records(SyncMode.full_refresh)) if retries <= 0: