diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 3f72bcfb..4d2886f4 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -1418,3 +1418,53 @@ def get_connector_builder_project_for_definition_id( client_secret=client_secret, ) return json_result.get("builderProjectId") + + +def get_definition_id_for_connector_builder_project( + *, + workspace_id: str, + builder_project_id: str, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> str: + """Get the source definition ID for a connector builder project. + + Uses the Config API endpoint: + /v1/connector_builder_projects/get_with_manifest + + See: https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L11900 + + Args: + workspace_id: The workspace ID + builder_project_id: The connector builder project ID + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + The source definition ID + + Raises: + AirbyteError: If no definition is found for the given builder project ID + """ + json_result = _make_config_api_request( + path="/connector_builder_projects/get_with_manifest", + json={ + "builderProjectId": builder_project_id, + "workspaceId": workspace_id, + }, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + definition_id = json_result.get("sourceDefinitionId") + if definition_id is None: + raise AirbyteError( + message="No source definition found for the given connector builder project", + context={ + "workspace_id": workspace_id, + "builder_project_id": builder_project_id, + }, + ) + return definition_id diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index af201e4c..ad58cdad 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -583,30 +583,94 @@ def list_custom_source_definitions( def get_custom_source_definition( self, - definition_id: str, + definition_id: str | None = None, *, definition_type: Literal["yaml", "docker"], + connector_builder_project_id: str | None = None, ) -> CustomCloudSourceDefinition: - """Get a specific custom source definition by ID. + """Get a specific custom source definition by ID or builder project ID. Args: - definition_id: The definition ID + definition_id: The definition ID. Mutually exclusive with + `connector_builder_project_id`. definition_type: Connector type ("yaml" or "docker"). Required. + connector_builder_project_id: The connector builder project ID. + Mutually exclusive with `definition_id` Returns: CustomCloudSourceDefinition object + + Raises: + PyAirbyteInputError: If both or neither parameters are provided, or if + connector_builder_project_id is used with non-yaml definition_type """ - if definition_type == "yaml": - result = api_util.get_custom_yaml_source_definition( + if definition_type != "yaml": + raise NotImplementedError( + "Docker custom source definitions are not yet supported. " + "Only YAML manifest-based custom sources are currently available." + ) + + if (definition_id is None) == (connector_builder_project_id is None): + raise exc.PyAirbyteInputError( + message=( + "Must specify EXACTLY ONE of definition_id or connector_builder_project_id" + ), + context={ + "definition_id": definition_id, + "connector_builder_project_id": connector_builder_project_id, + }, + ) + + if connector_builder_project_id: + if definition_type != "yaml": + raise exc.PyAirbyteInputError( + message="connector_builder_project_id is only valid for yaml definition_type", + context={ + "definition_type": definition_type, + "connector_builder_project_id": connector_builder_project_id, + }, + ) + definition_id = api_util.get_definition_id_for_connector_builder_project( workspace_id=self.workspace_id, - definition_id=definition_id, + builder_project_id=connector_builder_project_id, api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, ) - return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 - raise NotImplementedError( - "Docker custom source definitions are not yet supported. " - "Only YAML manifest-based custom sources are currently available." + # Definition ID is guaranteed to be set by here + assert definition_id is not None + result = api_util.get_custom_yaml_source_definition( + workspace_id=self.workspace_id, + definition_id=definition_id, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, ) + return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 + + def permanently_delete_custom_source_definition( + self, + definition_id: str, + *, + definition_type: Literal["yaml", "docker"], + ) -> None: + """Permanently delete a custom source definition. + + Args: + definition_id: The definition ID to delete + definition_type: Connector type ("yaml" or "docker"). Required. + """ + if definition_type == "yaml": + api_util.delete_custom_yaml_source_definition( + workspace_id=self.workspace_id, + definition_id=definition_id, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + else: + raise NotImplementedError( + "Docker custom source definitions are not yet supported. " + "Only YAML manifest-based custom sources are currently available." + ) diff --git a/tests/integration_tests/cloud/test_custom_definitions.py b/tests/integration_tests/cloud/test_custom_definitions.py index fbabf469..89a79405 100644 --- a/tests/integration_tests/cloud/test_custom_definitions.py +++ b/tests/integration_tests/cloud/test_custom_definitions.py @@ -77,7 +77,7 @@ def test_publish_custom_yaml_source( assert definitions[0].definition_id == definition_id fetched = cloud_workspace.get_custom_source_definition( - definition_id, + definition_id=definition_id, definition_type="yaml", ) assert fetched.definition_id == definition_id @@ -85,6 +85,15 @@ def test_publish_custom_yaml_source( assert fetched.definition_type == "yaml" assert fetched.connector_type == "source" + builder_project_id = fetched.connector_builder_project_id + if builder_project_id: + fetched_by_project_id = cloud_workspace.get_custom_source_definition( + connector_builder_project_id=builder_project_id, + definition_type="yaml", + ) + assert fetched_by_project_id.definition_id == definition_id + assert fetched_by_project_id.name == name + updated_manifest = TEST_YAML_MANIFEST.copy() updated_manifest["version"] = "0.2.0" updated = fetched.update_definition( @@ -168,3 +177,36 @@ def test_safe_mode_deletion( finally: definition.permanently_delete(safe_mode=False) + + +def test_get_custom_source_definition_validation() -> None: + """Test that get_custom_source_definition validates input parameters correctly.""" + from airbyte.exceptions import PyAirbyteInputError + from unittest.mock import Mock + + workspace = Mock(spec=CloudWorkspace) + workspace.workspace_id = "test-workspace" + + with pytest.raises(PyAirbyteInputError) as exc_info: + CloudWorkspace.get_custom_source_definition( + workspace, + definition_type="yaml", + ) + assert "EXACTLY ONE" in str(exc_info.value) + + with pytest.raises(PyAirbyteInputError) as exc_info: + CloudWorkspace.get_custom_source_definition( + workspace, + definition_id="test-id", + definition_type="yaml", + connector_builder_project_id="test-project-id", + ) + assert "EXACTLY ONE" in str(exc_info.value) + + with pytest.raises(PyAirbyteInputError) as exc_info: + CloudWorkspace.get_custom_source_definition( + workspace, + definition_type="docker", + connector_builder_project_id="test-project-id", + ) + assert "only valid for yaml" in str(exc_info.value)