Skip to content

Conversation

@aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Nov 12, 2025

Summary

Adds the ability to fetch individual records by primary key from declarative concurrent connectors. This implements Phase 1a of #833 for PyAirbyte's use case with ConcurrentDeclarativeSource.

What's being added to main:

  • ConcurrentDeclarativeSource.fetch_record(stream_name, pk_value, config) - Primary API for fetching single records
  • SimpleRetriever.fetch_one(pk_value, records_schema) - Underlying HTTP GET mechanism
  • AbstractStream.fetch_record(pk_value) - Interface method (raises NotImplementedError by default)
  • RecordNotFoundException - Exception raised when record is not found
  • 6 unit tests covering happy path, 404s, server errors, invalid inputs, and empty responses

Key implementation details:

  • Accepts string primary keys only (e.g., "123")
  • Uses convention-based path construction: appends /{pk_value} to base path
  • Raises RecordNotFoundException for empty/ignored responses
  • HTTP errors (including 404) propagate from requester's error handling
  • Only supports streams with SimpleRetriever (raises NotImplementedError for others)

Review & Testing Checklist for Human

⚠️ CRITICAL - This PR has NOT been tested end-to-end with a real connector:

  • Test with a real declarative connector - The ConcurrentDeclarativeSource.fetch_record() method reconstructs components from the manifest but has never been tested with an actual connector. Test with a simple declarative connector (e.g., JSONPlaceholder, PokeAPI) to verify it works.

  • Verify component reconstruction - The logic that creates declarative_stream from stream_config (lines 552-557 in concurrent_declarative_source.py) has never been exercised. Verify it correctly handles:

    • Standard streams with SimpleRetriever
    • Dynamic streams
    • Streams with custom schema loaders
    • Streams without SimpleRetriever (should raise NotImplementedError)
  • Test error handling - Verify that:

    • 404 responses propagate as exceptions (not caught as RecordNotFoundException)
    • Empty responses raise RecordNotFoundException
    • Server errors (5xx) propagate correctly
    • Invalid stream names raise ValueError with helpful message
  • Verify path construction - Test that the simple base_path + "/" + pk_value pattern works for your target use cases. This may not work for:

    • APIs that use query parameters for IDs (e.g., ?id=123)
    • Nested resources (e.g., /companies/{id}/properties/{prop})
    • APIs with non-standard path patterns

Recommended Test Plan

from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource

# Load a simple declarative manifest
source = ConcurrentDeclarativeSource(
    config={"api_key": "..."},
    source_config=manifest_dict,
)

# Test happy path
record = source.fetch_record("posts", "1", config)
assert record["id"] == "1"

# Test 404 (should propagate as exception, not RecordNotFoundException)
try:
    source.fetch_record("posts", "999999", config)
except Exception as e:
    print(f"404 error: {e}")

# Test invalid stream
try:
    source.fetch_record("nonexistent", "1", config)
except ValueError as e:
    assert "not found in source" in str(e)

Notes

  • All 6 unit tests pass (covering SimpleRetriever.fetch_one scenarios)
  • Ruff linter and formatter pass
  • No integration tests exist for ConcurrentDeclarativeSource.fetch_record
  • Only supports SimpleRetriever - other retriever types will raise NotImplementedError

Requested by: AJ Steers (aj@airbyte.io, @aaronsteers)
Devin Session: https://app.devin.ai/sessions/e0d627ac7e89447a9ebddd515104e2c3

Summary by CodeRabbit

  • New Features
    • Streams and declarative sources can fetch individual records by primary key; declarative implementations return single-object responses or surface a clear "record not found" error when absent. Other stream types will explicitly indicate if per-record fetch is unsupported.
  • Tests
    • Added unit tests for successful fetches, not-found cases, server errors, invalid inputs, and empty/no-response scenarios.

Implements Phase 1 of GitHub issue #833

Changes:
- Add fetch_one() method to SimpleRetriever for fetching single records by PK
- Add fetch_record() base method to Stream class
- Implement fetch_record() in DeclarativeStream to delegate to retriever
- Add fetch_record() helper method to AbstractSource
- Add comprehensive unit tests for fetch_one functionality

The implementation uses convention-based path construction (appending PK value
to base path) and supports both simple string PKs and composite dict PKs.
Handles 404 responses gracefully by returning None.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copilot AI review requested due to automatic review settings November 12, 2025 19:46
@devin-ai-integration
Copy link
Contributor

Original prompt from AJ Steers
Received message in Slack channel #ask-devin-ai:

@Devin - Let's create a plan of action for the first phase of the project defined here: <https://github.com/airbytehq/airbyte-python-cdk/issues/833>

Specifically, for now we ONLY want to add a "get_one" implementation to the existing SimpleRetriever implementation - ignoring all other requirements listed there and not (yet) implementing a new retriever class.
We don't care yet about CLI-based invocation but we need a "fetch_record(pk_value: Any) -&gt; dict:" API method on the Source or the Stream.
Thread URL: https://airbytehq-team.slack.com/archives/C08BHPUMEPJ/p1762969856048879

@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions github-actions bot added the enhancement New feature or request label Nov 12, 2025
@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1762976446-add-fetch-one-to-simple-retriever#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1762976446-add-fetch-one-to-simple-retriever

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements Phase 1 of the "get_one" functionality to fetch individual records by primary key from declarative streams using SimpleRetriever. The implementation adds a convention-based approach where the primary key value is appended to the base path (e.g., /posts/123 for simple PKs, /companies/123/status for composite PKs sorted alphabetically).

Key changes:

  • Added fetch_one() method to SimpleRetriever with convention-based path construction
  • Added fetch_record() methods to Stream and DeclarativeStream classes to expose the functionality
  • Added helper method in AbstractSource for convenient access across streams

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
unit_tests/sources/declarative/retrievers/test_simple_retriever.py Comprehensive test coverage for fetch_one() including simple PKs, composite PKs, 404 handling, error propagation, and edge cases
airbyte_cdk/sources/streams/core.py Base fetch_record() method added to Stream class that raises NotImplementedError by default
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py Core fetch_one() implementation with convention-based path construction and error handling
airbyte_cdk/sources/abstract_source.py Convenience method for fetching records by stream name
airbyte_cdk/legacy/sources/declarative/declarative_stream.py DeclarativeStream implementation that delegates to SimpleRetriever's fetch_one()

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 12, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

📝 Walkthrough

Walkthrough

Adds per-record fetch support: a new SimpleRetriever.fetch_one method, RecordNotFoundException, ConcurrentDeclarativeSource.fetch_record delegating to the retriever, an AbstractStream.fetch_record interface (default NotImplementedError), and unit tests for fetch_one.

Changes

