Skip to content

Commit ac23073

Browse files
feat(integrations): Add Sonar connector support
- Add new Integration class that inherits from ConnectorBase - Create SonarExecutor for YAML-based connector execution - Add get_integration() factory function - Support Sonar verbs (get, list, create, update, delete, retrieve, search) - Add connector-sdk as optional dependency in integrations group - Include comprehensive unit tests for Integration and SonarExecutor - Update ConnectorBase to support 'integration' connector type - Add asyncio pytest marker to configuration This enables PyAirbyte to run Sonar YAML-only connectors as a new connector type called 'integrations', which use different verbs than traditional sources/destinations and leverage the Sonar connector-sdk for dynamic execution. Co-Authored-By: AJ Steers <aj@airbyte.io>
1 parent 2981b3d commit ac23073

File tree

11 files changed

+1213
-31
lines changed

11 files changed

+1213
-31
lines changed

airbyte/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@
132132
from airbyte.datasets import CachedDataset
133133
from airbyte.destinations.base import Destination
134134
from airbyte.destinations.util import get_destination
135+
from airbyte.integrations.base import Integration
136+
from airbyte.integrations.util import get_integration
135137
from airbyte.records import StreamRecord
136138
from airbyte.registry import get_available_connectors
137139
from airbyte.results import ReadResult, WriteResult
@@ -154,6 +156,7 @@
154156
documents,
155157
exceptions, # noqa: ICN001 # No 'exc' alias for top-level module
156158
experimental,
159+
integrations,
157160
logs,
158161
mcp,
159162
records,
@@ -175,6 +178,7 @@
175178
"documents",
176179
"exceptions",
177180
"experimental",
181+
"integrations",
178182
"logs",
179183
"mcp",
180184
"records",
@@ -187,6 +191,7 @@
187191
"get_colab_cache",
188192
"get_default_cache",
189193
"get_destination",
194+
"get_integration",
190195
"get_secret",
191196
"get_source",
192197
"new_local_cache",
@@ -195,6 +200,7 @@
195200
"CachedDataset",
196201
"Destination",
197202
"DuckDBCache",
203+
"Integration",
198204
"ReadResult",
199205
"SecretSourceEnum",
200206
"Source",

airbyte/_connector_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
class ConnectorBase(abc.ABC):
5555
"""A class representing a destination that can be called."""
5656

57-
connector_type: Literal["destination", "source"]
57+
connector_type: Literal["destination", "source", "integration"]
5858

