Skip to content

Commit 5eb25d1

Browse files
authored
Handle file upload future completed exceptionally for Java-based TM (#5543)
* Handle file upload future completed exceptionally for Java-based TransferManager and add logging * Fix Checkstyle * Fix logging for CrtFileUpload * Add wiremock tests * Remove redundant test code * Remove redundant test code * Update logging
1 parent 3bc3427 commit 5eb25d1

File tree

4 files changed

+96
-31
lines changed

4 files changed

+96
-31
lines changed

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/model/CrtFileUpload.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@
2828
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
2929
import software.amazon.awssdk.transfer.s3.progress.TransferProgress;
3030
import software.amazon.awssdk.utils.Lazy;
31+
import software.amazon.awssdk.utils.Logger;
3132
import software.amazon.awssdk.utils.ToString;
3233
import software.amazon.awssdk.utils.Validate;
3334

3435
@SdkInternalApi
3536
public final class CrtFileUpload implements FileUpload {
37+
private static final Logger log = Logger.loggerFor(CrtFileUpload.class);
38+
3639
private final Lazy<ResumableFileUpload> resumableFileUpload;
3740
private final CompletableFuture<CompletedFileUpload> completionFuture;
3841
private final TransferProgress progress;
@@ -57,7 +60,11 @@ public ResumableFileUpload pause() {
5760

5861
private ResumableFileUpload doPause() {
5962
File sourceFile = request.source().toFile();
60-
if (completionFuture.isDone()) {
63+
64+
boolean futureCompletedExceptionally = completionFuture.isCompletedExceptionally();
65+
if (completionFuture.isDone() && !futureCompletedExceptionally) {
66+
log.debug(() -> "The upload future was completed. There will be no ResumeToken returned.");
67+
6168
Instant fileLastModified = Instant.ofEpochMilli(sourceFile.lastModified());
6269
return ResumableFileUpload.builder()
6370
.fileLastModified(fileLastModified)
@@ -80,8 +87,16 @@ private ResumableFileUpload doPause() {
8087
}
8188

8289
completionFuture.cancel(true);
83-
// Upload hasn't started yet, or it's a single object upload
8490
if (token == null) {
91+
// TODO - remove once CRT handles future completed exceptionally to return ResumeToken
92+
if (futureCompletedExceptionally) {
93+
log.debug(() -> "The upload future was completed exceptionally and the ResumeToken returned by the "
94+
+ "S3 MetaRequest was null.");
95+
} else {
96+
log.debug(() -> "The upload hasn't started yet or it's a single object upload. There will be no ResumeToken "
97+
+ "returned");
98+
}
99+
85100
return ResumableFileUpload.builder()
86101
.fileLastModified(fileLastModified)
87102
.fileLength(sourceFile.length())

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/model/DefaultFileUpload.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@
2727
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
2828
import software.amazon.awssdk.transfer.s3.progress.TransferProgress;
2929
import software.amazon.awssdk.utils.Lazy;
30+
import software.amazon.awssdk.utils.Logger;
3031
import software.amazon.awssdk.utils.ToString;
3132
import software.amazon.awssdk.utils.Validate;
3233

3334
@SdkInternalApi
3435
public final class DefaultFileUpload implements FileUpload {
36+
private static final Logger log = Logger.loggerFor(DefaultFileUpload.class);
37+
3538
private final Lazy<ResumableFileUpload> resumableFileUpload;
3639
private final CompletableFuture<CompletedFileUpload> completionFuture;
3740
private final TransferProgress progress;
@@ -70,17 +73,29 @@ private ResumableFileUpload doPause() {
7073
.fileLength(sourceFile.length())
7174
.uploadFileRequest(request);
7275

73-
if (completionFuture.isDone()) {
76+
boolean futureCompletedExceptionally = completionFuture.isCompletedExceptionally();
77+
if (completionFuture.isDone() && !futureCompletedExceptionally) {
78+
log.debug(() -> "The upload future was finished and was not completed exceptionally. There will be no S3ResumeToken "
79+
+ "returned.");
80+
7481
return resumableFileBuilder.build();
7582
}
7683

7784
S3ResumeToken token = pauseObservable.pause();
7885

79-
// Upload hasn't started yet, or it's a single object upload
8086
if (token == null) {
87+
log.debug(() -> "The upload hasn't started yet, or it's a single object upload. There will be no S3ResumeToken "
88+
+ "returned.");
8189
return resumableFileBuilder.build();
8290
}
8391

92+
if (futureCompletedExceptionally) {
93+
log.debug(() -> "The upload future was completed exceptionally but has been successfully paused and a S3ResumeToken "
94+
+ "was returned.");
95+
} else {
96+
log.debug(() -> "The upload was successfully paused and a S3ResumeToken was returned.");
97+
}
98+
8499
return resumableFileBuilder.multipartUploadId(token.uploadId())
85100
.totalParts(token.totalNumParts())
86101
.transferredParts(token.numPartsCompleted())

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/utils/ResumableRequestConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private ResumableRequestConverter() {
7777
}
7878

7979
if (hasRemainingParts(getObjectRequest)) {
80-
// multipart GET for the remaining parts
80+
log.debug(() -> "The paused download was performed with part GET, now resuming download of remaining parts");
8181
Long positionToWriteFrom =
8282
MultipartDownloadUtils.multipartDownloadResumeContext(originalDownloadRequest.getObjectRequest())
8383
.map(MultipartDownloadResumeContext::bytesToLastCompletedParts)
@@ -92,7 +92,7 @@ private ResumableRequestConverter() {
9292
return Pair.of(originalDownloadRequest, responseTransformer);
9393
}
9494

95-
// ranged GET for the remaining bytes.
95+
log.debug(() -> "The paused download was performed with range GET, now resuming download of remaining bytes.");
9696
newDownloadFileRequest = resumedDownloadFileRequest(resumableFileDownload,
9797
originalDownloadRequest,
9898
getObjectRequest,

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileUploadWireMockTest.java

Lines changed: 60 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,19 @@
2525
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
2626
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
2727
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
28+
import static org.assertj.core.api.Assertions.assertThat;
2829
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2930

3031
import com.github.tomakehurst.wiremock.WireMockServer;
31-
import com.google.common.jimfs.Configuration;
32-
import com.google.common.jimfs.Jimfs;
32+
import com.github.tomakehurst.wiremock.http.Fault;
3333
import java.io.IOException;
3434
import java.io.OutputStream;
3535
import java.io.UncheckedIOException;
3636
import java.net.URI;
37-
import java.nio.file.FileSystem;
3837
import java.nio.file.Files;
3938
import java.nio.file.Path;
4039
import java.nio.file.StandardOpenOption;
40+
import java.util.concurrent.CompletionException;
4141
import org.junit.jupiter.api.AfterAll;
4242
import org.junit.jupiter.api.AfterEach;
4343
import org.junit.jupiter.api.BeforeAll;
@@ -50,27 +50,28 @@
5050
import software.amazon.awssdk.services.s3.S3AsyncClient;
5151
import software.amazon.awssdk.services.s3.model.S3Exception;
5252
import software.amazon.awssdk.transfer.s3.S3TransferManager;
53+
import software.amazon.awssdk.transfer.s3.model.FileUpload;
54+
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
5355
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
5456

5557
/**
5658
* WireMock test for verifying the DefaultFileUpload codepath.
5759
*/
5860
public class DefaultFileUploadWireMockTest {
5961
private static final WireMockServer wireMock = new WireMockServer(wireMockConfig().dynamicPort());
60-
private static final FileSystem testFs = Jimfs.newFileSystem(Configuration.unix());
6162
private static Path testFile;
6263
private static S3AsyncClient s3;
6364

6465
@BeforeAll
65-
public static void setup() {
66-
testFile = testFs.getPath("/32mib.dat");
66+
public static void setup() throws IOException {
67+
testFile = Files.createTempFile("32mib", ".dat");
6768
writeTestFile(testFile, 32 * 1024 * 1024);
6869
wireMock.start();
6970
}
7071

7172
@AfterAll
7273
public static void teardown() throws IOException {
73-
testFs.close();
74+
Files.deleteIfExists(testFile);
7475
wireMock.stop();
7576
}
7677

@@ -98,29 +99,14 @@ public void methodTeardown() {
9899
@Test
99100
void retryableErrorDuringUpload_shouldSupportRetries() {
100101
S3TransferManager tm = S3TransferManager.builder().s3Client(s3).build();
101-
102-
String mpuInitBody = ""
103-
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
104-
+ "<InitiateMultipartUploadResult>\n"
105-
+ " <Bucket>bucket</Bucket>\n"
106-
+ " <Key>key</Key>\n"
107-
+ " <UploadId>uploadId</UploadId>\n"
108-
+ "</InitiateMultipartUploadResult>";
109-
110-
wireMock.stubFor(post(urlEqualTo("/bucket/key?uploads"))
111-
.willReturn(aResponse()
112-
.withStatus(200)
113-
.withBody(mpuInitBody)));
102+
stubCreateMpuSuccessfulResponse();
114103

115104
wireMock.stubFor(put(anyUrl())
116105
.willReturn(aResponse()
117106
.withStatus(500)
118107
.withBody("Internal Error")));
119108

120-
UploadFileRequest request = UploadFileRequest.builder()
121-
.source(testFile)
122-
.putObjectRequest(put -> put.bucket("bucket").key("key"))
123-
.build();
109+
UploadFileRequest request = createUploadFileRequest();
124110

125111
assertThatThrownBy(() -> tm.uploadFile(request).completionFuture().join())
126112
.hasCauseInstanceOf(S3Exception.class);
@@ -131,8 +117,57 @@ void retryableErrorDuringUpload_shouldSupportRetries() {
131117
.withQueryParam("partNumber", matching("1")));
132118
}
133119

120+
@Test
121+
void connectionFaultDuringUpload_shouldSaveStateOfUpload() {
122+
S3TransferManager tm = S3TransferManager.builder().s3Client(s3).build();
123+
124+
stubCreateMpuSuccessfulResponse();
125+
126+
wireMock.stubFor(put(urlPathMatching("/bucket/key?partNumber=1&uploadId=uploadId"))
127+
.willReturn(aResponse()
128+
.withStatus(200)
129+
.withBody("<Part><PartNumber>1</PartNumber><ETag>\"etag1\"</ETag></Part>")));
130+
131+
wireMock.stubFor(put(urlPathMatching("/bucket/key?partNumber=2&uploadId=uploadId"))
132+
.willReturn(aResponse()
133+
.withFault(Fault.CONNECTION_RESET_BY_PEER)));
134+
135+
UploadFileRequest request = createUploadFileRequest();
136+
137+
FileUpload fileUpload = null;
138+
try {
139+
tm.uploadFile(request);
140+
} catch (Exception e) {
141+
assertThat(e).isInstanceOf(CompletionException.class);
142+
ResumableFileUpload resumableFileUpload = fileUpload.pause();
143+
assertThat(resumableFileUpload.multipartUploadId()).isPresent();
144+
assertThat(resumableFileUpload.multipartUploadId().get()).isEqualTo("uploadId");
145+
}
146+
}
147+
148+
private void stubCreateMpuSuccessfulResponse() {
149+
String mpuInitBody = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
150+
+ "<InitiateMultipartUploadResult>\n"
151+
+ " <Bucket>bucket</Bucket>\n"
152+
+ " <Key>key</Key>\n"
153+
+ " <UploadId>uploadId</UploadId>\n"
154+
+ "</InitiateMultipartUploadResult>";
155+
156+
wireMock.stubFor(post(urlEqualTo("/bucket/key?uploads"))
157+
.willReturn(aResponse()
158+
.withStatus(200)
159+
.withBody(mpuInitBody)));
160+
}
161+
162+
private UploadFileRequest createUploadFileRequest() {
163+
return UploadFileRequest.builder()
164+
.source(testFile)
165+
.putObjectRequest(put -> put.bucket("bucket").key("key"))
166+
.build();
167+
}
168+
134169
private static void writeTestFile(Path file, long size) {
135-
try (OutputStream os = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
170+
try (OutputStream os = Files.newOutputStream(file, StandardOpenOption.CREATE)) {
136171
byte[] buff = new byte[4096];
137172
long remaining = size;
138173
while (remaining != 0) {

0 commit comments

Comments
 (0)