Skip to content

Commit c90eea1

Browse files
authored
feat: add download_decoder + download_extractor (#50)
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
1 parent 9587d4e commit c90eea1

File tree

8 files changed

+117
-8
lines changed

8 files changed

+117
-8
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,6 +1331,19 @@ definitions:
13311331
$parameters:
13321332
type: object
13331333
additionalProperties: true
1334+
ResponseToFileExtractor:
1335+
title: CSV To File Extractor
1336+
description: A record extractor designed for handling large responses that may exceed memory limits (to prevent OOM issues). It downloads a CSV file to disk, reads the data from disk, and deletes the file once it has been fully processed.
1337+
type: object
1338+
required:
1339+
- type
1340+
properties:
1341+
type:
1342+
type: string
1343+
enum: [ResponseToFileExtractor]
1344+
$parameters:
1345+
type: object
1346+
additionalProperties: true
13341347
ExponentialBackoffStrategy:
13351348
title: Exponential Backoff
13361349
description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count.
@@ -2680,6 +2693,12 @@ definitions:
26802693
anyOf:
26812694
- "$ref": "#/definitions/CustomRecordExtractor"
26822695
- "$ref": "#/definitions/DpathExtractor"
2696+
download_extractor:
2697+
description: Responsible for fetching the records from provided urls.
2698+
anyOf:
2699+
- "$ref": "#/definitions/CustomRecordExtractor"
2700+
- "$ref": "#/definitions/DpathExtractor"
2701+
- "$ref": "#/definitions/ResponseToFileExtractor"
26832702
creation_requester:
26842703
description: Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.
26852704
anyOf:
@@ -2734,6 +2753,16 @@ definitions:
27342753
- "$ref": "#/definitions/IterableDecoder"
27352754
- "$ref": "#/definitions/XmlDecoder"
27362755
- "$ref": "#/definitions/GzipJsonDecoder"
2756+
download_decoder:
2757+
title: Download Decoder
2758+
description: Component decoding the download response so records can be extracted.
2759+
anyOf:
2760+
- "$ref": "#/definitions/CustomDecoder"
2761+
- "$ref": "#/definitions/JsonDecoder"
2762+
- "$ref": "#/definitions/JsonlDecoder"
2763+
- "$ref": "#/definitions/IterableDecoder"
2764+
- "$ref": "#/definitions/XmlDecoder"
2765+
- "$ref": "#/definitions/GzipJsonDecoder"
27372766
$parameters:
27382767
type: object
27392768
additionalProperties: true

airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import uuid
77
import zlib
88
from contextlib import closing
9+
from dataclasses import InitVar, dataclass
910
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple
1011

1112
import pandas as pd
@@ -19,6 +20,7 @@
1920
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10
2021

2122

23+
@dataclass
2224
class ResponseToFileExtractor(RecordExtractor):
2325
"""
2426
This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as
@@ -28,7 +30,9 @@ class ResponseToFileExtractor(RecordExtractor):
2830
a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing.
2931
"""
3032

31-
def __init__(self) -> None:
33+
parameters: InitVar[Mapping[str, Any]]
34+
35+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3236
self.logger = logging.getLogger("airbyte")
3337

3438
def _get_response_encoding(self, headers: Dict[str, Any]) -> str:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,11 @@ class DpathExtractor(BaseModel):
567567
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
568568

569569

570+
class ResponseToFileExtractor(BaseModel):
571+
type: Literal["ResponseToFileExtractor"]
572+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
573+
574+
570575
class ExponentialBackoffStrategy(BaseModel):
571576
type: Literal["ExponentialBackoffStrategy"]
572577
factor: Optional[Union[float, str]] = Field(
@@ -1798,6 +1803,9 @@ class AsyncRetriever(BaseModel):
17981803
...,
17991804
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
18001805
)
1806+
download_extractor: Optional[
1807+
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
1808+
] = Field(None, description="Responsible for fetching the records from provided urls.")
18011809
creation_requester: Union[CustomRequester, HttpRequester] = Field(
18021810
...,
18031811
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
@@ -1848,6 +1856,20 @@ class AsyncRetriever(BaseModel):
18481856
description="Component decoding the response so records can be extracted.",
18491857
title="Decoder",
18501858
)
1859+
download_decoder: Optional[
1860+
Union[
1861+
CustomDecoder,
1862+
JsonDecoder,
1863+
JsonlDecoder,
1864+
IterableDecoder,
1865+
XmlDecoder,
1866+
GzipJsonDecoder,
1867+
]
1868+
] = Field(
1869+
None,
1870+
description="Component decoding the download response so records can be extracted.",
1871+
title="Download Decoder",
1872+
)
18511873
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
18521874

18531875

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,9 @@
270270
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
271271
RequestPath as RequestPathModel,
272272
)
273+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
274+
ResponseToFileExtractor as ResponseToFileExtractorModel,
275+
)
273276
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
274277
SelectiveAuthenticator as SelectiveAuthenticatorModel,
275278
)
@@ -427,6 +430,7 @@ def _init_mappings(self) -> None:
427430
DefaultErrorHandlerModel: self.create_default_error_handler,
428431
DefaultPaginatorModel: self.create_default_paginator,
429432
DpathExtractorModel: self.create_dpath_extractor,
433+
ResponseToFileExtractorModel: self.create_response_to_file_extractor,
430434
ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy,
431435
SessionTokenAuthenticatorModel: self.create_session_token_authenticator,
432436
HttpRequesterModel: self.create_http_requester,
@@ -1447,6 +1451,13 @@ def create_dpath_extractor(
14471451
parameters=model.parameters or {},
14481452
)
14491453

