Skip to content

Commit f8df182

Browse files
committed
add support for s3 multipart upload
1 parent d9c73b6 commit f8df182

20 files changed

+1082
-7
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
package software.amazon.payloadoffloading;
22

33
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
4+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
45
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
56

67
public class AwsManagedCmk implements ServerSideEncryptionStrategy {
78
@Override
89
public void decorate(PutObjectRequest.Builder putObjectRequestBuilder) {
910
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
1011
}
12+
13+
@Override
14+
public void decorate(CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder) {
15+
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
16+
}
1117
}

src/main/java/software/amazon/payloadoffloading/CustomerKey.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package software.amazon.payloadoffloading;
22

33
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
4+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
45
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
56

67
public class CustomerKey implements ServerSideEncryptionStrategy {
@@ -15,4 +16,10 @@ public void decorate(PutObjectRequest.Builder putObjectRequestBuilder) {
1516
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
1617
putObjectRequestBuilder.ssekmsKeyId(awsKmsKeyId);
1718
}
19+
20+
@Override
21+
public void decorate(CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder) {
22+
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
23+
createMultipartUploadRequestBuilder.ssekmsKeyId(awsKmsKeyId);
24+
}
1825
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package software.amazon.payloadoffloading;
2+
3+
/**
4+
* Optional extension interface for {@link PayloadStore} implementations that can perform multipart
5+
* upload and streaming retrieval for very large payloads to reduce memory pressure.
6+
*/
7+
public interface MultipartPayloadStore extends PayloadStore {
8+
9+
/**
10+
* Store a payload using streaming/multipart-aware logic. Default falls back to normal storage.
11+
* @param payload UTF-8 text payload
12+
* @param s3Key pre-generated S3 key (or equivalent object key) to use
13+
* @return pointer string (JSON) referencing stored payload
14+
*/
15+
default String storeOriginalPayloadMultipart(String payload, String s3Key) {
16+
return storeOriginalPayload(payload, s3Key);
17+
}
18+
19+
/**
20+
* Retrieve a payload potentially using streaming/multipart-aware logic. Default falls back to normal retrieval.
21+
* @param payloadPointer pointer JSON string
22+
* @return original payload content
23+
*/
24+
default String getOriginalPayloadMultipart(String payloadPointer) {
25+
return getOriginalPayload(payloadPointer);
26+
}
27+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package software.amazon.payloadoffloading;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
5+
/**
6+
* Optional extension interface for {@link PayloadStoreAsync} implementations that can perform multipart
7+
* upload and streaming retrieval for very large payloads to reduce memory pressure.
8+
*/
9+
public interface MultipartPayloadStoreAsync extends PayloadStoreAsync {
10+
11+
/**
12+
* Store payload content using multipart semantics when possible.
13+
* @param payload UTF-8 text payload
14+
* @param s3Key object key to use
15+
* @return future pointer string (JSON) referencing stored payload
16+
*/
17+
default CompletableFuture<String> storeOriginalPayloadMultipart(String payload, String s3Key) {
18+
return storeOriginalPayload(payload, s3Key);
19+
}
20+
21+
/**
22+
* Retrieve payload using streaming/multipart logic when available. Default delegates to normal method.
23+
* @param payloadPointer pointer JSON string
24+
* @return future original payload content
25+
*/
26+
default CompletableFuture<String> getOriginalPayloadMultipart(String payloadPointer) {
27+
return getOriginalPayload(payloadPointer);
28+
}
29+
}

src/main/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfiguration.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,24 @@ public PayloadStorageAsyncConfiguration withObjectCannedACL(ObjectCannedACL obje
150150
setObjectCannedACL(objectCannedACL);
151151
return this;
152152
}
153+
154+
/**
155+
* Enables or disables multipart upload support.
156+
* @param enabled true to enable multipart uploads when threshold exceeded.
157+
* @return updated configuration
158+
*/
159+
public PayloadStorageAsyncConfiguration withMultipartUploadEnabled(boolean enabled) {
160+
setMultipartUploadEnabled(enabled);
161+
return this;
162+
}
163+
164+
/**
165+
* Sets the multipart upload threshold (in bytes). Only used when multipart upload is enabled.
166+
* @param threshold threshold in bytes (must be >0) otherwise default (5MB) is applied.
167+
* @return updated configuration
168+
*/
169+
public PayloadStorageAsyncConfiguration withMultipartUploadThreshold(int threshold) {
170+
setMultipartUploadThreshold(threshold);
171+
return this;
172+
}
153173
}

src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,24 @@ public PayloadStorageConfiguration withObjectCannedACL(ObjectCannedACL objectCan
148148
setObjectCannedACL(objectCannedACL);
149149
return this;
150150
}
151+
152+
/**
153+
* Enables or disables multipart upload support.
154+
* @param enabled true to enable multipart uploads when threshold exceeded.
155+
* @return updated configuration
156+
*/
157+
public PayloadStorageConfiguration withMultipartUploadEnabled(boolean enabled) {
158+
setMultipartUploadEnabled(enabled);
159+
return this;
160+
}
161+
162+
/**
163+
* Sets the multipart upload threshold (in bytes). Only used when multipart upload is enabled.
164+
* @param threshold threshold in bytes (must be >0) otherwise default (5MB) is applied.
165+
* @return updated configuration
166+
*/
167+
public PayloadStorageConfiguration withMultipartUploadThreshold(int threshold) {
168+
setMultipartUploadThreshold(threshold);
169+
return this;
170+
}
151171
}