Cohort / File(s) Summary
Exception Definition
airbyte_cdk/sources/declarative/exceptions.py
Adds RecordNotFoundException (subclass of ReadException) used when a requested record is not found.
Retriever Implementation
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Adds fetch_one(self, pk_value: str, records_schema: Mapping[str, Any]) -> Mapping[str, Any]: builds base path via requester.get_path (empty slice), appends PK to form fetch path, sends request, parses response, returns first record mapping or single-object JSON body; raises RecordNotFoundException for 404/None/empty results and propagates other HTTP errors.
Public API / Source
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Adds ConcurrentDeclarativeSource.fetch_record(stream_name, pk_value, config=None) which resolves a manifest stream, requires a SimpleRetriever retriever (else raises NotImplementedError), and delegates to SimpleRetriever.fetch_one.
Stream Interface
airbyte_cdk/sources/streams/concurrent/abstract_stream.py
Adds AbstractStream.fetch_record(self, pk_value: str) -> Mapping[str, Any] with default implementation raising NotImplementedError indicating fetch-by-PK support only for declarative/simple retriever streams.
Unit Tests
unit_tests/sources/declarative/retrievers/test_simple_retriever.py
Adds tests for fetch_one: successful fetch, 404 -> RecordNotFoundException, non-404 error propagation, invalid PK type, None response, empty/falsey data handling, and single-object response handling.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant CDS as ConcurrentDeclarativeSource
    participant SR as SimpleRetriever
    participant HTTP as HTTP Client

    Caller->>CDS: fetch_record(stream_name, pk_value)
    activate CDS
    CDS->>CDS: resolve stream config, retriever & schema
    CDS->>SR: fetch_one(pk_value, schema)
    deactivate CDS

    activate SR
    SR->>SR: base_path = requester.get_path(slice=empty)
    SR->>SR: fetch_path = base_path + "/" + pk_value
    SR->>HTTP: send_request(GET, fetch_path)
    activate HTTP
    HTTP-->>SR: response (200 / 404 / error / None)
    deactivate HTTP

    alt 404 / None / empty list
        SR->>SR: raise RecordNotFoundException
    else 200 with records
        SR->>SR: parse response -> records
        SR->>SR: return first record mapping
    else other HTTP error
        SR->>SR: propagate error
    end
    deactivate SR

    SR-->>CDS: record or exception
    CDS-->>Caller: record or exception
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Areas to inspect closely:
    • Path construction and PK escaping when concatenating pk_value.
    • Consistency that 404 / None / empty-list map to RecordNotFoundException while other HTTP errors propagate.
    • Parsing logic in _parse_response and tests for single-object vs. list responses — are edge cases covered?
    • ConcurrentDeclarativeSource.fetch_record behavior when retriever is not SimpleRetriever and manifest resolution.

Would you like me to suggest a small utility to safely join URL path segments to avoid manual concatenation? wdyt?

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 75.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and concisely summarizes the main changes: adding fetch_record() methods to Source and Stream backed by SimpleRetriever's fetch_one() implementation.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1762976446-add-fetch-one-to-simple-retriever

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)

205-227: Consider validating pk_value format against the stream's primary_key definition?

The method accepts both string and dict pk_value but doesn't validate whether the format matches this stream's actual self.primary_key. For instance, if a stream has primary_key = ["company_id", "property"] but a caller passes a simple string "123", this will proceed to call retriever.fetch_one() which might construct an incorrect path.

Should we add a check here to ensure pk_value format aligns with the primary_key definition? For example:

  • If self.primary_key is a string or single-element list, require pk_value to be a string
  • If self.primary_key is a multi-element list, require pk_value to be a dict with matching keys

This would catch mismatches early with a clearer error message. Wdyt?

Additionally, consider adding debug logging when fetching records to aid troubleshooting:

 def fetch_record(self, pk_value: Any) -> Optional[Mapping[str, Any]]:
     """..."""
     if not isinstance(self.retriever, SimpleRetriever):
         raise NotImplementedError(...)
     
+    self.logger.debug(f"Fetching record from stream {self.name} with pk_value: {pk_value}")
     return self.retriever.fetch_one(pk_value=pk_value, records_schema=self.get_json_schema())
airbyte_cdk/sources/abstract_source.py (1)

328-357: Consider caching stream instances to avoid repeated initialization?

The method calls self.streams(config) on every invocation (line 348), which could be expensive if stream initialization involves API calls or heavy computation. Since this is a new "fetch by PK" use case that might be called frequently (e.g., in a loop fetching related records), this could become a performance bottleneck.

Would it make sense to cache the stream instances, perhaps using an LRU cache keyed by a hash of the config? Something like:

@lru_cache(maxsize=8)
def _get_stream_instances(self, config_hash: str) -> Dict[str, Stream]:
    # config_hash would be a frozen/hashable representation of config
    return {s.name: s for s in self.streams(config)}

This would maintain the existing behavior for the first call but speed up subsequent calls. Wdyt?

Alternatively, if the current behavior is intentional (always get fresh stream instances), that's fine too - just wanted to flag the potential performance consideration for Phase 2 optimizations.

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)

676-683: Path construction for composite PKs: Document the key ordering assumption?

For composite primary keys (lines 679-681), the implementation sorts keys alphabetically and joins their values with "/". For example, {"company_id": "123", "property": "status"} becomes the path segment "123/status" (because "company_id" < "property" alphabetically).

This convention-based approach is pragmatic for Phase 1, and the inline comments (lines 659-667) appropriately acknowledge that future phases might need configurable path templates. However, I'm wondering if we should make the key ordering even more explicit in the docstring examples?

Currently the docstring shows:

record = retriever.fetch_one({"company_id": "123", "property": "status"}, schema)

Could we add a note like: "Note: For composite PKs, values are ordered alphabetically by key name" to make this behavior crystal clear for API developers? Wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0443aa and 38d13d3.

📒 Files selected for processing (5)
  • airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1 hunks)
  • airbyte_cdk/sources/abstract_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1 hunks)
  • airbyte_cdk/sources/streams/core.py (1 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (3)
airbyte_cdk/sources/declarative/requesters/requester.py (2)
  • get_path (62-71)
  • send_request (138-156)
airbyte_cdk/sources/types.py (3)
  • Record (21-72)
  • data (35-36)
  • associated_slice (39-40)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
  • name (118-126)
  • name (129-131)
  • fetch_one (629-744)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
airbyte_cdk/sources/abstract_source.py (1)
  • fetch_record (328-357)
airbyte_cdk/sources/streams/core.py (2)
  • fetch_record (466-484)
  • name (139-143)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
  • SimpleRetriever (59-744)
  • name (118-126)
  • name (129-131)
  • fetch_one (629-744)
airbyte_cdk/sources/abstract_source.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
  • fetch_record (205-226)
  • name (93-97)
  • name (100-102)
airbyte_cdk/sources/streams/core.py (2)
  • fetch_record (466-484)
  • name (139-143)
airbyte_cdk/sources/streams/core.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
  • fetch_record (205-226)
  • name (93-97)
  • name (100-102)
airbyte_cdk/sources/abstract_source.py (1)
  • fetch_record (328-357)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
  • get_path (195-208)
  • send_request (446-484)
  • _request_headers (304-325)
  • _request_params (327-356)
  • _request_body_data (358-382)
  • _request_body_json (384-407)
airbyte_cdk/sources/declarative/requesters/requester.py (2)
  • get_path (62-71)
  • send_request (138-156)
airbyte_cdk/sources/types.py (6)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • keys (137-138)
  • Record (21-72)
  • data (35-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (python)
🔇 Additional comments (4)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)

1649-1843: Excellent test coverage!

The seven test cases provide comprehensive coverage of the fetch_one() functionality:

  • Simple and composite primary keys with proper path construction validation
  • 404 handling (returns None as expected)
  • Non-404 error propagation
  • Type validation for pk_value
  • Edge cases (None response, empty records)

The tests clearly document the expected behavior and will catch regressions. Nice work!

airbyte_cdk/sources/streams/core.py (1)

466-484: LGTM - Clean base implementation!

The default fetch_record() implementation in the Stream base class is well-designed:

  • Clear NotImplementedError with an informative message indicating which stream types support this operation
  • Comprehensive docstring with examples for both simple and composite primary keys
  • Appropriate signature accepting Any for pk_value to accommodate different PK formats

This establishes a good foundation for the new functionality.

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)

715-720: Verify 404 detection robustness across different exception types?

The 404 detection logic (lines 717-719) checks hasattr(e, "response") and hasattr(e.response, "status_code") before accessing e.response.status_code. This works well for requests library exceptions, but could potentially miss 404s from other HTTP client implementations or custom error handlers.

