Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2650,6 +2650,40 @@ definitions:
type:
type: string
enum: [JsonDecoder]
JsonItemsDecoder:
title: JSON Items (Streaming)
description: >-
Select 'JSON Items (Streaming)' to stream-decode a single JSON document
by yielding each element of a nested array, one at a time. Use this for
very large single-document JSON responses (e.g. a wrapping object
containing a multi-GB array) where buffering the whole document into
memory would cause out-of-memory errors. Powered by the `ijson`
streaming parser.
type: object
required:
- type
- items_path
properties:
type:
type: string
enum: [JsonItemsDecoder]
items_path:
title: Items Path
description: >-
Dot-separated path to the JSON array whose elements should be
yielded as records. Uses `ijson` path syntax (e.g. `data.users`),
not JSONPath syntax — do not include leading `$.` or trailing
`[*]`.
type: string
examples:
- dataByDepartmentAndSearchTerm
- dataByAsin
- data.users
encoding:
title: Encoding
description: Text encoding used to decode the streamed bytes before JSON parsing.
type: string
default: utf-8
JsonlDecoder:
title: JSON Lines
description: Select 'JSON Lines' if the response consists of JSON objects separated by new lines ('\n') in JSONL format.
Expand Down Expand Up @@ -2891,6 +2925,7 @@ definitions:
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonItemsDecoder"
- "$ref": "#/definitions/JsonlDecoder"
ListPartitionRouter:
title: List Partition Router
Expand Down Expand Up @@ -3931,6 +3966,7 @@ definitions:
description: Component decoding the response so records can be extracted.
anyOf:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonItemsDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/JsonlDecoder"
Expand Down Expand Up @@ -4019,6 +4055,7 @@ definitions:
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonItemsDecoder"
- "$ref": "#/definitions/JsonlDecoder"
CsvDecoder:
title: CSV
Expand Down Expand Up @@ -4185,6 +4222,7 @@ definitions:
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonItemsDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
Expand All @@ -4197,6 +4235,7 @@ definitions:
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonItemsDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
Expand Down
54 changes: 54 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import codecs
import csv
import gzip
import io
Expand All @@ -11,6 +12,7 @@
from io import BufferedIOBase, TextIOWrapper
from typing import Any, List, Optional

import ijson
import orjson
import requests

Expand Down Expand Up @@ -98,6 +100,58 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
logger.warning(f"Cannot decode/parse line {line!r} as JSON, error: {e}")


class _Utf8Recoder:
"""Lazily transcode a byte stream from a source encoding into UTF-8 bytes.

Lets a non-UTF-8 byte stream be fed to ijson while keeping it on the native byte
backend (ijson deprecates text-mode inputs). Bytes are read from the underlying
stream and decoded incrementally, so multi-byte characters split across read
boundaries are handled and memory stays bounded regardless of document size.
"""

def __init__(self, stream: BufferedIOBase, encoding: str) -> None:
self._stream = stream
self._decoder = codecs.getincrementaldecoder(encoding)()

def read(self, size: int = -1) -> bytes:
chunk = self._stream.read(size)
# `final` once the underlying stream is exhausted so a trailing partial sequence flushes.
return self._decoder.decode(chunk, final=not chunk).encode("utf-8")


@dataclass
class JsonItemsParser(Parser):
"""Streaming JSON parser that yields each element of a nested array.

Use this for very large single-document JSON responses where the records
of interest live under a nested array (e.g. `dataByDepartmentAndSearchTerm`,
`data.users`). Powered by `ijson`, this parser does not materialize the
full document — peak memory is bounded by a single record plus ijson's
internal parse buffers, regardless of document size.

`items_path` uses `ijson` dotted path syntax (e.g. `data.users`), not
JSONPath syntax (`$.data.users[*]`). Internally we append `.item`, which
is the `ijson` convention for "iterate elements of this array".
"""

items_path: str = ""
encoding: Optional[str] = "utf-8"
Comment thread
tolik0 marked this conversation as resolved.
Comment thread
tolik0 marked this conversation as resolved.

def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
if not self.items_path:
raise ValueError("JsonItemsParser requires a non-empty items_path.")
if self.encoding and codecs.lookup(self.encoding).name != "utf-8":
# ijson reads bytes natively (auto-detecting UTF-8/16/32). For an explicitly
# configured non-UTF-8 encoding (e.g. iso-8859-1) we transcode to UTF-8 bytes
# so ijson keeps using its fast byte backend rather than a (deprecated) text
# stream. The recoder decodes lazily in chunks, preserving bounded memory.
data = _Utf8Recoder(data, self.encoding) # type: ignore[assignment]
# ijson auto-selects the best available backend (yajl2_c when present)
# and reads from `data` lazily — it does not call `.read()` on the
# whole stream up front.
yield from ijson.items(data, f"{self.items_path}.item")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
tolik0 marked this conversation as resolved.