src/main/java/software/amazon/payloadoffloading/PayloadStorageConfigurationBase.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import org.slf4j.LoggerFactory;
55
import software.amazon.awssdk.annotations.NotThreadSafe;
66
import software.amazon.awssdk.core.exception.SdkClientException;
7-
import software.amazon.awssdk.services.s3.S3Client;
87
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
98

109
/**
@@ -22,6 +21,12 @@ public abstract class PayloadStorageConfigurationBase {
2221
private int payloadSizeThreshold = 0;
2322
private boolean alwaysThroughS3 = false;
2423
private boolean payloadSupport = false;
24+
// Enable multipart upload support (opt-in, default false)
25+
private boolean multipartUploadEnabled = false;
26+
// Threshold (bytes) above which multipart should be attempted when enabled (default 5MB)
27+
private int multipartUploadThreshold = 5 * 1024 * 1024;
28+
// Multipart part size (bytes). Each part (except last) must be >=5MB. Default 5MB.
29+
private int multipartUploadPartSize = 5 * 1024 * 1024;
2530
/**
2631
* This field is optional, it is set only when we want to configure S3 Server Side Encryption with KMS.
2732
*/
@@ -44,6 +49,9 @@ public PayloadStorageConfigurationBase(PayloadStorageConfigurationBase other) {
4449
this.payloadSizeThreshold = other.getPayloadSizeThreshold();
4550
this.serverSideEncryptionStrategy = other.getServerSideEncryptionStrategy();
4651
this.objectCannedACL = other.getObjectCannedACL();
52+
this.multipartUploadEnabled = other.isMultipartUploadEnabled();
53+
this.multipartUploadThreshold = other.getMultipartUploadThreshold();
54+
this.multipartUploadPartSize = other.getMultipartUploadPartSize();
4755
}
4856

