Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.types import StreamSlice
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from airbyte_cdk.utils.traced_exception import (
AirbyteTracedException,
RateLimitBudgetExhaustedException,
)

LOGGER = logging.getLogger("airbyte")
_NO_TIMEOUT = timedelta.max
Expand Down Expand Up @@ -561,9 +564,13 @@ def _abort_all_running_jobs(self) -> None:
self._running_partitions = []

def _is_breaking_exception(self, exception: Exception) -> bool:
return isinstance(exception, self._exceptions_to_break_on) or (
isinstance(exception, AirbyteTracedException)
and exception.failure_type == FailureType.config_error
return (
isinstance(exception, self._exceptions_to_break_on)
or (
isinstance(exception, AirbyteTracedException)
and exception.failure_type == FailureType.config_error
)
or isinstance(exception, RateLimitBudgetExhaustedException)
)

def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
Expand Down
9 changes: 6 additions & 3 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@
from airbyte_cdk.utils.stream_status_utils import (
as_airbyte_message as stream_status_as_airbyte_message,
)
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from airbyte_cdk.utils.traced_exception import (
AirbyteTracedException,
RateLimitBudgetExhaustedException,
)

# Backward-compatible deprecated alias. This class was removed in PR #927 but is still
# imported by connectors in the airbyte monorepo. Keep as a simple alias to
Expand Down Expand Up @@ -306,9 +309,9 @@ def _send_with_retry(
)

if is_rate_limited:
raise AirbyteTracedException(
raise RateLimitBudgetExhaustedException(
internal_message=f"Rate limit retry budget exhausted. Last exception: {e}",
message="API rate limit exceeded.",
message="Rate limit retry budget exhausted.",
failure_type=FailureType.transient_error,
exception=e,
stream_descriptor=StreamDescriptor(name=self._name),
Expand Down
10 changes: 8 additions & 2 deletions airbyte_cdk/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
from .is_cloud_environment import is_cloud_environment
from .print_buffer import PrintBuffer
from .schema_inferrer import SchemaInferrer
from .traced_exception import AirbyteTracedException
from .traced_exception import AirbyteTracedException, RateLimitBudgetExhaustedException

__all__ = ["AirbyteTracedException", "SchemaInferrer", "is_cloud_environment", "PrintBuffer"]
__all__ = [
"AirbyteTracedException",
"RateLimitBudgetExhaustedException",
"SchemaInferrer",
"is_cloud_environment",
"PrintBuffer",
]
9 changes: 9 additions & 0 deletions airbyte_cdk/utils/traced_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,12 @@ def as_sanitized_airbyte_message(
error_message.trace.error.stack_trace # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
)
return error_message


class RateLimitBudgetExhaustedException(AirbyteTracedException):
"""Raised when the HTTP client exhausts its configured 429 retry budget.

Higher-level retry loops (e.g. the async-job orchestrator) treat this as a
terminal failure and do not retry, preventing cascading retries across
orchestrator and platform layers.
"""
57 changes: 57 additions & 0 deletions unit_tests/sources/declarative/async_job/test_job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.types import StreamSlice
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.utils.traced_exception import RateLimitBudgetExhaustedException

_ANY_STREAM_SLICE = Mock()
_A_STREAM_SLICE = Mock()
Expand Down Expand Up @@ -288,6 +289,62 @@ def test_given_traced_config_error_when_start_job_and_raise_this_exception_and_a

assert job_tracker.try_to_get_intent()

def test_given_rate_limit_budget_exhausted_when_start_job_then_break_immediately(
self,
) -> None:
"""Rate limit budget exhaustion during job creation must be treated as a
breaking exception so the orchestrator does not retry and cascade into
hundreds of API calls."""
job_tracker = JobTracker(1)
self._job_repository.start.side_effect = RateLimitBudgetExhaustedException(
internal_message="Rate limit retry budget exhausted.",
message="Rate limit retry budget exhausted.",
failure_type=FailureType.transient_error,
)

orchestrator = AsyncJobOrchestrator(
self._job_repository,
[_A_STREAM_SLICE],
job_tracker,
self._message_repository,
)

with pytest.raises(RateLimitBudgetExhaustedException):
list(orchestrator.create_and_get_completed_partitions())

# Only one attempt — no orchestrator retry
assert self._job_repository.start.call_count == 1
# Budget was freed
assert job_tracker.try_to_get_intent()

@mock.patch(sleep_mock_target)
def test_given_rate_limit_budget_exhausted_with_running_jobs_then_abort_and_break(
self, mock_sleep: MagicMock
) -> None:
"""When rate limit budget exhaustion occurs while other jobs are running,
all running jobs should be aborted and the exception should propagate."""
self._job_repository.start.side_effect = [
self._job_for_a_slice,
RateLimitBudgetExhaustedException(
internal_message="Rate limit retry budget exhausted.",
message="Rate limit retry budget exhausted.",
failure_type=FailureType.transient_error,
),
]

orchestrator = AsyncJobOrchestrator(
self._job_repository,
[_A_STREAM_SLICE, _ANOTHER_STREAM_SLICE],
JobTracker(_NO_JOB_LIMIT),
self._message_repository,
)

with pytest.raises(RateLimitBudgetExhaustedException):
list(orchestrator.create_and_get_completed_partitions())

assert len(orchestrator._job_tracker._jobs) == 0
self._job_repository.abort.assert_called_once_with(self._job_for_a_slice)

@mock.patch(sleep_mock_target)
def test_given_exception_on_single_job_when_create_and_get_completed_partitions_then_return(
self, mock_sleep: MagicMock
Expand Down
9 changes: 5 additions & 4 deletions unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.utils.airbyte_secrets_utils import update_secrets
from airbyte_cdk.utils.traced_exception import RateLimitBudgetExhaustedException


class StubBasicReadHttpStream(HttpStream):
Expand Down Expand Up @@ -222,8 +223,8 @@ def get_error_handler(self) -> Optional[ErrorHandler]:
send_mock = mocker.patch.object(requests.Session, "send", return_value=req)

with pytest.raises(
AirbyteTracedException,
match="API rate limit exceeded.",
RateLimitBudgetExhaustedException,
match="Rate limit retry budget exhausted.",
):
list(stream.read_records(SyncMode.full_refresh))
if retries <= 0:
Expand Down Expand Up @@ -316,8 +317,8 @@ def test_raise_on_http_errors_off_429(mocker):

mocker.patch.object(requests.Session, "send", return_value=req)
with pytest.raises(
AirbyteTracedException,
match="API rate limit exceeded.",
RateLimitBudgetExhaustedException,
match="Rate limit retry budget exhausted.",
):
stream.exit_on_rate_limit = True
list(stream.read_records(SyncMode.full_refresh))
Expand Down
28 changes: 27 additions & 1 deletion unit_tests/sources/streams/http/test_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
)
from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from airbyte_cdk.utils.traced_exception import (
AirbyteTracedException,
RateLimitBudgetExhaustedException,
)


def test_http_client():
Expand Down Expand Up @@ -721,6 +724,29 @@ def test_backoff_strategy_endless(
assert mocked_send.call_count == expected_call_count


@pytest.mark.usefixtures("mock_sleep")
def test_given_429_budget_exhausted_then_raises_rate_limit_budget_exhausted_exception():
http_client = HttpClient(
name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock())
)

mocked_response = MagicMock(spec=requests.Response)
mocked_response.status_code = 429
mocked_response.headers = {}
mocked_response.ok = False

with patch.object(requests.Session, "send", return_value=mocked_response):
with pytest.raises(RateLimitBudgetExhaustedException) as exc_info:
http_client.send_request(
http_method="get",
url="https://test_base_url.com/v1/endpoint",
request_kwargs={},
exit_on_rate_limit=True,
)
assert exc_info.value.failure_type == FailureType.transient_error
assert "Rate limit retry budget exhausted" in str(exc_info.value.internal_message)


def test_given_different_headers_then_response_is_not_cached(requests_mock):
http_client = HttpClient(name="test", logger=MagicMock(), use_cache=True)
first_request_headers = {"header_key": "first"}
Expand Down
Loading