diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index 3607ad026..67cb19500 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -32,7 +32,10 @@ from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.types import StreamSlice from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets -from airbyte_cdk.utils.traced_exception import AirbyteTracedException +from airbyte_cdk.utils.traced_exception import ( + AirbyteTracedException, + RateLimitBudgetExhaustedException, +) LOGGER = logging.getLogger("airbyte") _NO_TIMEOUT = timedelta.max @@ -561,9 +564,13 @@ def _abort_all_running_jobs(self) -> None: self._running_partitions = [] def _is_breaking_exception(self, exception: Exception) -> bool: - return isinstance(exception, self._exceptions_to_break_on) or ( - isinstance(exception, AirbyteTracedException) - and exception.failure_type == FailureType.config_error + return ( + isinstance(exception, self._exceptions_to_break_on) + or ( + isinstance(exception, AirbyteTracedException) + and exception.failure_type == FailureType.config_error + ) + or isinstance(exception, RateLimitBudgetExhaustedException) ) def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index c1d0eabd6..200261696 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -56,7 +56,10 @@ from airbyte_cdk.utils.stream_status_utils import ( as_airbyte_message as stream_status_as_airbyte_message, ) -from airbyte_cdk.utils.traced_exception import AirbyteTracedException +from airbyte_cdk.utils.traced_exception import ( + AirbyteTracedException, + RateLimitBudgetExhaustedException, +) # Backward-compatible deprecated alias. This class was removed in PR #927 but is still # imported by connectors in the airbyte monorepo. Keep as a simple alias to @@ -306,9 +309,9 @@ def _send_with_retry( ) if is_rate_limited: - raise AirbyteTracedException( + raise RateLimitBudgetExhaustedException( internal_message=f"Rate limit retry budget exhausted. Last exception: {e}", - message="API rate limit exceeded.", + message="Rate limit retry budget exhausted.", failure_type=FailureType.transient_error, exception=e, stream_descriptor=StreamDescriptor(name=self._name), diff --git a/airbyte_cdk/utils/__init__.py b/airbyte_cdk/utils/__init__.py index dbfb641dd..f0fb8db93 100644 --- a/airbyte_cdk/utils/__init__.py +++ b/airbyte_cdk/utils/__init__.py @@ -5,6 +5,12 @@ from .is_cloud_environment import is_cloud_environment from .print_buffer import PrintBuffer from .schema_inferrer import SchemaInferrer -from .traced_exception import AirbyteTracedException +from .traced_exception import AirbyteTracedException, RateLimitBudgetExhaustedException -__all__ = ["AirbyteTracedException", "SchemaInferrer", "is_cloud_environment", "PrintBuffer"] +__all__ = [ + "AirbyteTracedException", + "RateLimitBudgetExhaustedException", + "SchemaInferrer", + "is_cloud_environment", + "PrintBuffer", +] diff --git a/airbyte_cdk/utils/traced_exception.py b/airbyte_cdk/utils/traced_exception.py index cb4072b53..8d3f2eb84 100644 --- a/airbyte_cdk/utils/traced_exception.py +++ b/airbyte_cdk/utils/traced_exception.py @@ -158,3 +158,12 @@ def as_sanitized_airbyte_message( error_message.trace.error.stack_trace # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage ) return error_message + + +class RateLimitBudgetExhaustedException(AirbyteTracedException): + """Raised when the HTTP client exhausts its configured 429 retry budget. + + Higher-level retry loops (e.g. the async-job orchestrator) treat this as a + terminal failure and do not retry, preventing cascading retries across + orchestrator and platform layers. + """ diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index dce40e624..d5cb16566 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -22,6 +22,7 @@ from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.types import StreamSlice from airbyte_cdk.utils import AirbyteTracedException +from airbyte_cdk.utils.traced_exception import RateLimitBudgetExhaustedException _ANY_STREAM_SLICE = Mock() _A_STREAM_SLICE = Mock() @@ -288,6 +289,62 @@ def test_given_traced_config_error_when_start_job_and_raise_this_exception_and_a assert job_tracker.try_to_get_intent() + def test_given_rate_limit_budget_exhausted_when_start_job_then_break_immediately( + self, + ) -> None: + """Rate limit budget exhaustion during job creation must be treated as a + breaking exception so the orchestrator does not retry and cascade into + hundreds of API calls.""" + job_tracker = JobTracker(1) + self._job_repository.start.side_effect = RateLimitBudgetExhaustedException( + internal_message="Rate limit retry budget exhausted.", + message="Rate limit retry budget exhausted.", + failure_type=FailureType.transient_error, + ) + + orchestrator = AsyncJobOrchestrator( + self._job_repository, + [_A_STREAM_SLICE], + job_tracker, + self._message_repository, + ) + + with pytest.raises(RateLimitBudgetExhaustedException): + list(orchestrator.create_and_get_completed_partitions()) + + # Only one attempt — no orchestrator retry + assert self._job_repository.start.call_count == 1 + # Budget was freed + assert job_tracker.try_to_get_intent() + + @mock.patch(sleep_mock_target) + def test_given_rate_limit_budget_exhausted_with_running_jobs_then_abort_and_break( + self, mock_sleep: MagicMock + ) -> None: + """When rate limit budget exhaustion occurs while other jobs are running, + all running jobs should be aborted and the exception should propagate.""" + self._job_repository.start.side_effect = [ + self._job_for_a_slice, + RateLimitBudgetExhaustedException( + internal_message="Rate limit retry budget exhausted.", + message="Rate limit retry budget exhausted.", + failure_type=FailureType.transient_error, + ), + ] + + orchestrator = AsyncJobOrchestrator( + self._job_repository, + [_A_STREAM_SLICE, _ANOTHER_STREAM_SLICE], + JobTracker(_NO_JOB_LIMIT), + self._message_repository, + ) + + with pytest.raises(RateLimitBudgetExhaustedException): + list(orchestrator.create_and_get_completed_partitions()) + + assert len(orchestrator._job_tracker._jobs) == 0 + self._job_repository.abort.assert_called_once_with(self._job_for_a_slice) + @mock.patch(sleep_mock_target) def test_given_exception_on_single_job_when_create_and_get_completed_partitions_then_return( self, mock_sleep: MagicMock diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index 7f4f9ac41..3446c6cea 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -35,6 +35,7 @@ from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator from airbyte_cdk.utils import AirbyteTracedException from airbyte_cdk.utils.airbyte_secrets_utils import update_secrets +from airbyte_cdk.utils.traced_exception import RateLimitBudgetExhaustedException class StubBasicReadHttpStream(HttpStream): @@ -222,8 +223,8 @@ def get_error_handler(self) -> Optional[ErrorHandler]: send_mock = mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises( - AirbyteTracedException, - match="API rate limit exceeded.", + RateLimitBudgetExhaustedException, + match="Rate limit retry budget exhausted.", ): list(stream.read_records(SyncMode.full_refresh)) if retries <= 0: @@ -316,8 +317,8 @@ def test_raise_on_http_errors_off_429(mocker): mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises( - AirbyteTracedException, - match="API rate limit exceeded.", + RateLimitBudgetExhaustedException, + match="Rate limit retry budget exhausted.", ): stream.exit_on_rate_limit = True list(stream.read_records(SyncMode.full_refresh)) diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 48d396cb6..068450b13 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -27,7 +27,10 @@ ) from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator -from airbyte_cdk.utils.traced_exception import AirbyteTracedException +from airbyte_cdk.utils.traced_exception import ( + AirbyteTracedException, + RateLimitBudgetExhaustedException, +) def test_http_client(): @@ -721,6 +724,29 @@ def test_backoff_strategy_endless( assert mocked_send.call_count == expected_call_count +@pytest.mark.usefixtures("mock_sleep") +def test_given_429_budget_exhausted_then_raises_rate_limit_budget_exhausted_exception(): + http_client = HttpClient( + name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock()) + ) + + mocked_response = MagicMock(spec=requests.Response) + mocked_response.status_code = 429 + mocked_response.headers = {} + mocked_response.ok = False + + with patch.object(requests.Session, "send", return_value=mocked_response): + with pytest.raises(RateLimitBudgetExhaustedException) as exc_info: + http_client.send_request( + http_method="get", + url="https://test_base_url.com/v1/endpoint", + request_kwargs={}, + exit_on_rate_limit=True, + ) + assert exc_info.value.failure_type == FailureType.transient_error + assert "Rate limit retry budget exhausted" in str(exc_info.value.internal_message) + + def test_given_different_headers_then_response_is_not_cached(requests_mock): http_client = HttpClient(name="test", logger=MagicMock(), use_cache=True) first_request_headers = {"header_key": "first"}