5959
def __init__(
6060
self,

airbyte/_executors/sonar.py

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
"""Sonar Integration Executor for YAML-based connectors."""
3+
4+
from __future__ import annotations
5+
6+
from pathlib import Path
7+
from typing import TYPE_CHECKING
8+
9+
from airbyte import exceptions as exc
10+
from airbyte._executors.base import Executor
11+
12+
13+
if TYPE_CHECKING:
14+
from collections.abc import Iterator
15+
from typing import Any
16+
17+
from airbyte.registry import ConnectorMetadata
18+
19+
20+
class SonarExecutor(Executor):
21+
"""Executor for Sonar YAML-based integration connectors.
22+
23+
This executor wraps the connector-sdk's ConnectorExecutor to run
24+
YAML-defined connectors without subprocess execution.
25+
"""
26+
27+
def __init__(
28+
self,
29+
*,
30+
name: str,
31+
yaml_path: Path | str,
32+
secrets: dict[str, Any] | None = None,
33+
enable_logging: bool = False,
34+
log_file: str | None = None,
35+
metadata: ConnectorMetadata | None = None,
36+
target_version: str | None = None,
37+
) -> None:
38+
"""Initialize Sonar executor.
39+
40+
Args:
41+
name: Connector name
42+
yaml_path: Path to connector YAML definition
43+
secrets: Secret credentials (will be converted to SecretStr)
44+
enable_logging: Enable request/response logging
45+
log_file: Path to log file
46+
metadata: Connector metadata (optional)
47+
target_version: Target version (not used for YAML connectors)
48+
"""
49+
super().__init__(name=name, metadata=metadata, target_version=target_version)
50+
51+
self.yaml_path = Path(yaml_path)
52+
self.secrets = secrets or {}
53+
self.enable_logging = enable_logging
54+
self.log_file = log_file
55+
self._executor: Any = None # connector_sdk.ConnectorExecutor
56+
57+
if not self.yaml_path.exists():
58+
raise exc.PyAirbyteInputError(
59+
message=f"Connector YAML file not found: {yaml_path}",
60+
input_value=str(yaml_path),
61+
)
62+
63+
def _get_executor(self) -> Any: # noqa: ANN401
64+
"""Lazily create connector-sdk executor.
65+
66+
Returns:
67+
ConnectorExecutor instance
68+
69+
Raises:
70+
PyAirbyteInputError: If connector-sdk is not installed
71+
"""
72+
if self._executor is None:
73+
try:
74+
from connector_sdk import ConnectorExecutor # noqa: PLC0415
75+
from connector_sdk.secrets import SecretStr # noqa: PLC0415
76+
except ImportError as ex:
77+
raise exc.PyAirbyteInputError(
78+
message=(
79+
"connector-sdk is required for Sonar integrations. "
80+
"Install it with: poetry install --with integrations"
81+
),
82+
guidance="Run: poetry install --with integrations",
83+
) from ex
84+
85+
# Convert secrets to SecretStr
86+
secret_dict = {
87+
key: SecretStr(value) if not isinstance(value, SecretStr) else value
88+
for key, value in self.secrets.items()
89+
}
90+
91+
self._executor = ConnectorExecutor(
92+
config_path=str(self.yaml_path),
93+
secrets=secret_dict,
94+
enable_logging=self.enable_logging,
95+
log_file=self.log_file,
96+
)
97+
98+
return self._executor
99+
100+
@property
101+
def _cli(self) -> list[str]:
102+
"""Get CLI args (not used for Sonar executor).
103+
104+
This property is required by the Executor base class but is not
105+
used for YAML-based connectors.
106+
"""
107+
return []
108+
109+
def execute(
110+
self,
111+
args: list[str],
112+
*,
113+
stdin: Any = None, # noqa: ANN401
114+
suppress_stderr: bool = False,
115+
) -> Iterator[str]:
116+
"""Execute is not supported for Sonar connectors.
117+
118+
Sonar connectors use async execute methods instead of subprocess execution.
119+
Use Integration.execute() or Integration.aexecute() instead.
120+
121+
Raises:
122+
NotImplementedError: Always raised
123+
"""
124+
raise NotImplementedError(
125+
"Sonar connectors do not support subprocess execution. "
126+
"Use Integration.execute() or Integration.aexecute() instead."
127+
)
128+
129+
async def aexecute(
130+
self,
131+
resource: str,
132+
verb: str,
133+
params: dict[str, Any] | None = None,
134+
) -> dict[str, Any]:
135+
"""Execute a verb on a resource asynchronously.
136+
137+
Args:
138+
resource: Resource name (e.g., "customers")
139+
verb: Verb to execute (e.g., "list", "get", "create")
140+
params: Parameters for the operation
141+
142+
Returns:
143+
API response as dictionary
144+
"""
145+
executor = self._get_executor()
146+
return await executor.execute(resource, verb, params)
147+
148+
async def aexecute_batch(
149+
self,
150+
operations: list[tuple[str, str, dict[str, Any] | None]],
151+
) -> list[dict[str, Any]]:
152+
"""Execute multiple operations concurrently.
153+
154+
Args:
155+
operations: List of (resource, verb, params) tuples
156+
157+
Returns:
158+
List of responses in the same order as operations
159+
"""
160+
executor = self._get_executor()
161+
return await executor.execute_batch(operations)
162+
163+
def ensure_installation(self, *, auto_fix: bool = True) -> None:
164+
"""Ensure connector is available (no-op for YAML connectors)."""
165+
pass
166+
167+
def install(self) -> None:
168+
"""Install connector (no-op for YAML connectors)."""
169+
pass
170+
171+
def uninstall(self) -> None:
172+
"""Uninstall connector (no-op for YAML connectors)."""
173+
pass
174+
175+
def get_installed_version(
176+
self,
177+
*,
178+
raise_on_error: bool = False,
179+
recheck: bool = False, # noqa: ARG002
180+
) -> str | None:
181+
"""Get connector version from YAML metadata.
182+
183+
Returns:
184+
Version string if available in YAML, None otherwise
185+
"""
186+
try:
187+
from connector_sdk.config_loader import load_connector_config # noqa: PLC0415
188+
189+
config = load_connector_config(str(self.yaml_path))
190+
except Exception:
191+
if raise_on_error:
192+
raise
193+
return None
194+
else:
195+
return config.connector.version if config.connector else None
196+
197+
198+
__all__ = [
199+
"SonarExecutor",
200+
]

airbyte/integrations/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
"""Sonar Integration support for PyAirbyte."""
3+
4+
from airbyte.integrations.base import Integration
5+
from airbyte.integrations.util import get_integration
6+
7+
8+
__all__ = [
9+
"Integration",
10+
"get_integration",
11+
]

0 commit comments

Comments
 (0)