Skip to content

Commit 5d9125f

Browse files
darynaishchenkooctavia-squidington-iii
andauthored
feat(file-based): option to use only first found file for discover (#841)
Co-authored-by: octavia-squidington-iii <contact@airbyte.com>
1 parent c9a5cf9 commit 5d9125f

File tree

7 files changed

+314
-18
lines changed

7 files changed

+314
-18
lines changed

airbyte_cdk/sources/file_based/config/file_based_stream_config.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
#
44

55
from enum import Enum
6-
from typing import Any, List, Mapping, Optional, Union
6+
from typing import Any, Dict, List, Mapping, Optional, Union
77

8-
from pydantic.v1 import BaseModel, Field, validator
8+
from pydantic.v1 import BaseModel, Field, root_validator, validator
99

1010
from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
1111
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
@@ -74,6 +74,11 @@ class FileBasedStreamConfig(BaseModel):
7474
default=None,
7575
gt=0,
7676
)
77+
use_first_found_file_for_schema_discovery: bool = Field(
78+
title="Use First Found File For Schema Discover",
79+
description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.",
80+
default=False,
81+
)
7782

7883
@validator("input_schema", pre=True)
7984
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
@@ -84,6 +89,35 @@ def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
8489
raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
8590
return None
8691

92+
@root_validator
93+
def validate_discovery_related_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]:
94+
"""
95+
Please update this validation when new related to schema discovery field is added.
96+
Validates schema discovery options compatibility.
97+
Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided.
98+
So this method doesn't check it to do not break already created connections.
99+
If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided,
100+
recent_n_files_to_read_for_schema_discovery will be ignored and second option will be used by default.
101+
"""
102+
input_schema = values["input_schema"] is not None
103+
schemaless = values["schemaless"]
104+
recent_n_files_to_read_for_schema_discovery = (
105+
values["recent_n_files_to_read_for_schema_discovery"] is not None
106+
)
107+
use_first_found_file_for_schema_discovery = values[
108+
"use_first_found_file_for_schema_discovery"
109+
]
110+
111+
if (
112+
recent_n_files_to_read_for_schema_discovery
113+
and use_first_found_file_for_schema_discovery
114+
) or [schemaless, input_schema, use_first_found_file_for_schema_discovery].count(True) > 1:
115+
raise ConfigValidationError(
116+
FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS
117+
)
118+
119+
return values
120+
87121
def get_input_schema(self) -> Optional[Mapping[str, Any]]:
88122
"""
89123
User defined input_schema is defined as a string in the config. This method takes the string representation

airbyte_cdk/sources/file_based/exceptions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class FileBasedSourceError(Enum):
2323
"The provided schema could not be transformed into valid JSON Schema."
2424
)
2525
ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy."
26+
ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time."
2627
ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
2728
ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
2829
STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema."

airbyte_cdk/sources/file_based/file_based_source.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,20 @@ def check_connection(
156156
"""
157157
try:
158158
streams = self.streams(config)
159-
except Exception as config_exception:
159+
except ConfigValidationError as config_exception:
160160
raise AirbyteTracedException(
161161
internal_message="Please check the logged errors for more information.",
162-
message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value,
162+
message=str(config_exception),
163163
exception=AirbyteTracedException(exception=config_exception),
164164
failure_type=FailureType.config_error,
165165
)
166+
except Exception as exp:
167+
raise AirbyteTracedException(
168+
internal_message="Please check the logged errors for more information.",
169+
message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value,
170+
exception=AirbyteTracedException(exception=exp),
171+
failure_type=FailureType.config_error,
172+
)
166173
if len(streams) == 0:
167174
return (
168175
False,
@@ -250,7 +257,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
250257
if (state_manager and catalog_stream)
251258
else None
252259
)
253-
self._validate_input_schema(stream_config)
254260

255261
sync_mode = self._get_sync_mode_from_catalog(stream_config.name)
256262

@@ -457,10 +463,3 @@ def _validate_and_get_validation_policy(
457463
model=FileBasedStreamConfig,
458464
)
459465
return self.validation_policies[stream_config.validation_policy]
460-
461-
def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None:
462-
if stream_config.schemaless and stream_config.input_schema:
463-
raise ValidationError(
464-
"`input_schema` and `schemaless` options cannot both be set",
465-
model=FileBasedStreamConfig,
466-
)

