diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index fa6a568bc..ce06df266 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -11,6 +11,7 @@ from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( PartitionGenerationCompletedSentinel, ) +from airbyte_cdk.sources.concurrent_source.stream_abort_registry import StreamAbortRegistry from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( @@ -46,6 +47,7 @@ def __init__( message_repository: MessageRepository, partition_reader: PartitionReader, max_concurrent_partition_generators: Optional[int] = None, + stream_abort_registry: Optional[StreamAbortRegistry] = None, ): """ This class is responsible for handling items from a concurrent stream read process. @@ -87,6 +89,7 @@ def __init__( self._partition_reader = partition_reader self._streams_done: Set[str] = set() self._exceptions_per_stream_name: dict[str, List[Exception]] = {} + self._stream_abort_registry = stream_abort_registry # Track which streams (by name) are currently active # A stream is "active" if it's generating partitions or has partitions being read @@ -234,14 +237,24 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]: def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: """ This method is called when an exception is raised. - 1. Stop all running streams - 2. Raise the exception + 1. Record the exception for the stream + 2. If the failure is stream-wide (a config error, e.g. an authorization failure that + affects every partition), abort the stream so its remaining partitions are skipped + instead of repeating the same doomed request. + 3. Emit a trace message for the exception """ self._flag_exception(exception.stream_name, exception.exception) self._logger.exception( f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception ) + if self._stream_abort_registry and self._is_stream_wide_failure(exception.exception): + self._logger.info( + f"Stream {exception.stream_name} failed with a stream-wide error. " + f"Skipping its remaining partitions." + ) + self._stream_abort_registry.abort(exception.stream_name) + stream_descriptor = StreamDescriptor(name=exception.stream_name) if isinstance(exception.exception, AirbyteTracedException): yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) @@ -255,6 +268,19 @@ def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMess def _flag_exception(self, stream_name: str, exception: Exception) -> None: self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) + @staticmethod + def _is_stream_wide_failure(exception: Exception) -> bool: + """Whether an exception should abort the whole stream rather than just the partition. + + A `config_error` (e.g. 401/403 authorization failures) affects every partition of the + stream, so there is no point reading the rest. Other failures (transient server errors, + timeouts, etc.) are left as best-effort: the remaining partitions still get a chance. + """ + return ( + isinstance(exception, AirbyteTracedException) + and exception.failure_type == FailureType.config_error + ) + def start_next_partition_generator(self) -> Optional[AirbyteMessage]: """ Submits the next partition generator to the thread pool. diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index 474780bcc..2585cb5d9 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -12,6 +12,7 @@ from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( PartitionGenerationCompletedSentinel, ) +from airbyte_cdk.sources.concurrent_source.stream_abort_registry import StreamAbortRegistry from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository @@ -110,9 +111,14 @@ def read( streams: List[AbstractStream], ) -> Iterator[AirbyteMessage]: self._logger.info("Starting syncing") + # Shared across the generator, reader, and processor so that a stream-wide failure on one + # partition stops the stream's remaining partitions instead of repeating a doomed request. + stream_abort_registry = StreamAbortRegistry() concurrent_stream_processor = ConcurrentReadProcessor( streams, - PartitionEnqueuer(self._queue, self._threadpool), + PartitionEnqueuer( + self._queue, self._threadpool, stream_abort_registry=stream_abort_registry + ), self._threadpool, self._logger, self._slice_logger, @@ -120,8 +126,10 @@ def read( PartitionReader( self._queue, PartitionLogger(self._slice_logger, self._logger, self._message_repository), + stream_abort_registry=stream_abort_registry, ), max_concurrent_partition_generators=self._initial_number_partitions_to_generate, + stream_abort_registry=stream_abort_registry, ) # Enqueue initial partition generation tasks diff --git a/airbyte_cdk/sources/concurrent_source/stream_abort_registry.py b/airbyte_cdk/sources/concurrent_source/stream_abort_registry.py new file mode 100644 index 000000000..ab38e8e0b --- /dev/null +++ b/airbyte_cdk/sources/concurrent_source/stream_abort_registry.py @@ -0,0 +1,31 @@ +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. + +import threading +from typing import Set + + +class StreamAbortRegistry: + """Thread-safe registry of streams whose remaining partitions should be skipped. + + When a partition raises a fatal, stream-wide error (e.g. an authorization failure + that affects every partition of the stream), the concurrent reader records the + stream here. Partitions that are still queued or not yet started then short-circuit + instead of repeating the same doomed request, so the stream fails fast rather than + exhausting every remaining partition. + + The registry is shared between the main thread (which marks streams as aborted) and + the worker threads generating and reading partitions (which check it), so all access + is guarded by a lock. + """ + + def __init__(self) -> None: + self._aborted: Set[str] = set() + self._lock = threading.Lock() + + def abort(self, stream_name: str) -> None: + with self._lock: + self._aborted.add(stream_name) + + def is_aborted(self, stream_name: str) -> bool: + with self._lock: + return stream_name in self._aborted diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index b78f09883..4e75c6a25 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2555,6 +2555,14 @@ definitions: type: array items: "$ref": "#/definitions/TypesMap" + default_type: + title: Default Type + description: The default Airbyte type to use when no type mapping matches the source field type. + type: string + examples: + - "string" + - "number" + - "integer" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 4c22d38df..bd3b50bec 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -646,6 +646,11 @@ class SchemaTypeIdentifier(BaseModel): title="Type Path", ) types_mapping: Optional[List[TypesMap]] = None + default_type: Optional[str] = Field( + None, + description="The default Airbyte type to use when no type mapping matches the source field type.", + title="Default Type", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 82798e02e..16ebeac21 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2600,6 +2600,7 @@ def create_schema_type_identifier( key_pointer=model_key_pointer, type_pointer=model_type_pointer, types_mapping=types_mapping, + default_type=model.default_type, parameters=model.parameters or {}, ) diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index 7d5946a12..fa46f5648 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -89,6 +89,7 @@ class SchemaTypeIdentifier: type_pointer: Optional[List[Union[InterpolatedString, str]]] = None types_mapping: Optional[List[TypesMap]] = None schema_pointer: Optional[List[Union[InterpolatedString, str]]] = None + default_type: Optional[str] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.schema_pointer = ( @@ -215,6 +216,7 @@ def _get_type( else "string" ) mapped_field_type = self._replace_type_if_not_valid(raw_field_type, raw_schema) + type_was_mapped = mapped_field_type is not raw_field_type if ( isinstance(mapped_field_type, list) and len(mapped_field_type) == 2 @@ -225,7 +227,9 @@ def _get_type( return {"oneOf": [first_type, second_type]} elif isinstance(mapped_field_type, str): - return self._get_airbyte_type(mapped_field_type) + return self._get_airbyte_type( + mapped_field_type, allow_default_fallback=not type_was_mapped + ) elif isinstance(mapped_field_type, ComplexFieldType): return self._resolve_complex_type(mapped_field_type) @@ -269,12 +273,19 @@ def _replace_type_if_not_valid( return types_map.target_type return field_type - @staticmethod - def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]: + def _get_airbyte_type( + self, field_type: str, *, allow_default_fallback: bool = False + ) -> MutableMapping[str, Any]: """ Maps a field type to its corresponding Airbyte type definition. + Falls back to `default_type` when `allow_default_fallback` is True and `field_type` is not recognized. """ if field_type not in AIRBYTE_DATA_TYPES: + default_type = self.schema_type_identifier.default_type + if allow_default_fallback and default_type is not None: + if default_type not in AIRBYTE_DATA_TYPES: + raise ValueError(f"Invalid default Airbyte data type: {default_type}") + return deepcopy(AIRBYTE_DATA_TYPES[default_type]) raise ValueError(f"Invalid Airbyte data type: {field_type}") return deepcopy(AIRBYTE_DATA_TYPES[field_type]) diff --git a/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py b/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py index a4dd81f29..0541daa02 100644 --- a/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py +++ b/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py @@ -3,10 +3,12 @@ # import time from queue import Queue +from typing import Optional from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( PartitionGenerationCompletedSentinel, ) +from airbyte_cdk.sources.concurrent_source.stream_abort_registry import StreamAbortRegistry from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream @@ -23,14 +25,18 @@ def __init__( queue: Queue[QueueItem], thread_pool_manager: ThreadPoolManager, sleep_time_in_seconds: float = 0.1, + stream_abort_registry: Optional[StreamAbortRegistry] = None, ) -> None: """ :param queue: The queue to put the partitions in. :param throttler: The throttler to use to throttle the partition generation. + :param stream_abort_registry: Optional registry of streams that failed fatally. Partition + generation stops early for a stream once it has been aborted. """ self._queue = queue self._thread_pool_manager = thread_pool_manager self._sleep_time_in_seconds = sleep_time_in_seconds + self._stream_abort_registry = stream_abort_registry def generate_partitions(self, stream: AbstractStream) -> None: """ @@ -44,6 +50,12 @@ def generate_partitions(self, stream: AbstractStream) -> None: """ try: for partition in stream.generate_partitions(): + # If the stream already failed fatally on another partition, stop generating new + # ones. Remaining partitions would only repeat the same doomed request. + if self._stream_abort_registry and self._stream_abort_registry.is_aborted( + stream.name + ): + break # Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that # we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the # future but we expect the delta between the max futures length and the actual to be small enough that it would not be an diff --git a/airbyte_cdk/sources/streams/concurrent/partition_reader.py b/airbyte_cdk/sources/streams/concurrent/partition_reader.py index 0edc5056a..5dded6524 100644 --- a/airbyte_cdk/sources/streams/concurrent/partition_reader.py +++ b/airbyte_cdk/sources/streams/concurrent/partition_reader.py @@ -4,6 +4,7 @@ from queue import Queue from typing import Optional +from airbyte_cdk.sources.concurrent_source.stream_abort_registry import StreamAbortRegistry from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException from airbyte_cdk.sources.message.repository import MessageRepository from airbyte_cdk.sources.streams.concurrent.cursor import Cursor @@ -53,12 +54,17 @@ def __init__( self, queue: Queue[QueueItem], partition_logger: Optional[PartitionLogger] = None, + stream_abort_registry: Optional[StreamAbortRegistry] = None, ) -> None: """ :param queue: The queue to put the records in. + :param partition_logger: Optional logger used to emit a slice log message per partition. + :param stream_abort_registry: Optional registry of streams that failed fatally. Partitions + belonging to an aborted stream are skipped instead of being read. """ self._queue = queue self._partition_logger = partition_logger + self._stream_abort_registry = stream_abort_registry def process_partition(self, partition: Partition, cursor: Cursor) -> None: """ @@ -72,6 +78,15 @@ def process_partition(self, partition: Partition, cursor: Cursor) -> None: :param partition: The partition to read data from :return: None """ + # If the stream already failed fatally on another partition, skip this one instead of + # repeating the same doomed request. We do not close the partition on the cursor so that + # state is not advanced past the failure. + if self._stream_abort_registry and self._stream_abort_registry.is_aborted( + partition.stream_name() + ): + self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL)) + return + try: if self._partition_logger: self._partition_logger.log(partition) diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 20147465f..c89a39517 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -13,6 +13,7 @@ TestLimits, ) from airbyte_cdk.sources.declarative.schema import DynamicSchemaLoader, SchemaTypeIdentifier +from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import TypesMap from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse _CONFIG = { @@ -309,6 +310,153 @@ def test_dynamic_schema_loader_manifest_flow(): assert actual_catalog.streams[0].json_schema == expected_schema +@pytest.mark.parametrize( + "retriever_data, default_type, expected_schema", + [ + pytest.param( + iter( + [ + { + "schema": [ + {"key": "name", "type": "string"}, + {"key": "custom_field", "type": "unknown_source_type"}, + ] + } + ] + ), + "string", + { + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "type": "object", + "properties": { + "name": {"type": ["null", "string"]}, + "custom_field": {"type": ["null", "string"]}, + }, + }, + id="unmapped_type_falls_back_to_default", + ), + pytest.param( + iter( + [ + { + "schema": [ + {"key": "count", "type": "integer"}, + {"key": "blob", "type": "exotic_plugin_type"}, + ] + } + ] + ), + "string", + { + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "type": "object", + "properties": { + "count": {"type": ["null", "integer"]}, + "blob": {"type": ["null", "string"]}, + }, + }, + id="valid_type_unchanged_unknown_type_uses_default", + ), + ], +) +def test_dynamic_schema_loader_default_type(retriever_data, default_type, expected_schema): + schema_type_identifier = SchemaTypeIdentifier( + schema_pointer=["schema"], + key_pointer=["key"], + type_pointer=["type"], + types_mapping=[], + default_type=default_type, + parameters={}, + ) + loader = DynamicSchemaLoader( + retriever=MagicMock(), + config=MagicMock(), + parameters={}, + schema_type_identifier=schema_type_identifier, + ) + loader.retriever.read_records = MagicMock(return_value=retriever_data) + + schema = loader.get_json_schema() + assert schema == expected_schema + + +def test_dynamic_schema_loader_no_default_type_raises_on_unknown(): + schema_type_identifier = SchemaTypeIdentifier( + schema_pointer=["schema"], + key_pointer=["key"], + type_pointer=["type"], + types_mapping=[], + parameters={}, + ) + loader = DynamicSchemaLoader( + retriever=MagicMock(), + config=MagicMock(), + parameters={}, + schema_type_identifier=schema_type_identifier, + ) + loader.retriever.read_records = MagicMock( + return_value=iter([{"schema": [{"key": "field", "type": "totally_unknown"}]}]) + ) + + with pytest.raises(ValueError, match="Invalid Airbyte data type: totally_unknown"): + loader.get_json_schema() + + +def test_dynamic_schema_loader_invalid_default_type_raises(): + schema_type_identifier = SchemaTypeIdentifier( + schema_pointer=["schema"], + key_pointer=["key"], + type_pointer=["type"], + types_mapping=[], + default_type="not_a_real_airbyte_type", + parameters={}, + ) + loader = DynamicSchemaLoader( + retriever=MagicMock(), + config=MagicMock(), + parameters={}, + schema_type_identifier=schema_type_identifier, + ) + loader.retriever.read_records = MagicMock( + return_value=iter([{"schema": [{"key": "field", "type": "unknown"}]}]) + ) + + with pytest.raises( + ValueError, + match="Invalid default Airbyte data type: not_a_real_airbyte_type", + ): + loader.get_json_schema() + + +def test_dynamic_schema_loader_bad_mapping_target_not_masked_by_default_type(): + schema_type_identifier = SchemaTypeIdentifier( + schema_pointer=["schema"], + key_pointer=["key"], + type_pointer=["type"], + types_mapping=[ + TypesMap( + target_type="not_a_real_airbyte_type", current_type="bad_source", condition=None + ), + ], + default_type="string", + parameters={}, + ) + loader = DynamicSchemaLoader( + retriever=MagicMock(), + config=MagicMock(), + parameters={}, + schema_type_identifier=schema_type_identifier, + ) + loader.retriever.read_records = MagicMock( + return_value=iter([{"schema": [{"key": "field", "type": "bad_source"}]}]) + ) + + with pytest.raises(ValueError, match="Invalid Airbyte data type: not_a_real_airbyte_type"): + loader.get_json_schema() + + def test_dynamic_schema_loader_with_type_conditions(): _MANIFEST_WITH_TYPE_CONDITIONS = deepcopy(_MANIFEST) _MANIFEST_WITH_TYPE_CONDITIONS["definitions"]["party_members_stream"]["schema_loader"][