Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
PartitionGenerationCompletedSentinel,
)
from airbyte_cdk.sources.concurrent_source.stream_abort_registry import StreamAbortRegistry
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
Expand Down Expand Up @@ -46,6 +47,7 @@ def __init__(
message_repository: MessageRepository,
partition_reader: PartitionReader,
max_concurrent_partition_generators: Optional[int] = None,
stream_abort_registry: Optional[StreamAbortRegistry] = None,
):
"""
This class is responsible for handling items from a concurrent stream read process.
Expand Down Expand Up @@ -87,6 +89,7 @@ def __init__(
self._partition_reader = partition_reader
self._streams_done: Set[str] = set()
self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
self._stream_abort_registry = stream_abort_registry

# Track which streams (by name) are currently active
# A stream is "active" if it's generating partitions or has partitions being read
Expand Down Expand Up @@ -234,14 +237,24 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
"""
This method is called when an exception is raised.
1. Stop all running streams
2. Raise the exception
1. Record the exception for the stream
2. If the failure is stream-wide (a config error, e.g. an authorization failure that
affects every partition), abort the stream so its remaining partitions are skipped
instead of repeating the same doomed request.
3. Emit a trace message for the exception
"""
self._flag_exception(exception.stream_name, exception.exception)
self._logger.exception(
f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
)

if self._stream_abort_registry and self._is_stream_wide_failure(exception.exception):
self._logger.info(
f"Stream {exception.stream_name} failed with a stream-wide error. "
f"Skipping its remaining partitions."
)
self._stream_abort_registry.abort(exception.stream_name)

stream_descriptor = StreamDescriptor(name=exception.stream_name)
if isinstance(exception.exception, AirbyteTracedException):
yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
Expand All @@ -255,6 +268,19 @@ def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMess
def _flag_exception(self, stream_name: str, exception: Exception) -> None:
self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)

@staticmethod
def _is_stream_wide_failure(exception: Exception) -> bool:
"""Whether an exception should abort the whole stream rather than just the partition.

A `config_error` (e.g. 401/403 authorization failures) affects every partition of the
stream, so there is no point reading the rest. Other failures (transient server errors,
timeouts, etc.) are left as best-effort: the remaining partitions still get a chance.
"""
return (
isinstance(exception, AirbyteTracedException)
and exception.failure_type == FailureType.config_error
)

def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
"""
Submits the next partition generator to the thread pool.
Expand Down
10 changes: 9 additions & 1 deletion airbyte_cdk/sources/concurrent_source/concurrent_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
PartitionGenerationCompletedSentinel,
)
from airbyte_cdk.sources.concurrent_source.stream_abort_registry import StreamAbortRegistry
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
Expand Down Expand Up @@ -110,18 +111,25 @@ def read(
streams: List[AbstractStream],
) -> Iterator[AirbyteMessage]:
self._logger.info("Starting syncing")
# Shared across the generator, reader, and processor so that a stream-wide failure on one
# partition stops the stream's remaining partitions instead of repeating a doomed request.
stream_abort_registry = StreamAbortRegistry()
concurrent_stream_processor = ConcurrentReadProcessor(
streams,
PartitionEnqueuer(self._queue, self._threadpool),
PartitionEnqueuer(
self._queue, self._threadpool, stream_abort_registry=stream_abort_registry
),
self._threadpool,
self._logger,
self._slice_logger,
self._message_repository,
PartitionReader(
self._queue,
PartitionLogger(self._slice_logger, self._logger, self._message_repository),
stream_abort_registry=stream_abort_registry,
),
max_concurrent_partition_generators=self._initial_number_partitions_to_generate,
stream_abort_registry=stream_abort_registry,
)

# Enqueue initial partition generation tasks
Expand Down
31 changes: 31 additions & 0 deletions airbyte_cdk/sources/concurrent_source/stream_abort_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.

import threading
from typing import Set


class StreamAbortRegistry:
"""Thread-safe registry of streams whose remaining partitions should be skipped.

When a partition raises a fatal, stream-wide error (e.g. an authorization failure
that affects every partition of the stream), the concurrent reader records the
stream here. Partitions that are still queued or not yet started then short-circuit
instead of repeating the same doomed request, so the stream fails fast rather than
exhausting every remaining partition.

The registry is shared between the main thread (which marks streams as aborted) and
the worker threads generating and reading partitions (which check it), so all access
is guarded by a lock.
"""

def __init__(self) -> None:
self._aborted: Set[str] = set()
self._lock = threading.Lock()

def abort(self, stream_name: str) -> None:
with self._lock:
self._aborted.add(stream_name)

def is_aborted(self, stream_name: str) -> bool:
with self._lock:
return stream_name in self._aborted
Original file line number Diff line number Diff line change
Expand Up @@ -2555,6 +2555,14 @@ definitions:
type: array
items:
"$ref": "#/definitions/TypesMap"
default_type:
title: Default Type
description: The default Airbyte type to use when no type mapping matches the source field type.
type: string
examples:
- "string"
- "number"
- "integer"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,11 @@ class SchemaTypeIdentifier(BaseModel):
title="Type Path",
)
types_mapping: Optional[List[TypesMap]] = None
default_type: Optional[str] = Field(
None,
description="The default Airbyte type to use when no type mapping matches the source field type.",
title="Default Type",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2600,6 +2600,7 @@ def create_schema_type_identifier(
key_pointer=model_key_pointer,
type_pointer=model_type_pointer,
types_mapping=types_mapping,
default_type=model.default_type,
parameters=model.parameters or {},
)

Expand Down
17 changes: 14 additions & 3 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class SchemaTypeIdentifier:
type_pointer: Optional[List[Union[InterpolatedString, str]]] = None
types_mapping: Optional[List[TypesMap]] = None
schema_pointer: Optional[List[Union[InterpolatedString, str]]] = None
default_type: Optional[str] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.schema_pointer = (
Expand Down Expand Up @@ -215,6 +216,7 @@ def _get_type(
else "string"
)
mapped_field_type = self._replace_type_if_not_valid(raw_field_type, raw_schema)
type_was_mapped = mapped_field_type is not raw_field_type
if (
isinstance(mapped_field_type, list)
and len(mapped_field_type) == 2
Expand All @@ -225,7 +227,9 @@ def _get_type(
return {"oneOf": [first_type, second_type]}

elif isinstance(mapped_field_type, str):
return self._get_airbyte_type(mapped_field_type)
return self._get_airbyte_type(
mapped_field_type, allow_default_fallback=not type_was_mapped
)

elif isinstance(mapped_field_type, ComplexFieldType):
return self._resolve_complex_type(mapped_field_type)
Expand Down Expand Up @@ -269,12 +273,19 @@ def _replace_type_if_not_valid(
return types_map.target_type
return field_type

@staticmethod
def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]:
def _get_airbyte_type(
self, field_type: str, *, allow_default_fallback: bool = False
) -> MutableMapping[str, Any]:
"""
Maps a field type to its corresponding Airbyte type definition.
Falls back to `default_type` when `allow_default_fallback` is True and `field_type` is not recognized.
"""
if field_type not in AIRBYTE_DATA_TYPES:
default_type = self.schema_type_identifier.default_type
if allow_default_fallback and default_type is not None:
if default_type not in AIRBYTE_DATA_TYPES:
raise ValueError(f"Invalid default Airbyte data type: {default_type}")
return deepcopy(AIRBYTE_DATA_TYPES[default_type])
raise ValueError(f"Invalid Airbyte data type: {field_type}")

return deepcopy(AIRBYTE_DATA_TYPES[field_type])
Expand Down
12 changes: 12 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
#
import time
from queue import Queue
from typing import Optional

from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
PartitionGenerationCompletedSentinel,
)
from airbyte_cdk.sources.concurrent_source.stream_abort_registry import StreamAbortRegistry
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
Expand All @@ -23,14 +25,18 @@ def __init__(
queue: Queue[QueueItem],
thread_pool_manager: ThreadPoolManager,
sleep_time_in_seconds: float = 0.1,
stream_abort_registry: Optional[StreamAbortRegistry] = None,
) -> None:
"""
:param queue: The queue to put the partitions in.
:param throttler: The throttler to use to throttle the partition generation.
:param stream_abort_registry: Optional registry of streams that failed fatally. Partition
generation stops early for a stream once it has been aborted.
"""
self._queue = queue
self._thread_pool_manager = thread_pool_manager
self._sleep_time_in_seconds = sleep_time_in_seconds
self._stream_abort_registry = stream_abort_registry

def generate_partitions(self, stream: AbstractStream) -> None:
"""
Expand All @@ -44,6 +50,12 @@ def generate_partitions(self, stream: AbstractStream) -> None:
"""
try:
for partition in stream.generate_partitions():
# If the stream already failed fatally on another partition, stop generating new
# ones. Remaining partitions would only repeat the same doomed request.
if self._stream_abort_registry and self._stream_abort_registry.is_aborted(
stream.name
):
break
# Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that
# we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the
# future but we expect the delta between the max futures length and the actual to be small enough that it would not be an
Expand Down
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/partition_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from queue import Queue
from typing import Optional

from airbyte_cdk.sources.concurrent_source.stream_abort_registry import StreamAbortRegistry
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
from airbyte_cdk.sources.message.repository import MessageRepository
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
Expand Down Expand Up @@ -53,12 +54,17 @@ def __init__(
self,
queue: Queue[QueueItem],
partition_logger: Optional[PartitionLogger] = None,
stream_abort_registry: Optional[StreamAbortRegistry] = None,
) -> None:
"""
:param queue: The queue to put the records in.
:param partition_logger: Optional logger used to emit a slice log message per partition.
:param stream_abort_registry: Optional registry of streams that failed fatally. Partitions
belonging to an aborted stream are skipped instead of being read.
"""
self._queue = queue
self._partition_logger = partition_logger
self._stream_abort_registry = stream_abort_registry

def process_partition(self, partition: Partition, cursor: Cursor) -> None:
"""
Expand All @@ -72,6 +78,15 @@ def process_partition(self, partition: Partition, cursor: Cursor) -> None:
:param partition: The partition to read data from
:return: None
"""
# If the stream already failed fatally on another partition, skip this one instead of
# repeating the same doomed request. We do not close the partition on the cursor so that
# state is not advanced past the failure.
if self._stream_abort_registry and self._stream_abort_registry.is_aborted(
partition.stream_name()
):
self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
return

try:
if self._partition_logger:
self._partition_logger.log(partition)
Expand Down
Loading
Loading