Have you verified this works correctly with the requester's error handling? Specifically:

  1. Does self.requester.send_request() always wrap HTTP errors in exceptions with a .response attribute?
  2. Are there any custom error handlers in the declarative framework that might transform exceptions?

If we're confident the requester always uses requests library exceptions (or compatible ones), this is fine. Otherwise, we might want to be more defensive:

except Exception as e:
    # Check if this is a 404 (record not found) - return None
    status_code = None
    if hasattr(e, "response"):
        status_code = getattr(e.response, "status_code", None)
    elif hasattr(e, "status_code"):
        status_code = e.status_code
    
    if status_code == 404:
        return None
    raise

Wdyt?


629-744: Overall: Solid Phase 1 implementation with clear documentation!

The fetch_one() implementation is well-structured and includes:

  • ✅ Clear path construction logic for simple and composite PKs
  • ✅ Appropriate 404 handling (returns None) vs. error propagation
  • ✅ Reuse of existing request options and response parsing machinery
  • ✅ Good type checking with ValueError for invalid pk_value types
  • ✅ Comprehensive docstring with examples and future considerations
  • ✅ Inline comments documenting alternative approaches for future phases

The convention-based approach (appending PK values to base path) is reasonable for Phase 1, and you've appropriately documented the limitations and future enhancement possibilities.

One additional thought: Since this is Phase 1 and you mention testing against real declarative connectors in the PR description, it would be valuable to document any connectors you've tested this with (either in the PR description or as a comment). This helps future developers understand which path construction patterns have been validated in practice.

@github-actions
Copy link

github-actions bot commented Nov 12, 2025

PyTest Results (Fast)

3 818 tests  +7   3 806 ✅ +6   6m 28s ⏱️ -44s
    1 suites ±0      12 💤 +1 
    1 files   ±0       0 ❌ ±0 

Results for commit a2e8e55. ± Comparison against base commit f0443aa.

This pull request skips 1 test.
unit_tests.sources.declarative.test_concurrent_declarative_source ‑ test_read_with_concurrent_and_synchronous_streams

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Nov 12, 2025

PyTest Results (Full)

3 821 tests  +7   3 809 ✅ +7   11m 9s ⏱️ +5s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit a2e8e55. ± Comparison against base commit f0443aa.

♻️ This comment has been updated with latest results.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)

205-208: Type hint could be more precise and consistent, wdyt?

The type hint Any | Mapping[str, Any] is redundant since Any already encompasses Mapping[str, Any]. Additionally:

  • Your docstring indicates pk_value should be str for simple PKs (line 214), but the type hint says Any
  • SimpleRetriever.fetch_one expects Union[str, Mapping[str, Any]], which more accurately reflects the actual constraint
  • The rest of this file uses Union syntax (see line 50), not the | operator

Consider aligning the type hint with both the documentation and fetch_one's signature:

     def fetch_record(
         self,
-        pk_value: Any | Mapping[str, Any],
+        pk_value: Union[str, Mapping[str, Any]],
     ) -> Optional[Mapping[str, Any]]:

This would match the docstring, improve type safety, and maintain consistency with the rest of the file.

Note: The same pattern appears in AbstractSource.fetch_record. If you update here, consider updating there as well for consistency across the codebase.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 38d13d3 and 052eb81.

📒 Files selected for processing (2)
  • airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1 hunks)
  • airbyte_cdk/sources/abstract_source.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/abstract_source.py
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
airbyte_cdk/sources/abstract_source.py (1)
  • fetch_record (328-360)
airbyte_cdk/sources/streams/core.py (3)
  • fetch_record (466-484)
  • name (139-143)
  • get_json_schema (296-304)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
  • SimpleRetriever (59-744)
  • name (118-126)
  • name (129-131)
  • fetch_one (629-744)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)

223-229: Implementation looks solid!

The runtime type check appropriately guards the delegation, the error message is clear and consistent with the base class, and passing self.get_json_schema() to fetch_one ensures the retriever has the schema context it needs for parsing.

devin-ai-integration bot and others added 2 commits November 12, 2025 20:33
…g None for 404s

Co-Authored-By: AJ Steers <aj@airbyte.io>
Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)

677-684: Path construction should normalize slashes more robustly, wdyt?

The current implementation uses .replace("//", "/") to handle double slashes, but this approach has limitations as noted in past review comments:

  • Doesn't handle triple slashes or more: "///" becomes "//"
  • Fails if base_path has trailing slash AND pk_value starts with slash
  • Example: "posts/" + "/123""posts//123""posts/123" ✓ (works)
  • Example: "posts//" + "//123""posts////123""posts///123" ✗ (broken)

As suggested in previous reviews, normalizing with rstrip('/') is more reliable:

     if isinstance(pk_value, str):
-        fetch_path = f"{base_path}/{pk_value}".replace("//", "/")
+        fetch_path = f"{base_path.rstrip('/')}/{pk_value.lstrip('/')}"
     elif isinstance(pk_value, Mapping):
         sorted_values = [str(pk_value[key]) for key in sorted(pk_value.keys())]
         pk_path_segment = "/".join(sorted_values)
-        fetch_path = f"{base_path}/{pk_path_segment}".replace("//", "/")
+        fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}"

This ensures exactly one slash between segments regardless of how base_path and pk_value are formatted.

🧹 Nitpick comments (6)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)

1683-1720: Consider verifying alphabetical key ordering explicitly, wdyt?

The test uses a composite PK with {"company_id": "123", "property": "status"} and expects path companies/123/status. While this works because "company_id" < "property" alphabetically, it would be clearer to test the sorting behavior explicitly.

Perhaps add an assertion or comment that makes the alphabetical ordering expectation explicit:

# Test verifies alphabetical key ordering: company_id comes before property
result = retriever.fetch_one({"company_id": "123", "property": "status"}, records_schema={})

Or test with keys intentionally out of order to prove sorting works:

# Keys intentionally reversed to verify sorting
result = retriever.fetch_one({"property": "status", "company_id": "123"}, records_schema={})
# Should still produce: companies/123/status (sorted order)
airbyte_cdk/sources/abstract_source.py (1)

328-360: Consider the performance implications of calling self.streams(config) on each fetch, wdyt?

The implementation looks correct and the error handling is good. However, calling self.streams(config) on every fetch_record invocation could be expensive if stream initialization involves API calls or heavy computation.

A couple of options to consider:

  1. If this method is called frequently in a loop, you might want to cache the stream instances or allow the caller to pass them:

    def fetch_record(
        self,
        stream_name: str,
        pk_value: Any | Mapping[str, Any],
        config: Mapping[str, Any],
        stream_instances: Optional[Dict[str, Stream]] = None,
    ) -> Optional[Mapping[str, Any]]:
        if stream_instances is None:
            stream_instances = {s.name: s for s in self.streams(config)}
        # ... rest of logic
  2. Add debug logging for observability when fetching individual records:

    logger.debug(f"Fetching record from stream '{stream_name}' with pk_value: {pk_value}")

That said, if fetch_record is used infrequently (e.g., for one-off lookups), the current implementation is fine as-is.

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)

683-684: Consider validating that pk_value is not empty, wdyt?

The code raises a clear ValueError for invalid types, but doesn't check for empty strings or empty dicts. This could lead to malformed paths like "posts/" or unexpected behavior.

You might want to add validation:

if isinstance(pk_value, str):
    if not pk_value.strip():
        raise ValueError("pk_value cannot be an empty string")
    fetch_path = f"{base_path.rstrip('/')}/{pk_value.lstrip('/')}"
