Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1418,3 +1418,53 @@ def get_connector_builder_project_for_definition_id(
client_secret=client_secret,
)
return json_result.get("builderProjectId")


def get_definition_id_for_connector_builder_project(
*,
workspace_id: str,
builder_project_id: str,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> str:
"""Get the source definition ID for a connector builder project.

Uses the Config API endpoint:
/v1/connector_builder_projects/get_with_manifest

See: https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L11900

Args:
workspace_id: The workspace ID
builder_project_id: The connector builder project ID
api_root: The API root URL
client_id: OAuth client ID
client_secret: OAuth client secret

Returns:
The source definition ID

Raises:
AirbyteError: If no definition is found for the given builder project ID
"""
json_result = _make_config_api_request(
path="/connector_builder_projects/get_with_manifest",
json={
"builderProjectId": builder_project_id,
"workspaceId": workspace_id,
},
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)
definition_id = json_result.get("sourceDefinitionId")
if definition_id is None:
raise AirbyteError(
message="No source definition found for the given connector builder project",
context={
"workspace_id": workspace_id,
"builder_project_id": builder_project_id,
},
)
return definition_id
84 changes: 74 additions & 10 deletions airbyte/cloud/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,30 +583,94 @@ def list_custom_source_definitions(

def get_custom_source_definition(
self,
definition_id: str,
definition_id: str | None = None,
*,
definition_type: Literal["yaml", "docker"],
connector_builder_project_id: str | None = None,
) -> CustomCloudSourceDefinition:
"""Get a specific custom source definition by ID.
"""Get a specific custom source definition by ID or builder project ID.

Args:
definition_id: The definition ID
definition_id: The definition ID. Mutually exclusive with
`connector_builder_project_id`.
definition_type: Connector type ("yaml" or "docker"). Required.
connector_builder_project_id: The connector builder project ID.
Mutually exclusive with `definition_id`

Returns:
CustomCloudSourceDefinition object

Raises:
PyAirbyteInputError: If both or neither parameters are provided, or if
connector_builder_project_id is used with non-yaml definition_type
"""
if definition_type == "yaml":
result = api_util.get_custom_yaml_source_definition(
if definition_type != "yaml":
raise NotImplementedError(
"Docker custom source definitions are not yet supported. "
"Only YAML manifest-based custom sources are currently available."
)

if (definition_id is None) == (connector_builder_project_id is None):
raise exc.PyAirbyteInputError(
message=(
"Must specify EXACTLY ONE of definition_id or connector_builder_project_id"
),
context={
"definition_id": definition_id,
"connector_builder_project_id": connector_builder_project_id,
},
)

if connector_builder_project_id:
if definition_type != "yaml":
raise exc.PyAirbyteInputError(
message="connector_builder_project_id is only valid for yaml definition_type",
context={
"definition_type": definition_type,
"connector_builder_project_id": connector_builder_project_id,
},
)
definition_id = api_util.get_definition_id_for_connector_builder_project(
Comment on lines +607 to +633
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reorder validation so Docker calls hit the intended PyAirbyteInputError

Right now the definition_type != "yaml" guard fires before we evaluate the mutually exclusive parameter check, so calling with definition_type="docker" and a connector_builder_project_id raises NotImplementedError instead of the documented PyAirbyteInputError. Could we move the XOR validation (and the builder-project/type check) ahead of the NotImplementedError so we emit the expected input error? Something along these lines might help, wdyt?

-        if definition_type != "yaml":
-            raise NotImplementedError(
-                "Docker custom source definitions are not yet supported. "
-                "Only YAML manifest-based custom sources are currently available."
-            )
-
-        if (definition_id is None) == (connector_builder_project_id is None):
+        if (definition_id is None) == (connector_builder_project_id is None):
             raise exc.PyAirbyteInputError(
                 message=(
                     "Must specify EXACTLY ONE of definition_id or connector_builder_project_id"
                 ),
                 context={
                     "definition_id": definition_id,
                     "connector_builder_project_id": connector_builder_project_id,
                 },
             )

-        if connector_builder_project_id:
-            if definition_type != "yaml":
-                raise exc.PyAirbyteInputError(
-                    message="connector_builder_project_id is only valid for yaml definition_type",
-                    context={
-                        "definition_type": definition_type,
-                        "connector_builder_project_id": connector_builder_project_id,
-                    },
-                )
+        if connector_builder_project_id and definition_type != "yaml":
+            raise exc.PyAirbyteInputError(
+                message="connector_builder_project_id is only valid for yaml definition_type",
+                context={
+                    "definition_type": definition_type,
+                    "connector_builder_project_id": connector_builder_project_id,
+                },
+            )
+
+        if definition_type != "yaml":
+            raise NotImplementedError(
+                "Docker custom source definitions are not yet supported. "
+                "Only YAML manifest-based custom sources are currently available."
+            )
+
+        if connector_builder_project_id:
             definition_id = api_util.get_definition_id_for_connector_builder_project(
                 workspace_id=self.workspace_id,
                 builder_project_id=connector_builder_project_id,
                 api_root=self.api_root,
                 client_id=self.client_id,
                 client_secret=self.client_secret,
             )
🤖 Prompt for AI Agents
In airbyte/cloud/workspaces.py between lines 607 and 633, reorder the validation
logic so that the XOR check for mutually exclusive definition_id and
connector_builder_project_id parameters happens before the definition_type
"yaml" check. This means first validate that exactly one of definition_id or
connector_builder_project_id is provided and that connector_builder_project_id
is only valid with "yaml" definition_type. Only after these validations pass,
check if definition_type is not "yaml" to raise NotImplementedError. This way,
calls with definition_type "docker" and connector_builder_project_id correctly
raise PyAirbyteInputError as expected.

workspace_id=self.workspace_id,
definition_id=definition_id,
builder_project_id=connector_builder_project_id,
api_root=self.api_root,
client_id=self.client_id,
client_secret=self.client_secret,
)
return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001

raise NotImplementedError(
"Docker custom source definitions are not yet supported. "
"Only YAML manifest-based custom sources are currently available."
# Definition ID is guaranteed to be set by here
assert definition_id is not None
result = api_util.get_custom_yaml_source_definition(
workspace_id=self.workspace_id,
definition_id=definition_id,
api_root=self.api_root,
client_id=self.client_id,
client_secret=self.client_secret,
)
return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001

def permanently_delete_custom_source_definition(
self,
definition_id: str,
*,
definition_type: Literal["yaml", "docker"],
) -> None:
"""Permanently delete a custom source definition.

Args:
definition_id: The definition ID to delete
definition_type: Connector type ("yaml" or "docker"). Required.
"""
if definition_type == "yaml":
api_util.delete_custom_yaml_source_definition(
workspace_id=self.workspace_id,
definition_id=definition_id,
api_root=self.api_root,
client_id=self.client_id,
client_secret=self.client_secret,
)
else:
raise NotImplementedError(
"Docker custom source definitions are not yet supported. "
"Only YAML manifest-based custom sources are currently available."
)
44 changes: 43 additions & 1 deletion tests/integration_tests/cloud/test_custom_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,23 @@ def test_publish_custom_yaml_source(
assert definitions[0].definition_id == definition_id

fetched = cloud_workspace.get_custom_source_definition(
definition_id,
definition_id=definition_id,
definition_type="yaml",
)
assert fetched.definition_id == definition_id
assert fetched.name == name
assert fetched.definition_type == "yaml"
assert fetched.connector_type == "source"

builder_project_id = fetched.connector_builder_project_id
if builder_project_id:
fetched_by_project_id = cloud_workspace.get_custom_source_definition(
connector_builder_project_id=builder_project_id,
definition_type="yaml",
)
assert fetched_by_project_id.definition_id == definition_id
assert fetched_by_project_id.name == name

updated_manifest = TEST_YAML_MANIFEST.copy()
updated_manifest["version"] = "0.2.0"
updated = fetched.update_definition(
Expand Down Expand Up @@ -168,3 +177,36 @@ def test_safe_mode_deletion(

finally:
definition.permanently_delete(safe_mode=False)


def test_get_custom_source_definition_validation() -> None:
"""Test that get_custom_source_definition validates input parameters correctly."""
from airbyte.exceptions import PyAirbyteInputError
from unittest.mock import Mock

workspace = Mock(spec=CloudWorkspace)
workspace.workspace_id = "test-workspace"

with pytest.raises(PyAirbyteInputError) as exc_info:
CloudWorkspace.get_custom_source_definition(
workspace,
definition_type="yaml",
)
assert "EXACTLY ONE" in str(exc_info.value)

with pytest.raises(PyAirbyteInputError) as exc_info:
CloudWorkspace.get_custom_source_definition(
workspace,
definition_id="test-id",
definition_type="yaml",
connector_builder_project_id="test-project-id",
)
assert "EXACTLY ONE" in str(exc_info.value)

with pytest.raises(PyAirbyteInputError) as exc_info:
CloudWorkspace.get_custom_source_definition(
workspace,
definition_type="docker",
connector_builder_project_id="test-project-id",
)
assert "only valid for yaml" in str(exc_info.value)
Loading