diff --git a/.changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json b/.changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json new file mode 100644 index 000000000000..f0cc6298ec3c --- /dev/null +++ b/.changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS CRT HTTP Client", + "contributor": "", + "description": "Fixed a potential deadlock in `AwsCrtHttpClient` that could occur when the request body `InputStream` blocked waiting for data on the CRT event loop thread. This could happen when a blocking stream (e.g., a `BufferedInputStream` wrapping a `ResponseInputStream`) was used as a request body and the read depended on the same event loop thread to deliver data. Request body writing now happens on the caller thread." +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java index 9c6d769e48fa..adbe3dcf4462 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java @@ -18,13 +18,16 @@ import static software.amazon.awssdk.http.HttpMetric.HTTP_CLIENT_NAME; import java.io.IOException; +import java.io.InputStream; import java.time.Duration; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.crt.http.HttpException; import software.amazon.awssdk.crt.http.HttpStreamManager; +import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.ExecutableHttpRequest; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.HttpExecuteResponse; @@ -35,6 +38,8 @@ import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase; import software.amazon.awssdk.http.crt.internal.CrtRequestContext; import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; +import software.amazon.awssdk.http.crt.internal.CrtUtils; import software.amazon.awssdk.utils.AttributeMap; import software.amazon.awssdk.utils.CompletableFutureUtils; @@ -64,7 +69,7 @@ private AwsCrtHttpClient(DefaultBuilder builder, AttributeMap config) { } } - public static AwsCrtHttpClient.Builder builder() { + public static Builder builder() { return new DefaultBuilder(); } @@ -107,6 +112,8 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) { } private static final class CrtHttpRequest implements ExecutableHttpRequest { + private static final int WRITE_BUFFER_SIZE = 16 * 1024; + private final CrtRequestContext context; private volatile CompletableFuture responseFuture; @@ -117,37 +124,70 @@ private CrtHttpRequest(CrtRequestContext context) { @Override public HttpExecuteResponse call() throws IOException { HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder(); + CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context); + responseFuture = result.responseFuture(); try { - responseFuture = new CrtRequestExecutor().execute(context); + writeRequestBody(result.streamHandler()); + SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture); builder.response(response); builder.responseBody(response.content().orElse(null)); return builder.build(); - } catch (CompletionException e) { - Throwable cause = e.getCause(); - - // Complete the future exceptionally to trigger connection cleanup in the response handler. - // Handles thread-interrupt case where joinInterruptibly throws due to - // InterruptedException. Without this, the - // Ensures that closeConnection() is invoked to prevent leaking the connection from the pool. - if (responseFuture != null) { - responseFuture.completeExceptionally(cause != null ? cause : e); - } + } catch (Throwable t) { + // CompletionException is the wrapper from joinInterruptibly; direct throws + // (e.g., IOException from inputStream.read in writeRequestBody) arrive unwrapped. + Throwable cause = (t instanceof CompletionException && t.getCause() != null) ? t.getCause() : t; + + // Tear down the stream so the connection is not leaked back to the pool. + // closeConnection() is idempotent and a no-op if the stream is not yet acquired + // or is already closed. + result.streamHandler().closeConnection(); + responseFuture.completeExceptionally(cause); + + throw mapToIoExceptionOrRethrow(cause); + } + } - if (cause instanceof IOException) { - throw (IOException) cause; + private static IOException mapToIoExceptionOrRethrow(Throwable cause) { + if (cause instanceof IOException) { + return (IOException) cause; + } + if (cause instanceof HttpException) { + Throwable wrapped = CrtUtils.wrapCrtException(cause); + if (wrapped instanceof IOException) { + return (IOException) wrapped; } + throw (HttpException) cause; + } + if (cause instanceof InterruptedException) { + Thread.currentThread().interrupt(); + return new IOException("Request was cancelled", cause); + } + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + if (cause instanceof Error) { + throw (Error) cause; + } + return new IOException(cause); + } - if (cause instanceof HttpException) { - throw (HttpException) cause; - } + private void writeRequestBody(CrtStreamHandler streamHandler) throws IOException { + ContentStreamProvider provider = context.sdkRequest().contentStreamProvider().orElse(null); + if (provider == null) { + return; + } - if (cause instanceof InterruptedException) { - Thread.currentThread().interrupt(); - throw new IOException("Request was cancelled", cause); + streamHandler.waitForStream(); + try (InputStream inputStream = provider.newStream()) { + byte[] buf = new byte[WRITE_BUFFER_SIZE]; + int read; + while ((read = inputStream.read(buf, 0, buf.length)) >= 0) { + byte[] chunk = read == buf.length ? buf : Arrays.copyOf(buf, read); + CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(chunk, false)); } - throw new RuntimeException(e.getCause()); + CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(null, true)); } } @@ -162,14 +202,14 @@ public void abort() { /** * Builder that allows configuration of the AWS CRT HTTP implementation. */ - public interface Builder extends SdkHttpClient.Builder { + public interface Builder extends SdkHttpClient.Builder { /** * The Maximum number of allowed concurrent requests. For HTTP/1.1 this is the same as max connections. * @param maxConcurrency maximum concurrency per endpoint * @return The builder of the method chaining. */ - AwsCrtHttpClient.Builder maxConcurrency(Integer maxConcurrency); + Builder maxConcurrency(Integer maxConcurrency); /** * Configures the number of unread bytes that can be buffered in the @@ -179,14 +219,14 @@ public interface Builder extends SdkHttpClient.Builder * @param readBufferSize The number of bytes that can be buffered. * @return The builder of the method chaining. */ - AwsCrtHttpClient.Builder readBufferSizeInBytes(Long readBufferSize); + Builder readBufferSizeInBytes(Long readBufferSize); /** * Sets the http proxy configuration to use for this client. * @param proxyConfiguration The http proxy configuration to use * @return The builder of the method chaining. */ - AwsCrtHttpClient.Builder proxyConfiguration(ProxyConfiguration proxyConfiguration); + Builder proxyConfiguration(ProxyConfiguration proxyConfiguration); /** * Sets the http proxy configuration to use for this client. @@ -194,7 +234,7 @@ public interface Builder extends SdkHttpClient.Builder * @param proxyConfigurationBuilderConsumer The consumer of the proxy configuration builder object. * @return the builder for method chaining. */ - AwsCrtHttpClient.Builder proxyConfiguration(Consumer proxyConfigurationBuilderConsumer); + Builder proxyConfiguration(Consumer proxyConfigurationBuilderConsumer); /** * Configure the health checks for all connections established by this client. @@ -214,7 +254,7 @@ public interface Builder extends SdkHttpClient.Builder * @param healthChecksConfiguration The health checks config to use * @return The builder of the method chaining. */ - AwsCrtHttpClient.Builder connectionHealthConfiguration(ConnectionHealthConfiguration healthChecksConfiguration); + Builder connectionHealthConfiguration(ConnectionHealthConfiguration healthChecksConfiguration); /** * A convenience method that creates an instance of the {@link ConnectionHealthConfiguration} builder, avoiding the @@ -224,7 +264,7 @@ public interface Builder extends SdkHttpClient.Builder * @return The builder of the method chaining. * @see #connectionHealthConfiguration(ConnectionHealthConfiguration) */ - AwsCrtHttpClient.Builder connectionHealthConfiguration(Consumer + Builder connectionHealthConfiguration(Consumer healthChecksConfigurationBuilder); /** @@ -232,21 +272,21 @@ AwsCrtHttpClient.Builder connectionHealthConfiguration(Consumer + Builder tcpKeepAliveConfiguration(Consumer tcpKeepAliveConfigurationBuilder); /** @@ -293,7 +333,7 @@ AwsCrtHttpClient.Builder tcpKeepAliveConfiguration(Consumer implements AwsCrtHttpClient.Builder { + extends AwsCrtClientBuilderBase implements Builder { @Override diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java index 7629683899a5..b4901c6d9cb8 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java @@ -67,24 +67,28 @@ private void doExecute(CrtAsyncRequestContext executionContext, HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext); - HttpStreamBaseResponseHandler crtResponseHandler = - CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler()); + CompletableFuture streamFuture = new CompletableFuture<>(); + CrtStreamHandler streamHandler = new CrtStreamHandler(streamFuture); - CompletableFuture streamFuture = - executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler); + HttpStreamBaseResponseHandler crtResponseHandler = + CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler(), streamHandler); long finalAcquireStartTime = acquireStartTime; - streamFuture.whenComplete((stream, throwable) -> { - if (shouldPublishMetrics) { - reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime); - } - - if (throwable != null) { - Throwable toThrow = wrapCrtException(throwable); - reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler()); - } - }); + executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler) + .whenComplete((stream, throwable) -> { + if (shouldPublishMetrics) { + reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime); + } + + if (throwable != null) { + Throwable toThrow = wrapCrtException(throwable); + streamFuture.completeExceptionally(toThrow); + reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler()); + } else { + streamFuture.complete(stream); + } + }); } /** diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java index 7165696435e1..0f34c31967f3 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java @@ -22,7 +22,6 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.http.HttpStreamBase; -import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkHttpFullResponse; import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; @@ -32,49 +31,97 @@ @SdkInternalApi public final class CrtRequestExecutor { - public CompletableFuture execute(CrtRequestContext executionContext) { - CompletableFuture requestFuture = new CompletableFuture<>(); + public Result execute(CrtRequestContext executionContext) { + CompletableFuture responseFuture = new CompletableFuture<>(); + CompletableFuture streamFuture = new CompletableFuture<>(); + CrtStreamHandler streamHandler = new CrtStreamHandler(streamFuture); try { - doExecute(executionContext, requestFuture); + doExecute(executionContext, responseFuture, streamHandler, streamFuture); } catch (Throwable t) { - requestFuture.completeExceptionally(t); + // Fail streamFuture too so any caller blocked in waitForStream() unblocks. + streamFuture.completeExceptionally(t); + responseFuture.completeExceptionally(t); } - return requestFuture; + return new Result(responseFuture, streamHandler); } - private void doExecute(CrtRequestContext executionContext, CompletableFuture requestFuture) { + private void doExecute(CrtRequestContext executionContext, + CompletableFuture responseFuture, + CrtStreamHandler streamHandler, + CompletableFuture streamFuture) { MetricCollector metricCollector = executionContext.metricCollector(); boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector); long acquireStartTime = 0; if (shouldPublishMetrics) { - // go ahead and get acquireStartTime for the concurrency timer as early as possible, - // so it's as accurate as possible, but only do it in a branch since clock_gettime() - // results in a full sys call barrier (multiple mutexes and a hw interrupt). acquireStartTime = System.nanoTime(); } - HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); + InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler = + new InputStreamAdaptingHttpStreamResponseHandler(responseFuture, streamHandler); HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext); - CompletableFuture streamFuture = - executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler); + boolean hasBody = executionContext.sdkRequest().contentStreamProvider().isPresent(); long finalAcquireStartTime = acquireStartTime; - streamFuture.whenComplete((streamBase, throwable) -> { - if (shouldPublishMetrics) { - reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime); - } + executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody) + .handle((streamBase, throwable) -> { + if (shouldPublishMetrics) { + reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime); + } + + if (throwable != null) { + handleAcquireFailure(throwable, streamFuture, responseFuture); + return null; + } + try { + streamBase.activate(); + } catch (Throwable t) { + // Stream is acquired but not activated and not yet published via + // streamFuture. No other path can reach it, so clean it up directly. + streamBase.cancel(); + streamBase.close(); + handleAcquireFailure(t, streamFuture, responseFuture); + return null; + } + streamFuture.complete(streamBase); + return null; + }).exceptionally(t -> { + // Defensive: only reached if the handle lambda itself throws. + handleAcquireFailure(t, streamFuture, responseFuture); + return null; + }); + } + + private void handleAcquireFailure(Throwable t, + CompletableFuture streamFuture, + CompletableFuture responseFuture) { + Throwable toThrow = wrapCrtException(t); + streamFuture.completeExceptionally(toThrow); + responseFuture.completeExceptionally(toThrow); + } - if (throwable != null) { - Throwable toThrow = wrapCrtException(throwable); - requestFuture.completeExceptionally(toThrow); - } - }); + @SdkInternalApi + public static final class Result { + private final CompletableFuture responseFuture; + private final CrtStreamHandler streamHandler; + + private Result(CompletableFuture responseFuture, CrtStreamHandler streamHandler) { + this.responseFuture = responseFuture; + this.streamHandler = streamHandler; + } + + public CompletableFuture responseFuture() { + return responseFuture; + } + + public CrtStreamHandler streamHandler() { + return streamHandler; + } } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandler.java new file mode 100644 index 000000000000..f1fc932303fd --- /dev/null +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandler.java @@ -0,0 +1,157 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.utils.CompletableFutureUtils; + +/** + * Manages the lifecycle of a CRT HTTP stream, providing thread-safe access to stream operations. + * Shared between the request executor (for writing body data) and the response handler (for + * incrementing the window and releasing/closing the connection). + * + *

