From fcd1d222eb19c61124a8b81014880c7c7b8e9d71 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 25 May 2026 15:49:17 +0000 Subject: [PATCH 1/2] fix(file-based): respect _concurrency_level for thread pool sizing FileBasedSource.__init__ always created the ConcurrentSource with MAX_CONCURRENCY (100) workers regardless of the subclass's _concurrency_level setting. This meant connectors that set _concurrency_level to a lower value (e.g. 20) still got 100 concurrent file readers, causing OOM on large S3 streams within 2 Gi container limits. Use _concurrency_level (capped at MAX_CONCURRENCY) to size the thread pool and initial partition count. When _concurrency_level is None (default), the existing MAX_CONCURRENCY is used. Co-Authored-By: bot_apk --- .../sources/file_based/file_based_source.py | 11 ++- .../test_file_based_source_concurrency.py | 67 +++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) create mode 100644 unit_tests/sources/file_based/test_file_based_source_concurrency.py diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index df469e834..62ad1abb5 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -124,9 +124,16 @@ def __init__( self.logger = init_logger(f"airbyte.{self.name}") self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector() self._message_repository: Optional[MessageRepository] = None + configured_concurrency: int | None = self._concurrency_level + concurrency = ( + min(configured_concurrency, MAX_CONCURRENCY) + if configured_concurrency is not None + else MAX_CONCURRENCY + ) + initial_n_partitions = max(concurrency // 2, 1) concurrent_source = ConcurrentSource.create( - MAX_CONCURRENCY, - INITIAL_N_PARTITIONS, + concurrency, + initial_n_partitions, self.logger, self._slice_logger, self.message_repository, diff --git a/unit_tests/sources/file_based/test_file_based_source_concurrency.py b/unit_tests/sources/file_based/test_file_based_source_concurrency.py new file mode 100644 index 000000000..cc3eebb01 --- /dev/null +++ b/unit_tests/sources/file_based/test_file_based_source_concurrency.py @@ -0,0 +1,67 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock, patch + +import pytest + +from airbyte_cdk.sources.file_based.file_based_source import ( + DEFAULT_CONCURRENCY, + MAX_CONCURRENCY, + FileBasedSource, +) + + +class _ConcreteFBSource(FileBasedSource): + """Minimal concrete subclass so we can instantiate FileBasedSource.""" + + _concurrency_level = None + + @property + def name(self) -> str: + return "test-source" + + def check_connection(self, logger, config): + return True, None + + def streams(self, config): + return [] + + +@pytest.mark.parametrize( + "concurrency_level, expected_num_workers, expected_initial_partitions", + [ + pytest.param(None, MAX_CONCURRENCY, MAX_CONCURRENCY // 2, id="none_uses_max"), + pytest.param(100, 100, 50, id="default_concurrency"), + pytest.param(20, 20, 10, id="reduced_concurrency"), + pytest.param(2, 2, 1, id="minimal_concurrency"), + pytest.param(200, MAX_CONCURRENCY, MAX_CONCURRENCY // 2, id="capped_at_max"), + ], +) +def test_concurrency_level_controls_thread_pool_size( + concurrency_level, expected_num_workers, expected_initial_partitions +): + _ConcreteFBSource._concurrency_level = concurrency_level + + with patch( + "airbyte_cdk.sources.file_based.file_based_source.ConcurrentSource.create" + ) as mock_create: + mock_create.return_value = MagicMock() + try: + _ConcreteFBSource( + stream_reader=MagicMock(), + spec_class=MagicMock(), + catalog=None, + config=None, + state=None, + ) + except Exception: + pass # Other init errors are fine; we only care about the ConcurrentSource.create call + + mock_create.assert_called_once() + call_args = mock_create.call_args + actual_num_workers = call_args[0][0] + actual_initial_partitions = call_args[0][1] + assert actual_num_workers == expected_num_workers + assert actual_initial_partitions == expected_initial_partitions From eb8f9100ec95bfc385134cb9230b36bd590e35bc Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 25 May 2026 15:51:58 +0000 Subject: [PATCH 2/2] chore: remove unused DEFAULT_CONCURRENCY import Co-Authored-By: bot_apk --- .../sources/file_based/test_file_based_source_concurrency.py | 1 - 1 file changed, 1 deletion(-) diff --git a/unit_tests/sources/file_based/test_file_based_source_concurrency.py b/unit_tests/sources/file_based/test_file_based_source_concurrency.py index cc3eebb01..bb6d4a7fe 100644 --- a/unit_tests/sources/file_based/test_file_based_source_concurrency.py +++ b/unit_tests/sources/file_based/test_file_based_source_concurrency.py @@ -7,7 +7,6 @@ import pytest from airbyte_cdk.sources.file_based.file_based_source import ( - DEFAULT_CONCURRENCY, MAX_CONCURRENCY, FileBasedSource, )