From 0170fdd0055ec086624ca15bbe0de9faae828588 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 21 May 2026 18:42:23 +0300 Subject: [PATCH 1/4] fix(file-based): throttle state-message emission to prevent platform OOM on large file streams FileBasedConcurrentCursor emits a state message after every processed file. Each state contains the full file-history dict, so on a sync of N files the cursor emits N states whose sizes grow with N (O(N^2) total bytes). The platform/orchestrator buffers each state until the destination ACKs the matching record batch; destinations flush on time/size, so state messages pile up in orchestrator memory and OOM the replication pod on large file streams. The downstream symptom is the source pod being torn down without emitting terminal stream status, which surfaces at the destination as TransientErrorException("Input was fully read, but some streams did not receive a terminal stream status message"). Mirror the throttle pattern from ConcurrentPerPartitionCursor (fix for oncall #7856) on the file-based concurrent cursor: emit at most one state per 600 seconds during the sync, and force-emit a final state via ensure_at_least_one_state_emitted so the platform always receives the closing state regardless of timing. --- .../cursor/file_based_concurrent_cursor.py | 32 +++++++++- .../test_file_based_concurrent_cursor.py | 59 +++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py index 0ae9178c0..ed872e707 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py @@ -3,6 +3,7 @@ # import logging +import time from datetime import datetime, timedelta from threading import RLock from typing import TYPE_CHECKING, Any, Dict, Iterable, List, MutableMapping, Optional, Tuple @@ -34,6 +35,13 @@ class FileBasedConcurrentCursor(AbstractConcurrentFileBasedCursor): ) DEFAULT_MAX_HISTORY_SIZE = 10_000 DATE_TIME_FORMAT = DefaultFileBasedCursor.DATE_TIME_FORMAT + # Minimum interval between successive state-message emissions during a sync. + # Each state message contains the full file history dict, which grows linearly + # with files synced (O(N) per message, O(N^2) over the sync). Emitting one per + # file pressures destination input buffers and platform state storage; throttle + # so the host platform/destination is not overwhelmed by huge, redundant states. + # The final state is always force-emitted via ensure_at_least_one_state_emitted. + DEFAULT_STATE_EMISSION_INTERVAL_SECONDS = 600 zero_value = datetime.min zero_cursor_value = f"0001-01-01T00:00:00.000000Z_{_NULL_FILE}" @@ -64,6 +72,8 @@ def __init__( self._file_to_datetime_history = stream_state.get("history", {}) if stream_state else {} self._prev_cursor_value = self._compute_prev_sync_cursor(stream_state) self._sync_start = self._compute_start_time() + # Track the last time a state message was emitted, used by the throttle. + self._last_emission_time: float = 0.0 @property def state(self) -> MutableMapping[str, Any]: @@ -168,6 +178,24 @@ def add_file(self, file: RemoteFile) -> None: self.emit_state_message() def emit_state_message(self) -> None: + """Emit a state message, throttled to one per DEFAULT_STATE_EMISSION_INTERVAL_SECONDS.""" + self._emit_state_message(throttle=True) + + def _throttle_state_message(self) -> Optional[float]: + current_time = time.time() + if ( + current_time - self._last_emission_time + <= self.DEFAULT_STATE_EMISSION_INTERVAL_SECONDS + ): + return None + return current_time + + def _emit_state_message(self, throttle: bool = True) -> None: + if throttle: + current_time = self._throttle_state_message() + if current_time is None: + return + self._last_emission_time = current_time with self._state_lock: new_state = self.get_state() self._connector_state_manager.update_state_for_stream( @@ -310,7 +338,9 @@ def set_initial_state(self, value: StreamState) -> None: pass def ensure_at_least_one_state_emitted(self) -> None: - self.emit_state_message() + # Bypass the throttle so the platform always receives a final state message, + # even when the sync completes within the throttle window. + self._emit_state_message(throttle=False) def should_be_synced(self, record: Record) -> bool: return True diff --git a/unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py b/unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py index 2c8b74ea5..d6838b88a 100644 --- a/unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py +++ b/unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py @@ -587,3 +587,62 @@ def test_compute_start_time(input_history, is_history_full, expected_start_time, cursor._file_to_datetime_history = input_history cursor._is_history_full = MagicMock(return_value=is_history_full) assert cursor._compute_start_time() == expected_start_time + + +def test_state_throttling(mocker): + """ + emit_state_message must skip emission if less than DEFAULT_STATE_EMISSION_INTERVAL_SECONDS + have passed since the previous emission, and must emit once the interval has elapsed. + """ + cursor = _make_cursor({"history": {}}) + mock_connector_manager = cursor._connector_state_manager = MagicMock() + mock_repo = cursor._message_repository = MagicMock() + cursor._last_emission_time = 0.0 + + mock_time = mocker.patch("time.time") + + # 100s elapsed: under threshold, no emission + mock_time.return_value = 100 + cursor.emit_state_message() + mock_connector_manager.update_state_for_stream.assert_not_called() + mock_repo.emit_message.assert_not_called() + + # 300s elapsed: still under threshold + mock_time.return_value = 300 + cursor.emit_state_message() + mock_connector_manager.update_state_for_stream.assert_not_called() + mock_repo.emit_message.assert_not_called() + + # 700s elapsed: over the 600s threshold, must emit once + mock_time.return_value = 700 + cursor.emit_state_message() + mock_connector_manager.update_state_for_stream.assert_called_once() + mock_repo.emit_message.assert_called_once() + + # 800s elapsed (only 100s since last emit): throttled again + mock_time.return_value = 800 + cursor.emit_state_message() + mock_connector_manager.update_state_for_stream.assert_called_once() + mock_repo.emit_message.assert_called_once() + + +def test_ensure_at_least_one_state_emitted_bypasses_throttle(mocker): + """ + The final state at end-of-stream must always be emitted, even within the throttle + window. Otherwise the platform could see a sync finish with no final state. + """ + cursor = _make_cursor({"history": {}}) + mock_connector_manager = cursor._connector_state_manager = MagicMock() + mock_repo = cursor._message_repository = MagicMock() + + # Pretend a throttled emission just happened. + mock_time = mocker.patch("time.time") + mock_time.return_value = 10 + cursor._last_emission_time = 10.0 + + # Only 1s later — well within the throttle window. + mock_time.return_value = 11 + cursor.ensure_at_least_one_state_emitted() + + mock_connector_manager.update_state_for_stream.assert_called_once() + mock_repo.emit_message.assert_called_once() From 1d3d679dccf4830ea54adf971fad47b42e6fb641 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 21 May 2026 15:47:00 +0000 Subject: [PATCH 2/4] Auto-fix lint and format issues --- .../stream/concurrent/cursor/file_based_concurrent_cursor.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py index ed872e707..3d81601fb 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py @@ -183,10 +183,7 @@ def emit_state_message(self) -> None: def _throttle_state_message(self) -> Optional[float]: current_time = time.time() - if ( - current_time - self._last_emission_time - <= self.DEFAULT_STATE_EMISSION_INTERVAL_SECONDS - ): + if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL_SECONDS: return None return current_time From 0fa23c9c67f608d5798b1f561590ebd63f0d65e9 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 21 May 2026 21:14:27 +0300 Subject: [PATCH 3/4] fix(file-based): dispatch cursor_cls to multi-arg signature in fallback branch FileBasedSource.streams() iterates every stream declared in the connector config (not just the catalog). For streams without a catalog entry sync_mode is None and the fallback branch calls `self.cursor_cls(stream_config)`. That single-arg form works for DefaultFileBasedCursor, but FileBasedConcurrentCursor requires 7 args, so the moment a connector switches to cursor_cls=FileBasedConcurrentCursor it crashes during read on any partial-catalog selection, plus on every check/discover (no catalog at all). Detect the cursor flavor and pass the full constructor args when it's a concurrent cursor. Unblocks file-based connectors (e.g. source-s3) from migrating onto the concurrent cursor path and picking up the state emission throttle added earlier in this PR. --- .../sources/file_based/file_based_source.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index df469e834..88827d147 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -313,7 +313,24 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: cursor=cursor, ) else: - cursor = self.cursor_cls(stream_config) + # This branch fires when sync_mode is None (stream is in the + # connector config but not the selected catalog — also during + # check / discover). For concurrent cursors we cannot use the + # single-arg constructor; dispatch to the full signature so + # connectors with cursor_cls=FileBasedConcurrentCursor (or a + # subclass) don't crash before read even begins. + if issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor): + cursor = self.cursor_cls( + stream_config, + stream_config.name, + None, + stream_state, + self.message_repository, + state_manager, + CursorField(DefaultFileBasedStream.ab_last_mod_col), + ) + else: + cursor = self.cursor_cls(stream_config) stream = self._make_file_based_stream( stream_config=stream_config, cursor=cursor, From f0ba35f1ecfff754f57adbcbc8ceb056376069af Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 25 May 2026 23:16:21 +0300 Subject: [PATCH 4/4] feat(file-based): expose a per-connector override for the concurrent record queue size MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ConcurrentSource defaults its inter-worker record queue to maxsize=10_000. On file-based connectors with large per-record Python representations (e.g. multi-KB JSONL blobs), the queue contribution alone can dominate peak source-pod RSS — the queue at full holds tens of thousands of dict objects in flight. Add a class-level `_concurrent_record_queue_maxsize` (default 10_000, unchanged behaviour) that subclasses can override to bound the queue more tightly when their per-record memory cost warrants it. Pass the result through to ConcurrentSource via the existing optional `queue` arg. No CDK-default change; opt-in per connector. Used by source-s3 with maxsize=1_000 to keep peak RSS under the 2 Gi connector pod limit on streams with many or large file records. --- airbyte_cdk/sources/file_based/file_based_source.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 88827d147..b62645d33 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -22,6 +22,8 @@ SyncMode, ) from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource +from queue import Queue + from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.file_based.availability_strategy import ( @@ -87,6 +89,13 @@ class FileBasedSource(ConcurrentSourceAdapter, ABC): # We make each source override the concurrency level to give control over when they are upgraded. _concurrency_level = None + # Override on a per-connector basis to bound the in-memory inter-worker + # record queue. Smaller values reduce peak source-pod RSS at a small cost + # in throughput slack; larger values give more buffering between fast + # producers and a slow main thread. Default matches the original + # ConcurrentSource behaviour. + _concurrent_record_queue_maxsize: int = 10_000 + def __init__( self, stream_reader: AbstractFileBasedStreamReader, @@ -130,6 +139,7 @@ def __init__( self.logger, self._slice_logger, self.message_repository, + queue=Queue(maxsize=self._concurrent_record_queue_maxsize), ) self._state = None super().__init__(concurrent_source)