1454+
def create_response_to_file_extractor(
1455+
self,
1456+
model: ResponseToFileExtractorModel,
1457+
**kwargs: Any,
1458+
) -> ResponseToFileExtractor:
1459+
return ResponseToFileExtractor(parameters=model.parameters or {})
1460+
14501461
@staticmethod
14511462
def create_exponential_backoff_strategy(
14521463
model: ExponentialBackoffStrategyModel, config: Config
@@ -2011,6 +2022,7 @@ def create_async_retriever(
20112022
model=model.record_selector,
20122023
config=config,
20132024
decoder=decoder,
2025+
name=name,
20142026
transformations=transformations,
20152027
client_side_incremental_sync=client_side_incremental_sync,
20162028
)
@@ -2028,16 +2040,36 @@ def create_async_retriever(
20282040
name=f"job polling - {name}",
20292041
)
20302042
job_download_components_name = f"job download - {name}"
2043+
download_decoder = (
2044+
self._create_component_from_model(model=model.download_decoder, config=config)
2045+
if model.download_decoder
2046+
else JsonDecoder(parameters={})
2047+
)
2048+
download_extractor = (
2049+
self._create_component_from_model(
2050+
model=model.download_extractor,
2051+
config=config,
2052+
decoder=download_decoder,
2053+
parameters=model.parameters,
2054+
)
2055+
if model.download_extractor
2056+
else DpathExtractor(
2057+
[],
2058+
config=config,
2059+
decoder=download_decoder,
2060+
parameters=model.parameters or {},
2061+
)
2062+
)
20312063
download_requester = self._create_component_from_model(
20322064
model=model.download_requester,
2033-
decoder=decoder,
2065+
decoder=download_decoder,
20342066
config=config,
20352067
name=job_download_components_name,
20362068
)
20372069
download_retriever = SimpleRetriever(
20382070
requester=download_requester,
20392071
record_selector=RecordSelector(
2040-
extractor=ResponseToFileExtractor(),
2072+
extractor=download_extractor,
20412073
name=name,
20422074
record_filter=None,
20432075
transformations=[],

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class AsyncHttpJobRepository(AsyncJobRepository):
4242

4343
job_timeout: Optional[timedelta] = None
4444
record_extractor: RecordExtractor = field(
45-
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor()
45+
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({})
4646
)
4747

4848
def __post_init__(self) -> None:

unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
class ResponseToFileExtractorTest(TestCase):
1616
def setUp(self) -> None:
17-
self._extractor = ResponseToFileExtractor()
17+
self._extractor = ResponseToFileExtractor({})
1818
self._http_mocker = requests_mock.Mocker()
1919
self._http_mocker.__enter__()
2020

@@ -76,7 +76,7 @@ def large_event_response_fixture():
7676
@pytest.mark.limit_memory("20 MB")
7777
def test_response_to_file_extractor_memory_usage(requests_mock, large_events_response):
7878
lines_in_response, file_path = large_events_response
79-
extractor = ResponseToFileExtractor()
79+
extractor = ResponseToFileExtractor({})
8080

8181
url = "https://for-all-mankind.nasa.com/api/v1/users/users1"
8282
requests_mock.get(url, body=open(file_path, "rb"))

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44

55
# mypy: ignore-errors
66
import datetime
7-
from typing import Any, Mapping
7+
from typing import Any, Iterable, Mapping
88

99
import freezegun
1010
import pendulum
1111
import pytest
12+
import requests
1213

1314
from airbyte_cdk import AirbyteTracedException
1415
from airbyte_cdk.models import FailureType, Level
@@ -27,6 +28,7 @@
2728
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
2829
from airbyte_cdk.sources.declarative.decoders import JsonDecoder, PaginationDecoderDecorator
2930
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
31+
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
3032
from airbyte_cdk.sources.declarative.extractors.record_filter import (
3133
ClientSideIncrementalRecordFilterDecorator,
3234
)
@@ -47,6 +49,9 @@
4749
from airbyte_cdk.sources.declarative.models import (
4850
CustomPartitionRouter as CustomPartitionRouterModel,
4951
)
52+
from airbyte_cdk.sources.declarative.models import (
53+
CustomRecordExtractor as CustomRecordExtractorModel,
54+
)
5055
from airbyte_cdk.sources.declarative.models import CustomSchemaLoader as CustomSchemaLoaderModel
5156
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel
5257
from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel
@@ -3271,3 +3276,20 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined():
32713276
"state_type": "date-range",
32723277
"legacy": {},
32733278
}
3279+
3280+
3281+
class CustomRecordExtractor(RecordExtractor):
3282+
def extract_records(
3283+
self,
3284+
response: requests.Response,
3285+
) -> Iterable[Mapping[str, Any]]:
3286+
yield from response.json()
3287+
3288+
3289+
def test_create_custom_record_extractor():
3290+
definition = {
3291+
"type": "CustomRecordExtractor",
3292+
"class_name": "unit_tests.sources.declarative.parsers.test_model_to_component_factory.CustomRecordExtractor",
3293+
}
3294+
component = factory.create_component(CustomRecordExtractorModel, definition, {})
3295+
assert isinstance(component, CustomRecordExtractor)

unit_tests/sources/declarative/requesters/test_http_job_repository.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def setUp(self) -> None:
9595
stream_response=True,
9696
),
9797
record_selector=RecordSelector(
98-
extractor=ResponseToFileExtractor(),
98+
extractor=ResponseToFileExtractor({}),
9999
record_filter=None,
100100
transformations=[],
101101
schema_normalization=TypeTransformer(TransformConfig.NoTransform),

0 commit comments

Comments
 (0)