4957
/**
@@ -175,4 +183,56 @@ public boolean isObjectCannedACLDefined() {
175183
public ObjectCannedACL getObjectCannedACL() {
176184
return objectCannedACL;
177185
}
186+
187+
/**
188+
* Checks whether multipart upload support is enabled. Default: false.
189+
*
190+
* @return true if multipart upload support is enabled.
191+
*/
192+
public boolean isMultipartUploadEnabled() { return multipartUploadEnabled; }
193+
194+
/**
195+
* Enable or disable multipart upload support. When enabling, callers should ensure they provide a PayloadStore
196+
* implementation capable of multipart operations; otherwise normal single PUT behavior will be used.
197+
*
198+
* @param multipartUploadEnabled flag to enable/disable multipart support.
199+
*/
200+
public void setMultipartUploadEnabled(boolean multipartUploadEnabled) { this.multipartUploadEnabled = multipartUploadEnabled; }
201+
202+
/**
203+
* Gets the multipart upload threshold in bytes. Default 5MB.
204+
*
205+
* @return threshold in bytes.
206+
*/
207+
public int getMultipartUploadThreshold() { return multipartUploadThreshold; }
208+
209+
/**
210+
* Sets the multipart upload threshold in bytes. Values less than or equal to zero will reset to default (5MB).
211+
*
212+
* @param multipartUploadThreshold threshold in bytes
213+
*/
214+
public void setMultipartUploadThreshold(int multipartUploadThreshold) {
215+
if (multipartUploadThreshold <= 0) {
216+
this.multipartUploadThreshold = 5 * 1024 * 1024;
217+
} else {
218+
this.multipartUploadThreshold = multipartUploadThreshold;
219+
}
220+
}
221+
222+
/**
223+
* Gets the configured multipart upload part size (bytes). Default 5MB.
224+
*/
225+
public int getMultipartUploadPartSize() { return multipartUploadPartSize; }
226+
227+
/**
228+
* Sets the multipart upload part size (bytes). Values < 5MB are rounded up to 5MB.
229+
*/
230+
public void setMultipartUploadPartSize(int partSize) {
231+
int min = 5 * 1024 * 1024;
232+
if (partSize < min) {
233+
this.multipartUploadPartSize = min;
234+
} else {
235+
this.multipartUploadPartSize = partSize;
236+
}
237+
}
178238
}

src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@
1515
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
1616
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
1717
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
18+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
19+
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
20+
import software.amazon.awssdk.services.s3.model.CompletedPart;
21+
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
22+
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
23+
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
24+
25+
import java.util.Arrays;
26+
import java.util.List;
27+
import java.util.ArrayList;
28+
import java.nio.charset.StandardCharsets;
29+
1830