airbyte_cdk/sources/file_based/stream/default_file_based_stream.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,12 @@ def _get_raw_json_schema(self) -> JsonSchema:
273273
return self.config.get_input_schema() # type: ignore
274274
elif self.config.schemaless:
275275
return schemaless_schema
276+
elif self.config.use_first_found_file_for_schema_discovery:
277+
self.logger.info(
278+
msg=f"Using only first found file for schema discovery for stream {self.name} due to limitation in config."
279+
)
280+
files = list(itertools.islice(self.get_files(), 1))
281+
first_n_files = len(files)
276282
else:
277283
files = self.list_files()
278284
first_n_files = len(files)

unit_tests/sources/file_based/scenarios/csv_scenarios.py

Lines changed: 231 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,12 @@
485485
"exclusiveMinimum": 0,
486486
"type": "integer",
487487
},
488+
"use_first_found_file_for_schema_discovery": {
489+
"default": False,
490+
"description": "When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.",
491+
"title": "Use First Found File For Schema Discover",
492+
"type": "boolean",
493+
},
488494
},
489495
"required": ["name", "format"],
490496
},
@@ -2114,12 +2120,14 @@
21142120
}
21152121
)
21162122
.set_expected_check_status("FAILED")
2117-
.set_expected_check_error(None, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value)
2123+
.set_expected_check_error(
2124+
None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
2125+
)
21182126
.set_expected_discover_error(
2119-
ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value
2127+
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
21202128
)
21212129
.set_expected_read_error(
2122-
ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value
2130+
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
21232131
)
21242132
).build()
21252133

