diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 781bb64d1..e146cacc3 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -77,6 +77,7 @@ ModelToComponentFactory, ) from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING +from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever from airbyte_cdk.sources.declarative.spec.spec import Spec from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository @@ -507,6 +508,62 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) return AirbyteConnectionStatus(status=Status.SUCCEEDED) + def fetch_record( + self, + stream_name: str, + pk_value: str, + config: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + """ + Fetch a single record from a stream by primary key. + + Args: + stream_name: Name of the stream to fetch from + pk_value: Primary key value to fetch as a string (e.g., "123") + config: Source configuration (optional, uses instance config if not provided) + + Returns: + The fetched record as a dict + + Raises: + ValueError: If the stream name is not found in the source + NotImplementedError: If the stream doesn't support fetching individual records + RecordNotFoundException: If the record is not found + """ + config = config or self._config + + stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams + + stream_config = None + for config_item in stream_configs: + if config_item.get("name") == stream_name: + stream_config = config_item + break + + if not stream_config: + available_streams = [c.get("name") for c in stream_configs] + raise ValueError( + f"Stream '{stream_name}' not found in source. " + f"Available streams: {', '.join(available_streams)}" + ) + + declarative_stream = self._constructor.create_component( + DeclarativeStreamModel, + stream_config, + config, + emit_connector_builder_messages=self._emit_connector_builder_messages, + ) + + if not isinstance(declarative_stream.retriever, SimpleRetriever): + raise NotImplementedError( + f"Stream '{stream_name}' does not support fetching individual records. " + "Only streams with SimpleRetriever currently support this operation." + ) + + return declarative_stream.retriever.fetch_one( + pk_value=pk_value, records_schema=declarative_stream.get_json_schema() + ) + @property def dynamic_streams(self) -> List[Dict[str, Any]]: return self._dynamic_stream_configs( diff --git a/airbyte_cdk/sources/declarative/exceptions.py b/airbyte_cdk/sources/declarative/exceptions.py index ca67c6a55..0d4ad6a59 100644 --- a/airbyte_cdk/sources/declarative/exceptions.py +++ b/airbyte_cdk/sources/declarative/exceptions.py @@ -7,3 +7,7 @@ class ReadException(Exception): """ Raise when there is an error reading data from an API Source """ + + +class RecordNotFoundException(ReadException): + """Raised when a requested record is not found (e.g., 404 response).""" diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index a30574107..24de7439b 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -26,6 +26,7 @@ from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.models import AirbyteMessage +from airbyte_cdk.sources.declarative.exceptions import RecordNotFoundException from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( @@ -626,6 +627,109 @@ def _to_partition_key(to_serialize: Any) -> str: # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True) + def fetch_one( + self, + pk_value: str, + records_schema: Mapping[str, Any], + ) -> Mapping[str, Any]: + """Fetch a single record by primary key value. + + This method constructs a path by appending the primary key value to the base path + and sends a GET request to fetch a single record. It's designed for REST APIs that + follow the convention: GET /resource/{id} + + Args: + pk_value: The primary key value to fetch as a string (e.g., "123") + records_schema: JSON schema describing the record structure + + Returns: + The fetched record as a dict. + + Raises: + RecordNotFoundException: If the response is empty/ignored or parsing yields no records. + Exception: HTTP errors (including 404) are propagated from requester's error handling. + + Example: + record = retriever.fetch_one("123", schema) + + Note: + This implementation uses convention-based path construction, appending /{pk_value} to the base path. (important-comment) + + Alternative approaches that could be implemented in the future: (important-comment) + - Path template interpolation: Use a configurable template like "{base_path}/{id}" (important-comment) + See: https://github.com/airbytehq/airbyte-python-cdk/issues/833#phase-1a (important-comment) + - Field path configuration: Allow specifying which response field contains the record (important-comment) + for APIs that wrap single records in envelopes like {"data": {...}} (important-comment) + """ + # Single-record fetch doesn't involve partitioning, so we pass an empty StreamSlice + empty_stream_slice = StreamSlice(partition={}, cursor_slice={}) + + # Get the base path from the requester + base_path = self.requester.get_path( + stream_state={}, + stream_slice=empty_stream_slice, + next_page_token=None, + ) + + fetch_path = f"{base_path.rstrip('/')}/{pk_value.lstrip('/')}" + + # send_request() may return None when the error handler chooses to IGNORE a response + response: requests.Response | None = self.requester.send_request( + path=fetch_path, + stream_state={}, + stream_slice=empty_stream_slice, + next_page_token=None, + request_headers=self._request_headers( + stream_state={}, + stream_slice=empty_stream_slice, + next_page_token=None, + ), + request_params=self._request_params( + stream_state={}, + stream_slice=empty_stream_slice, + next_page_token=None, + ), + request_body_data=self._request_body_data( + stream_state={}, + stream_slice=empty_stream_slice, + next_page_token=None, + ), + request_body_json=self._request_body_json( + stream_state={}, + stream_slice=empty_stream_slice, + next_page_token=None, + ), + log_formatter=self.log_formatter, + ) + + if not response: + raise RecordNotFoundException( + f"Record with primary key {pk_value} not found (no response)" + ) + + records_iter: Iterable[Record] = self._parse_response( + response=response, + stream_state={}, + records_schema=records_schema, + stream_slice=empty_stream_slice, + next_page_token=None, + ) + + first_record: Record | None = next(iter(records_iter), None) + if first_record: + return dict(first_record.data) + + try: + response_body = response.json() + if isinstance(response_body, dict) and response_body: + return response_body + except Exception: + pass + + raise RecordNotFoundException( + f"Record with primary key {pk_value} not found (empty response)" + ) + def _deep_merge( target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]] diff --git a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py index 667d088ab..c2d633a4a 100644 --- a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py @@ -90,3 +90,22 @@ def check_availability(self) -> StreamAvailability: """ :return: If the stream is available and if not, why """ + + def fetch_record(self, pk_value: str) -> Mapping[str, Any]: + """ + Fetch a single record by primary key value. + + Args: + pk_value: The primary key value as a string (e.g., "123") + + Returns: + The fetched record as a dict + + Raises: + NotImplementedError: If the stream doesn't support fetching individual records + RecordNotFoundException: If the record is not found + """ + raise NotImplementedError( + f"Stream {self.name} does not support fetching individual records. " + "Only declarative streams with SimpleRetriever currently support this operation." + ) diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 5caec9a34..12d72ad9b 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -24,6 +24,7 @@ ) from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth from airbyte_cdk.sources.declarative.decoders import JsonDecoder +from airbyte_cdk.sources.declarative.exceptions import RecordNotFoundException from airbyte_cdk.sources.declarative.extractors import DpathExtractor, HttpSelector, RecordSelector from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, Paginator @@ -1644,3 +1645,190 @@ def _mock_paginator(): paginator.get_request_body_data.__name__ = "get_request_body_data" paginator.get_request_body_json.__name__ = "get_request_body_json" return paginator + + +def testfetch_one_simple_pk(): + """Test fetch_one with a simple string primary key.""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"id": "123", "title": "Test Post"}).encode("utf-8") + + requester.send_request.return_value = response + + record_selector = MagicMock() + record_selector.select_records.return_value = [ + Record(data={"id": "123", "title": "Test Post"}, stream_name="posts", associated_slice=None) + ] + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + result = retriever.fetch_one("123", records_schema={}) + + assert result == {"id": "123", "title": "Test Post"} + requester.send_request.assert_called_once() + call_kwargs = requester.send_request.call_args[1] + assert call_kwargs["path"] == "posts/123" + + +def test_fetch_one_not_found(): + """Test fetch_one raises RecordNotFoundException when record is not found (404).""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + error = Exception("Not found") + error.response = MagicMock() + error.response.status_code = 404 + requester.send_request.side_effect = error + + record_selector = MagicMock() + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + with pytest.raises(RecordNotFoundException) as exc_info: + retriever.fetch_one("999", records_schema={}) + + assert "999" in str(exc_info.value) + +def test_fetch_one_server_error(): + """Test fetch_one propagates non-404 errors.""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + error = Exception("Server error") + error.response = MagicMock() + error.response.status_code = 500 + requester.send_request.side_effect = error + + record_selector = MagicMock() + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + with pytest.raises(Exception) as exc_info: + retriever.fetch_one("123", records_schema={}) + + assert "Server error" in str(exc_info.value) + +def testfetch_one_invalid_pk_type(): + """Test fetch_one with non-string pk_value (should fail type checking but test runtime behavior).""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + record_selector = MagicMock() + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + with pytest.raises(AttributeError): + retriever.fetch_one(123, records_schema={}) # type: ignore + + +def testfetch_one_no_response(): + """Test fetch_one raises RecordNotFoundException when response is None.""" + requester = MagicMock() + requester.get_path.return_value = "posts" + requester.send_request.return_value = None + + record_selector = MagicMock() + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + with pytest.raises(RecordNotFoundException) as exc_info: + retriever.fetch_one("123", records_schema={}) + + assert "123" in str(exc_info.value) + + +def test_fetch_one_empty_records(): + """Test fetch_one raises RecordNotFoundException when response is truly empty.""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({}).encode("utf-8") + + requester.send_request.return_value = response + + record_selector = MagicMock() + record_selector.select_records.return_value = [] + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + with pytest.raises(RecordNotFoundException) as exc_info: + retriever.fetch_one("123", records_schema={}) + + assert "123" in str(exc_info.value) + +def testfetch_one_single_object_response(): + """Test fetch_one handles single object responses (most common pattern for GET /resource/{id}).""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"id": "123", "title": "Test Post"}).encode("utf-8") + + requester.send_request.return_value = response + + record_selector = MagicMock() + record_selector.select_records.return_value = [] + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + result = retriever.fetch_one("123", records_schema={}) + + assert result == {"id": "123", "title": "Test Post"} + requester.send_request.assert_called_once() + call_kwargs = requester.send_request.call_args[1] + assert call_kwargs["path"] == "posts/123"