feat: add JsonItemsDecoder for streaming large JSON responses#1049
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/cdk/streaming-json-items-decoder#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/cdk/streaming-json-items-decoderPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
|
Warning Review limit reached
More reviews will be available in 35 minutes and 35 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more credits in the billing tab to continue. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (6)
📝 WalkthroughWalkthroughAdds JsonItemsDecoder schema/model, an ijson-backed JsonItemsParser with optional lazy UTF‑8 recoder, factory wiring to produce a streaming CompositeRawDecoder, ijson runtime dependency, and unit tests for extraction, encoding, gzip composition, validation, and laziness. wdyt? ChangesJsonItemsDecoder Streaming JSON Parser
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
2646-2680: ⚡ Quick winClarify ijson path format in
items_pathdescription.The description mentions "Uses
ijsonpath syntax (e.g.data.users)" but doesn't explicitly state that the implementation will append.itemto the provided path when callingijson.items(data, "{items_path}.item"). From the PR objectives, I see the implementation does this automatically, which is great—but users configuring this in manifests might be confused about what to specify.The current description warns against trailing
[*](JSONPath syntax), which is helpful. But it might be clearer to explicitly state: "Specify the path to the parent object/array containing the items, without the final.itemsuffix (which is added automatically)." For example, if your JSON is{"data": {"users": [...]}}, specify"data.users", not"data.users.item".Also, consider adding an example for a top-level array case, such as specifying
"item"for[{...}, {...}], to help users understand the simplest scenario. Wdyt?Suggested description refinement
description: >- Dot-separated path to the JSON array whose elements should be - yielded as records. Uses `ijson` path syntax (e.g. `data.users`), - not JSONPath syntax — do not include leading `$.` or trailing - `[*]`. + yielded as records. Specify the path to the parent object/array + containing the items (e.g., 'data.users' for nested arrays, or + 'item' for a top-level array). The implementation automatically + appends '.item' when calling ijson.items(). Do not use JSONPath + syntax (no leading '$.' or trailing '[*]').🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around lines 2646 - 2680, Update the JsonItemsDecoder items_path description to explicitly state that the implementation appends the `.item` suffix when calling ijson (i.e., users should provide the path to the parent array/object without the final `.item`), mention that trailing JSONPath syntax like `[*]` should not be used, and add a top-level-array example (e.g., use "item" for a top-level JSON array like [{...}, {...}]); reference the items_path property and ijson usage so readers know the `.item` suffix is added automatically by the decoder.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py`:
- Around line 117-118: The field items_path in JsonItemsParser is declared with
a default empty string which contradicts the schema
(declarative_component_schema.py defines it as required); change the attribute
to a required str (remove the default: items_path: str) so instantiating
JsonItemsParser without items_path fails fast and matches the schema, and keep
or tighten the existing runtime validation that previously checked for empty
strings to validate non-empty values where parsing occurs.
- Around line 118-126: The parse method of JsonItemsParser declares an encoding
attribute but never uses it; update parse(self, data: BufferedIOBase) so that if
self.encoding is set and not a UTF-8/utf8 variant it wraps the byte stream with
io.TextIOWrapper(data, encoding=self.encoding) before calling ijson.items(data,
f"{self.items_path}.item") (ensure you import io and perform case-insensitive
comparison like self.encoding.lower() not in ('utf-8','utf8')). Keep the
existing items_path validation and yield-from behavior; this ensures
user-specified encodings (e.g., ISO-8859-1) are honored by ijson.items.
In `@unit_tests/sources/declarative/decoders/test_composite_decoder.py`:
- Around line 521-535: Replace the broad pytest.raises(Exception) in
test_gzip_decoder_without_always_decompress_skips_mislabeled_gzip with a
specific expectation for the inner JSON decode failure (e.g.
pytest.raises(json.JSONDecodeError)) so the test proves the header-based
fallback route is failing during decoding; ensure the
list(decoder.decode(response)) call remains inside the pytest.raises context and
add/import json if not present so json.JSONDecodeError can be referenced; target
the assertion around the decoder produced by _build_gzip_decoder and the
decode() call to narrow the failure.
---
Nitpick comments:
In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 2646-2680: Update the JsonItemsDecoder items_path description to
explicitly state that the implementation appends the `.item` suffix when calling
ijson (i.e., users should provide the path to the parent array/object without
the final `.item`), mention that trailing JSONPath syntax like `[*]` should not
be used, and add a top-level-array example (e.g., use "item" for a top-level
JSON array like [{...}, {...}]); reference the items_path property and ijson
usage so readers know the `.item` suffix is added automatically by the decoder.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 9b37a828-b488-4152-98c1-36f67c65c09d
⛔ Files ignored due to path filters (1)
poetry.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/declarative_component_schema.yamlairbyte_cdk/sources/declarative/decoders/composite_raw_decoder.pyairbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.pypyproject.tomlunit_tests/sources/declarative/decoders/test_composite_decoder.py
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds a streaming JSON decoder/parser to yield items from a nested array without loading the full response into memory, and extends gzip decoding to optionally force decompression even when headers are misleading.
Changes:
- Introduces
JsonItemsDecoder/JsonItemsParserbacked byijsonfor streaming nested-array extraction. - Adds
always_decompresstoGzipDecoderto handle mis-labeled gzip responses. - Expands unit tests and schema/model wiring to support the new decoder types.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| unit_tests/sources/declarative/decoders/test_composite_decoder.py | Adds tests for JsonItemsParser behavior and gzip always_decompress behavior. |
| pyproject.toml | Adds ijson dependency to support streaming JSON parsing. |
| airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py | Wires JsonItemsDecoder into the factory and adds always_decompress behavior in gzip decoder creation. |
| airbyte_cdk/sources/declarative/models/declarative_component_schema.py | Adds JsonItemsDecoder model and extends decoder unions + GzipDecoder.always_decompress. |
| airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py | Implements JsonItemsParser using ijson. |
| airbyte_cdk/sources/declarative/declarative_component_schema.yaml | Updates declarative component JSON schema to include JsonItemsDecoder and always_decompress. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- JsonItemsParser now honors a configured non-UTF-8 encoding by transcoding to UTF-8 bytes via a lazy streaming recoder (keeps ijson on its byte backend; bounded memory). - create_gzip_decoder: respect always_decompress in connector-builder mode too (previously the builder early-return skipped it). - tests: exercise non-ASCII bytes in the encoding test (ensure_ascii=False); narrow the always_decompress negative test to ijson.JSONError. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Thanks for the reviews — addressed in e687ea1:
Left as-is (happy to change if you prefer): |
52fed33 to
288cf47
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
4036-4053:⚠️ Potential issue | 🟠 Major | ⚡ Quick winCould we add
always_decompresstoGzipDecoderschema to keep config/runtime contracts aligned, wdyt?
GzipDecodercurrently exposes onlytypeanddecoder, but this PR’s objective includes schema support foralways_decompress. Without it, manifests using the new flag can fail schema validation even if runtime/factory supports it.Proposed schema patch
GzipDecoder: title: gzip description: Select 'gzip' for response data that is compressed with gzip. Requires specifying an inner data type/decoder to parse the decompressed data. type: object required: - type - decoder properties: type: type: string enum: [GzipDecoder] + always_decompress: + title: Always Decompress + description: Force gzip decompression regardless of Content-Encoding/Content-Type headers. + type: boolean + default: false decoder: anyOf: - "$ref": "`#/definitions/CsvDecoder`" - "$ref": "`#/definitions/GzipDecoder`" - "$ref": "`#/definitions/JsonDecoder`" - "$ref": "`#/definitions/JsonItemsDecoder`" - "$ref": "`#/definitions/JsonlDecoder`"🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around lines 4036 - 4053, Add an optional boolean property named always_decompress to the GzipDecoder schema (the object with title "gzip" / symbol GzipDecoder) so manifests using that flag validate; specifically, under the properties of the gzip/GzipDecoder object add always_decompress: type: boolean with a short description and a default (false) and do not add it to required. Ensure the new property appears alongside existing properties (type, decoder) so the schema and runtime/factory remain aligned.airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2703-2713:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
always_decompressis not applied in the gzip factory path — could we wire it here, wdyt?
create_gzip_decodercurrently ignoresmodel.always_decompress, so mislabeled gzip payloads can still skip decompression (especially in builder mode whereinner_parseris forced). This breaks the new decoder contract.Proposed patch
def create_gzip_decoder( self, model: GzipDecoderModel, config: Config, **kwargs: Any ) -> Decoder: @@ gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser + if getattr(model, "always_decompress", False): + return CompositeRawDecoder( + parser=gzip_parser, + stream_response=False if self._emit_connector_builder_messages else True, + ) + if self._emit_connector_builder_messages: @@ return CompositeRawDecoder(gzip_parser.inner_parser, False)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` around lines 2703 - 2713, The gzip decoder factory path fails to honor model.always_decompress, so update create_gzip_decoder to wire that flag into both branches: when returning CompositeRawDecoder(gzip_parser.inner_parser, False) (the builder/message path) pass model.always_decompress instead of False (or set the corresponding always_decompress kwarg), and when calling CompositeRawDecoder.by_headers(...) include the always_decompress parameter set from model.always_decompress (instead of relying on fallback behavior). Locate the factory function (create_gzip_decoder / the block referencing gzip_parser and self._emit_connector_builder_messages) and propagate model.always_decompress into the CompositeRawDecoder construction in both places.
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
2663-2670: ⚡ Quick winWould you consider enforcing non-empty
items_pathin schema (minLength: 1) to fail fast at validation time, wdyt?This shifts the current runtime fast-fail into schema validation and improves manifest UX.
Suggested tweak
items_path: title: Items Path description: >- Dot-separated path to the JSON array whose elements should be yielded as records. Uses `ijson` path syntax (e.g. `data.users`), not JSONPath syntax — do not include leading `$.` or trailing `[*]`. type: string + minLength: 1🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around lines 2663 - 2670, The schema property "items_path" currently allows empty strings which delays failure to runtime; update the declarative_component_schema.yaml entry for items_path to enforce non-empty values by adding "minLength: 1" alongside "type: string" (the title "Items Path" and description stay the same) so manifest validation fails fast; update any unit tests or schema consumers that assume empty values if present.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@airbyte_cdk/sources/declarative/models/declarative_component_schema.py`:
- Around line 2210-2213: GzipDecoder model is missing the always_decompress
field, so add an attribute always_decompress: bool = False to the GzipDecoder
BaseModel to match the manifest and factory contract; update any places that
construct or validate GzipDecoder instances (the decoder union that references
GzipDecoder and any factory/deserialize code handling GzipDecoder) to accept and
propagate this new flag. Ensure the Pydantic default is False and that
serialization/deserialization and factory creation paths preserve this field.
---
Outside diff comments:
In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 4036-4053: Add an optional boolean property named
always_decompress to the GzipDecoder schema (the object with title "gzip" /
symbol GzipDecoder) so manifests using that flag validate; specifically, under
the properties of the gzip/GzipDecoder object add always_decompress: type:
boolean with a short description and a default (false) and do not add it to
required. Ensure the new property appears alongside existing properties (type,
decoder) so the schema and runtime/factory remain aligned.
In `@airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py`:
- Around line 2703-2713: The gzip decoder factory path fails to honor
model.always_decompress, so update create_gzip_decoder to wire that flag into
both branches: when returning CompositeRawDecoder(gzip_parser.inner_parser,
False) (the builder/message path) pass model.always_decompress instead of False
(or set the corresponding always_decompress kwarg), and when calling
CompositeRawDecoder.by_headers(...) include the always_decompress parameter set
from model.always_decompress (instead of relying on fallback behavior). Locate
the factory function (create_gzip_decoder / the block referencing gzip_parser
and self._emit_connector_builder_messages) and propagate model.always_decompress
into the CompositeRawDecoder construction in both places.
---
Nitpick comments:
In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 2663-2670: The schema property "items_path" currently allows empty
strings which delays failure to runtime; update the
declarative_component_schema.yaml entry for items_path to enforce non-empty
values by adding "minLength: 1" alongside "type: string" (the title "Items Path"
and description stay the same) so manifest validation fails fast; update any
unit tests or schema consumers that assume empty values if present.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 7f324512-f39d-4931-abb1-26a88b1d491b
⛔ Files ignored due to path filters (1)
poetry.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/declarative_component_schema.yamlairbyte_cdk/sources/declarative/decoders/composite_raw_decoder.pyairbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.pypyproject.tomlunit_tests/sources/declarative/decoders/test_composite_decoder.py
✅ Files skipped from review due to trivial changes (1)
- pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
|
/prerelease
|
…s reports to prevent OOM
The Brand Analytics report streams buffered the entire gzip-decompressed JSON document
(multi-GB) into memory, causing OOM kills (exit 137) on large sellers. Switch their
download_decoder to the streaming GzipDecoder{JsonItemsDecoder}, which yields records
one at a time via ijson.
Amazon serves these report documents gzip-compressed but labels them
'Content-Encoding: identity' by default, so the header-based GzipDecoder would skip
decompression. Request enableContentEncodingUrlHeader=true on getReportDocument so Amazon
sets an honest Content-Encoding (gzip when compressed, identity when not).
Validated end-to-end on real reports: 3.2GB search-terms report peak memory ~10GB -> ~150MB
with identical records; repeat-purchase stream reads 475 records cleanly.
Uses prerelease CDK 7.21.2.post1.dev27346905256 (JsonItemsDecoder, airbytehq/airbyte-python-cdk#1049);
baseImage to be finalized to the official SDM release once #1049 merges.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
288cf47 to
c0e2647
Compare
|
Addressed the latest review feedback and rebased onto current main (v7.22.0):
41 decoder tests pass. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py`:
- Around line 2735-2739: The JsonItemsDecoderModel->JsonItemsParser construction
currently coerces explicit null to "utf-8" via encoding=model.encoding or
"utf-8"; change it to pass encoding=model.encoding directly so an explicit None
is preserved (JsonItemsParser should then receive Optional[str] as intended)
while relying on the model's default when the field is omitted; update the
JsonItemsParser call site in the block handling JsonItemsDecoderModel to use
encoding=model.encoding instead of the 'or' fallback.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 4bcc0d5b-a794-4f4b-95f7-b825ffbeca1d
⛔ Files ignored due to path filters (1)
poetry.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/declarative_component_schema.yamlairbyte_cdk/sources/declarative/decoders/composite_raw_decoder.pyairbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.pypyproject.tomlunit_tests/sources/declarative/decoders/test_composite_decoder.py
🚧 Files skipped from review as they are similar to previous changes (4)
- pyproject.toml
- airbyte_cdk/sources/declarative/models/declarative_component_schema.py
- airbyte_cdk/sources/declarative/declarative_component_schema.yaml
- airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Adds a streaming JSON decoder for very large single-document JSON responses where the records live under a nested array. JsonItemsParser yields each array element via ijson, so peak memory is bounded by a single record rather than the whole document. Composes with the existing CompositeRawDecoder hierarchy (gzip/zip) and is wired into the decoder unions + factory. Adds ijson as a first-class CDK dependency. JsonItemsParser also honors a configured non-UTF-8 encoding by transcoding to UTF-8 bytes via a lazy streaming recoder, keeping ijson on its native byte backend. Adopts and supersedes #1026. Co-Authored-By: devin-ai-integration[bot] <devin-ai-integration[bot]@users.noreply.github.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
c0e2647 to
81c5d8f
Compare
|
Addressed: |
What
Adds a streaming JSON decoder (
JsonItemsDecoder) for very large single-document JSON responses where the records of interest live under a nested array.JsonItemsParseryields each array element viaijson, so peak memory is bounded by a single record instead of the whole document.Adopts and supersedes #1026 (whose authoring Devin session can no longer respond to review).
How
JsonItemsParserincomposite_raw_decoder.py—ijson.items(data, f"{items_path}.item"). Composes with the existingCompositeRawDecoderhierarchy (gzip/zip).JsonItemsDecoderadded todeclarative_component_schema.yamland wired into the decoder unions; pydantic models regenerated; factorycreate_json_items_decoderregistered.ijsonadded as a first-class CDK dependency (dropped from the deptry ignore list).JsonItemsParserhonors a configured non-UTF-8encodingby transcoding to UTF-8 bytes through a lazy streaming recoder, keeping ijson on its fast byte backend (text-mode input is deprecated in ijson) — verified to stay bounded-memory.Scope note (changed from the original revision)
An earlier revision of this PR also added an
always_decompressflag toGzipDecoder, to handle Amazon Seller Partner reports that return gzip bodies mislabeled asContent-Encoding: identity. That turned out to be unnecessary: the consuming connector can instead requestenableContentEncodingUrlHeader=trueongetReportDocument, which makes Amazon set an honestContent-Encoding: gzip(when compressed) /identity(when not), so the stock header-basedGzipDecoderalready handles both cases. The gzip change has been dropped to keep this PR focused onJsonItemsDecoder.Validation
Validated against a real 3.45 GB Amazon Brand Analytics report (
GzipDecoder{JsonItemsDecoder}, with honest gzip headers): 8,533,151 records, ~150 MB peak RSS (vs ~10 GB for the buffering decoder), correct UTF-8.Tests
unit_tests/.../decoders/test_composite_decoder.py— JsonItems cases (top-level/nested/empty/encoding/gzip-composition/missing-path/lazy). 41 decoder tests pass.🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Tests