The handler is constructed with a {@link CompletableFuture} representing stream acquisition. + * The caller (request executor) completes that future once the underlying CRT stream manager has + * either acquired the stream or failed. All operations on this handler chain off that future, so + * writes issued before acquisition completes are queued. + */ +@SdkInternalApi +public final class CrtStreamHandler { + + private final Object streamLock = new Object(); + private final CompletableFuture streamFuture; + private boolean streamClosed; + + public CrtStreamHandler(CompletableFuture streamFuture) { + this.streamFuture = streamFuture; + } + + /** + * Blocks until the stream has been acquired or acquisition has failed. Returns the acquired + * stream on success. If acquisition failed, the failure cause is rethrown wrapped in a + * {@link CompletionException} so callers can use the same handling as for response futures. + */ + public HttpStreamBase waitForStream() { + return CompletableFutureUtils.joinInterruptibly(streamFuture); + } + + /** + * Write data to the stream. The returned future chains on stream acquisition: if the stream + * is not yet ready, the write is queued until the {@code streamFuture} passed to the + * constructor completes. Failures from either stream acquisition or the underlying CRT write + * are propagated as the original cause (not wrapped in {@link CompletionException}) so callers + * see the same exception type whether the failure happens before or after {@code thenCompose}- + * style chaining. + */ + public CompletableFuture writeData(byte[] data, boolean endStream) { + CompletableFuture result = new CompletableFuture<>(); + streamFuture.handle((s, t) -> { + if (t != null) { + result.completeExceptionally(unwrap(t)); + return null; + } + doWrite(s, data, endStream, result); + return null; + }).exceptionally(t -> { + result.completeExceptionally(t); + closeConnection(); + return null; + }); + return result; + } + + private void doWrite(HttpStreamBase s, byte[] data, boolean endStream, CompletableFuture result) { + try { + CompletableFuture writeFuture; + synchronized (streamLock) { + if (streamClosed) { + result.completeExceptionally(new IOException("Stream is already closed, cannot write data.")); + return; + } + writeFuture = s.writeData(data, endStream); + } + writeFuture.whenComplete((v, err) -> { + if (err != null) { + result.completeExceptionally(unwrap(err)); + } else { + result.complete(null); + } + }); + } catch (Throwable th) { + result.completeExceptionally(th); + closeConnection(); + } + } + + private static Throwable unwrap(Throwable t) { + return t instanceof CompletionException && t.getCause() != null ? t.getCause() : t; + } + + public void incrementWindow(int windowSize) { + synchronized (streamLock) { + HttpStreamBase s = streamIfAvailable(); + if (!streamClosed && s != null) { + s.incrementWindow(windowSize); + } + } + } + + /** + * Release the connection back to the pool so that it may be reused. This should be called when the request + * completes successfully and the response has been fully consumed. + */ + public void releaseConnection() { + synchronized (streamLock) { + HttpStreamBase s = streamIfAvailable(); + if (!streamClosed && s != null) { + streamClosed = true; + s.close(); + } + } + } + + /** + * Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the + * connection pool. This should be called on error paths or when the stream is aborted before the response is + * fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract. + */ + public void closeConnection() { + synchronized (streamLock) { + HttpStreamBase s = streamIfAvailable(); + if (!streamClosed && s != null) { + streamClosed = true; + s.cancel(); + s.close(); + } + } + } + + /** + * Returns the acquired stream if {@link #streamFuture} completed normally, otherwise {@code null}. + * Tolerates exceptional or pending completion (in contrast to {@link CompletableFuture#getNow}, which + * throws {@link CompletionException} when the future is exceptional). + */ + private HttpStreamBase streamIfAvailable() { + if (!streamFuture.isDone() || streamFuture.isCompletedExceptionally()) { + return null; + } + return streamFuture.getNow(null); + } +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java index 8672d80b0d1b..e97b1adb514e 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java @@ -78,14 +78,7 @@ public static HttpRequest toCrtRequest(CrtRequestContext request) { HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest)); String finalEncodedPath = encodedPath + encodedQueryString; - return sdkExecuteRequest.contentStreamProvider() - .map(provider -> new HttpRequest(method, - finalEncodedPath, - crtHeaderArray, - new CrtRequestInputStreamAdapter(provider))) - .orElse(new HttpRequest(method, - finalEncodedPath, - crtHeaderArray, null)); + return new HttpRequest(method, finalEncodedPath, crtHeaderArray, null); } private static HttpHeader[] asArray(List crtHeaderList) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java deleted file mode 100644 index 68f418b9e1df..000000000000 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.http.crt.internal.request; - -import static java.lang.Math.min; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.http.HttpRequestBodyStream; -import software.amazon.awssdk.http.ContentStreamProvider; - -@SdkInternalApi -final class CrtRequestInputStreamAdapter implements HttpRequestBodyStream { - private static final int READ_BUFFER_SIZE = 16 * 1024; - - private final ContentStreamProvider provider; - private volatile InputStream providerStream; - private final byte[] readBuffer = new byte[READ_BUFFER_SIZE]; - - CrtRequestInputStreamAdapter(ContentStreamProvider provider) { - this.provider = provider; - } - - @Override - public boolean sendRequestBody(ByteBuffer bodyBytesOut) { - int read; - - try { - if (providerStream == null) { - createNewStream(); - } - - int toRead = min(READ_BUFFER_SIZE, bodyBytesOut.remaining()); - read = providerStream.read(readBuffer, 0, toRead); - - if (read > 0) { - bodyBytesOut.put(readBuffer, 0, read); - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - - return read < 0; - } - - @Override - public boolean resetPosition() { - try { - createNewStream(); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - - return true; - } - - private void createNewStream() throws IOException { - if (providerStream != null) { - providerStream.close(); - } - providerStream = provider.newStream(); - } -} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java index 1beaa872b5f4..250edb9733f5 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.async.SimplePublisher; @@ -48,27 +49,32 @@ public final class CrtResponseAdapter implements HttpStreamBaseResponseHandler { private final SimplePublisher responsePublisher; private final SdkHttpResponse.Builder responseBuilder; private final ResponseHandlerHelper responseHandlerHelper; + private final CrtStreamHandler streamHandler; private CrtResponseAdapter(CompletableFuture completionFuture, - SdkAsyncHttpResponseHandler responseHandler) { - this(completionFuture, responseHandler, new SimplePublisher<>()); + SdkAsyncHttpResponseHandler responseHandler, + CrtStreamHandler streamHandler) { + this(completionFuture, responseHandler, new SimplePublisher<>(), streamHandler); } @SdkTestInternalApi public CrtResponseAdapter(CompletableFuture completionFuture, SdkAsyncHttpResponseHandler responseHandler, - SimplePublisher simplePublisher) { + SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture"); this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler"); this.responseBuilder = SdkHttpResponse.builder(); this.responsePublisher = simplePublisher; + this.streamHandler = streamHandler; this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder); } public static HttpStreamBaseResponseHandler toCrtResponseHandler( CompletableFuture requestFuture, - SdkAsyncHttpResponseHandler responseHandler) { - return new CrtResponseAdapter(requestFuture, responseHandler); + SdkAsyncHttpResponseHandler responseHandler, + CrtStreamHandler streamHandler) { + return new CrtResponseAdapter(requestFuture, responseHandler, streamHandler); } @Override @@ -96,10 +102,10 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { writeFuture.whenComplete((result, failure) -> { if (failure != null) { failResponseHandlerAndFuture(failure); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); return; } - responseHandlerHelper.incrementWindow(bodyBytesIn.length); + streamHandler.incrementWindow(bodyBytesIn.length); }); return 0; @@ -122,7 +128,7 @@ private void onSuccessfulResponseComplete() { } completionFuture.complete(null); }); - responseHandlerHelper.releaseConnection(); + streamHandler.releaseConnection(); } private void onFailedResponseComplete(HttpException error) { @@ -130,7 +136,7 @@ private void onFailedResponseComplete(HttpException error) { Throwable toThrow = wrapWithIoExceptionIfRetryable(error); responsePublisher.error(toThrow); failResponseHandlerAndFuture(toThrow); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); } private void failResponseHandlerAndFuture(Throwable error) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java index bb04d71fdb2e..6275f14ae51d 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.AbortableInputStreamSubscriber; import software.amazon.awssdk.http.crt.AwsCrtHttpClient; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.async.SimplePublisher; @@ -45,17 +46,21 @@ public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpS private final CompletableFuture requestCompletionFuture; private final SdkHttpFullResponse.Builder responseBuilder; private final ResponseHandlerHelper responseHandlerHelper; + private final CrtStreamHandler streamHandler; - public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture) { - this(requestCompletionFuture, new SimplePublisher<>()); + public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture, + CrtStreamHandler streamHandler) { + this(requestCompletionFuture, new SimplePublisher<>(), streamHandler); } @SdkTestInternalApi public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture, - SimplePublisher simplePublisher) { + SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { this.requestCompletionFuture = requestCompletionFuture; this.responseBuilder = SdkHttpResponse.builder(); this.simplePublisher = simplePublisher; + this.streamHandler = streamHandler; this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder); } @@ -66,7 +71,7 @@ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int // Propagate cancellation requestCompletionFuture.exceptionally(t -> { - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); return null; }); } @@ -76,7 +81,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { if (inputStreamSubscriber == null) { inputStreamSubscriber = AbortableInputStreamSubscriber.builder() - .doAfterClose(() -> responseHandlerHelper.closeConnection()) + .doAfterClose(() -> streamHandler.closeConnection()) .build(); simplePublisher.subscribe(inputStreamSubscriber); // For response with a payload, we need to complete the future here to allow downstream to retrieve the data from @@ -97,10 +102,10 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { log.debug(() -> "The subscriber failed to receive the data, closing the connection and failing the future", failure); requestCompletionFuture.completeExceptionally(failure); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); return; } - responseHandlerHelper.incrementWindow(bodyBytesIn.length); + streamHandler.incrementWindow(bodyBytesIn.length); }); // Window will be incremented after the subscriber consumes the data, returning 0 here to disable it. @@ -120,16 +125,15 @@ private void onFailedResponseComplete(int errorCode) { Throwable toThrow = wrapWithIoExceptionIfRetryable(new HttpException(errorCode)); simplePublisher.error(toThrow); requestCompletionFuture.completeExceptionally(toThrow); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); } private void onSuccessfulResponseComplete() { // For response without a payload, for example, S3 PutObjectResponse, we need to complete the future // in onResponseComplete callback since onResponseBody will never be invoked. - requestCompletionFuture.complete(responseBuilder.build()); // requestCompletionFuture has been completed at this point, no need to notify the future simplePublisher.complete(); - responseHandlerHelper.releaseConnection(); + streamHandler.releaseConnection(); } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java index b90b43210472..4c9d4813f5cf 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java @@ -30,20 +30,12 @@ public class ResponseHandlerHelper { private final SdkHttpResponse.Builder responseBuilder; - private HttpStreamBase stream; - private boolean streamClosed; - private final Object streamLock = new Object(); public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder) { this.responseBuilder = responseBuilder; } public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) { - synchronized (streamLock) { - if (this.stream == null) { - this.stream = stream; - } - } if (headerType == HttpHeaderBlock.MAIN.getValue()) { for (HttpHeader h : nextHeaders) { responseBuilder.appendHeader(h.getName(), h.getValue()); @@ -51,40 +43,4 @@ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int responseBuilder.statusCode(responseStatusCode); } } - - public void incrementWindow(int windowSize) { - synchronized (streamLock) { - if (!streamClosed && stream != null) { - stream.incrementWindow(windowSize); - } - } - } - - /** - * Release the connection back to the pool so that it may be reused. This should be called when the request - * completes successfully and the response has been fully consumed. - */ - public void releaseConnection() { - synchronized (streamLock) { - if (!streamClosed && stream != null) { - streamClosed = true; - stream.close(); - } - } - } - - /** - * Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the - * connection pool. This should be called on error paths or when the stream is aborted before the response is - * fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract. - */ - public void closeConnection() { - synchronized (streamLock) { - if (!streamClosed && stream != null) { - streamClosed = true; - stream.cancel(); - stream.close(); - } - } - } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java index ce5d778f06a1..7d45b7ef2aed 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java @@ -113,6 +113,36 @@ public void sharedEventLoopGroup_closeOneClient_shouldNotAffectOtherClients() th } } + @Test + public void contentStreamReadThrows_propagatesIoExceptionAndDoesNotLeakConnection() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.builder().maxConcurrency(1).build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withBody(randomAlphabetic(10)))); + SdkHttpRequest request = createRequest(uri); + + IOException readError = new IOException("simulated read failure"); + ExecutableHttpRequest failing = client.prepareRequest( + HttpExecuteRequest.builder() + .request(request) + .contentStreamProvider(() -> new java.io.InputStream() { + @Override + public int read() throws IOException { + throw readError; + } + }) + .build()); + + assertThatThrownBy(failing::call) + .isInstanceOf(IOException.class) + .isSameAs(readError); + + // If the connection leaked, this second request would hang since maxConcurrency=1. + // Use a short overall test timeout to fail fast if the leak regresses. + HttpExecuteResponse second = makeSimpleRequest(client, null); + assertThat(second.httpResponse().statusCode()).isEqualTo(200); + } + } + @Test public void abortRequest_shouldFailTheExceptionWithIOException() throws Exception { try (SdkHttpClient client = AwsCrtHttpClient.create()) { diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java index 9318f6af228a..110ad2c97cc5 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java @@ -31,7 +31,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -54,14 +53,18 @@ public abstract class BaseHttpStreamResponseHandlerTest { HttpStreamBaseResponseHandler responseHandler; - abstract HttpStreamBaseResponseHandler responseHandler(); + CrtStreamHandler streamHandler; - abstract HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher); + abstract HttpStreamBaseResponseHandler responseHandler(CrtStreamHandler streamHandler); + + abstract HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher, + CrtStreamHandler streamHandler); @BeforeEach public void setUp() { requestFuture = new CompletableFuture<>(); - responseHandler = responseHandler(); + streamHandler = new CrtStreamHandler(CompletableFuture.completedFuture(httpStream)); + responseHandler = responseHandler(streamHandler); } @Test @@ -98,9 +101,8 @@ void failedToGetResponse_shouldCancelAndCloseStream() { responseHandler.onResponseComplete(httpStream, 1); assertThatThrownBy(() -> requestFuture.join()).hasRootCauseInstanceOf(HttpException.class); - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream).close(); } @Test @@ -123,7 +125,7 @@ void publisherWritesFutureFails_shouldCancelAndCloseStream() { CompletableFuture future = new CompletableFuture<>(); when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); - HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); handler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), @@ -140,9 +142,8 @@ void publisherWritesFutureFails_shouldCancelAndCloseStream() { // we don't verify here because it behaves differently in async and sync } - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream).close(); verify(httpStream, never()).incrementWindow(anyInt()); } @@ -152,7 +153,7 @@ void publisherWritesFutureCompletesAfterStreamClosed_shouldNotInvokeIncrementWin when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); when(simplePublisher.complete()).thenReturn(future); - HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); @@ -176,7 +177,7 @@ void publisherWritesFutureCompletesBeforeStreamClosed_shouldInvokeIncrementWindo when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); when(simplePublisher.complete()).thenReturn(future); - HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java index 456000ac1150..3ca81a08b245 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java @@ -24,6 +24,7 @@ import java.net.URI; import javax.net.ssl.SSLHandshakeException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.stream.Stream; import java.util.AbstractMap.SimpleEntry; import java.util.Map.Entry; @@ -87,7 +88,7 @@ public void execute_requestConversionFails_failsFuture() { .request(HttpExecuteRequest.builder().build()) .build(); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class); } @@ -98,11 +99,11 @@ public void execute_acquireStreamFails_wrapsWithIOException() { CrtRequestContext context = crtRequestContext(); CompletableFuture completableFuture = new CompletableFuture<>(); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); completableFuture.completeExceptionally(exception); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class); } @@ -113,10 +114,10 @@ public void execute_retryableException_wrapsWithIOException(Throwable throwable) CrtRequestContext context = crtRequestContext(); CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(throwable); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class); } @@ -130,23 +131,63 @@ public void execute_httpException_mapsToCorrectException(Entry completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass); } + @Test + public void execute_doExecuteThrowsSynchronously_failsBothStreamFutureAndResponseFuture() { + CrtRequestContext context = CrtRequestContext.builder() + .streamManager(streamManager) + .request(HttpExecuteRequest.builder().build()) + .build(); + + CrtRequestExecutor.Result result = requestExecutor.execute(context); + + assertThat(result.responseFuture()).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class); + assertThatThrownBy(() -> result.streamHandler().waitForStream()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(NullPointerException.class); + } + + @Test + public void execute_streamActivateThrows_failsBothFuturesWithIoExceptionWrappingCause() { + CrtRuntimeException activateError = new CrtRuntimeException("activate failed"); + CrtRequestContext context = crtRequestContext(); + + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), + Mockito.any(HttpStreamBaseResponseHandler.class), + Mockito.anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(httpStream)); + Mockito.doThrow(activateError).when(httpStream).activate(); + + CrtRequestExecutor.Result result = requestExecutor.execute(context); + + assertThat(result.responseFuture()).hasFailedWithThrowableThat() + .isInstanceOf(IOException.class) + .hasCauseInstanceOf(CrtRuntimeException.class); + assertThatThrownBy(() -> result.streamHandler().waitForStream()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(IOException.class) + .hasRootCauseInstanceOf(CrtRuntimeException.class); + // Verify the acquired stream was cancelled and closed so the connection is not leaked. + Mockito.verify(httpStream).cancel(); + Mockito.verify(httpStream).close(); + } + @Test public void execute_nonRetryableHttpException_doesNotWrapWithIOException() { HttpException exception = new HttpException(0x0801); // AWS_ERROR_HTTP_HEADER_NOT_FOUND CrtRequestContext context = crtRequestContext(); CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThatThrownBy(executeFuture::join).hasCause(exception); } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java index e1caaeb021a9..aba67504c5a0 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java @@ -33,34 +33,36 @@ import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter; import software.amazon.awssdk.utils.async.SimplePublisher; public class CrtResponseHandlerTest extends BaseHttpStreamResponseHandlerTest { @Override - HttpStreamBaseResponseHandler responseHandler() { + HttpStreamBaseResponseHandler responseHandler(CrtStreamHandler streamHandler) { AsyncResponseHandler responseHandler = new AsyncResponseHandler<>((response, executionAttributes) -> null, Function.identity(), new ExecutionAttributes()); responseHandler.prepare(); - return CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler); + return CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler, streamHandler); } @Override - HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) { + HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { AsyncResponseHandler responseHandler = new AsyncResponseHandler<>((response, executionAttributes) -> null, Function.identity(), new ExecutionAttributes()); responseHandler.prepare(); - return new CrtResponseAdapter(requestFuture, responseHandler, simplePublisher); + return new CrtResponseAdapter(requestFuture, responseHandler, simplePublisher, streamHandler); } @Test void onResponseComplete_publisherCancelled_closesStream() { SdkAsyncHttpResponseHandler responseHandler = new TestAsyncHttpResponseHandler(); - HttpStreamBaseResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler); + HttpStreamBaseResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); crtResponseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), httpHeaders); diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandlerTest.java new file mode 100644 index 000000000000..bc6b38296e01 --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandlerTest.java @@ -0,0 +1,160 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.crt.CrtRuntimeException; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.utils.CompletableFutureUtils; + +@ExtendWith(MockitoExtension.class) +class CrtStreamHandlerTest { + + @Mock + private HttpStreamBase stream; + + private CrtStreamHandler streamHandler; + + @BeforeEach + void setUp() { + streamHandler = new CrtStreamHandler(CompletableFuture.completedFuture(stream)); + } + + @Test + void releaseConnection_shouldCallClose() { + streamHandler.releaseConnection(); + + verify(stream, never()).cancel(); + verify(stream).close(); + } + + @Test + void closeConnection_shouldCallCancelAndClose() { + streamHandler.closeConnection(); + + verify(stream).cancel(); + verify(stream, Mockito.atLeastOnce()).close(); + } + + @Test + void incrementWindow_afterReleaseConnection_shouldBeNoOp() { + streamHandler.releaseConnection(); + streamHandler.incrementWindow(1024); + + verify(stream, never()).incrementWindow(1024); + } + + @Test + void incrementWindow_afterCloseConnection_shouldBeNoOp() { + streamHandler.closeConnection(); + streamHandler.incrementWindow(1024); + + verify(stream, never()).incrementWindow(1024); + } + + @Test + void incrementWindow_beforeClose_shouldWork() { + streamHandler.incrementWindow(1024); + + verify(stream).incrementWindow(1024); + } + + @Test + void incrementWindow_beforeStreamReady_shouldBeNoOp() { + CrtStreamHandler handler = new CrtStreamHandler(new CompletableFuture<>()); + + handler.incrementWindow(1024); + + verify(stream, never()).incrementWindow(1024); + } + + @Test + void waitForStream_afterStreamFutureFailed_throwsCompletionExceptionWrappingCause() { + IOException cause = new IOException("acquire failed"); + CrtStreamHandler handler = new CrtStreamHandler(CompletableFutureUtils.failedFuture(cause)); + + assertThatThrownBy(handler::waitForStream) + .isInstanceOf(CompletionException.class) + .hasCause(cause); + } + + @Test + void writeData_underlyingWriteFails_propagatesOriginalCauseUnwrapped() { + RuntimeException writeError = new RuntimeException("write failure"); + Mockito.when(stream.writeData(Mockito.any(byte[].class), Mockito.eq(false))) + .thenReturn(CompletableFutureUtils.failedFuture(writeError)); + + CompletableFuture writeFuture = streamHandler.writeData(new byte[] {1, 2, 3}, false); + + assertThatThrownBy(writeFuture::get) + .isInstanceOf(ExecutionException.class) + .hasCauseReference(writeError); + } + + @Test + void writeData_streamAcquisitionFailed_propagatesOriginalCauseUnwrapped() { + IOException acquireFailure = new IOException("acquire failed"); + CrtStreamHandler handler = new CrtStreamHandler(CompletableFutureUtils.failedFuture(acquireFailure)); + + CompletableFuture writeFuture = handler.writeData(new byte[] {1, 2, 3}, false); + + assertThatThrownBy(writeFuture::get) + .isInstanceOf(ExecutionException.class) + .hasCauseReference(acquireFailure); + } + + @Test + void writeData_afterCloseConnection_failsWithIoException() { + streamHandler.closeConnection(); + + CompletableFuture writeFuture = streamHandler.writeData(new byte[] {1, 2, 3}, false); + + assertThat(writeFuture).isCompletedExceptionally(); + assertThatThrownBy(writeFuture::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(IOException.class); + } + + @Test + void writeData_underlyingWriteThrowsSynchronously_failsFutureAndClosesConnection() { + CrtRuntimeException syncError = new CrtRuntimeException("AWS_ERROR_HTTP_STREAM_NOT_ACTIVATED"); + Mockito.when(stream.writeData(Mockito.any(byte[].class), Mockito.eq(false))) + .thenThrow(syncError); + + CompletableFuture writeFuture = streamHandler.writeData(new byte[] {1, 2, 3}, false); + + assertThat(writeFuture).isCompletedExceptionally(); + assertThatThrownBy(writeFuture::get) + .isInstanceOf(ExecutionException.class) + .hasCauseReference(syncError); + verify(stream).cancel(); + verify(stream).close(); + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java index 8c786624426b..2321ee0c8809 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java @@ -31,19 +31,21 @@ import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.http.SdkHttpFullResponse; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; import software.amazon.awssdk.utils.async.SimplePublisher; public class InputStreamAdaptingHttpStreamResponseHandlerTest extends BaseHttpStreamResponseHandlerTest { @Override - HttpStreamBaseResponseHandler responseHandler() { - return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); + HttpStreamBaseResponseHandler responseHandler(CrtStreamHandler streamHandler) { + return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture, streamHandler); } @Override - HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) { - return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture, simplePublisher); + HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { + return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture, simplePublisher, streamHandler); } @Test @@ -63,9 +65,8 @@ void abortStream_shouldCancelAndCloseStream() throws IOException { abortableInputStream.read(); abortableInputStream.abort(); - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream, Mockito.atLeastOnce()).close(); } @Test @@ -85,7 +86,7 @@ void closeStream_shouldCloseStreamWithoutCancel() throws IOException { abortableInputStream.read(); abortableInputStream.close(); - verify(httpStream).close(); + verify(httpStream, Mockito.atLeastOnce()).close(); } @Test @@ -97,8 +98,7 @@ void cancelFuture_shouldCancelAndCloseStream() { requestFuture.completeExceptionally(new RuntimeException()); - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream, Mockito.atLeastOnce()).close(); } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java deleted file mode 100644 index 1714b9930ec4..000000000000 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.http.crt.internal; - -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import software.amazon.awssdk.crt.http.HttpHeader; -import software.amazon.awssdk.crt.http.HttpHeaderBlock; -import software.amazon.awssdk.crt.http.HttpStreamBase; -import software.amazon.awssdk.http.SdkHttpResponse; -import software.amazon.awssdk.http.crt.internal.response.ResponseHandlerHelper; - -@ExtendWith(MockitoExtension.class) -class ResponseHandlerHelperTest { - - @Mock - private HttpStreamBase stream; - - private ResponseHandlerHelper helper; - - @BeforeEach - void setUp() { - helper = new ResponseHandlerHelper(SdkHttpResponse.builder()); - // Register the stream via onResponseHeaders - HttpHeader[] headers = { new HttpHeader("Content-Length", "1") }; - helper.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.getValue(), headers); - } - - @Test - void releaseConnection_shouldOnlyCallClose() { - helper.releaseConnection(); - - verify(stream, never()).cancel(); - verify(stream).close(); - } - - @Test - void closeConnection_shouldCallCancelThenClose() { - helper.closeConnection(); - - InOrder inOrder = Mockito.inOrder(stream); - inOrder.verify(stream).cancel(); - inOrder.verify(stream).close(); - } - - @Test - void releaseConnection_calledTwice_shouldOnlyCloseOnce() { - helper.releaseConnection(); - helper.releaseConnection(); - - verify(stream, Mockito.times(1)).close(); - } - - @Test - void closeConnection_calledTwice_shouldOnlyCloseOnce() { - helper.closeConnection(); - helper.closeConnection(); - - verify(stream, Mockito.times(1)).cancel(); - verify(stream, Mockito.times(1)).close(); - } - - @Test - void releaseConnection_afterCloseConnection_shouldBeNoOp() { - helper.closeConnection(); - helper.releaseConnection(); - - verify(stream, Mockito.times(1)).cancel(); - verify(stream, Mockito.times(1)).close(); - } - - @Test - void closeConnection_afterReleaseConnection_shouldBeNoOp() { - helper.releaseConnection(); - helper.closeConnection(); - - verify(stream, never()).cancel(); - verify(stream, Mockito.times(1)).close(); - } - - @Test - void incrementWindow_afterReleaseConnection_shouldBeNoOp() { - helper.releaseConnection(); - helper.incrementWindow(1024); - - verify(stream, never()).incrementWindow(1024); - } - - @Test - void incrementWindow_afterCloseConnection_shouldBeNoOp() { - helper.closeConnection(); - helper.incrementWindow(1024); - - verify(stream, never()).incrementWindow(1024); - } - - @Test - void incrementWindow_beforeClose_shouldWork() { - helper.incrementWindow(1024); - - verify(stream).incrementWindow(1024); - } -} diff --git a/pom.xml b/pom.xml index ab0ee75e8afa..35605a9ba553 100644 --- a/pom.xml +++ b/pom.xml @@ -131,7 +131,7 @@ 3.1.5 1.17.1 1.37 - 0.45.1 + 0.46.1 5.10.3