Skip to content

Commit 16d5e80

Browse files
feat: Add scanning capability to get_record() and new fetch_source_stream_record MCP tool
- Add allow_scanning and scan_timeout_seconds parameters to get_record() - When allow_scanning=True, iterate through get_records() to find matching records - Allow searching by non-primary-key fields when scanning is enabled - Implement time-based timeout with configurable scan_timeout_seconds - Update _normalize_and_validate_pk() to support strict_pk_field parameter - Add fetch_source_stream_record MCP tool for single record fetching - Add comprehensive unit tests for scanning feature (8 new tests) - All tests passing (23/23) Co-Authored-By: AJ Steers <aj@airbyte.io>
1 parent ff8fe70 commit 16d5e80

File tree

3 files changed

+379
-43
lines changed

3 files changed

+379
-43
lines changed

airbyte/mcp/local_ops.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,124 @@ def read_source_stream_records(
461461
return records
462462

463463

464+
@mcp_tool(
465+
domain="local",
466+
read_only=True,
467+
extra_help_text=_CONFIG_HELP,
468+
)
469+
def fetch_source_stream_record( # noqa: PLR0913
470+
source_connector_name: Annotated[
471+
str,
472+
Field(description="The name of the source connector."),
473+
],
474+
config: Annotated[
475+
dict | str | None,
476+
Field(
477+
description="The configuration for the source connector as a dict or JSON string.",
478+
default=None,
479+
),
480+
],
481+
config_file: Annotated[
482+
str | Path | None,
483+
Field(
484+
description="Path to a YAML or JSON file containing the source connector config.",
485+
default=None,
486+
),
487+
],
488+
config_secret_name: Annotated[
489+
str | None,
490+
Field(
491+
description="The name of the secret containing the configuration.",
492+
default=None,
493+
),
494+
],
495+
*,
496+
stream_name: Annotated[
497+
str,
498+
Field(description="The name of the stream to fetch the record from."),
499+
],
500+
pk_value: Annotated[
501+
str | dict[str, Any],
502+
Field(
503+
description="Either a primary key value as a string (e.g., '123') or a dict "
504+
"with a single entry where the key is the field name and the value is the field value "
505+
"(e.g., {'id': '123'} or {'email': 'user@example.com'}). "
506+
"When allow_scanning=False, dict keys must match the primary key. "
507+
"When allow_scanning=True, dict keys can be any field name.",
508+
),
509+
],
510+
allow_scanning: Annotated[
511+
bool,
512+
Field(
513+
description="When True, enables scanning through records to find a match. "
514+
"This allows searching by non-primary-key fields but is slower and may not find "
515+
"the record if it's not in the first scan_timeout_seconds of data. Default: False.",
516+
default=False,
517+
),
518+
],
519+
scan_timeout_seconds: Annotated[
520+
int,
521+
Field(
522+
description="Maximum time in seconds to scan for a record when allow_scanning=True. "
523+
"Default: 5 seconds.",
524+
default=5,
525+
),
526+
],
527+
override_execution_mode: Annotated[
528+
Literal["docker", "python", "yaml", "auto"],
529+
Field(
530+
description="Optionally override the execution method to use for the connector. "
531+
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
532+
default="auto",
533+
),
534+
],
535+
manifest_path: Annotated[
536+
str | Path | None,
537+
Field(
538+
description="Path to a local YAML manifest file for declarative connectors.",
539+
default=None,
540+
),
541+
],
542+
) -> dict[str, Any] | str:
543+
"""Fetch a single record from a source connector by primary key or field value.
544+
545+
This tool is only supported for declarative (YAML-based) sources.
546+
"""
547+
try:
548+
source: Source = _get_mcp_source(
549+
connector_name=source_connector_name,
550+
override_execution_mode=override_execution_mode,
551+
manifest_path=manifest_path,
552+
)
553+
config_dict = resolve_config(
554+
config=config,
555+
config_file=config_file,
556+
config_secret_name=config_secret_name,
557+
config_spec_jsonschema=source.config_spec,
558+
)
559+
source.set_config(config_dict)
560+
561+
record = source.get_record(
562+
stream_name=stream_name,
563+
pk_value=pk_value,
564+
allow_scanning=allow_scanning,
565+
scan_timeout_seconds=scan_timeout_seconds,
566+
)
567+
568+
print(
569+
f"Retrieved record with {pk_value} from stream '{stream_name}'",
570+
file=sys.stderr,
571+
)
572+
573+
except Exception as ex:
574+
tb_str = traceback.format_exc()
575+
return (
576+
f"Error fetching record from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
577+
)
578+
else:
579+
return record
580+
581+
464582
@mcp_tool(
465583
domain="local",
466584
read_only=True,

airbyte/sources/base.py

Lines changed: 113 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -614,38 +614,42 @@ def _normalize_and_validate_pk(
614614
self,
615615
stream_name: str,
616616
pk_value: LookupValue | dict[str, LookupValue],
617-
) -> str:
617+
*,
618+
strict_pk_field: bool = True,
619+
) -> tuple[str, str]:
618620
"""Normalize and validate primary key input.
619621
620622
Accepts either a direct primary key value or a dict with a single entry
621-
where the key matches the stream's primary key field name.
623+
where the key matches the stream's primary key field name (when strict_pk_field=True).
622624
623625
Args:
624626
stream_name: Name of the stream
625627
pk_value: Either a direct PK value or a dict with single entry
628+
strict_pk_field: When True, dict keys must match the stream's primary key.
629+
When False (for scanning), dict keys can be any field name.
626630
627631
Returns:
628-
The primary key value as a string
632+
A tuple of (field_name, field_value) as strings
629633
630634
Raises:
631635
PyAirbyteInputError: If validation fails
632636
NotImplementedError: If the stream has composite or no primary key
637+
(when strict_pk_field=True)
633638
"""
634639
primary_key = self._get_stream_primary_key(stream_name)
635640

636-
if len(primary_key) == 0:
637-
raise NotImplementedError(
638-
f"Stream '{stream_name}' does not have a primary key defined. "
639-
"Cannot fetch individual records without a primary key."
640-
)
641-
642-
if len(primary_key) > 1:
643-
raise NotImplementedError(
644-
f"Stream '{stream_name}' has a composite primary key {primary_key}. "
645-
"Fetching by composite primary key is not yet supported."
646-
)
641+
if strict_pk_field:
642+
if len(primary_key) == 0:
643+
raise NotImplementedError(
644+
f"Stream '{stream_name}' does not have a primary key defined. "
645+
"Cannot fetch individual records without a primary key."
646+
)
647647

648-
pk_field = primary_key[0]
648+
if len(primary_key) > 1:
649+
raise NotImplementedError(
650+
f"Stream '{stream_name}' has a composite primary key {primary_key}. "
651+
"Fetching by composite primary key is not yet supported."
652+
)
649653

650654
if isinstance(pk_value, dict):
651655
if len(pk_value) != 1:
@@ -663,55 +667,88 @@ def _normalize_and_validate_pk(
663667
dict_key = next(iter(pk_value.keys()))
664668
dict_value = pk_value[dict_key]
665669

666-
if dict_key != pk_field:
667-
raise exc.PyAirbyteInputError(
668-
message="The key in the pk_value dict does not match the stream's primary key.",
669-
input_value=dict_key,
670-
context={
671-
"stream_name": stream_name,
672-
"expected_key": pk_field,
673-
"actual_key": dict_key,
674-
},
675-
)
670+
if strict_pk_field:
671+
pk_field = primary_key[0]
672+
if dict_key != pk_field:
673+
raise exc.PyAirbyteInputError(
674+
message=(
675+
"The key in the pk_value dict does not match "
676+
"the stream's primary key."
677+
),
678+
input_value=dict_key,
679+
context={
680+
"stream_name": stream_name,
681+
"expected_key": pk_field,
682+
"actual_key": dict_key,
683+
},
684+
)
676685

677-
return str(dict_value)
686+
return dict_key, str(dict_value)
678687

679-
return str(pk_value)
688+
if len(primary_key) == 0:
689+
raise exc.PyAirbyteInputError(
690+
message=(
691+
"When passing a scalar pk_value, the stream must have a primary key defined. "
692+
"Use a dict to specify the field name explicitly."
693+
),
694+
input_value=str(pk_value),
695+
context={
696+
"stream_name": stream_name,
697+
},
698+
)
699+
700+
return primary_key[0], str(pk_value)
680701

681702
def get_record(
682703
self,
683704
stream_name: str,
684705
*,
685706
pk_value: LookupValue | dict[str, LookupValue],
707+
allow_scanning: bool = False,
708+
scan_timeout_seconds: int = 5,
686709
) -> dict[str, Any]:
687-
"""Fetch a single record from a stream by primary key.
710+
"""Fetch a single record from a stream by primary key or field value.
688711
689712
This method is only supported for declarative (YAML-based) sources.
690713
691714
Args:
692715
stream_name: Name of the stream to fetch from
693716
pk_value: Either a direct primary key value (e.g., "123") or a dict
694-
with a single entry where the key is the primary key field name
695-
and the value is the primary key value (e.g., {"id": "123"}).
696-
The dict form provides explicit validation that you're using
697-
the correct primary key field.
717+
with a single entry where the key is the field name and the value
718+
is the field value (e.g., {"id": "123"} or {"email": "user@example.com"}).
719+
When allow_scanning=False, dict keys must match the primary key.
720+
When allow_scanning=True, dict keys can be any field name.
721+
allow_scanning: When True, enables scanning through records to find a match.
722+
This allows searching by non-primary-key fields but is slower and may
723+
not find the record if it's not in the first scan_timeout_seconds of data.
724+
Default: False (use fast primary key lookup only).
725+
scan_timeout_seconds: Maximum time in seconds to scan for a record when
726+
allow_scanning=True. Default: 5 seconds.
698727
699728
Returns:
700729
The fetched record as a dict
701730
702731
Raises:
703732
NotImplementedError: If the source is not a declarative source, or if
704733
the stream has a composite primary key or no primary key
734+
(when allow_scanning=False)
705735
PyAirbyteInputError: If the stream doesn't exist or pk_value validation fails
706-
RecordNotFoundException: If the record is not found (from CDK)
736+
ValueError: If the record is not found within the timeout period
737+
(when allow_scanning=True)
707738
708739
Example:
709740
```python
710741
source = get_source("source-rest-api-tutorial", config=config)
711742
712743
record = source.get_record("users", pk_value="123")
713-
714744
record = source.get_record("users", pk_value={"id": "123"})
745+
746+
record = source.get_record(
747+
"users",
748+
pk_value={"email": "user@example.com"},
749+
allow_scanning=True,
750+
scan_timeout_seconds=10,
751+
)
715752
```
716753
"""
717754
if not isinstance(self.executor, DeclarativeExecutor):
@@ -720,12 +757,49 @@ def get_record(
720757
f"This source uses {type(self.executor).__name__}."
721758
)
722759

723-
validated_pk_str = self._normalize_and_validate_pk(stream_name, pk_value)
760+
field_name, field_value = self._normalize_and_validate_pk(
761+
stream_name, pk_value, strict_pk_field=not allow_scanning
762+
)
763+
764+
# Try fast path first if the field is the primary key
765+
primary_key = self._get_stream_primary_key(stream_name)
766+
if len(primary_key) == 1 and field_name == primary_key[0]:
767+
try:
768+
return self.executor.fetch_record(
769+
stream_name=stream_name,
770+
pk_value=field_value,
771+
config=self._config_dict,
772+
)
773+
except Exception:
774+
if not allow_scanning:
775+
raise
776+
777+
if not allow_scanning:
778+
raise ValueError(
779+
f"Record with {field_name}={field_value} not found in stream '{stream_name}'. "
780+
"Consider using allow_scanning=True to search by non-primary-key fields."
781+
)
782+
783+
start_time = time.time()
784+
records_checked = 0
785+
786+
for record in self.get_records(stream_name):
787+
elapsed = time.time() - start_time
788+
if elapsed > scan_timeout_seconds:
789+
raise ValueError(
790+
f"Record with {field_name}={field_value} not found in stream '{stream_name}' "
791+
f"within {scan_timeout_seconds} seconds (checked {records_checked} records). "
792+
"Consider increasing scan_timeout_seconds or using a primary key lookup."
793+
)
794+
795+
records_checked += 1
796+
797+
if field_name in record and str(record[field_name]) == field_value:
798+
return record
724799

725-
return self.executor.fetch_record(
726-
stream_name=stream_name,
727-
pk_value=validated_pk_str,
728-
config=self._config_dict,
800+
raise ValueError(
801+
f"Record with {field_name}={field_value} not found in stream '{stream_name}' "
802+
f"(checked {records_checked} records in {time.time() - start_time:.1f} seconds)."
729803
)
730804

731805
def get_documents(

0 commit comments

Comments
 (0)