@@ -2217,12 +2225,229 @@
22172225
}
22182226
)
22192227
.set_expected_check_status("FAILED")
2220-
.set_expected_check_error(None, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value)
2228+
.set_expected_check_error(
2229+
None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
2230+
)
2231+
.set_expected_discover_error(
2232+
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
2233+
)
2234+
.set_expected_read_error(
2235+
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
2236+
)
2237+
).build()
2238+
2239+
recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario: TestScenario[
2240+
InMemoryFilesSource
2241+
] = (
2242+
TestScenarioBuilder[InMemoryFilesSource]()
2243+
.set_name(
2244+
"recent_n_files_to_read_for_schema_discovery_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario"
2245+
)
2246+
.set_config(
2247+
{
2248+
"streams": [
2249+
{
2250+
"name": "stream1",
2251+
"format": {"filetype": "csv"},
2252+
"globs": ["a.csv"],
2253+
"validation_policy": "Skip Record",
2254+
"recent_n_files_to_read_for_schema_discovery": 5,
2255+
"use_first_found_file_for_schema_discovery": True,
2256+
},
2257+
{
2258+
"name": "stream2",
2259+
"format": {"filetype": "csv"},
2260+
"globs": ["b.csv"],
2261+
"validation_policy": "Skip Record",
2262+
},
2263+
]
2264+
}
2265+
)
2266+
.set_source_builder(
2267+
FileBasedSourceBuilder()
2268+
.set_files(
2269+
{
2270+
"a.csv": {
2271+
"contents": [
2272+
("col1", "col2"),
2273+
("val11a", "val12a"),
2274+
("val21a", "val22a"),
2275+
],
2276+
"last_modified": "2023-06-05T03:54:07.000Z",
2277+
},
2278+
"b.csv": {
2279+
"contents": [
2280+
("col3",),
2281+
("val13b",),
2282+
("val23b",),
2283+
],
2284+
"last_modified": "2023-06-05T03:54:07.000Z",
2285+
},
2286+
}
2287+
)
2288+
.set_file_type("csv")
2289+
)
2290+
.set_catalog(
2291+
CatalogBuilder()
2292+
.with_stream("stream1", SyncMode.full_refresh)
2293+
.with_stream("stream2", SyncMode.full_refresh)
2294+
.build()
2295+
)
2296+
.set_expected_catalog(
2297+
{
2298+
"streams": [
2299+
{
2300+
"json_schema": {
2301+
"type": "object",
2302+
"properties": {
2303+
"data": {"type": "object"},
2304+
"_ab_source_file_last_modified": {"type": "string"},
2305+
"_ab_source_file_url": {"type": "string"},
2306+
},
2307+
},
2308+
"name": "stream1",
2309+
"supported_sync_modes": ["full_refresh", "incremental"],
2310+
"is_resumable": True,
2311+
"is_file_based": False,
2312+
"source_defined_cursor": True,
2313+
"default_cursor_field": ["_ab_source_file_last_modified"],
2314+
},
2315+
{
2316+
"json_schema": {
2317+
"type": "object",
2318+
"properties": {
2319+
"col3": {"type": ["null", "string"]},
2320+
"_ab_source_file_last_modified": {"type": "string"},
2321+
"_ab_source_file_url": {"type": "string"},
2322+
},
2323+
},
2324+
"name": "stream2",
2325+
"source_defined_cursor": True,
2326+
"default_cursor_field": ["_ab_source_file_last_modified"],
2327+
"supported_sync_modes": ["full_refresh", "incremental"],
2328+
"is_resumable": True,
2329+
"is_file_based": False,
2330+
},
2331+
]
2332+
}
2333+
)
2334+
.set_expected_check_status("FAILED")
2335+
.set_expected_check_error(
2336+
None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
2337+
)
2338+
.set_expected_discover_error(
2339+
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
2340+
)
2341+
.set_expected_read_error(
2342+
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
2343+
)
2344+
).build()
2345+
2346+
2347+
schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario: TestScenario[
2348+
InMemoryFilesSource
2349+
] = (
2350+
TestScenarioBuilder[InMemoryFilesSource]()
2351+
.set_name(
2352+
"schemaless_with_use_first_found_file_for_schema_discovery_fails_connection_check_multi_stream_scenario"
2353+
)
2354+
.set_config(
2355+
{
2356+
"streams": [
2357+
{
2358+
"name": "stream1",
2359+
"format": {"filetype": "csv"},
2360+
"globs": ["a.csv"],
2361+
"validation_policy": "Skip Record",
2362+
"schemaless": True,
2363+
"use_first_found_file_for_schema_discovery": True,
2364+
},
2365+
{
2366+
"name": "stream2",
2367+
"format": {"filetype": "csv"},
2368+
"globs": ["b.csv"],
2369+
"validation_policy": "Skip Record",
2370+
},
2371+
]
2372+
}
2373+
)
2374+
.set_source_builder(
2375+
FileBasedSourceBuilder()
2376+
.set_files(
2377+
{
2378+
"a.csv": {
2379+
"contents": [
2380+
("col1", "col2"),
2381+
("val11a", "val12a"),
2382+
("val21a", "val22a"),
2383+
],
2384+
"last_modified": "2023-06-05T03:54:07.000Z",
2385+
},
2386+
"b.csv": {
2387+
"contents": [
2388+
("col3",),
2389+
("val13b",),
2390+
("val23b",),
2391+
],
2392+
"last_modified": "2023-06-05T03:54:07.000Z",
2393+
},
2394+
}
2395+
)
2396+
.set_file_type("csv")
2397+
)
2398+
.set_catalog(
2399+
CatalogBuilder()
2400+
.with_stream("stream1", SyncMode.full_refresh)
2401+
.with_stream("stream2", SyncMode.full_refresh)
2402+
.build()
2403+
)
2404+
.set_expected_catalog(
2405+
{
2406+
"streams": [
2407+
{
2408+
"json_schema": {
2409+
"type": "object",
2410+
"properties": {
2411+
"data": {"type": "object"},
2412+
"_ab_source_file_last_modified": {"type": "string"},
2413+
"_ab_source_file_url": {"type": "string"},
2414+
},
2415+
},
2416+
"name": "stream1",
2417+
"supported_sync_modes": ["full_refresh", "incremental"],
2418+
"is_resumable": True,
2419+
"is_file_based": False,
2420+
"source_defined_cursor": True,
2421+
"default_cursor_field": ["_ab_source_file_last_modified"],
2422+
},
2423+
{
2424+
"json_schema": {
2425+
"type": "object",
2426+
"properties": {
2427+
"col3": {"type": ["null", "string"]},
2428+
"_ab_source_file_last_modified": {"type": "string"},
2429+
"_ab_source_file_url": {"type": "string"},
2430+
},
2431+
},
2432+
"name": "stream2",
2433+
"source_defined_cursor": True,
2434+
"default_cursor_field": ["_ab_source_file_last_modified"],
2435+
"supported_sync_modes": ["full_refresh", "incremental"],
2436+
"is_resumable": True,
2437+
"is_file_based": False,
2438+
},
2439+
]
2440+
}
2441+
)
2442+
.set_expected_check_status("FAILED")
2443+
.set_expected_check_error(
2444+
None, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
2445+
)
22212446
.set_expected_discover_error(
2222-
ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value
2447+
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
22232448
)
22242449
.set_expected_read_error(
2225-
ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value
2450+
ConfigValidationError, FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS.value
22262451
)
22272452
).build()
22282453

0 commit comments

Comments
 (0)