1931
/**
2032
* Dao layer to access S3.
@@ -115,4 +127,83 @@ public CompletableFuture<Void> deletePayloadFromS3(String s3BucketName, String s
115127
return null;
116128
});
117129
}
130+
131+
132+
public CompletableFuture<Void> storeTextMultipartInS3(String bucket, String key, String payloadContentStr, int partSize, int multipartThreshold) {
133+
byte[] data = payloadContentStr.getBytes(StandardCharsets.UTF_8);
134+
if (data.length < multipartThreshold) {
135+
return storeTextInS3(bucket, key, payloadContentStr);
136+
}
137+
138+
CreateMultipartUploadRequest.Builder createBuilder = CreateMultipartUploadRequest.builder()
139+
.bucket(bucket)
140+
.key(key);
141+
if (objectCannedACL != null) {
142+
createBuilder.acl(objectCannedACL);
143+
}
144+
// https://docs.aws.amazon.com/AmazonS3/latest/dev/kms-using-sdks.html
145+
if (serverSideEncryptionStrategy != null) {
146+
serverSideEncryptionStrategy.decorate(createBuilder);
147+
}
148+
149+
return s3Client.createMultipartUpload(createBuilder.build())
150+
.thenCompose(createResp -> {
151+
String uploadId = createResp.uploadId();
152+
int partCount = (int) Math.ceil((double) data.length / partSize);
153+
List<CompletableFuture<CompletedPart>> partFutures = new ArrayList<>(partCount);
154+
155+
int offset = 0;
156+
int partNumber = 1;
157+
while (offset < data.length) {
158+
int currSize = Math.min(partSize, data.length - offset);
159+
byte[] slice = Arrays.copyOfRange(data, offset, offset + currSize);
160+
offset += currSize;
161+
final int thisPartNumber = partNumber++;
162+
163+
UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
164+
.bucket(bucket)
165+
.key(key)
166+
.uploadId(uploadId)
167+
.partNumber(thisPartNumber)
168+
.contentLength((long) slice.length)
169+
.build();
170+
171+
CompletableFuture<CompletedPart> fut = s3Client.uploadPart(uploadPartRequest, AsyncRequestBody.fromBytes(slice))
172+
.thenApply(resp -> CompletedPart.builder().partNumber(thisPartNumber).eTag(resp.eTag()).build());
173+
partFutures.add(fut);
174+
}
175+
176+
CompletableFuture<Void> all = CompletableFuture.allOf(partFutures.toArray(new CompletableFuture[0]));
177+
return all.thenCompose(v -> {
178+
List<CompletedPart> completed = new ArrayList<>(partFutures.size());
179+
for (CompletableFuture<CompletedPart> f : partFutures) {
180+
completed.add(f.join());
181+
}
182+
CompletedMultipartUpload cmu = CompletedMultipartUpload.builder().parts(completed).build();
183+
CompleteMultipartUploadRequest completeReq = CompleteMultipartUploadRequest.builder()
184+
.bucket(bucket)
185+
.key(key)
186+
.uploadId(uploadId)
187+
.multipartUpload(cmu)
188+
.build();
189+
return s3Client.completeMultipartUpload(completeReq)
190+
.handle((vv, t) -> {
191+
if (t != null) {
192+
s3Client.abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(bucket).key(key).uploadId(uploadId).build());
193+
throw new CompletionException(t);
194+
}
195+
LOG.info("S3 multipart object created, Bucket name: " + bucket + ", Object key: " + key + ".");
196+
return (Void) null;
197+
});
198+
});
199+
}).exceptionally(t -> {
200+
Throwable cause = Util.unwrapFutureException(t);
201+
if (cause instanceof SdkException) {
202+
String errorMessage = "Failed to store the message content in an S3 multipart object.";
203+
LOG.error(errorMessage, cause);
204+
throw SdkException.create(errorMessage, cause);
205+
}
206+
throw new CompletionException(cause);
207+
});
208+
}
118209
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package software.amazon.payloadoffloading;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
public class S3BackedMultipartPayloadStore extends S3BackedPayloadStore implements MultipartPayloadStore {
7+
private static final Logger LOG = LoggerFactory.getLogger(S3BackedMultipartPayloadStore.class);
8+
private final S3Dao multipartDao;
9+
private final int partSize;
10+
private final int threshold;
11+
private final String s3BucketName;
12+
13+
public S3BackedMultipartPayloadStore(S3Dao s3Dao, String s3BucketName, int partSize, int threshold) {
14+
super(s3Dao, s3BucketName);
15+
this.multipartDao = s3Dao;
16+
this.partSize = partSize;
17+
this.threshold = threshold;
18+
this.s3BucketName = s3BucketName;
19+
}
20+
21+
@Override
22+
public String storeOriginalPayloadMultipart(String payload, String s3Key) {
23+
multipartDao.storeTextMultipartInS3(s3BucketName, s3Key, payload, partSize, threshold);
24+
LOG.info("S3 multipart object created, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
25+
return new PayloadS3Pointer(s3BucketName, s3Key).toJson();
26+
}
27+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package software.amazon.payloadoffloading;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
public class S3BackedMultipartPayloadStoreAsync extends S3BackedPayloadStoreAsync implements MultipartPayloadStoreAsync {
8+
private static final Logger LOG = LoggerFactory.getLogger(S3BackedMultipartPayloadStoreAsync.class);
9+
private final S3AsyncDao multipartDao;
10+
private final String s3BucketName;
11+
private final int partSize;
12+
private final int threshold;
13+
14+
public S3BackedMultipartPayloadStoreAsync(S3AsyncDao s3Dao, String s3BucketName, int partSize, int threshold) {
15+
super(s3Dao, s3BucketName);
16+
this.multipartDao = s3Dao;
17+
this.s3BucketName = s3BucketName;
18+
this.partSize = partSize;
19+
this.threshold = threshold;
20+
}
21+
22+
@Override
23+
public CompletableFuture<String> storeOriginalPayloadMultipart(String payload, String s3Key) {
24+
return multipartDao.storeTextMultipartInS3(s3BucketName, s3Key, payload, partSize, threshold)
25+
.thenApply(v -> {
26+
LOG.info("S3 multipart object created, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
27+
return new PayloadS3Pointer(s3BucketName, s3Key).toJson();
28+
});
29+
}
30+
}

0 commit comments

Comments
 (0)