elif isinstance(pk_value, Mapping):
    if not pk_value:
        raise ValueError("pk_value cannot be an empty dict")
    sorted_values = [str(pk_value[key]) for key in sorted(pk_value.keys())]
    pk_path_segment = "/".join(sorted_values)
    fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}"

688-715: Consider extracting request options into a helper to reduce repetition, wdyt?

Lines 694-713 repeat the same pattern four times: calling self._request_* with identical arguments. While not incorrect, this repetition could be reduced for maintainability.

You could extract this into a helper:

def _get_request_options_for_fetch(
    self, stream_slice: StreamSlice
) -> dict:
    """Get all request options for a fetch_one operation."""
    return {
        "request_headers": self._request_headers(
            stream_state={},
            stream_slice=stream_slice,
            next_page_token=None,
        ),
        "request_params": self._request_params(
            stream_state={},
            stream_slice=stream_slice,
            next_page_token=None,
        ),
        "request_body_data": self._request_body_data(
            stream_state={},
            stream_slice=stream_slice,
            next_page_token=None,
        ),
        "request_body_json": self._request_body_json(
            stream_state={},
            stream_slice=stream_slice,
            next_page_token=None,
        ),
    }

Then use it:

response = self.requester.send_request(
    path=fetch_path,
    stream_state={},
    stream_slice=stream_slice,
    next_page_token=None,
    log_formatter=self.log_formatter,
    **self._get_request_options_for_fetch(stream_slice),
)

716-726: 404 detection via string search could match false positives, wdyt?

The condition "404" in str(e) might match error messages that contain "404" but aren't actually 404 responses (e.g., "Failed after 404 retries" or "Connection timeout after 404ms").

Consider making the detection more specific:

except Exception as e:
    # Check if this is a 404 (record not found)
    is_404 = False
    
    # Check response status code if available
    if hasattr(e, "response") and hasattr(e.response, "status_code"):
        is_404 = e.response.status_code == 404
    # Fallback to checking error message more carefully
    elif "404" in str(e):
        # Only match if 404 appears with typical HTTP context
        error_str = str(e).lower()
        is_404 = any(pattern in error_str for pattern in [
            "404 not found",
            "404 client error",
            "http error 404",
            "status code 404",
        ])
    
    if is_404:
        raise RecordNotFoundException(
            f"Record with primary key {pk_value} not found"
        ) from e
    raise

Or, if you're confident the current implementation works for your error handling, at least add a comment explaining the string check is intentional.


630-756: Consider adding logging for debugging and observability, wdyt?

The method performs a critical operation (fetching a single record by PK) but doesn't log anything. Adding debug-level logging would help troubleshoot issues in production.

Consider adding:

def fetch_one(
    self,
    pk_value: Union[str, Mapping[str, Any]],
    records_schema: Mapping[str, Any],
) -> Optional[Mapping[str, Any]]:
    # ... existing docstring ...
    
    LOGGER.debug(f"Fetching single record for stream '{self.name}' with pk_value: {pk_value}")
    
    # ... existing path construction ...
    
    LOGGER.debug(f"Constructed fetch path: {fetch_path}")
    
    try:
        response = self.requester.send_request(...)
    except Exception as e:
        LOGGER.debug(f"Error fetching record with pk_value {pk_value}: {e}")
        # ... existing exception handling ...
    
    # ... rest of method ...
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 052eb81 and bab2781.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/abstract_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/exceptions.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (4)
airbyte_cdk/sources/declarative/exceptions.py (1)
  • RecordNotFoundException (12-13)
airbyte_cdk/sources/declarative/requesters/requester.py (2)
  • get_path (62-71)
  • send_request (138-156)
airbyte_cdk/sources/types.py (3)
  • Record (21-72)
  • data (35-36)
  • associated_slice (39-40)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
  • name (119-127)
  • name (130-132)
  • fetch_one (630-756)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
airbyte_cdk/sources/declarative/exceptions.py (1)
  • RecordNotFoundException (12-13)
airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
  • get_path (195-208)
  • send_request (446-484)
  • _request_headers (304-325)
  • _request_params (327-356)
  • _request_body_data (358-382)
  • _request_body_json (384-407)
airbyte_cdk/sources/declarative/requesters/requester.py (2)
  • get_path (62-71)
  • send_request (138-156)
airbyte_cdk/sources/types.py (6)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • keys (137-138)
  • Record (21-72)
  • data (35-36)
airbyte_cdk/sources/abstract_source.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
  • fetch_record (205-229)
  • name (93-97)
  • name (100-102)
airbyte_cdk/sources/streams/core.py (2)
  • fetch_record (466-484)
  • name (139-143)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (12)
airbyte_cdk/sources/declarative/exceptions.py (1)

12-13: LGTM! Clean exception class design.

The new RecordNotFoundException follows Python exception conventions and inherits appropriately from ReadException. The docstring clearly indicates its purpose for 404-like scenarios.

unit_tests/sources/declarative/retrievers/test_simple_retriever.py (7)

27-27: LGTM! Import added for new exception.

The import is correctly placed and used throughout the test suite.


1650-1681: LGTM! Simple PK test covers path construction and response handling.

The test verifies both the returned record data and the constructed path (posts/123). Good coverage.


1722-1747: LGTM! 404 handling test verifies exception and message.

The test confirms that a 404 response raises RecordNotFoundException and includes the PK value in the error message.


1749-1774: LGTM! Non-404 error propagation test ensures errors aren't swallowed.

Good coverage of the differentiation between 404 (converted to RecordNotFoundException) and other errors (propagated as-is).


1776-1796: LGTM! Type validation test catches invalid PK types.

The test verifies that passing an integer (instead of string or dict) raises a ValueError with a clear message.


1798-1819: LGTM! None response handling test.

The test confirms that when send_request returns None, a RecordNotFoundException is raised appropriately.


1821-1847: LGTM! Empty records test handles the case where API returns 200 but no data.

This is a good edge case to cover—APIs that return success but empty results should be treated as "not found".

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)

29-29: LGTM! Import added for new exception.

The import is correctly placed and used in the fetch_one method.


630-669: LGTM! Excellent documentation with examples and future considerations.

The docstring is comprehensive, includes examples for both simple and composite PKs, and documents the convention-based approach with references to alternative implementations. This will help future maintainers understand the design decisions.


670-675: LGTM! Correct use of requester to get base path.

The method properly delegates to requester.get_path() with empty state/slice/token, which is appropriate for fetching a specific record.


733-756: LGTM! Record extraction logic is thorough and handles multiple types.

The code properly handles both Record objects and plain Mapping types, and raises clear exceptions when records are empty or have unexpected types. The use of dict() ensures a clean dict is returned.

- Use rstrip('/') on base_path and lstrip('/') on pk segments
- More robust than replace('//', '/') for handling edge cases
- Addresses Copilot review feedback

Co-Authored-By: AJ Steers <aj@airbyte.io>
@aaronsteers aaronsteers changed the title feat: Add fetch_one/fetch_record functionality to SimpleRetriever feat: Add Stream.fetch_record() and SimpleRetriever.fetch_one() Nov 12, 2025
@aaronsteers aaronsteers changed the title feat: Add Stream.fetch_record() and SimpleRetriever.fetch_one() feat: Add Stream.fetch_record() backed by SimpleRetriever.fetch_one() Nov 12, 2025
@devin-ai-integration
Copy link
Contributor

Thanks for the feedback! I've updated the path construction to use rstrip('/') and lstrip('/') instead of .replace("//", "/") for more robust handling of edge cases. This change was pushed in commit a2e8e55.

The new implementation:

if isinstance(pk_value, str):
    fetch_path = f"{base_path.rstrip('/')}/{str(pk_value).lstrip('/')}"
elif isinstance(pk_value, Mapping):
    sorted_values = [str(pk_value[key]).lstrip('/') for key in sorted(pk_value.keys())]
    pk_path_segment = "/".join(sorted_values)
    fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}"

All unit tests continue to pass with this change.

@aaronsteers aaronsteers changed the title feat: Add Stream.fetch_record() backed by SimpleRetriever.fetch_one() feat: Add Source.fetch_record() backed byStream.fetch_record() and SimpleRetriever.fetch_one() Nov 12, 2025
- Remove fetch_record from AbstractSource (legacy, not supported)
- Remove fetch_record from Stream base class (legacy, not supported)
- Remove fetch_record from DeclarativeStream (legacy, not supported)
- Add fetch_record to ConcurrentDeclarativeSource (primary implementation)
- Add fetch_record to AbstractStream as concrete method (raises NotImplementedError)
- Keep SimpleRetriever.fetch_one as underlying mechanism

This refactor ensures fetch_record is ONLY available for declarative
concurrent connectors (PyAirbyte use case), not legacy sources.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

557-568: Consider validating primary key existence, wdyt?

The retriever type check is good, but should we also verify that the stream has a primary key defined before attempting the fetch? Without a PK, the fetch_one call might fail in unclear ways.

Something like:

if not stream_config.get("primary_key"):
    raise ValueError(
        f"Stream '{stream_name}' does not define a primary key, "
        "which is required for fetching individual records."
    )

This would provide clearer error messages upfront. What do you think?

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)

677-684: Clarify composite key ordering in docstring; URL encoding can remain optional.

The docstring already documents the behavior at line 662 ("in key order"), but it could be more explicit: would you consider rewording it to say "in alphabetically sorted key order" to match the sorted(pk_value.keys()) implementation? That removes any ambiguity for users.

On URL encoding—there's no existing pattern for encoding path parameters in the retriever/requester codebase, and no tests demonstrate issues with special characters. If pk_values are typically sourced from database records or structured data, this might not be a concern. If that changes or edge cases emerge, URL encoding can be added then. What are the typical sources of pk_values in practice, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bab2781 and c0719e2.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2 hunks)
  • airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
  • DeclarativeStream (2482-2549)
  • SimpleRetriever (2867-2925)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
  • SimpleRetriever (60-756)
  • fetch_one (630-756)
airbyte_cdk/sources/declarative/schema/default_schema_loader.py (1)
  • DefaultSchemaLoader (15-47)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • create_component (831-864)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/exceptions.py (1)
  • RecordNotFoundException (12-13)
airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
  • get_path (195-208)
  • send_request (446-484)
  • _request_headers (304-325)
  • _request_params (327-356)
  • _request_body_data (358-382)
  • _request_body_json (384-407)
airbyte_cdk/sources/types.py (6)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • keys (137-138)
  • Record (21-72)
  • data (35-36)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
  • fetch_record (510-585)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Analyze (python)
🔇 Additional comments (6)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)

94-113: LGTM! Clean interface addition.

The new fetch_record method provides a clear extension point for streams that support single-record fetching. The default NotImplementedError with an informative message is appropriate for a base class.

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)

540-555: Stream resolution looks good!

The linear search for the stream config is appropriate given the typical number of streams, and the error message helpfully lists available streams when lookup fails.


583-585: Clean delegation to retriever!

The final call properly passes through both the pk_value and the resolved schema to the retriever's fetch_one method.


570-581: Verify the type(schema_loader_config) pattern safety.

The use of type(schema_loader_config) on line 573 assumes the config is compatible with create_component's expected model type parameter. Can you confirm this pattern is safe across all possible schema loader configurations in manifests?

Run this to check how schema_loader is typically structured in manifests:

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)

686-715: Request construction looks solid!

The use of empty stream state and minimal StreamSlice is appropriate for fetching a single record. All necessary request options (headers, params, body) are properly included.


728-756: Response parsing and error handling are thorough!

The implementation properly handles various edge cases:

  • No response → RecordNotFoundException
  • Empty records list → RecordNotFoundException
  • Type checking with clear error messages
  • Consistent dict return type

The reuse of _parse_response is good for maintaining consistency with the streaming logic.

devin-ai-integration bot and others added 2 commits November 13, 2025 00:54
- Update SimpleRetriever.fetch_one docstring to reflect RecordNotFoundException behavior
- Move inline imports to top-level in ConcurrentDeclarativeSource
- Simplify schema building logic using declarative_stream.get_json_schema()
- Remove unused DefaultSchemaLoader import

Addresses feedback from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <aj@airbyte.io>
Remove manual 404 checking since send_request() already handles errors
internally. If send_request() returns without raising, we have valid data.
Errors are now propagated naturally from the requester's error handler.

Addresses feedback from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

511-534: Consider tightening the pk_value type annotation?

The docstring clarifies that pk_value can be either str or Mapping[str, Any], but the signature uses Any. Would it be clearer to use Union[str, Mapping[str, Any]] to match the documented behavior, wdyt?

     def fetch_record(
         self,
         stream_name: str,
-        pk_value: Any,
+        pk_value: Union[str, Mapping[str, Any]],
         config: Optional[Mapping[str, Any]] = None,
     ) -> Mapping[str, Any]:
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)

678-685: Path construction: Consider documenting composite key ordering assumptions?

The implementation sorts composite keys alphabetically (line 681), which is deterministic. However, some APIs might expect keys in a specific order (e.g., parent before child IDs).

Also curious about edge cases:

  • Empty string pk_value: "".lstrip('/') would still concatenate to base/
  • Composite keys with None/null values: str(None) becomes "None"
  • Nested values in composite keys: Only the top level is handled

These might be fine for the intended use cases, but worth documenting the assumptions? If APIs need custom ordering, they'd need a different approach (like the path template option mentioned in the docstring), wdyt?


722-745: Record extraction logic is solid, consider adding observability?

The implementation correctly handles both Record and Mapping types and extracts the data properly. Two suggestions:

  1. Multiple records: If the API unexpectedly returns multiple records, taking the first one silently (line 734) could mask API issues. Consider logging a warning if len(records) > 1? This would help catch configuration issues where the PK path returns a collection instead of a single item, wdyt?

  2. Success logging: Adding debug-level logging when a record is successfully fetched would help with troubleshooting. Something like LOGGER.debug(f"Successfully fetched record with PK {pk_value} from {fetch_path}")?

  3. Error message clarity: Line 740's "invalid record type" might be confusing—it's really about unexpected API behavior rather than "not found". Perhaps "Record with primary key {pk_value} returned unexpected type {type(first_record).name}" would be clearer?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0719e2 and 0a71543.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
  • SimpleRetriever (60-745)
  • fetch_one (630-745)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
  • fetch_record (94-113)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • create_component (831-864)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/exceptions.py (1)
  • RecordNotFoundException (12-13)
airbyte_cdk/sources/declarative/requesters/http_requester.py (6)
  • get_path (195-208)
  • send_request (446-484)
  • _request_headers (304-325)
  • _request_params (327-356)
  • _request_body_data (358-382)
  • _request_body_json (384-407)
airbyte_cdk/sources/types.py (6)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • keys (137-138)
  • Record (21-72)
  • data (35-36)
🔇 Additional comments (8)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)

80-80: LGTM: Import properly placed at module level.

Good to see this as a top-level import rather than inline.


535-550: Solid stream lookup with helpful error message.

The fallback to instance config and the clear ValueError with available stream names will make debugging much easier. Nice work!


552-567: Code already implements proper error handling and type safety; review concerns are addressed

