feat: add JsonItemsDecoder for streaming large JSON responses#1026
feat: add JsonItemsDecoder for streaming large JSON responses#1026devin-ai-integration[bot] wants to merge 9 commits into
Conversation
Adds the ijson streaming JSON parser as a direct dependency so connectors that ship inside the source-declarative-manifest base image can stream-parse very large JSON response bodies without materializing the full document in memory. Motivation: source-amazon-seller-partner currently OOMs while reading GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT documents that can exceed 3 GB uncompressed. See airbytehq/oncall#12143.
Adds a new declarative decoder, JsonItemsDecoder, that streams elements of a nested array out of a single JSON document one at a time using the ijson library. This lets manifest-only connectors decode multi-GB JSON responses (e.g. Amazon Seller Partner Brand Analytics reports) without loading the full document into memory. - New `JsonItemsParser` in composite_raw_decoder.py (wraps ijson.items) - New `JsonItemsDecoder` schema entry, wired into GzipDecoder / ZipfileDecoder / top-level decoder unions so it composes with the existing decoder hierarchy - Pydantic models regenerated from schema - Factory: create_json_items_decoder + JsonItemsDecoderModel handling in _get_parser - Drop ijson from deptry DEP002 ignore list now that the CDK imports it directly; update pyproject.toml comment to reflect first-class use - Unit tests covering top-level, nested, empty, encoding, gzip composition, missing path validation, and lazy streaming behavior
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1778790048-streaming-json-items-decoder#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1778790048-streaming-json-items-decoderPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
…msDecoder The earlier regeneration via `poe assemble` produced datamodel-code-generator drift unrelated to this PR (Optional[conint(ge=1)] instead of Optional[int] + ge=1 kwarg, removed ScopesJoinStrategy, reordered classes, whitespace in descriptions). That drift broke mypy on Python 3.13. Reset the generated file to match main and add only the new `JsonItemsDecoder` Pydantic class manually, mirroring the style of `JsonDecoder` / `JsonlDecoder`.
Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
…ecoder in all Union types Ran `poe assemble` (datamodel-codegen) against the YAML schema to regenerate the Pydantic model. This adds JsonItemsDecoder to: - GzipDecoder.decoder - ZipfileDecoder.decoder - SimpleRetriever.decoder - AsyncRetriever.decoder - AsyncRetriever.download_decoder The YAML schema already had JsonItemsDecoder in these anyOf unions, but the previously generated Python model was stale and missing it. This fixes manifest validation for GzipDecoder -> JsonItemsDecoder pipelines (needed by airbytehq/airbyte#78360). Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Regenerated
|
|
I reran the cross-PR smoke test against |
Instead of a full regeneration (which introduced conint/confloat drift that broke MyPy), this takes the main branch model and makes only the minimal edits needed: 1. Add JsonItemsDecoder class definition (after JsonDecoder) 2. Add JsonItemsDecoder to GzipDecoder.decoder Union 3. Add JsonItemsDecoder to ZipfileDecoder.decoder Union 4. Add JsonItemsDecoder to SimpleRetriever.decoder Union 5. Add JsonItemsDecoder to AsyncRetriever.decoder Union 6. Add JsonItemsDecoder to AsyncRetriever.download_decoder Union Verified: MyPy clean (454 source files), ruff lint+format clean, all decoder (60) and parser (165) tests pass. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
The previous poetry lock resolved mypy 2.1.0 (was 1.14.1 on main), which introduced new type errors in unrelated files. Using --no-update keeps existing versions pinned and only adds the new ijson dependency. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
|
/prerelease
|
|
❌ Cannot revive Devin session - the session is too old. Please start a new session instead. |
Adds a streaming JSON decoder for very large single-document JSON responses where the records live under a nested array. JsonItemsParser yields each array element via ijson, so peak memory is bounded by a single record rather than the whole document. Composes with the existing CompositeRawDecoder hierarchy (gzip/zip) and is wired into the decoder unions + factory. Adds ijson as a first-class CDK dependency. JsonItemsParser also honors a configured non-UTF-8 encoding by transcoding to UTF-8 bytes via a lazy streaming recoder, keeping ijson on its native byte backend. Adopts and supersedes #1026. Co-Authored-By: devin-ai-integration[bot] <devin-ai-integration[bot]@users.noreply.github.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adds a streaming JSON decoder for very large single-document JSON responses where the records live under a nested array. JsonItemsParser yields each array element via ijson, so peak memory is bounded by a single record rather than the whole document. Composes with the existing CompositeRawDecoder hierarchy (gzip/zip) and is wired into the decoder unions + factory. Adds ijson as a first-class CDK dependency. JsonItemsParser also honors a configured non-UTF-8 encoding by transcoding to UTF-8 bytes via a lazy streaming recoder, keeping ijson on its native byte backend. Adopts and supersedes #1026. Co-Authored-By: devin-ai-integration[bot] <devin-ai-integration[bot]@users.noreply.github.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adds a streaming JSON decoder for very large single-document JSON responses where the records live under a nested array. JsonItemsParser yields each array element via ijson, so peak memory is bounded by a single record rather than the whole document. Composes with the existing CompositeRawDecoder hierarchy (gzip/zip) and is wired into the decoder unions + factory. Adds ijson as a first-class CDK dependency. JsonItemsParser also honors a configured non-UTF-8 encoding by transcoding to UTF-8 bytes via a lazy streaming recoder, keeping ijson on its native byte backend. Adopts and supersedes #1026. Co-Authored-By: devin-ai-integration[bot] <devin-ai-integration[bot]@users.noreply.github.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
Adds a new declarative
JsonItemsDecoderthat streams elements of a nested array out of a single JSON document one at a time, so manifest-only connectors can decode multi-GB JSON responses without OOMing.Related to https://github.com/airbytehq/oncall/issues/12143:
That issue surfaced 8 GB-cap OOM (
exit code 137) on source-amazon-seller-partner's Brand Analytics streams. Today the only ways to JSON-decode a single large document in the declarative CDK areJsonDecoder(fullresponse.content→orjson.loads) andGzipDecoderwrappingJsonDecoder(full decompress → full parse). Both materialize the entire payload in memory. The closed connector-side fix (airbytehq/airbyte#77709) added a custom Python component for this, but per maintainer feedback we want it as a first-class CDK component so any connector can opt in via YAML.What's in this PR
CDK-side only:
JsonItemsParserinairbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py, alongsideJsonParser/JsonLineParser/CsvParser/GzipParser. Usesijson.items(stream, f"{items_path}.item")to lazily yield each element of the configured array.JsonItemsDecoderindeclarative_component_schema.yamlwithitems_path(required) andencoding(defaultutf-8). Added to theanyOfunions forGzipDecoder.decoder,ZipfileDecoder.decoder, and the top-leveldecoder/download_decoderslots.poe assemble.model_to_component_factory.py:create_json_items_decoder+ a new branch in_get_parserthat buildsJsonItemsParser(items_path=..., encoding=...).ijson = "^3.3.0"added to[tool.poetry.dependencies](this is what was in the now-closed build: add ijson as a runtime dependency #1011). Since the CDK now importsijsondirectly, dropped theDEP002ignore entry that PR added.unit_tests/sources/declarative/decoders/test_composite_decoder.pycovering: top-level / nested / empty array paths, encoding, gzip composition, required-field validation, and a_CountingStream-based test confirming the parser yields the first item before consuming the full document (lazy streaming).Example connector manifest after this lands:
Declarative-First Evaluation
This is the declarative approach. The previous attempt used a custom Python component in the connector; the new component is a generic, reusable building block that any manifest-only connector can opt into via YAML, no custom code required. Existing declarative decoders (
JsonDecoder,GzipDecoderwrappingJsonDecoder) cannot stream a single large document — they buffer the full payload — so neither can solve the OOM on their own.Local verification
poetry run pytest unit_tests/sources/declarative/decoders/ -x→ 60 passedpoetry run pytest unit_tests/sources/declarative/parsers/ -x→ 157 passedpoetry run ruff check+ruff format --checkon changed files → cleanpoetry run mypy --config-file mypy.inion the two modifiedairbyte_cdk/files → cleanReview & Testing Checklist for Human
items_pathsyntax (ijsondotted path with implicit.itemsuffix, not JSONPath) is the right ergonomics for connector authors. The schema description and parser docstring spell this out, but it differs from howDpathExtractorusesfield_path.anyOfwiring:JsonItemsDecoderis now valid whereverJsonDecoderis valid (top-leveldecoder/download_decoder, insideGzipDecoder, insideZipfileDecoder). Confirm that's the desired scope.ijson = "^3.3.0"is acceptable (3.3.0 is the floor whereijson.items(stream, "path.item")behaves consistently across backends; 3.5.0 is what poetry currently resolves to).ijsonships, the follow-up connector PR replacessource-amazon-seller-partner's customGzipJsonStreamingItemsDecoderwith a manifest-onlyJsonItemsDecoder(5 Brand Analytics streams). End-to-end memory metrics will be captured then; the CDK unit test only asserts the parser is lazy (does not read the full document before yielding the first item) and that ijson is wired correctly.Notes
JsonItemsDecoderis a sibling ofJsonDecoderrather than a flag on it — JSON path semantics and encoding are first-class enough that overloadingJsonDecoderwould muddy the schema for the common case.JsonItemsDecoder+items_path. Avoided "streaming" in the name because streaming is implicit forCompositeRawDecoder-backed parsers.ijsonis published, I'll open a connector-side PR that:GzipJsonStreamingItemsDecoderfromsource-amazon-seller-partner/components.pymanifest.yamltoJsonItemsDecoder+DpathExtractorbaseImageand the connector PATCH versionLink to Devin session: https://app.devin.ai/sessions/e31a7df6ebe54ce4a68e0eecc7117555