diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java new file mode 100644 index 000000000000..d50d27e81456 --- /dev/null +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.s3; + +import org.apache.paimon.fs.StorageType; + +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.GlacierJobParameters; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.MetadataDirective; +import software.amazon.awssdk.services.s3.model.ObjectAlreadyInActiveTierErrorException; +import software.amazon.awssdk.services.s3.model.ObjectNotInActiveTierErrorException; +import software.amazon.awssdk.services.s3.model.RestoreObjectRequest; +import software.amazon.awssdk.services.s3.model.RestoreRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.StorageClass; +import software.amazon.awssdk.services.s3.model.TaggingDirective; +import software.amazon.awssdk.services.s3.model.Tier; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.time.Duration; + +import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** S3 archive operation helpers. */ +class S3ArchiveOperations { + + private static final long SECONDS_PER_DAY = 24 * 60 * 60; + + private static final String RESTORE_ALREADY_IN_PROGRESS = "RestoreAlreadyInProgress"; + + private S3ArchiveOperations() {} + + static StorageClass archiveStorageClass(StorageType type) { + switch (checkNotNull(type, "Storage type must not be null.")) { + case ARCHIVE: + return StorageClass.GLACIER; + case COLD_ARCHIVE: + return StorageClass.DEEP_ARCHIVE; + default: + throw new IllegalArgumentException( + "Unsupported S3 archive storage type: " + type + ". Use unarchive."); + } + } + + static StorageClass unarchiveStorageClass(StorageType type) { + if (checkNotNull(type, "Storage type must not be null.") == StorageType.STANDARD) { + return StorageClass.STANDARD; + } + throw new IllegalArgumentException("Unsupported S3 unarchive storage type: " + type); + } + + static int restoreDays(Duration duration) { + checkNotNull(duration, "Restore duration must not be null."); + checkArgument( + !duration.isZero() && !duration.isNegative(), + "Restore duration must be greater than zero."); + + long seconds = duration.getSeconds(); + long days = seconds / SECONDS_PER_DAY; + if (seconds % SECONDS_PER_DAY != 0 || duration.getNano() > 0) { + days++; + } + + checkArgument(days <= Integer.MAX_VALUE, "Restore duration is too large: %s", duration); + return (int) days; + } + + static void changeStorageClass( + S3AFileSystem fileSystem, + org.apache.hadoop.fs.Path path, + StorageClass storageClass, + String operation) + throws IOException { + String key = fileSystem.pathToKey(path); + CopyObjectRequest request = + copyObjectRequest( + fileSystem.getRequestFactory()::newCopyObjectRequestBuilder, + key, + fileSystem.getObjectMetadata(path), + storageClass); + + try { + s3Client(fileSystem).copyObject(request); + } catch (ObjectNotInActiveTierErrorException e) { + throw new IOException( + "S3 object " + path + " is not in active tier. Restore it before unarchiving.", + e); + } catch (S3Exception e) { + throw new IOException( + "Failed to " + operation + " S3 object " + path + " to " + storageClass + ".", + e); + } catch (SdkException e) { + throw new IOException( + "Failed to " + operation + " S3 object " + path + " to " + storageClass + ".", + e); + } + } + + static CopyObjectRequest copyObjectRequest( + CopyObjectRequestBuilderFactory factory, + String key, + HeadObjectResponse metadata, + StorageClass storageClass) { + return factory.create(key, key, metadata) + .storageClass(storageClass) + .metadataDirective(MetadataDirective.COPY) + .taggingDirective(TaggingDirective.COPY) + .build(); + } + + static void restoreArchive(S3AFileSystem fileSystem, org.apache.hadoop.fs.Path path, int days) + throws IOException { + RestoreObjectRequest request = + restoreObjectRequest(fileSystem.getBucket(), fileSystem.pathToKey(path), days); + + try { + s3Client(fileSystem).restoreObject(request); + } catch (ObjectAlreadyInActiveTierErrorException e) { + throw new IOException("S3 object " + path + " is already in active tier.", e); + } catch (S3Exception e) { + if (isRestoreAlreadyInProgress(e)) { + return; + } + throw new IOException("Failed to restore archived S3 object " + path + ".", e); + } catch (SdkException e) { + throw new IOException("Failed to restore archived S3 object " + path + ".", e); + } + } + + static RestoreObjectRequest restoreObjectRequest(String bucket, String key, int days) { + return RestoreObjectRequest.builder() + .bucket(bucket) + .key(key) + .restoreRequest( + RestoreRequest.builder() + .days(days) + .glacierJobParameters( + GlacierJobParameters.builder().tier(Tier.STANDARD).build()) + .build()) + .build(); + } + + static boolean isRestoreAlreadyInProgress(S3Exception exception) { + return exception.awsErrorDetails() != null + && RESTORE_ALREADY_IN_PROGRESS.equals(exception.awsErrorDetails().errorCode()); + } + + private static S3Client s3Client(S3AFileSystem fileSystem) throws IOException { + try { + Method method = S3AFileSystem.class.getDeclaredMethod("getS3Client"); + method.setAccessible(true); + return (S3Client) method.invoke(fileSystem); + } catch (NoSuchMethodException e) { + throw new IOException("S3AFileSystem does not expose an S3 client.", e); + } catch (IllegalAccessException e) { + throw new IOException("Failed to access S3 client from S3AFileSystem.", e); + } catch (InvocationTargetException e) { + throw new IOException("Failed to get S3 client from S3AFileSystem.", e); + } + } + + interface CopyObjectRequestBuilderFactory { + CopyObjectRequest.Builder create( + String sourceKey, String destinationKey, HeadObjectResponse metadata); + } +} diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java index 827251837342..103a14fb0a53 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.StorageType; import org.apache.paimon.fs.TwoPhaseOutputStream; import org.apache.paimon.options.Options; @@ -33,8 +34,10 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.time.Duration; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; /** S3 {@link FileIO}. */ @@ -85,6 +88,37 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite new S3MultiPartUpload(fs, fs.getConf()), hadoopPath, path); } + @Override + public Optional archive(Path path, StorageType type) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + S3ArchiveOperations.changeStorageClass( + (S3AFileSystem) getFileSystem(hadoopPath), + hadoopPath, + S3ArchiveOperations.archiveStorageClass(type), + "archive"); + return Optional.empty(); + } + + @Override + public void restoreArchive(Path path, Duration duration) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + S3ArchiveOperations.restoreArchive( + (S3AFileSystem) getFileSystem(hadoopPath), + hadoopPath, + S3ArchiveOperations.restoreDays(duration)); + } + + @Override + public Optional unarchive(Path path, StorageType type) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + S3ArchiveOperations.changeStorageClass( + (S3AFileSystem) getFileSystem(hadoopPath), + hadoopPath, + S3ArchiveOperations.unarchiveStorageClass(type), + "unarchive"); + return Optional.empty(); + } + // add additional config entries from the IO config to the Hadoop config private Options loadHadoopConfigFromContext(CatalogContext context) { Options hadoopConfig = new Options(); diff --git a/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java new file mode 100644 index 000000000000..46f8ff9785bf --- /dev/null +++ b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.s3; + +import org.apache.paimon.fs.StorageType; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.MetadataDirective; +import software.amazon.awssdk.services.s3.model.RestoreObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.StorageClass; +import software.amazon.awssdk.services.s3.model.TaggingDirective; +import software.amazon.awssdk.services.s3.model.Tier; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link S3ArchiveOperations}. */ +class S3ArchiveOperationsTest { + + @Test + void testArchiveStorageClassMapping() { + assertThat(S3ArchiveOperations.archiveStorageClass(StorageType.ARCHIVE)) + .isEqualTo(StorageClass.GLACIER); + assertThat(S3ArchiveOperations.archiveStorageClass(StorageType.COLD_ARCHIVE)) + .isEqualTo(StorageClass.DEEP_ARCHIVE); + + assertThatThrownBy(() -> S3ArchiveOperations.archiveStorageClass(StorageType.STANDARD)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Use unarchive"); + } + + @Test + void testUnarchiveStorageClassMapping() { + assertThat(S3ArchiveOperations.unarchiveStorageClass(StorageType.STANDARD)) + .isEqualTo(StorageClass.STANDARD); + + assertThatThrownBy(() -> S3ArchiveOperations.unarchiveStorageClass(StorageType.ARCHIVE)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported S3 unarchive storage type"); + assertThatThrownBy( + () -> S3ArchiveOperations.unarchiveStorageClass(StorageType.COLD_ARCHIVE)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported S3 unarchive storage type"); + } + + @Test + void testRestoreDays() { + assertThat(S3ArchiveOperations.restoreDays(Duration.ofNanos(1))).isEqualTo(1); + assertThat(S3ArchiveOperations.restoreDays(Duration.ofHours(24))).isEqualTo(1); + assertThat(S3ArchiveOperations.restoreDays(Duration.ofHours(25))).isEqualTo(2); + assertThat(S3ArchiveOperations.restoreDays(Duration.ofDays(7))).isEqualTo(7); + + assertThatThrownBy(() -> S3ArchiveOperations.restoreDays(Duration.ZERO)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("greater than zero"); + assertThatThrownBy(() -> S3ArchiveOperations.restoreDays(Duration.ofSeconds(-1))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("greater than zero"); + } + + @Test + void testRestoreObjectRequest() { + RestoreObjectRequest request = + S3ArchiveOperations.restoreObjectRequest("bucket", "partition/data.orc", 7); + + assertThat(request.bucket()).isEqualTo("bucket"); + assertThat(request.key()).isEqualTo("partition/data.orc"); + assertThat(request.restoreRequest().days()).isEqualTo(7); + assertThat(request.restoreRequest().glacierJobParameters().tier()).isEqualTo(Tier.STANDARD); + } + + @Test + void testCopyObjectRequest() { + AtomicReference source = new AtomicReference<>(); + AtomicReference destination = new AtomicReference<>(); + + CopyObjectRequest request = + S3ArchiveOperations.copyObjectRequest( + (sourceKey, destinationKey, metadata) -> { + source.set(sourceKey); + destination.set(destinationKey); + return CopyObjectRequest.builder() + .sourceKey(sourceKey) + .destinationKey(destinationKey); + }, + "partition/data.orc", + HeadObjectResponse.builder().build(), + StorageClass.GLACIER); + + assertThat(source).hasValue("partition/data.orc"); + assertThat(destination).hasValue("partition/data.orc"); + assertThat(request.sourceKey()).isEqualTo("partition/data.orc"); + assertThat(request.destinationKey()).isEqualTo("partition/data.orc"); + assertThat(request.storageClass()).isEqualTo(StorageClass.GLACIER); + assertThat(request.metadataDirective()).isEqualTo(MetadataDirective.COPY); + assertThat(request.taggingDirective()).isEqualTo(TaggingDirective.COPY); + } + + @Test + void testRestoreAlreadyInProgress() { + assertThat( + S3ArchiveOperations.isRestoreAlreadyInProgress( + s3Exception("RestoreAlreadyInProgress"))) + .isTrue(); + assertThat(S3ArchiveOperations.isRestoreAlreadyInProgress(s3Exception("OtherError"))) + .isFalse(); + } + + private static S3Exception s3Exception(String errorCode) { + return (S3Exception) + S3Exception.builder() + .awsErrorDetails(AwsErrorDetails.builder().errorCode(errorCode).build()) + .build(); + } +}