Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion airbyte_cdk/sources/file_based/file_based_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
SyncMode,
)
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
from queue import Queue

from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.file_based.availability_strategy import (
Expand Down Expand Up @@ -87,6 +89,13 @@ class FileBasedSource(ConcurrentSourceAdapter, ABC):
# We make each source override the concurrency level to give control over when they are upgraded.
_concurrency_level = None

# Override on a per-connector basis to bound the in-memory inter-worker
# record queue. Smaller values reduce peak source-pod RSS at a small cost
# in throughput slack; larger values give more buffering between fast
# producers and a slow main thread. Default matches the original
# ConcurrentSource behaviour.
_concurrent_record_queue_maxsize: int = 10_000

def __init__(
self,
stream_reader: AbstractFileBasedStreamReader,
Expand Down Expand Up @@ -130,6 +139,7 @@ def __init__(
self.logger,
self._slice_logger,
self.message_repository,
queue=Queue(maxsize=self._concurrent_record_queue_maxsize),
)
self._state = None
super().__init__(concurrent_source)
Expand Down Expand Up @@ -313,7 +323,24 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
cursor=cursor,
)
else:
cursor = self.cursor_cls(stream_config)
# This branch fires when sync_mode is None (stream is in the
# connector config but not the selected catalog — also during
# check / discover). For concurrent cursors we cannot use the
# single-arg constructor; dispatch to the full signature so
# connectors with cursor_cls=FileBasedConcurrentCursor (or a
# subclass) don't crash before read even begins.
if issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor):
cursor = self.cursor_cls(
stream_config,
stream_config.name,
None,
stream_state,
self.message_repository,
state_manager,
CursorField(DefaultFileBasedStream.ab_last_mod_col),
)
else:
cursor = self.cursor_cls(stream_config)
stream = self._make_file_based_stream(
stream_config=stream_config,
cursor=cursor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import logging
import time
from datetime import datetime, timedelta
from threading import RLock
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, MutableMapping, Optional, Tuple
Expand Down Expand Up @@ -34,6 +35,13 @@ class FileBasedConcurrentCursor(AbstractConcurrentFileBasedCursor):
)
DEFAULT_MAX_HISTORY_SIZE = 10_000
DATE_TIME_FORMAT = DefaultFileBasedCursor.DATE_TIME_FORMAT
# Minimum interval between successive state-message emissions during a sync.
# Each state message contains the full file history dict, which grows linearly
# with files synced (O(N) per message, O(N^2) over the sync). Emitting one per
# file pressures destination input buffers and platform state storage; throttle
# so the host platform/destination is not overwhelmed by huge, redundant states.
# The final state is always force-emitted via ensure_at_least_one_state_emitted.
DEFAULT_STATE_EMISSION_INTERVAL_SECONDS = 600
zero_value = datetime.min
zero_cursor_value = f"0001-01-01T00:00:00.000000Z_{_NULL_FILE}"

Expand Down Expand Up @@ -64,6 +72,8 @@ def __init__(
self._file_to_datetime_history = stream_state.get("history", {}) if stream_state else {}
self._prev_cursor_value = self._compute_prev_sync_cursor(stream_state)
self._sync_start = self._compute_start_time()
# Track the last time a state message was emitted, used by the throttle.
self._last_emission_time: float = 0.0

@property
def state(self) -> MutableMapping[str, Any]:
Expand Down Expand Up @@ -168,6 +178,21 @@ def add_file(self, file: RemoteFile) -> None:
self.emit_state_message()

def emit_state_message(self) -> None:
"""Emit a state message, throttled to one per DEFAULT_STATE_EMISSION_INTERVAL_SECONDS."""
self._emit_state_message(throttle=True)

def _throttle_state_message(self) -> Optional[float]:
current_time = time.time()
if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL_SECONDS:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use time.monotonic() instead of time.time() for elapsed-time throttling. time.time() can jump forward or backward due to wall-clock adjustments, which can either bypass the throttle early or suppress state emission far longer than 600 seconds. Since _last_emission_time only measures elapsed duration within this process, a monotonic clock is the safer fit.


Devin session

return None
return current_time

def _emit_state_message(self, throttle: bool = True) -> None:
if throttle:
current_time = self._throttle_state_message()
if current_time is None:
return
self._last_emission_time = current_time
with self._state_lock:
new_state = self.get_state()
self._connector_state_manager.update_state_for_stream(
Expand Down Expand Up @@ -310,7 +335,9 @@ def set_initial_state(self, value: StreamState) -> None:
pass

def ensure_at_least_one_state_emitted(self) -> None:
self.emit_state_message()
# Bypass the throttle so the platform always receives a final state message,
# even when the sync completes within the throttle window.
self._emit_state_message(throttle=False)

def should_be_synced(self, record: Record) -> bool:
return True
Original file line number Diff line number Diff line change
Expand Up @@ -587,3 +587,62 @@ def test_compute_start_time(input_history, is_history_full, expected_start_time,
cursor._file_to_datetime_history = input_history
cursor._is_history_full = MagicMock(return_value=is_history_full)
assert cursor._compute_start_time() == expected_start_time


def test_state_throttling(mocker):
"""
emit_state_message must skip emission if less than DEFAULT_STATE_EMISSION_INTERVAL_SECONDS
have passed since the previous emission, and must emit once the interval has elapsed.
"""
cursor = _make_cursor({"history": {}})
mock_connector_manager = cursor._connector_state_manager = MagicMock()
mock_repo = cursor._message_repository = MagicMock()
cursor._last_emission_time = 0.0

mock_time = mocker.patch("time.time")

# 100s elapsed: under threshold, no emission
mock_time.return_value = 100
cursor.emit_state_message()
mock_connector_manager.update_state_for_stream.assert_not_called()
mock_repo.emit_message.assert_not_called()

# 300s elapsed: still under threshold
mock_time.return_value = 300
cursor.emit_state_message()
mock_connector_manager.update_state_for_stream.assert_not_called()
mock_repo.emit_message.assert_not_called()

# 700s elapsed: over the 600s threshold, must emit once
mock_time.return_value = 700
cursor.emit_state_message()
mock_connector_manager.update_state_for_stream.assert_called_once()
mock_repo.emit_message.assert_called_once()

# 800s elapsed (only 100s since last emit): throttled again
mock_time.return_value = 800
cursor.emit_state_message()
mock_connector_manager.update_state_for_stream.assert_called_once()
mock_repo.emit_message.assert_called_once()


def test_ensure_at_least_one_state_emitted_bypasses_throttle(mocker):
"""
The final state at end-of-stream must always be emitted, even within the throttle
window. Otherwise the platform could see a sync finish with no final state.
"""
cursor = _make_cursor({"history": {}})
mock_connector_manager = cursor._connector_state_manager = MagicMock()
mock_repo = cursor._message_repository = MagicMock()

# Pretend a throttled emission just happened.
mock_time = mocker.patch("time.time")
mock_time.return_value = 10
cursor._last_emission_time = 10.0

# Only 1s later — well within the throttle window.
mock_time.return_value = 11
cursor.ensure_at_least_one_state_emitted()

mock_connector_manager.update_state_for_stream.assert_called_once()
mock_repo.emit_message.assert_called_once()
Loading