After verification, your concerns appear to be either already addressed or handled implicitly by design:

  1. Stream construction (lines 545-550): Validation exists checking stream_config is found before create_component() call
  2. Schema retrieval: All get_json_schema() implementations return Mapping[str, Any] with no None return path—the type contract guarantees this
  3. PK validation: fetch_one() validates pk_value type at lines 678-685, raising ValueError for invalid types; tests cover this scenario

The existing pre-validation and type contracts mean these edge cases are already handled. The code demonstrates good defensive design with clear error messages where needed (e.g., SimpleRetriever type check at lines 559-563).

Wdyt—does this align with your analysis, or do you see additional edge cases the verification might have missed?

Likely an incorrect or invalid review comment.

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (5)

29-29: LGTM: Exception import properly placed.


630-670: Excellent docstring with clear design rationale.

The comprehensive documentation including examples, the convention-based approach explanation, and future alternatives is very helpful. The type hints using Union[str, Mapping[str, Any]] instead of Any make the interface clearer too.


672-676: Base path retrieval looks good.

Using empty state/slice for the single-record fetch context makes sense. The requester's get_path() should handle any edge cases like None or empty paths.


687-715: Good reuse of existing request machinery.

Properly delegates to the requester's error handling and reuses the existing helper methods for headers/params/body. This maintains consistency with the rest of the retriever's behavior.


717-720: Response validation is clear and simple.

Good that this was simplified from the previous version. The requester's error handler takes care of HTTP status codes, so checking for a falsy response here is appropriate.

- Add type annotation to response variable: requests.Response | None
- Add comment explaining response can be None when error handler returns IGNORE
- Add comment explaining empty StreamSlice for single-record fetch
- Update docstring to clarify 404s propagate as exceptions (not RecordNotFoundException)
- RecordNotFoundException is only raised for empty/ignored responses or no parsed records

Addresses comments from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <aj@airbyte.io>
- Add type hints: records_iter: Iterable[Record] and first_record: Record | None
- Replace list(...) with next(iter(...), None) to avoid materializing entire iterable
- Remove unnecessary Mapping branch since _parse_response returns Iterable[Record]
- Simplify logic from 27 lines to 18 lines

This is more efficient (only fetches first record) and has clearer type hints.

Addresses feedback from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <aj@airbyte.io>
devin-ai-integration bot and others added 2 commits November 13, 2025 01:46
…alue

- Update SimpleRetriever.fetch_one signature: pk_value now str only (not Union[str, Mapping])
- Rename stream_slice to empty_stream_slice for clarity
- Remove composite key handling logic (isinstance checks, dict path construction)
- Update ConcurrentDeclarativeSource.fetch_record signature to accept str only
- Update AbstractStream.fetch_record signature to accept str only
- Remove test_fetch_one_composite_pk test (no longer supported)
- Update test_fetch_one_invalid_pk_type to reflect new behavior
- Simplify docstrings to remove composite key documentation

This simplification makes the implementation cleaner and easier to maintain.
Composite key support can be added later when there's a concrete use case.

Addresses feedback from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <aj@airbyte.io>
The _fetch_one method on SimpleRetriever is now prefixed with underscore
to indicate it's not part of the public API. External callers should use
Stream.fetch_record() or Source.fetch_record() instead.

Addresses feedback from @aaronsteers on PR #846

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

563-565: Consider making _fetch_one part of the public API?

The method calls declarative_stream.retriever._fetch_one(), which is a private method (indicated by the leading underscore). While this works, calling private methods across module boundaries can be fragile if the implementation changes.

Since _fetch_one is intended to be called from ConcurrentDeclarativeSource.fetch_record, would it make sense to either:

  1. Make it a public method (rename to fetch_one), or
  2. Document it as part of the internal API contract between these classes?

This is more of a design preference, so wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 691927b and 8a52390.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
  • SimpleRetriever (60-724)
  • _fetch_one (630-724)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
  • fetch_record (94-111)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • create_component (831-864)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

80-80: LGTM on the import placement!

The SimpleRetriever import is properly placed at the module level and is used for the type check in fetch_record. This addresses the previous feedback about avoiding inline imports.


511-565: Solid implementation! Please verify end-to-end with a real connector.

The method correctly:

  • Resolves the stream from the manifest
  • Validates the stream uses SimpleRetriever
  • Delegates to the retriever's fetch logic
  • Provides clear error messages for missing streams and unsupported operations

However, as noted in the PR objectives, this hasn't been tested end-to-end with a real declarative connector. Given that the method reconstructs components from the manifest (lines 550-555), it's important to verify the path construction and component reconstruction work correctly in practice.

Could you test this with a real declarative connector to ensure:

  1. The path construction works correctly (e.g., /posts/123 for a posts stream)
  2. Component reconstruction from the manifest succeeds
  3. Error handling works for 404s, empty responses, and server errors
  4. The schema retrieval via get_json_schema() doesn't fail unexpectedly

wdyt?

@aaronsteers aaronsteers changed the title feat: Add Source.fetch_record() backed byStream.fetch_record() and SimpleRetriever.fetch_one() feat: Add Source.fetch_record() andStream.fetch_record() backed by SimpleRetriever._fetch_one() Nov 13, 2025
devin-ai-integration bot added a commit to airbytehq/PyAirbyte that referenced this pull request Nov 13, 2025
- Add LookupValue type alias for primary key values
- Add get_record() method to Source class (keyword-only pk_value arg)
- Add helper methods _get_stream_primary_key() and _normalize_and_validate_pk()
- Add fetch_record() method to DeclarativeExecutor
- Support both direct PK values and dict input with validation
- Dict validation ensures single entry and key matches stream's primary key
- Only supported for declarative (YAML-based) sources
- Add comprehensive unit tests (16 tests, all passing)

This builds on airbytehq/airbyte-python-cdk#846

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my initial comment on the SimpleRetriever._fetch_one().

@aaronsteers pending our convo yesterday, I was under the impression that we were going to have a separate runtime component to handle single fetch requests. It's not a hard blocker if we want the simple retriever to define the behavior, but right now it doesn't seem like we're actually extracting an individual record correctly. let's discuss this a bit and that might help me understand what you envision the overall design to be since this looks to just be the first part of things

# separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)

def _fetch_one(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is presumably going to be called from somewhere else outside of SimpleRetriever, we should make this public. While I understand this is not meant to be public facing or used by the protocol interface, it still feels like bad practice for this be a private method.

f"Record with primary key {pk_value} not found (no response)"
)

