Skip to content
38 changes: 36 additions & 2 deletions airbyte_cdk/sources/file_based/config/file_based_stream_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#

from enum import Enum
from typing import Any, List, Mapping, Optional, Union
from typing import Any, Dict, List, Mapping, Optional, Union

from pydantic.v1 import BaseModel, Field, validator
from pydantic.v1 import BaseModel, Field, root_validator, validator

from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
Expand Down Expand Up @@ -74,6 +74,11 @@ class FileBasedStreamConfig(BaseModel):
default=None,
gt=0,
)
use_first_found_file_for_schema_discovery: bool = Field(
title="Use First Found File For Schema Discover",
description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.",
default=False,
)

@validator("input_schema", pre=True)
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
Expand All @@ -84,6 +89,35 @@ def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
return None

@root_validator
def validate_discovery_related_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""
Please update this validation when new related to schema discovery field is added.
Validates schema discovery options compatibility.
Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided.
So this method doesn't check it to do not break already created connections.
If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided,
recent_n_files_to_read_for_schema_discovery will be ignored and second option will be used by default.
"""
input_schema = values["input_schema"] is not None
schemaless = values["schemaless"]
recent_n_files_to_read_for_schema_discovery = (
values["recent_n_files_to_read_for_schema_discovery"] is not None
)
use_first_found_file_for_schema_discovery = values[
"use_first_found_file_for_schema_discovery"
]

if (
recent_n_files_to_read_for_schema_discovery
and use_first_found_file_for_schema_discovery
) or [schemaless, input_schema, use_first_found_file_for_schema_discovery].count(True) > 1:
raise ConfigValidationError(
FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS
)

return values

def get_input_schema(self) -> Optional[Mapping[str, Any]]:
"""
User defined input_schema is defined as a string in the config. This method takes the string representation
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/sources/file_based/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class FileBasedSourceError(Enum):
"The provided schema could not be transformed into valid JSON Schema."
)
ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy."
ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time."
ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema."
Expand Down
19 changes: 9 additions & 10 deletions airbyte_cdk/sources/file_based/file_based_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,20 @@ def check_connection(
"""
try:
streams = self.streams(config)
except Exception as config_exception:
except ConfigValidationError as config_exception:
raise AirbyteTracedException(
internal_message="Please check the logged errors for more information.",
message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value,
message=str(config_exception),
exception=AirbyteTracedException(exception=config_exception),
failure_type=FailureType.config_error,
)
except Exception as exp:
raise AirbyteTracedException(
internal_message="Please check the logged errors for more information.",
message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value,
exception=AirbyteTracedException(exception=exp),
failure_type=FailureType.config_error,
)
if len(streams) == 0:
return (
False,
Expand Down Expand Up @@ -250,7 +257,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
if (state_manager and catalog_stream)
else None
)
self._validate_input_schema(stream_config)

sync_mode = self._get_sync_mode_from_catalog(stream_config.name)

Expand Down Expand Up @@ -457,10 +463,3 @@ def _validate_and_get_validation_policy(
model=FileBasedStreamConfig,
)
return self.validation_policies[stream_config.validation_policy]

def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None:
if stream_config.schemaless and stream_config.input_schema:
raise ValidationError(
"`input_schema` and `schemaless` options cannot both be set",
model=FileBasedStreamConfig,
)
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ def _get_raw_json_schema(self) -> JsonSchema:
return self.config.get_input_schema() # type: ignore
elif self.config.schemaless:
return schemaless_schema
elif self.config.use_first_found_file_for_schema_discovery:
self.logger.info(
msg=f"Using only first found file for schema discovery for stream {self.name} due to limitation in config."
)
files = list(itertools.islice(self.get_files(), 1))
first_n_files = len(files)
else:
files = self.list_files()
first_n_files = len(files)
Expand Down
237 changes: 231 additions & 6 deletions unit_tests/sources/file_based/scenarios/csv_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,12 @@
"exclusiveMinimum": 0,
"type": "integer",
},
"use_first_found_file_for_schema_discovery": {
"default": False,
"description": "When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.",
"title": "Use First Found File For Schema Discover",
"type": "boolean",
},
},
"required": ["name", "format"],
},
Expand Down Expand Up @@ -2114,12 +2120,14 @@
}
)
.set_expected_check_status("FAILED")
.set_expected_check_error(None, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value)
.set_expected_check_error(
None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
.set_expected_discover_error(
ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
.set_expected_read_error(
ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
).build()

Expand Down Expand Up @@ -2217,12 +2225,229 @@
}
)
.set_expected_check_status("FAILED")
.set_expected_check_error(None, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value)
.set_expected_check_error(
None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
.set_expected_discover_error(
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
.set_expected_read_error(
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
).build()

recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario: TestScenario[
InMemoryFilesSource
] = (
TestScenarioBuilder[InMemoryFilesSource]()
.set_name(
"recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario"
)
.set_config(
{
"streams": [
{
"name": "stream1",
"format": {"filetype": "csv"},
"globs": ["a.csv"],
"validation_policy": "Skip Record",
"recent_n_files_to_read_for_schema_discovery": 5,
"use_first_found_file_for_schema_discovery": True,
},
{
"name": "stream2",
"format": {"filetype": "csv"},
"globs": ["b.csv"],
"validation_policy": "Skip Record",
},
]
}
)
.set_source_builder(
FileBasedSourceBuilder()
.set_files(
{
"a.csv": {
"contents": [
("col1", "col2"),
("val11a", "val12a"),
("val21a", "val22a"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"b.csv": {
"contents": [
("col3",),
("val13b",),
("val23b",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
}
)
.set_file_type("csv")
)
.set_catalog(
CatalogBuilder()
.with_stream("stream1", SyncMode.full_refresh)
.with_stream("stream2", SyncMode.full_refresh)
.build()
)
.set_expected_catalog(
{
"streams": [
{
"json_schema": {
"type": "object",
"properties": {
"data": {"type": "object"},
"_ab_source_file_last_modified": {"type": "string"},
"_ab_source_file_url": {"type": "string"},
},
},
"name": "stream1",
"supported_sync_modes": ["full_refresh", "incremental"],
"is_resumable": True,
"is_file_based": False,
"source_defined_cursor": True,
"default_cursor_field": ["_ab_source_file_last_modified"],
},
{
"json_schema": {
"type": "object",
"properties": {
"col3": {"type": ["null", "string"]},
"_ab_source_file_last_modified": {"type": "string"},
"_ab_source_file_url": {"type": "string"},
},
},
"name": "stream2",
"source_defined_cursor": True,
"default_cursor_field": ["_ab_source_file_last_modified"],
"supported_sync_modes": ["full_refresh", "incremental"],
"is_resumable": True,
"is_file_based": False,
},
]
}
)
.set_expected_check_status("FAILED")
.set_expected_check_error(
None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
.set_expected_discover_error(
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
.set_expected_read_error(
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
).build()


schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario: TestScenario[
InMemoryFilesSource
] = (
TestScenarioBuilder[InMemoryFilesSource]()
.set_name(
"schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario"
)
.set_config(
{
"streams": [
{
"name": "stream1",
"format": {"filetype": "csv"},
"globs": ["a.csv"],
"validation_policy": "Skip Record",
"schemaless": True,
"use_first_found_file_for_schema_discovery": True,
},
{
"name": "stream2",
"format": {"filetype": "csv"},
"globs": ["b.csv"],
"validation_policy": "Skip Record",
},
]
}
)
.set_source_builder(
FileBasedSourceBuilder()
.set_files(
{
"a.csv": {
"contents": [
("col1", "col2"),
("val11a", "val12a"),
("val21a", "val22a"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"b.csv": {
"contents": [
("col3",),
("val13b",),
("val23b",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
}
)
.set_file_type("csv")
)
.set_catalog(
CatalogBuilder()
.with_stream("stream1", SyncMode.full_refresh)
.with_stream("stream2", SyncMode.full_refresh)
.build()
)
.set_expected_catalog(
{
"streams": [
{
"json_schema": {
"type": "object",
"properties": {
"data": {"type": "object"},
"_ab_source_file_last_modified": {"type": "string"},
"_ab_source_file_url": {"type": "string"},
},
},
"name": "stream1",
"supported_sync_modes": ["full_refresh", "incremental"],
"is_resumable": True,
"is_file_based": False,
"source_defined_cursor": True,
"default_cursor_field": ["_ab_source_file_last_modified"],
},
{
"json_schema": {
"type": "object",
"properties": {
"col3": {"type": ["null", "string"]},
"_ab_source_file_last_modified": {"type": "string"},
"_ab_source_file_url": {"type": "string"},
},
},
"name": "stream2",
"source_defined_cursor": True,
"default_cursor_field": ["_ab_source_file_last_modified"],
"supported_sync_modes": ["full_refresh", "incremental"],
"is_resumable": True,
"is_file_based": False,
},
]
}
)
.set_expected_check_status("FAILED")
.set_expected_check_error(
None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
.set_expected_discover_error(
ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
.set_expected_read_error(
ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
)
).build()

Expand Down
Loading
Loading