diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index df469e834..b62645d33 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -22,6 +22,8 @@ SyncMode, ) from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource +from queue import Queue + from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.file_based.availability_strategy import ( @@ -87,6 +89,13 @@ class FileBasedSource(ConcurrentSourceAdapter, ABC): # We make each source override the concurrency level to give control over when they are upgraded. _concurrency_level = None + # Override on a per-connector basis to bound the in-memory inter-worker + # record queue. Smaller values reduce peak source-pod RSS at a small cost + # in throughput slack; larger values give more buffering between fast + # producers and a slow main thread. Default matches the original + # ConcurrentSource behaviour. + _concurrent_record_queue_maxsize: int = 10_000 + def __init__( self, stream_reader: AbstractFileBasedStreamReader, @@ -130,6 +139,7 @@ def __init__( self.logger, self._slice_logger, self.message_repository, + queue=Queue(maxsize=self._concurrent_record_queue_maxsize), ) self._state = None super().__init__(concurrent_source) @@ -313,7 +323,24 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: cursor=cursor, ) else: - cursor = self.cursor_cls(stream_config) + # This branch fires when sync_mode is None (stream is in the + # connector config but not the selected catalog — also during + # check / discover). For concurrent cursors we cannot use the + # single-arg constructor; dispatch to the full signature so + # connectors with cursor_cls=FileBasedConcurrentCursor (or a + # subclass) don't crash before read even begins. + if issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor): + cursor = self.cursor_cls( + stream_config, + stream_config.name, + None, + stream_state, + self.message_repository, + state_manager, + CursorField(DefaultFileBasedStream.ab_last_mod_col), + ) + else: + cursor = self.cursor_cls(stream_config) stream = self._make_file_based_stream( stream_config=stream_config, cursor=cursor, diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py index 0ae9178c0..3d81601fb 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py @@ -3,6 +3,7 @@ # import logging +import time from datetime import datetime, timedelta from threading import RLock from typing import TYPE_CHECKING, Any, Dict, Iterable, List, MutableMapping, Optional, Tuple @@ -34,6 +35,13 @@ class FileBasedConcurrentCursor(AbstractConcurrentFileBasedCursor): ) DEFAULT_MAX_HISTORY_SIZE = 10_000 DATE_TIME_FORMAT = DefaultFileBasedCursor.DATE_TIME_FORMAT + # Minimum interval between successive state-message emissions during a sync. + # Each state message contains the full file history dict, which grows linearly + # with files synced (O(N) per message, O(N^2) over the sync). Emitting one per + # file pressures destination input buffers and platform state storage; throttle + # so the host platform/destination is not overwhelmed by huge, redundant states. + # The final state is always force-emitted via ensure_at_least_one_state_emitted. + DEFAULT_STATE_EMISSION_INTERVAL_SECONDS = 600 zero_value = datetime.min zero_cursor_value = f"0001-01-01T00:00:00.000000Z_{_NULL_FILE}" @@ -64,6 +72,8 @@ def __init__( self._file_to_datetime_history = stream_state.get("history", {}) if stream_state else {} self._prev_cursor_value = self._compute_prev_sync_cursor(stream_state) self._sync_start = self._compute_start_time() + # Track the last time a state message was emitted, used by the throttle. + self._last_emission_time: float = 0.0 @property def state(self) -> MutableMapping[str, Any]: @@ -168,6 +178,21 @@ def add_file(self, file: RemoteFile) -> None: self.emit_state_message() def emit_state_message(self) -> None: + """Emit a state message, throttled to one per DEFAULT_STATE_EMISSION_INTERVAL_SECONDS.""" + self._emit_state_message(throttle=True) + + def _throttle_state_message(self) -> Optional[float]: + current_time = time.time() + if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL_SECONDS: + return None + return current_time + + def _emit_state_message(self, throttle: bool = True) -> None: + if throttle: + current_time = self._throttle_state_message() + if current_time is None: + return + self._last_emission_time = current_time with self._state_lock: new_state = self.get_state() self._connector_state_manager.update_state_for_stream( @@ -310,7 +335,9 @@ def set_initial_state(self, value: StreamState) -> None: pass def ensure_at_least_one_state_emitted(self) -> None: - self.emit_state_message() + # Bypass the throttle so the platform always receives a final state message, + # even when the sync completes within the throttle window. + self._emit_state_message(throttle=False) def should_be_synced(self, record: Record) -> bool: return True diff --git a/unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py b/unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py index 2c8b74ea5..d6838b88a 100644 --- a/unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py +++ b/unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py @@ -587,3 +587,62 @@ def test_compute_start_time(input_history, is_history_full, expected_start_time, cursor._file_to_datetime_history = input_history cursor._is_history_full = MagicMock(return_value=is_history_full) assert cursor._compute_start_time() == expected_start_time + + +def test_state_throttling(mocker): + """ + emit_state_message must skip emission if less than DEFAULT_STATE_EMISSION_INTERVAL_SECONDS + have passed since the previous emission, and must emit once the interval has elapsed. + """ + cursor = _make_cursor({"history": {}}) + mock_connector_manager = cursor._connector_state_manager = MagicMock() + mock_repo = cursor._message_repository = MagicMock() + cursor._last_emission_time = 0.0 + + mock_time = mocker.patch("time.time") + + # 100s elapsed: under threshold, no emission + mock_time.return_value = 100 + cursor.emit_state_message() + mock_connector_manager.update_state_for_stream.assert_not_called() + mock_repo.emit_message.assert_not_called() + + # 300s elapsed: still under threshold + mock_time.return_value = 300 + cursor.emit_state_message() + mock_connector_manager.update_state_for_stream.assert_not_called() + mock_repo.emit_message.assert_not_called() + + # 700s elapsed: over the 600s threshold, must emit once + mock_time.return_value = 700 + cursor.emit_state_message() + mock_connector_manager.update_state_for_stream.assert_called_once() + mock_repo.emit_message.assert_called_once() + + # 800s elapsed (only 100s since last emit): throttled again + mock_time.return_value = 800 + cursor.emit_state_message() + mock_connector_manager.update_state_for_stream.assert_called_once() + mock_repo.emit_message.assert_called_once() + + +def test_ensure_at_least_one_state_emitted_bypasses_throttle(mocker): + """ + The final state at end-of-stream must always be emitted, even within the throttle + window. Otherwise the platform could see a sync finish with no final state. + """ + cursor = _make_cursor({"history": {}}) + mock_connector_manager = cursor._connector_state_manager = MagicMock() + mock_repo = cursor._message_repository = MagicMock() + + # Pretend a throttled emission just happened. + mock_time = mocker.patch("time.time") + mock_time.return_value = 10 + cursor._last_emission_time = 10.0 + + # Only 1s later — well within the throttle window. + mock_time.return_value = 11 + cursor.ensure_at_least_one_state_emitted() + + mock_connector_manager.update_state_for_stream.assert_called_once() + mock_repo.emit_message.assert_called_once()