Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
38d13d3
feat: Add fetch_one/fetch_record functionality to SimpleRetriever
devin-ai-integration[bot] Nov 12, 2025
052eb81
update type hints
aaronsteers Nov 12, 2025
c581827
Change fetch_one to raise RecordNotFoundException instead of returnin…
devin-ai-integration[bot] Nov 12, 2025
bab2781
Fix ruff format issues
devin-ai-integration[bot] Nov 12, 2025
a2e8e55
Improve path construction robustness in fetch_one
devin-ai-integration[bot] Nov 12, 2025
c0719e2
refactor: Move fetch_record to concurrent declarative classes only
devin-ai-integration[bot] Nov 12, 2025
38f6dc3
fix: Address PR review comments
devin-ai-integration[bot] Nov 13, 2025
0a71543
fix: Simplify error handling in fetch_one
devin-ai-integration[bot] Nov 13, 2025
626848b
fix: Address PR review comments - add type hints and clarifying comments
devin-ai-integration[bot] Nov 13, 2025
bf83bf1
refactor: Use iterator instead of list for fetch_one record parsing
devin-ai-integration[bot] Nov 13, 2025
691927b
refactor: Simplify fetch_one/fetch_record to only accept str for pk_v…
devin-ai-integration[bot] Nov 13, 2025
8a52390
refactor: Rename fetch_one to _fetch_one to mark as internal API
devin-ai-integration[bot] Nov 13, 2025
4386a35
fix: Address PR feedback from brianjlai on fetch_one implementation
devin-ai-integration[bot] Nov 13, 2025
28b9286
Update unit_tests/sources/declarative/retrievers/test_simple_retrieve…
aaronsteers Nov 13, 2025
dfeeec2
Update unit_tests/sources/declarative/retrievers/test_simple_retrieve…
aaronsteers Nov 13, 2025
82add1b
Update unit_tests/sources/declarative/retrievers/test_simple_retrieve…
aaronsteers Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.spec.spec import Spec
from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition
from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
Expand Down Expand Up @@ -507,6 +508,62 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon
return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def fetch_record(
self,
stream_name: str,
pk_value: str,
config: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Fetch a single record from a stream by primary key.

Args:
stream_name: Name of the stream to fetch from
pk_value: Primary key value to fetch as a string (e.g., "123")
config: Source configuration (optional, uses instance config if not provided)

Returns:
The fetched record as a dict

Raises:
ValueError: If the stream name is not found in the source
NotImplementedError: If the stream doesn't support fetching individual records
RecordNotFoundException: If the record is not found
"""
config = config or self._config

stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams

stream_config = None
for config_item in stream_configs:
if config_item.get("name") == stream_name:
stream_config = config_item
break

if not stream_config:
available_streams = [c.get("name") for c in stream_configs]
raise ValueError(
f"Stream '{stream_name}' not found in source. "
f"Available streams: {', '.join(available_streams)}"
)

declarative_stream = self._constructor.create_component(
DeclarativeStreamModel,
stream_config,
config,
emit_connector_builder_messages=self._emit_connector_builder_messages,
)

if not isinstance(declarative_stream.retriever, SimpleRetriever):
raise NotImplementedError(
f"Stream '{stream_name}' does not support fetching individual records. "
"Only streams with SimpleRetriever currently support this operation."
)

return declarative_stream.retriever.fetch_one(
pk_value=pk_value, records_schema=declarative_stream.get_json_schema()
)

@property
def dynamic_streams(self) -> List[Dict[str, Any]]:
return self._dynamic_stream_configs(
Expand Down
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/declarative/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ class ReadException(Exception):
"""
Raise when there is an error reading data from an API Source
"""


class RecordNotFoundException(ReadException):
"""Raised when a requested record is not found (e.g., 404 response)."""
104 changes: 104 additions & 0 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor
from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.models import AirbyteMessage
from airbyte_cdk.sources.declarative.exceptions import RecordNotFoundException
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import (
Expand Down Expand Up @@ -626,6 +627,109 @@ def _to_partition_key(to_serialize: Any) -> str:
# separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)

def fetch_one(
self,
pk_value: str,
records_schema: Mapping[str, Any],
) -> Mapping[str, Any]:
"""Fetch a single record by primary key value.

This method constructs a path by appending the primary key value to the base path
and sends a GET request to fetch a single record. It's designed for REST APIs that
follow the convention: GET /resource/{id}

Args:
pk_value: The primary key value to fetch as a string (e.g., "123")
records_schema: JSON schema describing the record structure

Returns:
The fetched record as a dict.

Raises:
RecordNotFoundException: If the response is empty/ignored or parsing yields no records.
Exception: HTTP errors (including 404) are propagated from requester's error handling.

Example:
record = retriever.fetch_one("123", schema)

Note:
This implementation uses convention-based path construction, appending /{pk_value} to the base path. (important-comment)

Alternative approaches that could be implemented in the future: (important-comment)
- Path template interpolation: Use a configurable template like "{base_path}/{id}" (important-comment)
See: https://github.com/airbytehq/airbyte-python-cdk/issues/833#phase-1a (important-comment)
- Field path configuration: Allow specifying which response field contains the record (important-comment)
for APIs that wrap single records in envelopes like {"data": {...}} (important-comment)
"""
# Single-record fetch doesn't involve partitioning, so we pass an empty StreamSlice
empty_stream_slice = StreamSlice(partition={}, cursor_slice={})

# Get the base path from the requester
base_path = self.requester.get_path(
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
)

fetch_path = f"{base_path.rstrip('/')}/{pk_value.lstrip('/')}"

# send_request() may return None when the error handler chooses to IGNORE a response
response: requests.Response | None = self.requester.send_request(
path=fetch_path,
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
request_headers=self._request_headers(
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
),
request_params=self._request_params(
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
),
request_body_data=self._request_body_data(
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
),
request_body_json=self._request_body_json(
stream_state={},
stream_slice=empty_stream_slice,
next_page_token=None,
),
log_formatter=self.log_formatter,
)

if not response:
raise RecordNotFoundException(
f"Record with primary key {pk_value} not found (no response)"
)

records_iter: Iterable[Record] = self._parse_response(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section doesn't seem like it will work for a lot of cases. I feel like a common pattern is that for retrieving all objects like prices, we get back a response w/ a top-level array object which will contain an object for each price. And when we hit the individual API endpoint we get back a single top-level object representing the price.

The way this is written assumes that hitting the single record fetch_path will contain a top level list and extract each record into the records_iter. But now we'll be unable to extract anything, there won't be a first_record to extract.

Unless you think I'm missing something, it seems like we shouldn't be using the underlying extractor which is no longer suitable for individual records

response=response,
stream_state={},
records_schema=records_schema,
stream_slice=empty_stream_slice,
next_page_token=None,
)

first_record: Record | None = next(iter(records_iter), None)
if first_record:
return dict(first_record.data)

try:
response_body = response.json()
if isinstance(response_body, dict) and response_body:
return response_body
except Exception:
pass

raise RecordNotFoundException(
f"Record with primary key {pk_value} not found (empty response)"
)


def _deep_merge(
target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]]
Expand Down
19 changes: 19 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/abstract_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,22 @@ def check_availability(self) -> StreamAvailability:
"""
:return: If the stream is available and if not, why
"""

def fetch_record(self, pk_value: str) -> Mapping[str, Any]:
"""
Fetch a single record by primary key value.

Args:
pk_value: The primary key value as a string (e.g., "123")

Returns:
The fetched record as a dict

Raises:
NotImplementedError: If the stream doesn't support fetching individual records
RecordNotFoundException: If the record is not found
"""
raise NotImplementedError(
f"Stream {self.name} does not support fetching individual records. "
"Only declarative streams with SimpleRetriever currently support this operation."
)
Loading
Loading