records_iter: Iterable[Record] = self._parse_response(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section doesn't seem like it will work for a lot of cases. I feel like a common pattern is that for retrieving all objects like prices, we get back a response w/ a top-level array object which will contain an object for each price. And when we hit the individual API endpoint we get back a single top-level object representing the price.

The way this is written assumes that hitting the single record fetch_path will contain a top level list and extract each record into the records_iter. But now we'll be unable to extract anything, there won't be a first_record to extract.

Unless you think I'm missing something, it seems like we shouldn't be using the underlying extractor which is no longer suitable for individual records

record_selector = MagicMock()
record_selector.select_records.return_value = [
Record(data={"id": "123", "title": "Test Post"}, stream_name="posts", associated_slice=None)
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this test is written, we aren't actually testing the selector/extractor logic. Most extractor/selector are defined to extract a list of objects (Stripe almost certainly does it this way)

But here we just mock the single record response, and we mock calling select_records() so in a real world use case, this would fail since we'll attempt to extract response.data which doesn't exist. But all this is skipped over due to all the mocking

- Make fetch_one public by removing underscore prefix (was _fetch_one)
- Fix extractor/selector logic to handle single-object responses
  - Most REST APIs return single objects for GET /resource/{id}
  - Now tries extractor first, then falls back to response.json()
- Add test for single-object response pattern
- Update all test references from _fetch_one to fetch_one

Addresses comments from #846

Co-Authored-By: AJ Steers <aj@airbyte.io>
devin-ai-integration bot added a commit to airbytehq/PyAirbyte that referenced this pull request Nov 13, 2025
Updated CDK dependency to latest commit that addresses PR feedback:
- Made fetch_one public (removed underscore prefix)
- Fixed extractor/selector logic to handle single-object responses
- Added test for single-object response pattern

This commit includes the fixes from airbytehq/airbyte-python-cdk#846

Co-Authored-By: AJ Steers <aj@airbyte.io>
response_body = response.json()
if isinstance(response_body, dict) and response_body:
return response_body
except Exception:
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)

1650-1681: Fix test naming convention and consider reducing mocking.

Two concerns:

  1. Test name should follow Python convention: test_fetch_one_simple_pk (with underscore after "test")

  2. This test has the same over-mocking issue mentioned in the past review comment. By mocking record_selector.select_records to return the expected record, you're bypassing the actual selector/extractor logic. The test would pass even if the selector couldn't properly extract data from the response. Consider testing with a real RecordSelector and mocking just the HTTP layer, wdyt?

Apply this diff to fix the naming:

-def testfetch_one_simple_pk():
+def test_fetch_one_simple_pk():
🧹 Nitpick comments (1)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)

1809-1838: Fix test naming convention and verify fallback logic.

  1. Test name should follow Python convention: test_fetch_one_single_object_response (with underscore after "test")

  2. This test verifies the fallback logic when select_records returns empty but the response contains a single object. However, with the current mocking, it's unclear how the result is extracted. Consider either:

    • Using real decoder/extractor components to verify the JSON parsing works
    • Adding assertions that specific methods on the response are called

This would make the test more robust and actually verify the fallback mechanism, wdyt?

Apply this diff for the naming:

-def testfetch_one_single_object_response():
+def test_fetch_one_single_object_response():
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8a52390 and 4386a35.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🔇 Additional comments (3)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)

27-27: LGTM on the import.

The RecordNotFoundException import is properly placed and necessary for the new test cases.

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

80-80: LGTM on the import.

The SimpleRetriever import is correctly placed at the module level. Good fix from the previous inline import issue!


511-565: Handle StateDelegatingStream like the streams() method does?

The streams() method (lines 409-422) conditionally creates either StateDelegatingStreamModel or DeclarativeStreamModel based on the stream config type. However, fetch_record always uses DeclarativeStreamModel (line 550).

Should we handle StateDelegatingStream the same way here? Something like:

model_type = (
    StateDelegatingStreamModel
    if stream_config.get("type") == StateDelegatingStreamModel.__name__
    else DeclarativeStreamModel
)
declarative_stream = self._constructor.create_component(
    model_type,
    stream_config,
    config,
    emit_connector_builder_messages=self._emit_connector_builder_messages,
)

Or is there a reason to exclude StateDelegatingStream from fetch operations, wdyt?

Comment on lines +1737 to +1755
def testfetch_one_invalid_pk_type():
"""Test fetch_one with non-string pk_value (should fail type checking but test runtime behavior)."""
requester = MagicMock()
requester.get_path.return_value = "posts"

record_selector = MagicMock()

retriever = SimpleRetriever(
name="posts",
primary_key="id",
requester=requester,
record_selector=record_selector,
parameters={},
config={},
)

with pytest.raises(AttributeError):
retriever.fetch_one(123, records_schema={}) # type: ignore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix test naming convention and clarify test intent.

  1. Test name should follow Python convention: test_fetch_one_invalid_pk_type (with underscore after "test")

  2. This test verifies runtime behavior when type hints are violated. The expectation of AttributeError suggests the code tries to call a string method on pk_value. Consider adding a docstring comment explaining what specific string method is being called, wdyt?

Apply this diff for the naming:

-def testfetch_one_invalid_pk_type():
+def test_fetch_one_invalid_pk_type():
🤖 Prompt for AI Agents
In unit_tests/sources/declarative/retrievers/test_simple_retriever.py around
lines 1737 to 1755, rename the test function from testfetch_one_invalid_pk_type
to test_fetch_one_invalid_pk_type to follow Python test naming conventions, and
update its docstring to explicitly state the intent (that this verifies runtime
behavior when a non-string pk_value is passed and that an AttributeError is
expected because the implementation calls a string method on pk_value — e.g.,
.strip() or .startswith()). Ensure the function name change is consistent with
any test references/imports and keep the pytest.raises(AttributeError) assertion
as-is.

aaronsteers and others added 3 commits November 13, 2025 15:45
…r.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
…r.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
…r.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2)

1735-1753: Fix test naming convention.

The function name should be test_fetch_one_invalid_pk_type (with underscore after "test") to follow Python/pytest conventions.

Apply this diff:

-def testfetch_one_invalid_pk_type():
+def test_fetch_one_invalid_pk_type():

1755-1776: Fix test naming convention (still present despite past fix).

The function name should be test_fetch_one_no_response (with underscore after "test"). Past review comments indicated this was addressed in commit dfeeec2, but the issue persists in the current code.

The test logic itself is correct—when send_request returns None, RecordNotFoundException should be raised.

Apply this diff:

-def testfetch_one_no_response():
+def test_fetch_one_no_response():
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dfeeec2 and 82add1b.

📒 Files selected for processing (1)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (4)
airbyte_cdk/sources/declarative/exceptions.py (1)
  • RecordNotFoundException (12-13)
airbyte_cdk/sources/declarative/requesters/http_requester.py (2)
  • get_path (195-208)
  • send_request (446-484)
airbyte_cdk/sources/types.py (3)
  • Record (21-72)
  • data (35-36)
  • associated_slice (39-40)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
  • name (119-127)
  • name (130-132)
  • fetch_one (630-731)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (python)
🔇 Additional comments (3)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (3)

27-27: LGTM!

The import of RecordNotFoundException is correctly added to support the new fetch_one tests.


1709-1734: LGTM with a note on consistency.

This test correctly expects non-404 errors to propagate from the requester, which aligns with the implementation's documented behavior. However, note that this uses the same mocking pattern as test_fetch_one_not_found (making send_request raise an exception), so if that pattern needs adjustment, this test would too.


1778-1805: LGTM!

This test correctly validates the empty response scenario. The implementation properly handles the case where select_records returns no records and the response body is an empty dict (which is falsy in Python), raising RecordNotFoundException as expected.

Comment on lines +1650 to +1681
def testfetch_one_simple_pk():
"""Test fetch_one with a simple string primary key."""
requester = MagicMock()
requester.get_path.return_value = "posts"

response = requests.Response()
response.status_code = 200
response._content = json.dumps({"id": "123", "title": "Test Post"}).encode("utf-8")

requester.send_request.return_value = response

record_selector = MagicMock()
record_selector.select_records.return_value = [
Record(data={"id": "123", "title": "Test Post"}, stream_name="posts", associated_slice=None)
]

retriever = SimpleRetriever(
name="posts",
primary_key="id",
requester=requester,
record_selector=record_selector,
parameters={},
config={},
)

result = retriever.fetch_one("123", records_schema={})

assert result == {"id": "123", "title": "Test Post"}
requester.send_request.assert_called_once()
call_kwargs = requester.send_request.call_args[1]
assert call_kwargs["path"] == "posts/123"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix test naming and consider testing the real selector/extractor path.

