Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,16 @@ under the License.
<artifactId>iceberg-aws</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-gcp</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>2.67.0</version>
</dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class IcebergRestProperties extends AbstractIcebergProperties {
@ConnectorProperty(names = {"iceberg.rest.security.type"},
required = false,
description = "The security type of the iceberg rest catalog service,"
+ "optional: (none, oauth2), default: none.")
+ "optional: (none, oauth2, google), default: none.")
private String icebergRestSecurityType = "none";

@ConnectorProperty(names = {"iceberg.rest.session"},
Expand Down Expand Up @@ -159,6 +159,22 @@ public class IcebergRestProperties extends AbstractIcebergProperties {
description = "Socket timeout in milliseconds for the REST catalog HTTP client. Default: 60000 (60s).")
private String icebergRestSocketTimeoutMs = "60000";

@ConnectorProperty(names = {"iceberg.rest.io-impl"},
required = false,
description = "The FileIO implementation for the iceberg rest catalog service.")
private String icebergRestIoImpl;

@ConnectorProperty(names = {"iceberg.rest.google.user-project"},
required = false,
description = "The Google project to be billed for using the iceberg rest catalog service.")
private String icebergRestGoogleUserProject;

@ConnectorProperty(names = {"iceberg.gcs.oauth2.token"},
required = false,
sensitive = true,
description = "The OAuth2 token for GCS storage access when using GCS FileIO.")
private String icebergGcsOauth2Token;

protected IcebergRestProperties(Map<String, String> props) {
super(props);
}
Expand Down Expand Up @@ -195,7 +211,7 @@ private void validateSecurityType() {
Security.valueOf(icebergRestSecurityType.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid security type: " + icebergRestSecurityType
+ ". Supported values are: none, oauth2");
+ ". Supported values are: none, oauth2, google");
}
}

Expand Down Expand Up @@ -267,12 +283,26 @@ private void addOptionalProperties() {
if (Strings.isNotBlank(icebergRestSocketTimeoutMs)) {
icebergRestCatalogProperties.put("rest.client.socket-timeout-ms", icebergRestSocketTimeoutMs);
}

if (Strings.isNotBlank(icebergRestIoImpl)) {
icebergRestCatalogProperties.put("io-impl", icebergRestIoImpl);
}

if (Strings.isNotBlank(icebergRestGoogleUserProject)) {
icebergRestCatalogProperties.put("header.x-goog-user-project", icebergRestGoogleUserProject);
}

if (Strings.isNotBlank(icebergGcsOauth2Token)) {
icebergRestCatalogProperties.put("gcs.oauth2.token", icebergGcsOauth2Token);
}
}

private void addAuthenticationProperties() {
Security security = Security.valueOf(icebergRestSecurityType.toUpperCase());
if (security == Security.OAUTH2) {
addOAuth2Properties();
} else if (security == Security.GOOGLE) {
addGoogleProperties();
}
}

Expand All @@ -294,6 +324,11 @@ private void addOAuth2Properties() {
}
}

private void addGoogleProperties() {
icebergRestCatalogProperties.put("rest.auth.type",
"org.apache.iceberg.gcp.auth.GoogleAuthManager");
}

private void addGlueRestCatalogProperties() {
if (Strings.isNotBlank(icebergRestSigningName)) {
// signing-name is case sensible, do not use lowercase()
Expand All @@ -320,5 +355,6 @@ public boolean isIcebergRestNestedNamespaceEnabled() {
public enum Security {
NONE,
OAUTH2,
GOOGLE,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,86 @@ public void testSecurityTypeNone() {
Assertions.assertFalse(catalogProps.containsKey(OAuth2Properties.TOKEN));
}

@Test
public void testGoogleSecurityType() {
Map<String, String> props = new HashMap<>();
props.put("iceberg.rest.uri", "http://localhost:8080");
props.put("iceberg.rest.security.type", "google");

IcebergRestProperties restProps = new IcebergRestProperties(props);
restProps.initNormalizeAndCheckProps();

Map<String, String> catalogProps = restProps.getIcebergRestCatalogProperties();
Assertions.assertEquals("org.apache.iceberg.gcp.auth.GoogleAuthManager",
catalogProps.get("rest.auth.type"));
Assertions.assertFalse(catalogProps.containsKey(OAuth2Properties.CREDENTIAL));
Assertions.assertFalse(catalogProps.containsKey(OAuth2Properties.TOKEN));
}

@Test
public void testGoogleWithIoImpl() {
Map<String, String> props = new HashMap<>();
props.put("iceberg.rest.uri", "http://localhost:8080");
props.put("iceberg.rest.security.type", "google");
props.put("iceberg.rest.io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO");

IcebergRestProperties restProps = new IcebergRestProperties(props);
restProps.initNormalizeAndCheckProps();

Map<String, String> catalogProps = restProps.getIcebergRestCatalogProperties();
Assertions.assertEquals("org.apache.iceberg.gcp.auth.GoogleAuthManager",
catalogProps.get("rest.auth.type"));
Assertions.assertEquals("org.apache.iceberg.gcp.gcs.GCSFileIO",
catalogProps.get("io-impl"));
}

@Test
public void testGoogleWithUserProject() {
Map<String, String> props = new HashMap<>();
props.put("iceberg.rest.uri", "http://localhost:8080");
props.put("iceberg.rest.security.type", "google");
props.put("iceberg.rest.google.user-project", "my-billing-project");

IcebergRestProperties restProps = new IcebergRestProperties(props);
restProps.initNormalizeAndCheckProps();

Map<String, String> catalogProps = restProps.getIcebergRestCatalogProperties();
Assertions.assertEquals("org.apache.iceberg.gcp.auth.GoogleAuthManager",
catalogProps.get("rest.auth.type"));
Assertions.assertEquals("my-billing-project",
catalogProps.get("header.x-goog-user-project"));
}

@Test
public void testGoogleWithGcsToken() {
Map<String, String> props = new HashMap<>();
props.put("iceberg.rest.uri", "http://localhost:8080");
props.put("iceberg.rest.security.type", "google");
props.put("iceberg.gcs.oauth2.token", "my-gcs-token");

IcebergRestProperties restProps = new IcebergRestProperties(props);
restProps.initNormalizeAndCheckProps();

Map<String, String> catalogProps = restProps.getIcebergRestCatalogProperties();
Assertions.assertEquals("org.apache.iceberg.gcp.auth.GoogleAuthManager",
catalogProps.get("rest.auth.type"));
Assertions.assertEquals("my-gcs-token", catalogProps.get("gcs.oauth2.token"));
}

@Test
public void testGoogleSecurityTypeCaseInsensitive() {
Map<String, String> props = new HashMap<>();
props.put("iceberg.rest.uri", "http://localhost:8080");
props.put("iceberg.rest.security.type", "GOOGLE");

IcebergRestProperties restProps = new IcebergRestProperties(props);
Assertions.assertDoesNotThrow(restProps::initNormalizeAndCheckProps);

Map<String, String> catalogProps = restProps.getIcebergRestCatalogProperties();
Assertions.assertEquals("org.apache.iceberg.gcp.auth.GoogleAuthManager",
catalogProps.get("rest.auth.type"));
}

@Test
public void testUriAliases() {
// Test different URI property names
Expand Down
Loading