@dataclass
class CsvParser(Parser):
# TODO: migrate implementation to re-use file-base classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,20 @@ class JsonDecoder(BaseModel):
type: Literal["JsonDecoder"]


class JsonItemsDecoder(BaseModel):
type: Literal["JsonItemsDecoder"]
items_path: str = Field(
...,
description="Dot-separated path to the JSON array whose elements should be yielded as records. Uses `ijson` path syntax (e.g. `data.users`), not JSONPath syntax \u2014 do not include leading `$.` or trailing `[*]`.",
title="Items Path",
)
encoding: Optional[str] = Field(
"utf-8",
description="The character encoding of the JSON data. Defaults to UTF-8.",
title="Encoding",
)


class JsonlDecoder(BaseModel):
type: Literal["JsonlDecoder"]

Expand Down Expand Up @@ -2201,7 +2215,7 @@ class PaginationReset(BaseModel):

class GzipDecoder(BaseModel):
type: Literal["GzipDecoder"]
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder]
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonItemsDecoder, JsonlDecoder]

Comment thread
tolik0 marked this conversation as resolved.

class RequestBodyGraphQL(BaseModel):
Expand Down Expand Up @@ -2339,7 +2353,7 @@ class Config:
extra = Extra.allow

type: Literal["ZipfileDecoder"]
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] = Field(
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonItemsDecoder, JsonlDecoder] = Field(
...,
description="Parser to parse the decompressed data from the zipfile(s).",
title="Parser",
Expand Down Expand Up @@ -3011,6 +3025,7 @@ class SimpleRetriever(BaseModel):
decoder: Optional[
Union[
JsonDecoder,
JsonItemsDecoder,
XmlDecoder,
CsvDecoder,
JsonlDecoder,
Expand Down Expand Up @@ -3144,6 +3159,7 @@ class AsyncRetriever(BaseModel):
CsvDecoder,
GzipDecoder,
JsonDecoder,
JsonItemsDecoder,
JsonlDecoder,
IterableDecoder,
XmlDecoder,
Expand All @@ -3160,6 +3176,7 @@ class AsyncRetriever(BaseModel):
CsvDecoder,
GzipDecoder,
JsonDecoder,
JsonItemsDecoder,
JsonlDecoder,
IterableDecoder,
XmlDecoder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
CompositeRawDecoder,
CsvParser,
GzipParser,
JsonItemsParser,
JsonLineParser,
JsonParser,
Parser,
Expand Down Expand Up @@ -321,6 +322,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonFileSchemaLoader as JsonFileSchemaLoaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonItemsDecoder as JsonItemsDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonlDecoder as JsonlDecoderModel,
)
Expand Down Expand Up @@ -763,6 +767,7 @@ def _init_mappings(self) -> None:
HttpResponseFilterModel: self.create_http_response_filter,
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonItemsDecoderModel: self.create_json_items_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector,
GzipDecoderModel: self.create_gzip_decoder,
Expand Down Expand Up @@ -2671,6 +2676,14 @@ def create_jsonl_decoder(
stream_response=False if self._emit_connector_builder_messages else True,
)

def create_json_items_decoder(
self, model: JsonItemsDecoderModel, config: Config, **kwargs: Any
) -> Decoder:
return CompositeRawDecoder(
parser=ModelToComponentFactory._get_parser(model, config),
stream_response=False if self._emit_connector_builder_messages else True,
)

def create_gzip_decoder(
self, model: GzipDecoderModel, config: Config, **kwargs: Any
) -> Decoder:
Expand Down Expand Up @@ -2719,6 +2732,11 @@ def _get_parser(model: BaseModel, config: Config) -> Parser:
if isinstance(model, JsonDecoderModel):
# Note that the logic is a bit different from the JsonDecoder as there is some legacy that is maintained to return {} on error cases
return JsonParser()
elif isinstance(model, JsonItemsDecoderModel):
return JsonItemsParser(
items_path=model.items_path,
encoding=model.encoding,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
elif isinstance(model, JsonlDecoderModel):
return JsonLineParser()
elif isinstance(model, CsvDecoderModel):
Expand Down
Loading
Loading