Skip to content

Commit 4328f5f

Browse files
fix: Handle missing CDK fetch_record method gracefully with runtime detection
Co-Authored-By: AJ Steers <aj@airbyte.io>
1 parent 8dabe3d commit 4328f5f

File tree

4 files changed

+107
-75
lines changed

4 files changed

+107
-75
lines changed

airbyte/_executors/declarative.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,25 +148,37 @@ def fetch_record(
148148
config: dict[str, Any] | None = None,
149149
) -> dict[str, Any]:
150150
"""Fetch a single record from a stream by primary key.
151-
151+
152+
This method requires airbyte-python-cdk with fetch_record support
153+
(airbytehq/airbyte-python-cdk#846).
154+
152155
Args:
153156
stream_name: Name of the stream to fetch from
154157
pk_value: Primary key value as a string
155158
config: Source configuration (optional, uses instance config if not provided)
156-
159+
157160
Returns:
158161
The fetched record as a dict
159-
162+
160163
Raises:
164+
NotImplementedError: If the installed CDK doesn't support fetch_record
161165
ValueError: If the stream name is not found
162-
NotImplementedError: If the stream doesn't support fetching individual records
163166
RecordNotFoundException: If the record is not found
164167
"""
165168
merged_config = {**self._config_dict}
166169
if config:
167170
merged_config.update(config)
168-
169-
return self.declarative_source.fetch_record(
171+
172+
source = self.declarative_source
173+
fetch_record_method = getattr(source, "fetch_record", None)
174+
175+
if fetch_record_method is None:
176+
raise NotImplementedError(
177+
"The installed airbyte-python-cdk does not support fetch_record. "
178+
"This requires airbytehq/airbyte-python-cdk#846 to be merged and installed."
179+
)
180+
181+
return fetch_record_method(
170182
stream_name=stream_name,
171183
pk_value=pk_value,
172184
config=merged_config,

airbyte/sources/base.py

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
from airbyte import exceptions as exc
3232
from airbyte._connector_base import ConnectorBase
33+
from airbyte._executors.declarative import DeclarativeExecutor
3334
from airbyte._message_iterators import AirbyteMessageIterator
3435
from airbyte._util.temp_files import as_temp_files
3536
from airbyte.caches.util import get_default_cache
@@ -582,31 +583,31 @@ def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]
582583

583584
def _get_stream_primary_key(self, stream_name: str) -> list[str]:
584585
"""Get the primary key for a stream.
585-
586+
586587
Returns the primary key from the configured catalog, which includes user overrides.
587588
The Airbyte protocol represents primary keys as a list of field paths (list of lists).
588589
This method flattens single-field paths to simple strings.
589-
590+
590591
Args:
591592
stream_name: Name of the stream
592-
593+
593594
Returns:
594595
List of primary key field names (flattened from field paths)
595-
596+
596597
Raises:
597598
PyAirbyteInputError: If the stream doesn't exist
598599
"""
599600
configured_catalog = self.get_configured_catalog(streams=[stream_name])
600-
601+
601602
if len(configured_catalog.streams) == 0:
602603
raise exc.PyAirbyteInputError(
603604
message="Stream name does not exist in catalog.",
604605
input_value=stream_name,
605606
)
606-
607+
607608
configured_stream = configured_catalog.streams[0]
608609
pk_field_paths = configured_stream.primary_key or []
609-
610+
610611
return [field_path[0] for field_path in pk_field_paths if field_path]
611612

612613
def _normalize_and_validate_pk(
@@ -615,52 +616,53 @@ def _normalize_and_validate_pk(
615616
pk_value: LookupValue | dict[str, LookupValue],
616617
) -> str:
617618
"""Normalize and validate primary key input.
618-
619+
619620
Accepts either a direct primary key value or a dict with a single entry
620621
where the key matches the stream's primary key field name.
621-
622+
622623
Args:
623624
stream_name: Name of the stream
624625
pk_value: Either a direct PK value or a dict with single entry
625-
626+
626627
Returns:
627628
The primary key value as a string
628-
629+
629630
Raises:
630631
PyAirbyteInputError: If validation fails
631632
NotImplementedError: If the stream has composite or no primary key
632633
"""
633634
primary_key = self._get_stream_primary_key(stream_name)
634-
635+
635636
if len(primary_key) == 0:
636637
raise NotImplementedError(
637638
f"Stream '{stream_name}' does not have a primary key defined. "
638639
"Cannot fetch individual records without a primary key."
639640
)
640-
641+
641642
if len(primary_key) > 1:
642643
raise NotImplementedError(
643644
f"Stream '{stream_name}' has a composite primary key {primary_key}. "
644645
"Fetching by composite primary key is not yet supported."
645646
)
646-
647+
647648
pk_field = primary_key[0]
648-
649+
649650
if isinstance(pk_value, dict):
650651
if len(pk_value) != 1:
651652
raise exc.PyAirbyteInputError(
652653
message="When passing a dict for pk_value, it must contain exactly one entry.",
653-
input_value=pk_value,
654+
input_value=str(pk_value),
654655
context={
655656
"stream_name": stream_name,
656657
"expected_entries": 1,
657658
"actual_entries": len(pk_value),
659+
"pk_value": pk_value,
658660
},
659661
)
660-
662+
661663
dict_key = next(iter(pk_value.keys()))
662664
dict_value = pk_value[dict_key]
663-
665+
664666
if dict_key != pk_field:
665667
raise exc.PyAirbyteInputError(
666668
message="The key in the pk_value dict does not match the stream's primary key.",
@@ -671,9 +673,9 @@ def _normalize_and_validate_pk(
671673
"actual_key": dict_key,
672674
},
673675
)
674-
676+
675677
return str(dict_value)
676-
678+
677679
return str(pk_value)
678680

679681
def get_record(
@@ -683,45 +685,43 @@ def get_record(
683685
pk_value: LookupValue | dict[str, LookupValue],
684686
) -> dict[str, Any]:
685687
"""Fetch a single record from a stream by primary key.
686-
688+
687689
This method is only supported for declarative (YAML-based) sources.
688-
690+
689691
Args:
690692
stream_name: Name of the stream to fetch from
691693
pk_value: Either a direct primary key value (e.g., "123") or a dict
692694
with a single entry where the key is the primary key field name
693695
and the value is the primary key value (e.g., {"id": "123"}).
694696
The dict form provides explicit validation that you're using
695697
the correct primary key field.
696-
698+
697699
Returns:
698700
The fetched record as a dict
699-
701+
700702
Raises:
701703
NotImplementedError: If the source is not a declarative source, or if
702704
the stream has a composite primary key or no primary key
703705
PyAirbyteInputError: If the stream doesn't exist or pk_value validation fails
704706
RecordNotFoundException: If the record is not found (from CDK)
705-
707+
706708
Example:
707709
```python
708710
source = get_source("source-rest-api-tutorial", config=config)
709-
711+
710712
record = source.get_record("users", pk_value="123")
711-
713+
712714
record = source.get_record("users", pk_value={"id": "123"})
713715
```
714716
"""
715-
from airbyte._executors.declarative import DeclarativeExecutor
716-
717717
if not isinstance(self.executor, DeclarativeExecutor):
718718
raise NotImplementedError(
719719
f"get_record() is only supported for declarative sources. "
720720
f"This source uses {type(self.executor).__name__}."
721721
)
722-
722+
723723
validated_pk_str = self._normalize_and_validate_pk(stream_name, pk_value)
724-
724+
725725
return self.executor.fetch_record(
726726
stream_name=stream_name,
727727
pk_value=validated_pk_str,

poetry.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)