From 8615bb9376a35a72dc403d77a8c8689fa9cce986 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 12 Nov 2025 15:56:23 +0200 Subject: [PATCH 01/12] use only first found file for discover --- .../config/file_based_stream_config.py | 5 +++++ .../stream/default_file_based_stream.py | 4 ++++ .../stream/test_default_file_based_stream.py | 22 +++++++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index eb592a4aa..a9ebe7227 100644 --- a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -74,6 +74,11 @@ class FileBasedStreamConfig(BaseModel): default=None, gt=0, ) + use_first_found_file_for_schema_discovery: Optional[bool] = Field( + title="Use first found file for schema discovery", + description="When enable, the source will use the first found file for schema discovery. Helps to avoid long discovery step", + default=False, + ) @validator("input_schema", pre=True) def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 3053a74d2..a9d3108b7 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -273,6 +273,10 @@ def _get_raw_json_schema(self) -> JsonSchema: return self.config.get_input_schema() # type: ignore elif self.config.schemaless: return schemaless_schema + elif self.config.use_first_found_file_for_schema_discovery: + self.logger.info(msg=f"Using only first found file for schema discovery.") + files = [next(iter(self.get_files()))] + first_n_files = len(files) else: files = self.list_files() first_n_files = len(files) diff --git a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py index 60716b771..b8e4247c8 100644 --- a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py +++ b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py @@ -226,6 +226,28 @@ def test_override_max_n_files_for_schema_inference_is_respected(self) -> None: } assert self._parser.infer_schema.call_count == 3 + def test_use_first_found_file_for_schema_discovery(self) -> None: + self._stream.config.use_first_found_file_for_schema_discovery = True + + self._discovery_policy.get_max_n_files_for_schema_inference.return_value = 3 + self._discovery_policy.n_concurrent_requests = 1 + self._stream.config.input_schema = None + self._stream.config.schemaless = None + self._stream.config.recent_n_files_to_read_for_schema_discovery = None + self._parser.infer_schema.return_value = {"data": {"type": "string"}} + files = [RemoteFile(uri=f"file{i}", last_modified=self._NOW) for i in range(10)] + self._stream_reader.get_matching_files.return_value = files + + schema = self._stream.get_json_schema() + assert schema == { + "properties": { + "_ab_source_file_last_modified": {"type": "string"}, + "_ab_source_file_url": {"type": "string"}, + "data": {"type": ["null", "string"]}, + }, + "type": "object", + } + def _iter(self, x: Iterable[Any]) -> Iterator[Any]: for item in x: if isinstance(item, Exception): From 377884cf87ea667e72d9057f7b357a224313708b Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 12 Nov 2025 16:29:54 +0200 Subject: [PATCH 02/12] refactor code --- .../sources/file_based/config/file_based_stream_config.py | 4 ++-- .../sources/file_based/stream/default_file_based_stream.py | 6 ++++-- .../file_based/stream/test_default_file_based_stream.py | 2 ++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index a9ebe7227..0938d989f 100644 --- a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -74,9 +74,9 @@ class FileBasedStreamConfig(BaseModel): default=None, gt=0, ) - use_first_found_file_for_schema_discovery: Optional[bool] = Field( + use_first_found_file_for_schema_discovery: bool = Field( title="Use first found file for schema discovery", - description="When enable, the source will use the first found file for schema discovery. Helps to avoid long discovery step", + description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step", default=False, ) diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index a9d3108b7..588c4a18e 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -274,8 +274,10 @@ def _get_raw_json_schema(self) -> JsonSchema: elif self.config.schemaless: return schemaless_schema elif self.config.use_first_found_file_for_schema_discovery: - self.logger.info(msg=f"Using only first found file for schema discovery.") - files = [next(iter(self.get_files()))] + self.logger.info( + msg=f"Using only first found file for schema discovery for stream {self.name} due to limitation in config." + ) + files = list(itertools.islice(self.get_files(), 1)) first_n_files = len(files) else: files = self.list_files() diff --git a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py index b8e4247c8..11a96d6fb 100644 --- a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py +++ b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py @@ -239,6 +239,8 @@ def test_use_first_found_file_for_schema_discovery(self) -> None: self._stream_reader.get_matching_files.return_value = files schema = self._stream.get_json_schema() + assert self._parser.infer_schema.call_count == 1 + assert self._parser.infer_schema.call_args[0][1].uri == "file0" assert schema == { "properties": { "_ab_source_file_last_modified": {"type": "string"}, From 10a3b4e70b21c95f1bab26fbe3dced8575ab0c6b Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 12 Nov 2025 16:44:14 +0200 Subject: [PATCH 03/12] update expected spec --- unit_tests/sources/file_based/scenarios/csv_scenarios.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index e67365ae3..0233122b1 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -485,6 +485,12 @@ "exclusiveMinimum": 0, "type": "integer", }, + "use_first_found_file_for_schema_discovery": { + "default": False, + "description": "When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step", + "title": "Use first found file for schema discovery", + "type": "boolean", + }, }, "required": ["name", "format"], }, From 6b28a5c7c41274a1ea417f22719df82e5dfd8ed3 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 13 Nov 2025 15:16:22 +0200 Subject: [PATCH 04/12] add schema discovery options validation --- .../config/file_based_stream_config.py | 33 +++++++++++++++++-- airbyte_cdk/sources/file_based/exceptions.py | 1 + .../sources/file_based/file_based_source.py | 19 +++++------ 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index 0938d989f..3cc6c7f96 100644 --- a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -5,7 +5,7 @@ from enum import Enum from typing import Any, List, Mapping, Optional, Union -from pydantic.v1 import BaseModel, Field, validator +from pydantic.v1 import BaseModel, Field, validator, root_validator from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat @@ -75,7 +75,7 @@ class FileBasedStreamConfig(BaseModel): gt=0, ) use_first_found_file_for_schema_discovery: bool = Field( - title="Use first found file for schema discovery", + title="Use First Found File For Schema Discover", description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step", default=False, ) @@ -89,6 +89,35 @@ def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA) return None + @root_validator + def validate_discovery_related_fields(cls, values): + """ + Please update this validation when new related to schema discovery field is added. + Validates schema discovery options compatability. + Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided. + So this method doesn't check it to do not break already created connections. + If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided, + recent_n_files_to_read_for_schema_discovery will be ignored and second option will be used by default. + """ + input_schema = values["input_schema"] is not None + schemaless = values["schemaless"] + recent_n_files_to_read_for_schema_discovery = ( + values["recent_n_files_to_read_for_schema_discovery"] is not None + ) + use_first_found_file_for_schema_discovery = values[ + "use_first_found_file_for_schema_discovery" + ] + + if ( + recent_n_files_to_read_for_schema_discovery + and use_first_found_file_for_schema_discovery + ) or [schemaless, input_schema, use_first_found_file_for_schema_discovery].count(True) > 1: + raise ConfigValidationError( + FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS + ) + + return values + def get_input_schema(self) -> Optional[Mapping[str, Any]]: """ User defined input_schema is defined as a string in the config. This method takes the string representation diff --git a/airbyte_cdk/sources/file_based/exceptions.py b/airbyte_cdk/sources/file_based/exceptions.py index 75f7d3f83..c75f3257f 100644 --- a/airbyte_cdk/sources/file_based/exceptions.py +++ b/airbyte_cdk/sources/file_based/exceptions.py @@ -23,6 +23,7 @@ class FileBasedSourceError(Enum): "The provided schema could not be transformed into valid JSON Schema." ) ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy." + ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time." ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema." diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 2d34fe5dc..df469e834 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -156,13 +156,20 @@ def check_connection( """ try: streams = self.streams(config) - except Exception as config_exception: + except ConfigValidationError as config_exception: raise AirbyteTracedException( internal_message="Please check the logged errors for more information.", - message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value, + message=str(config_exception), exception=AirbyteTracedException(exception=config_exception), failure_type=FailureType.config_error, ) + except Exception as exp: + raise AirbyteTracedException( + internal_message="Please check the logged errors for more information.", + message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value, + exception=AirbyteTracedException(exception=exp), + failure_type=FailureType.config_error, + ) if len(streams) == 0: return ( False, @@ -250,7 +257,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: if (state_manager and catalog_stream) else None ) - self._validate_input_schema(stream_config) sync_mode = self._get_sync_mode_from_catalog(stream_config.name) @@ -457,10 +463,3 @@ def _validate_and_get_validation_policy( model=FileBasedStreamConfig, ) return self.validation_policies[stream_config.validation_policy] - - def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None: - if stream_config.schemaless and stream_config.input_schema: - raise ValidationError( - "`input_schema` and `schemaless` options cannot both be set", - model=FileBasedStreamConfig, - ) From 2aaeb4964b6b938efd94fa38cb1ef61889fd3e80 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 13 Nov 2025 15:17:06 +0200 Subject: [PATCH 05/12] add unit tests --- .../file_based/scenarios/csv_scenarios.py | 231 +++++++++++++++++- .../file_based/test_file_based_scenarios.py | 6 + 2 files changed, 231 insertions(+), 6 deletions(-) diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index 0233122b1..5b4c74742 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -2120,12 +2120,14 @@ } ) .set_expected_check_status("FAILED") - .set_expected_check_error(None, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value) + .set_expected_check_error( + None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) .set_expected_discover_error( - ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value ) .set_expected_read_error( - ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value ) ).build() @@ -2223,12 +2225,229 @@ } ) .set_expected_check_status("FAILED") - .set_expected_check_error(None, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value) + .set_expected_check_error( + None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) + .set_expected_discover_error( + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) + .set_expected_read_error( + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) +).build() + +recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario: TestScenario[ + InMemoryFilesSource +] = ( + TestScenarioBuilder[InMemoryFilesSource]() + .set_name( + "recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario" + ) + .set_config( + { + "streams": [ + { + "name": "stream1", + "format": {"filetype": "csv"}, + "globs": ["a.csv"], + "validation_policy": "Skip Record", + "recent_n_files_to_read_for_schema_discovery": 5, + "use_first_found_file_for_schema_discovery": True, + }, + { + "name": "stream2", + "format": {"filetype": "csv"}, + "globs": ["b.csv"], + "validation_policy": "Skip Record", + }, + ] + } + ) + .set_source_builder( + FileBasedSourceBuilder() + .set_files( + { + "a.csv": { + "contents": [ + ("col1", "col2"), + ("val11a", "val12a"), + ("val21a", "val22a"), + ], + "last_modified": "2023-06-05T03:54:07.000Z", + }, + "b.csv": { + "contents": [ + ("col3",), + ("val13b",), + ("val23b",), + ], + "last_modified": "2023-06-05T03:54:07.000Z", + }, + } + ) + .set_file_type("csv") + ) + .set_catalog( + CatalogBuilder() + .with_stream("stream1", SyncMode.full_refresh) + .with_stream("stream2", SyncMode.full_refresh) + .build() + ) + .set_expected_catalog( + { + "streams": [ + { + "json_schema": { + "type": "object", + "properties": { + "data": {"type": "object"}, + "_ab_source_file_last_modified": {"type": "string"}, + "_ab_source_file_url": {"type": "string"}, + }, + }, + "name": "stream1", + "supported_sync_modes": ["full_refresh", "incremental"], + "is_resumable": True, + "is_file_based": False, + "source_defined_cursor": True, + "default_cursor_field": ["_ab_source_file_last_modified"], + }, + { + "json_schema": { + "type": "object", + "properties": { + "col3": {"type": ["null", "string"]}, + "_ab_source_file_last_modified": {"type": "string"}, + "_ab_source_file_url": {"type": "string"}, + }, + }, + "name": "stream2", + "source_defined_cursor": True, + "default_cursor_field": ["_ab_source_file_last_modified"], + "supported_sync_modes": ["full_refresh", "incremental"], + "is_resumable": True, + "is_file_based": False, + }, + ] + } + ) + .set_expected_check_status("FAILED") + .set_expected_check_error( + None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) + .set_expected_discover_error( + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) + .set_expected_read_error( + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) +).build() + + +schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario: TestScenario[ + InMemoryFilesSource +] = ( + TestScenarioBuilder[InMemoryFilesSource]() + .set_name( + "schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario" + ) + .set_config( + { + "streams": [ + { + "name": "stream1", + "format": {"filetype": "csv"}, + "globs": ["a.csv"], + "validation_policy": "Skip Record", + "schemaless": True, + "use_first_found_file_for_schema_discovery": True, + }, + { + "name": "stream2", + "format": {"filetype": "csv"}, + "globs": ["b.csv"], + "validation_policy": "Skip Record", + }, + ] + } + ) + .set_source_builder( + FileBasedSourceBuilder() + .set_files( + { + "a.csv": { + "contents": [ + ("col1", "col2"), + ("val11a", "val12a"), + ("val21a", "val22a"), + ], + "last_modified": "2023-06-05T03:54:07.000Z", + }, + "b.csv": { + "contents": [ + ("col3",), + ("val13b",), + ("val23b",), + ], + "last_modified": "2023-06-05T03:54:07.000Z", + }, + } + ) + .set_file_type("csv") + ) + .set_catalog( + CatalogBuilder() + .with_stream("stream1", SyncMode.full_refresh) + .with_stream("stream2", SyncMode.full_refresh) + .build() + ) + .set_expected_catalog( + { + "streams": [ + { + "json_schema": { + "type": "object", + "properties": { + "data": {"type": "object"}, + "_ab_source_file_last_modified": {"type": "string"}, + "_ab_source_file_url": {"type": "string"}, + }, + }, + "name": "stream1", + "supported_sync_modes": ["full_refresh", "incremental"], + "is_resumable": True, + "is_file_based": False, + "source_defined_cursor": True, + "default_cursor_field": ["_ab_source_file_last_modified"], + }, + { + "json_schema": { + "type": "object", + "properties": { + "col3": {"type": ["null", "string"]}, + "_ab_source_file_last_modified": {"type": "string"}, + "_ab_source_file_url": {"type": "string"}, + }, + }, + "name": "stream2", + "source_defined_cursor": True, + "default_cursor_field": ["_ab_source_file_last_modified"], + "supported_sync_modes": ["full_refresh", "incremental"], + "is_resumable": True, + "is_file_based": False, + }, + ] + } + ) + .set_expected_check_status("FAILED") + .set_expected_check_error( + None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value + ) .set_expected_discover_error( - ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value ) .set_expected_read_error( - ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value + ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value ) ).build() diff --git a/unit_tests/sources/file_based/test_file_based_scenarios.py b/unit_tests/sources/file_based/test_file_based_scenarios.py index 278dcf1ac..91075eebf 100644 --- a/unit_tests/sources/file_based/test_file_based_scenarios.py +++ b/unit_tests/sources/file_based/test_file_based_scenarios.py @@ -84,6 +84,8 @@ schemaless_csv_multi_stream_scenario, schemaless_csv_scenario, schemaless_with_user_input_schema_fails_connection_check_multi_stream_scenario, + recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, + schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, schemaless_with_user_input_schema_fails_connection_check_scenario, single_csv_scenario, ) @@ -207,6 +209,8 @@ schemaless_csv_scenario, schemaless_csv_multi_stream_scenario, schemaless_with_user_input_schema_fails_connection_check_multi_stream_scenario, + recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, + schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, schemaless_with_user_input_schema_fails_connection_check_scenario, single_stream_user_input_schema_scenario_schema_is_invalid, single_stream_user_input_schema_scenario_emit_nonconforming_records, @@ -312,6 +316,8 @@ success_multi_stream_scenario, success_user_provided_schema_scenario, schemaless_with_user_input_schema_fails_connection_check_multi_stream_scenario, + recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, + schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, schemaless_with_user_input_schema_fails_connection_check_scenario, valid_single_stream_user_input_schema_scenario, single_avro_scenario, From 6cb415341ff95cf7f60c5c2498cebf9740a4153f Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 13 Nov 2025 13:20:15 +0000 Subject: [PATCH 06/12] Auto-fix lint and format issues --- .../sources/file_based/config/file_based_stream_config.py | 2 +- unit_tests/sources/file_based/test_file_based_scenarios.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index 3cc6c7f96..c59261fd1 100644 --- a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -5,7 +5,7 @@ from enum import Enum from typing import Any, List, Mapping, Optional, Union -from pydantic.v1 import BaseModel, Field, validator, root_validator +from pydantic.v1 import BaseModel, Field, root_validator, validator from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat diff --git a/unit_tests/sources/file_based/test_file_based_scenarios.py b/unit_tests/sources/file_based/test_file_based_scenarios.py index 91075eebf..36c9f42fc 100644 --- a/unit_tests/sources/file_based/test_file_based_scenarios.py +++ b/unit_tests/sources/file_based/test_file_based_scenarios.py @@ -81,11 +81,11 @@ multi_csv_stream_n_file_exceeds_config_limit_for_inference, multi_csv_stream_n_file_exceeds_limit_for_inference, multi_stream_custom_format, + recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, schemaless_csv_multi_stream_scenario, schemaless_csv_scenario, - schemaless_with_user_input_schema_fails_connection_check_multi_stream_scenario, - recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario, + schemaless_with_user_input_schema_fails_connection_check_multi_stream_scenario, schemaless_with_user_input_schema_fails_connection_check_scenario, single_csv_scenario, ) From 01af8ff5b18f154ac718548fc9a8b6fed0023e03 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 13 Nov 2025 15:42:25 +0200 Subject: [PATCH 07/12] fix unit tests --- .../sources/file_based/config/file_based_stream_config.py | 2 +- unit_tests/sources/file_based/scenarios/csv_scenarios.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index c59261fd1..2c6ca3d94 100644 --- a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -90,7 +90,7 @@ def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: return None @root_validator - def validate_discovery_related_fields(cls, values): + def validate_discovery_related_fields(cls, values: dict) -> dict: """ Please update this validation when new related to schema discovery field is added. Validates schema discovery options compatability. diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index 5b4c74742..a288eb40a 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -488,7 +488,7 @@ "use_first_found_file_for_schema_discovery": { "default": False, "description": "When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step", - "title": "Use first found file for schema discovery", + "title": "Use First Found File For Schema Discover", "type": "boolean", }, }, From f7237646808a760fb5ae89db6f4d2c8e3493a2f4 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 13 Nov 2025 15:46:47 +0200 Subject: [PATCH 08/12] fix typing --- .../sources/file_based/config/file_based_stream_config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index 2c6ca3d94..79c374b69 100644 --- a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -3,7 +3,7 @@ # from enum import Enum -from typing import Any, List, Mapping, Optional, Union +from typing import Any, List, Mapping, Optional, Union, Dict from pydantic.v1 import BaseModel, Field, root_validator, validator @@ -90,7 +90,7 @@ def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: return None @root_validator - def validate_discovery_related_fields(cls, values: dict) -> dict: + def validate_discovery_related_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]: """ Please update this validation when new related to schema discovery field is added. Validates schema discovery options compatability. From 0d5e443968455fad71917783a6ff8a1f986e76d2 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 13 Nov 2025 13:49:51 +0000 Subject: [PATCH 09/12] Auto-fix lint and format issues --- .../sources/file_based/config/file_based_stream_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index 79c374b69..3c0b3aa5a 100644 --- a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -3,7 +3,7 @@ # from enum import Enum -from typing import Any, List, Mapping, Optional, Union, Dict +from typing import Any, Dict, List, Mapping, Optional, Union from pydantic.v1 import BaseModel, Field, root_validator, validator From ad750a17e7c2a7af7f8d259bf6f035b388b701bc Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 13 Nov 2025 15:58:50 +0200 Subject: [PATCH 10/12] update description --- .../sources/file_based/config/file_based_stream_config.py | 2 +- unit_tests/sources/file_based/scenarios/csv_scenarios.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index 3c0b3aa5a..186c9e575 100644 --- a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -76,7 +76,7 @@ class FileBasedStreamConfig(BaseModel): ) use_first_found_file_for_schema_discovery: bool = Field( title="Use First Found File For Schema Discover", - description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step", + description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.", default=False, ) diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index a288eb40a..f31585412 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -487,7 +487,7 @@ }, "use_first_found_file_for_schema_discovery": { "default": False, - "description": "When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step", + "description": "When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.", "title": "Use First Found File For Schema Discover", "type": "boolean", }, From d312c9c485eaffd840a6ff9db65b1e31df1ca6dc Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 13 Nov 2025 16:02:53 +0200 Subject: [PATCH 11/12] fix unit test --- .../sources/file_based/stream/test_default_file_based_stream.py | 1 + 1 file changed, 1 insertion(+) diff --git a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py index 11a96d6fb..54394a36d 100644 --- a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py +++ b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py @@ -209,6 +209,7 @@ def test_override_max_n_files_for_schema_inference_is_respected(self) -> None: self._discovery_policy.get_max_n_files_for_schema_inference.return_value = 3 self._stream.config.input_schema = None self._stream.config.schemaless = None + self._stream.config.use_first_found_file_for_schema_discovery = False self._parser.infer_schema.return_value = {"data": {"type": "string"}} files = [RemoteFile(uri=f"file{i}", last_modified=self._NOW) for i in range(10)] self._stream_reader.get_matching_files.return_value = files From 31b6be0a76d684cea2aee97669afc5d05eaef73c Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 13 Nov 2025 16:06:48 +0200 Subject: [PATCH 12/12] fix typo --- .../sources/file_based/config/file_based_stream_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index 186c9e575..4946fb626 100644 --- a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -93,7 +93,7 @@ def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: def validate_discovery_related_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]: """ Please update this validation when new related to schema discovery field is added. - Validates schema discovery options compatability. + Validates schema discovery options compatibility. Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided. So this method doesn't check it to do not break already created connections. If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided,