Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>software.amazon.payloadoffloading</groupId>
<artifactId>payloadoffloading-common</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>
<packaging>jar</packaging>
<name>Payload offloading common library for AWS</name>
<description>Common library between extended Amazon AWS clients to save payloads up to 2GB on Amazon S3.</description>
Expand Down Expand Up @@ -36,13 +36,13 @@
</developers>

<properties>
<aws-java-sdk.version>2.20.130</aws-java-sdk.version>
<aws-java-sdk.version>2.27.14</aws-java-sdk.version>
</properties>

<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<artifactId>s3-transfer-manager</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package software.amazon.payloadoffloading;

import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;

public class AwsManagedCmk implements ServerSideEncryptionStrategy {
@Override
public void decorate(PutObjectRequest.Builder putObjectRequestBuilder) {
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
}

@Override
public void decorate(CreateMultipartUploadRequest.Builder createStreamUploadRequestBuilder) {
createStreamUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package software.amazon.payloadoffloading;

import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;

public class CustomerKey implements ServerSideEncryptionStrategy {
Expand All @@ -15,4 +16,10 @@ public void decorate(PutObjectRequest.Builder putObjectRequestBuilder) {
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
putObjectRequestBuilder.ssekmsKeyId(awsKmsKeyId);
}

@Override
public void decorate(CreateMultipartUploadRequest.Builder createStreamUploadRequestBuilder) {
createStreamUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
createStreamUploadRequestBuilder.ssekmsKeyId(awsKmsKeyId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,14 @@ public PayloadStorageAsyncConfiguration withObjectCannedACL(ObjectCannedACL obje
setObjectCannedACL(objectCannedACL);
return this;
}

/**
* Enables or disables stream upload support.
* @param enabled true to enable stream uploads when threshold exceeded.
* @return updated configuration
*/
public PayloadStorageAsyncConfiguration withStreamUploadEnabled(boolean enabled) {
setStreamUploadEnabled(enabled);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class PayloadStorageConfiguration extends PayloadStorageConfigurationBase
private static final Logger LOG = LoggerFactory.getLogger(PayloadStorageConfiguration.class);

private S3Client s3;
private int streamUploadPartSize = 5 * 1024 * 1024;
private int streamUploadThreshold = 5 * 1024 * 1024;

public PayloadStorageConfiguration() {
s3 = null;
Expand All @@ -43,6 +45,8 @@ public PayloadStorageConfiguration() {
public PayloadStorageConfiguration(PayloadStorageConfiguration other) {
super(other);
this.s3 = other.getS3Client();
this.streamUploadThreshold = other.getStreamUploadThreshold();
this.streamUploadPartSize = other.getStreamUploadPartSize();
}

/**
Expand Down Expand Up @@ -148,4 +152,68 @@ public PayloadStorageConfiguration withObjectCannedACL(ObjectCannedACL objectCan
setObjectCannedACL(objectCannedACL);
return this;
}

/**
* Enables or disables stream upload support.
* @param enabled true to enable stream uploads when threshold exceeded.
* @return updated configuration
*/
public PayloadStorageConfiguration withStreamUploadEnabled(boolean enabled) {
setStreamUploadEnabled(enabled);
return this;
}

/**
* Sets the stream upload threshold (in bytes). Only used when stream upload is enabled.
* @param threshold threshold in bytes (must be >0) otherwise default (5MB) is applied.
* @return updated configuration
*/
public PayloadStorageConfiguration withStreamUploadThreshold(int threshold) {
setStreamUploadThreshold(threshold);
return this;
}

public PayloadStorageConfiguration withStreamUploadPartSize(int partSize) {
setStreamUploadPartSize(partSize);
return this;
}


/**
* Gets the stream upload threshold in bytes. Default 5MB.
*
* @return threshold in bytes.
*/
public int getStreamUploadThreshold() { return streamUploadThreshold; }

/**
* Sets the stream upload threshold in bytes. Values less than or equal to zero will reset to default (5MB).
*
* @param streamUploadThreshold threshold in bytes
*/
public void setStreamUploadThreshold(int streamUploadThreshold) {
int min = 5 * 1024 * 1024;
if (streamUploadThreshold <= min) {
this.streamUploadThreshold = min;
} else {
this.streamUploadThreshold = streamUploadThreshold;
}
}

/**
* Gets the configured stream upload part size (bytes). Default 5MB.
*/
public int getStreamUploadPartSize() { return streamUploadPartSize; }

/**
* Sets the stream upload part size (bytes). Values < 5MB are rounded up to 5MB.
*/
public void setStreamUploadPartSize(int partSize) {
int min = 5 * 1024 * 1024;
if (partSize < min) {
this.streamUploadPartSize = min;
} else {
this.streamUploadPartSize = partSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.NotThreadSafe;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;

/**
Expand All @@ -22,6 +21,8 @@ public abstract class PayloadStorageConfigurationBase {
private int payloadSizeThreshold = 0;
private boolean alwaysThroughS3 = false;
private boolean payloadSupport = false;
// Enable stream upload support (opt-in, default false)
private boolean streamUploadEnabled = false;
/**
* This field is optional, it is set only when we want to configure S3 Server Side Encryption with KMS.
*/
Expand All @@ -44,6 +45,7 @@ public PayloadStorageConfigurationBase(PayloadStorageConfigurationBase other) {
this.payloadSizeThreshold = other.getPayloadSizeThreshold();
this.serverSideEncryptionStrategy = other.getServerSideEncryptionStrategy();
this.objectCannedACL = other.getObjectCannedACL();
this.streamUploadEnabled = other.isStreamUploadEnabled();
}

/**
Expand Down Expand Up @@ -175,4 +177,19 @@ public boolean isObjectCannedACLDefined() {
public ObjectCannedACL getObjectCannedACL() {
return objectCannedACL;
}

/**
* Checks whether stream upload support is enabled. Default: false.
*
* @return true if stream upload support is enabled.
*/
public boolean isStreamUploadEnabled() { return streamUploadEnabled; }

/**
* Enable or disable stream upload support. When enabling, callers should ensure they provide a PayloadStore
* implementation capable of stream operations; otherwise normal single PUT behavior will be used.
*
* @param streamUploadEnabled flag to enable/disable stream support.
*/
public void setStreamUploadEnabled(boolean streamUploadEnabled) { this.streamUploadEnabled = streamUploadEnabled; }
}
80 changes: 80 additions & 0 deletions src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
package software.amazon.payloadoffloading;

import java.io.UncheckedIOException;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.UploadRequest;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Dao layer to access S3.
Expand Down Expand Up @@ -65,6 +73,28 @@ public CompletableFuture<String> getTextFromS3(String s3BucketName, String s3Key
});
}

public CompletableFuture<ResponseInputStream<GetObjectResponse>> getTextStreamFromS3(String s3BucketName, String s3Key) {
GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(s3BucketName)
.key(s3Key)
.build();

return s3Client.getObject(getObjectRequest, AsyncResponseTransformer.toBlockingInputStream())
.handle((v, tIn) -> {
if (tIn != null) {
Throwable t = Util.unwrapFutureException(tIn);
if (t instanceof SdkException) {
String errorMessage = "Failed to get the S3 object stream which contains the payload.";
LOG.error(errorMessage, t);
throw SdkException.create(errorMessage, t);
}
throw new CompletionException(t);
}
LOG.info("S3 object stream retrieved, Bucket name: " + s3BucketName + ", Object key: " + s3Key);
return v;
});
}

public CompletableFuture<Void> storeTextInS3(String s3BucketName, String s3Key, String payloadContentStr) {
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
.bucket(s3BucketName)
Expand Down Expand Up @@ -115,4 +145,54 @@ public CompletableFuture<Void> deletePayloadFromS3(String s3BucketName, String s
return null;
});
}


/**
* Stores a stream of data to S3 using TransferManager for efficient multipart uploads.
*
* @param bucket The S3 bucket name
* @param key The S3 object key
* @param payloadStream The input stream to upload
* @return CompletableFuture that completes when upload is finished
*/
public CompletableFuture<Void> storeTextStreamInS3(String bucket, String key, InputStream payloadStream) {
S3TransferManager transferManager = S3TransferManager.create();
ExecutorService executor = Executors.newSingleThreadExecutor();

try {
UploadRequest.Builder uploadBuilder = UploadRequest.builder()
.putObjectRequest(b -> {
b.bucket(bucket).key(key);
if (objectCannedACL != null) {
b.acl(objectCannedACL);
}
// https://docs.aws.amazon.com/AmazonS3/latest/dev/kms-using-sdks.html
if (serverSideEncryptionStrategy != null) {
serverSideEncryptionStrategy.decorate(b);
}
})
.requestBody(AsyncRequestBody.fromInputStream(payloadStream, null, executor));

return transferManager.upload(uploadBuilder.build()).completionFuture()
.thenApply(completedUpload -> {
LOG.info("S3 stream object created from InputStream, Bucket name: " + bucket + ", Object key: " + key + ".");
return (Void) null;
})
.exceptionally(t -> {
Throwable cause = Util.unwrapFutureException(t);
if (cause instanceof SdkException) {
String errorMessage = "Failed to store the message content from InputStream in an S3 stream object.";
LOG.error(errorMessage, cause);
throw SdkException.create(errorMessage, cause);
}
throw new CompletionException(cause);
});
} catch (UnsupportedOperationException e) {
LOG.warn("TransferManager creation disabled, cannot perform streaming upload: " + e.getMessage());
throw new CompletionException(e);
} finally {
transferManager.close();
executor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package software.amazon.payloadoffloading;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import java.io.InputStream;

public class S3BackedStreamPayloadStore extends S3BackedPayloadStore implements StreamPayloadStore {
private static final Logger LOG = LoggerFactory.getLogger(S3BackedStreamPayloadStore.class);
private final S3Dao s3Dao;
private final String s3BucketName;

public S3BackedStreamPayloadStore(S3Dao s3Dao, String s3BucketName) {
super(s3Dao, s3BucketName);
this.s3Dao = s3Dao;
this.s3BucketName = s3BucketName;
}

@Override
public String storeOriginalPayloadStream(InputStream payloadStream, String s3Key) {
s3Dao.storeTextStreamInS3(s3BucketName, s3Key, payloadStream);
LOG.info("S3 stream object created from InputStream, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
return new PayloadS3Pointer(s3BucketName, s3Key).toJson();
}

@Override
public ResponseInputStream<GetObjectResponse> getOriginalPayloadStreamStream(String payloadPointer) {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(payloadPointer);
String s3BucketName = s3Pointer.getS3BucketName();
String s3Key = s3Pointer.getS3Key();

ResponseInputStream<GetObjectResponse> stream = s3Dao.getTextStreamFromS3(s3BucketName, s3Key);
LOG.info("S3 object stream retrieved, Bucket name: " + s3BucketName + ", Object key: " + s3Key);
return stream;
}
}
Loading