Two concerns here:

  1. Naming convention: The function name should be test_fetch_one_simple_pk (with underscore after "test") to follow Python/pytest conventions.

  2. Test coverage: As brianjlai noted in the past review comments, this test mocks select_records to return the record directly, which doesn't test the actual selector/extractor logic. In real-world scenarios (especially for GET /resource/{id} endpoints), the response is typically a single object, not a list. The fetch_one implementation has fallback logic to handle this via response.json() when select_records returns empty.

Would you consider adding a test case that exercises the actual selector path without mocking select_records, or at least verifying that the fallback logic is tested elsewhere, wdyt?

Apply this diff for the naming fix:

-def testfetch_one_simple_pk():
+def test_fetch_one_simple_pk():
🤖 Prompt for AI Agents
unit_tests/sources/declarative/retrievers/test_simple_retriever.py lines
1650-1681: rename the test function from testfetch_one_simple_pk to
test_fetch_one_simple_pk to follow pytest naming conventions, and update any
references; additionally add (or modify) a second test that does not mock
record_selector.select_records but instead returns the raw requests.Response (or
uses the real selector/extractor) to exercise the selector/extractor path and
verify the fallback behavior when select_records returns empty or when the
endpoint returns a single object, ensuring fetch_one handles both list and
single-object responses.

Comment on lines +1683 to +1708
def test_fetch_one_not_found():
"""Test fetch_one raises RecordNotFoundException when record is not found (404)."""
requester = MagicMock()
requester.get_path.return_value = "posts"

error = Exception("Not found")
error.response = MagicMock()
error.response.status_code = 404
requester.send_request.side_effect = error

record_selector = MagicMock()

retriever = SimpleRetriever(
name="posts",
primary_key="id",
requester=requester,
record_selector=record_selector,
parameters={},
config={},
)

with pytest.raises(RecordNotFoundException) as exc_info:
retriever.fetch_one("999", records_schema={})

assert "999" in str(exc_info.value)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify that this test matches the actual error handling behavior.

The test mocks send_request to raise an Exception with a 404 status code, but according to the fetch_one implementation and its docstring, HTTP errors (including 404) are propagated from the requester's error handling, not caught and converted to RecordNotFoundException.

RecordNotFoundException is only raised when:

  1. send_request returns None (response ignored by error handler), or
  2. The response parsing yields no records

If send_request raises an exception (like a 404), shouldn't that exception propagate directly rather than being caught? Could you confirm that this test accurately reflects the intended behavior, wdyt?

Run this script to check how the requester's error handling works with 404 responses:


🏁 Script executed:

#!/bin/bash
# Search for error handling patterns in the requester implementation
rg -nP -A5 -B5 'def send_request' airbyte_cdk/sources/declarative/requesters/

Length of output: 2115



---

🏁 Script executed:

```shell
#!/bin/bash
# Get the full send_request implementation from HttpRequester
rg -nP -A30 'def send_request' airbyte_cdk/sources/declarative/requesters/http_requester.py | head -50

Length of output: 1556


🏁 Script executed:

#!/bin/bash
# Find the fetch_one method and its exception handling
rg -nP -A40 'def fetch_one' airbyte_cdk/sources/declarative/retrievers/simple_retriever.py | head -60

Length of output: 2123


🏁 Script executed:

#!/bin/bash
# Search for where RecordNotFoundException is raised in SimpleRetriever
rg -nP 'RecordNotFoundException' airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Length of output: 275


🏁 Script executed:

#!/bin/bash
# Look for how send_request is called in fetch_one and error handling around it
rg -nP -B5 -A10 'send_request' airbyte_cdk/sources/declarative/retrievers/simple_retriever.py | grep -A15 -B5 'def fetch_one'

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check error handling logic in http_requester.py around status codes
rg -nP -C5 '404\|status_code\|error' airbyte_cdk/sources/declarative/requesters/http_requester.py | head -80

Length of output: 0



---

🏁 Script executed:

```shell
#!/bin/bash
# Get the complete fetch_one method implementation
rg -nP -A70 'def fetch_one' airbyte_cdk/sources/declarative/retrievers/simple_retriever.py | head -100

Length of output: 3399


🏁 Script executed:

#!/bin/bash
# Get context around RecordNotFoundException raises at lines 706 and 729
sed -n '700,735p' airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Length of output: 1099


🏁 Script executed:

#!/bin/bash
# Check if there's any try-except around send_request in fetch_one
sed -n '630,750p' airbyte_cdk/sources/declarative/retrievers/simple_retriever.py | rg -nP 'try|except|send_request|raise RecordNotFoundException'

Length of output: 306


Fix test to match the documented and implemented error handling behavior.

The test is incorrect. According to the fetch_one docstring (lines 649-650), "Exception: HTTP errors (including 404) are propagated from requester's error handling." The implementation has no try-except around the send_request() call (line 677), so exceptions propagate directly.

RecordNotFoundException is only raised when:

  1. send_request() returns None (line 706: "no response"), or
  2. Response parsing yields no records (line 729: "empty response")

This test should either:

  • Mock send_request to return None (to trigger the line 706 RecordNotFoundException), or
  • Mock send_request to return an empty response (to trigger the line 729 RecordNotFoundException), or
  • Expect the original exception to propagate instead of RecordNotFoundException

Which approach matches your intended behavior, wdyt?

🤖 Prompt for AI Agents
In unit_tests/sources/declarative/retrievers/test_simple_retriever.py around
lines 1683-1708, the test currently simulates send_request raising an exception
with a 404 but asserts that fetch_one raises RecordNotFoundException; update the
test to match the implementation by either (preferred) mocking
requester.send_request to return None (to trigger the "no response" branch that
raises RecordNotFoundException) or to return an empty/parsable response that
leads to an empty record list (to trigger the "empty response"
RecordNotFoundException), or alternatively change the assertion to expect the
original exception to propagate if you want to keep the current send_request
side_effect behavior—pick one approach and adjust the request mock and the
pytest.raises assertion accordingly.

Comment on lines +1806 to +1834
def testfetch_one_single_object_response():
"""Test fetch_one handles single object responses (most common pattern for GET /resource/{id})."""
requester = MagicMock()
requester.get_path.return_value = "posts"

response = requests.Response()
response.status_code = 200
response._content = json.dumps({"id": "123", "title": "Test Post"}).encode("utf-8")

requester.send_request.return_value = response

record_selector = MagicMock()
record_selector.select_records.return_value = []

retriever = SimpleRetriever(
name="posts",
primary_key="id",
requester=requester,
record_selector=record_selector,
parameters={},
config={},
)

result = retriever.fetch_one("123", records_schema={})

assert result == {"id": "123", "title": "Test Post"}
requester.send_request.assert_called_once()
call_kwargs = requester.send_request.call_args[1]
assert call_kwargs["path"] == "posts/123"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix test naming; good coverage of the fallback path.

  1. Naming convention: The function name should be test_fetch_one_single_object_response (with underscore after "test").

  2. Good test coverage: This test validates the important fallback behavior where select_records returns empty (because the selector is configured to extract from a list field like response.data), but the actual response is a single object. The fetch_one implementation correctly falls back to response.json() in this case, which is the typical pattern for GET /resource/{id} endpoints. This addresses the scenario brianjlai mentioned in past comments, wdyt?

Apply this diff for the naming fix:

-def testfetch_one_single_object_response():
+def test_fetch_one_single_object_response():
🤖 Prompt for AI Agents
In unit_tests/sources/declarative/retrievers/test_simple_retriever.py around
lines 1806 to 1834, the test function name uses camelCase
(testfetch_one_single_object_response) which breaks the test naming convention;
rename the function to test_fetch_one_single_object_response (add the underscore
after "test") and keep the body unchanged so the test framework discovers and
runs it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants