diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt index 6f0e244ea08..11f2ba69c13 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt @@ -1,2 +1,7 @@ Comparing source compatibility of opentelemetry-sdk-common-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-common-1.60.1.jar -No changes. \ No newline at end of file +*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.GrpcSenderConfig (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize() +*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.HttpSenderConfig (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize() diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt index f0ba1adf8a7..c641fffcf40 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt @@ -1,2 +1,4 @@ Comparing source compatibility of opentelemetry-sdk-extension-jaeger-remote-sampler-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-extension-jaeger-remote-sampler-1.60.1.jar -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder setMaxSamplingStrategyResponseBodySize(long) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java index 53a3d5c7c5e..867cf897c3f 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java @@ -230,7 +230,10 @@ public GrpcExporter build() { isPlainHttp ? null : tlsConfigHelper.getSslContext(), isPlainHttp ? null : tlsConfigHelper.getTrustManager(), executorService, - grpcChannel)); + grpcChannel, + // 4mb to align with spec guidance - even though we don't do anything with the + // response today, we will so better to have future-looking memory profile + 4 * 1024L * 1024L)); LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName()); return new GrpcExporter( diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java index 66a3a2ad5f0..33a430ae920 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java @@ -37,7 +37,8 @@ public static ImmutableGrpcSenderConfig create( @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, @Nullable ExecutorService executorService, - @Nullable Object managedChannel) { + @Nullable Object managedChannel, + long maxResponseBodySize) { return new AutoValue_ImmutableGrpcSenderConfig( endpoint, fullMethodName, @@ -49,6 +50,10 @@ public static ImmutableGrpcSenderConfig create( sslContext, trustManager, executorService, - managedChannel); + managedChannel, + maxResponseBodySize); } + + @Override + public abstract long getMaxResponseBodySize(); } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java index 2e06f8caa26..8937acbb474 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java @@ -245,7 +245,10 @@ public HttpExporter build() { retryPolicy, isPlainHttp ? null : tlsConfigHelper.getSslContext(), isPlainHttp ? null : tlsConfigHelper.getTrustManager(), - executorService)); + executorService, + // 4mb to align with spec guidance - even though we don't do anything with the + // response today, we will so better to have future-looking memory profile + 4 * 1024L * 1024L)); LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName()); return new HttpExporter( diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java index 24e5a9bb1bc..805d48e2638 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java @@ -35,7 +35,8 @@ static HttpSenderConfig create( @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { return new AutoValue_ImmutableHttpSenderConfig( endpoint, contentType, @@ -47,6 +48,10 @@ static HttpSenderConfig create( retryPolicy, sslContext, trustManager, - executorService); + executorService, + maxResponseBodySize); } + + @Override + public abstract long getMaxResponseBodySize(); } diff --git a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java index 22271bc271e..5dbc8f42ba9 100644 --- a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java +++ b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java @@ -110,7 +110,8 @@ public void setUp() { null, null, null, - null), + null, + Long.MAX_VALUE), InternalTelemetryVersion.LATEST, ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER), MeterProvider::noop, diff --git a/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java b/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java index 2ac958b47ab..3af11db2d6b 100644 --- a/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java +++ b/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java @@ -42,7 +42,7 @@ void usingOkHttp() throws Exception { @Override // whilst profile signal type is in development it uses a different error message @SuppressLogger(GrpcExporter.class) protected void testExport_Unimplemented() { - addGrpcError(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); + addGrpcResponse(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); TelemetryExporter exporter = nonRetryingExporter(); diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java index 664e839314e..139baf09850 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java @@ -7,6 +7,7 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -15,7 +16,9 @@ import static org.junit.jupiter.api.Named.named; import static org.junit.jupiter.params.provider.Arguments.arguments; +import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.Message; +import com.google.protobuf.Parser; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.TlsKeyPair; @@ -111,9 +114,11 @@ public abstract class AbstractGrpcTelemetryExporterTest { private static final ConcurrentLinkedQueue exportedResourceTelemetry = new ConcurrentLinkedQueue<>(); - private static final ConcurrentLinkedQueue grpcErrors = + private static final ConcurrentLinkedQueue grpcResponses = new ConcurrentLinkedQueue<>(); + private static volatile byte[] defaultResponseBytes = new byte[0]; + private static final AtomicInteger attempts = new AtomicInteger(); private static final ConcurrentLinkedQueue httpRequests = @@ -143,13 +148,7 @@ public void export( ExportLogsServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceLogsList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportLogsServiceResponse.parser()); } }) .addService( @@ -159,14 +158,7 @@ public void export( ExportMetricsServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceMetricsList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext( - ExportMetricsServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportMetricsServiceResponse.parser()); } }) .addService( @@ -176,14 +168,7 @@ public void export( ExportTraceServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceSpansList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext( - ExportTraceServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportTraceServiceResponse.parser()); } }) .addService( @@ -193,14 +178,7 @@ public void export( ExportProfilesServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceProfilesList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext( - ExportProfilesServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportProfilesServiceResponse.parser()); } }) .decompressorRegistry( @@ -239,6 +217,25 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) } }; + private static void handleExport( + StreamObserver responseObserver, Parser parser) { + GrpcServerResponse grpcResponse = grpcResponses.poll(); + if (grpcResponse != null && grpcResponse.error != null) { + responseObserver.onError(grpcResponse.error); + } else { + try { + byte[] responseBytes = + grpcResponse != null && grpcResponse.responseBytes != null + ? grpcResponse.responseBytes + : defaultResponseBytes; + responseObserver.onNext(parser.parseFrom(responseBytes)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + responseObserver.onCompleted(); + } + } + @RegisterExtension protected LogCapturer logs = LogCapturer.create().captureForType(GrpcExporter.class); @@ -265,6 +262,7 @@ void setUp() { .setInitialBackoff(Duration.ofMillis(1)) .build()) .build(); + defaultResponseBytes = exporter.exportResponse(0).toByteArray(); // Sanity check that TLS files are in PEM format. assertThat(certificate.certificateFile()) @@ -293,7 +291,7 @@ void tearDown() { @AfterEach void reset() { exportedResourceTelemetry.clear(); - grpcErrors.clear(); + grpcResponses.clear(); attempts.set(0); httpRequests.clear(); } @@ -332,6 +330,40 @@ void export() { .matches("OTel-OTLP-Exporter-Java/1\\..*")); } + @Test + // @SuppressLogger(HttpExporter.class) + void responseBodyBounds() { + // We have a 4mb hardcoded response body limit. Responses <= 4mb succeed. Responses >= 4mb + // succeed. We can't test payloads exactly at 4mb because protobuf message lengths are finicky - + // its hard to create a message with an exact size. + + // Body below the limit, succeeds + addGrpcResponse(GrpcStatusCode.OK, null, exporter.exportResponse(100)); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isTrue(); + + // Body over the limit, fails with RESOURCE_EXHAUSTED + addGrpcResponse(GrpcStatusCode.OK, null, exporter.exportResponse(4 * 1024 * 1024 + 1)); + result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.GrpcExportException.class, + ex -> + assertThat(requireNonNull(ex.getResponse())) + .isNotNull() + .satisfies( + grpcResponse -> + assertThat(grpcResponse.getStatusCode()) + .isEqualTo(GrpcStatusCode.RESOURCE_EXHAUSTED))); + } + @Test void multipleItems() { List telemetry = new ArrayList<>(); @@ -603,7 +635,7 @@ void doubleShutdown() { @SuppressLogger(GrpcExporter.class) void error() { GrpcStatusCode statusCode = GrpcStatusCode.INTERNAL; - addGrpcError(statusCode, null); + addGrpcResponse(statusCode, null); try (TelemetryExporter exporter = nonRetryingExporter()) { CompletableResultCode result = @@ -639,7 +671,7 @@ void error() { @Test @SuppressLogger(GrpcExporter.class) void errorWithUnknownError() { - addGrpcError(GrpcStatusCode.UNKNOWN, null); + addGrpcResponse(GrpcStatusCode.UNKNOWN, null); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -662,7 +694,7 @@ void errorWithUnknownError() { @Test @SuppressLogger(GrpcExporter.class) void errorWithMessage() { - addGrpcError(GrpcStatusCode.RESOURCE_EXHAUSTED, "out of quota"); + addGrpcResponse(GrpcStatusCode.RESOURCE_EXHAUSTED, "out of quota"); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -683,7 +715,7 @@ void errorWithMessage() { @Test @SuppressLogger(GrpcExporter.class) void errorWithEscapedMessage() { - addGrpcError(GrpcStatusCode.NOT_FOUND, "クマ🐻"); + addGrpcResponse(GrpcStatusCode.NOT_FOUND, "クマ🐻"); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -704,7 +736,7 @@ void errorWithEscapedMessage() { @Test @SuppressLogger(GrpcExporter.class) void testExport_Unavailable() { - addGrpcError(GrpcStatusCode.UNAVAILABLE, null); + addGrpcResponse(GrpcStatusCode.UNAVAILABLE, null); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -726,7 +758,7 @@ void testExport_Unavailable() { @Test @SuppressLogger(GrpcExporter.class) protected void testExport_Unimplemented() { - addGrpcError(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); + addGrpcResponse(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -772,7 +804,7 @@ protected void testExport_Unimplemented() { @ValueSource(ints = {1, 4, 8, 10, 11, 14, 15}) @SuppressLogger(GrpcExporter.class) void retryableError(int code) { - addGrpcError(GrpcStatusCode.fromValue(code), null); + addGrpcResponse(GrpcStatusCode.fromValue(code), null); assertThat( exporter @@ -787,8 +819,8 @@ void retryableError(int code) { @Test @SuppressLogger(GrpcExporter.class) void retryableError_tooManyAttempts() { - addGrpcError(GrpcStatusCode.CANCELLED, null); - addGrpcError(GrpcStatusCode.CANCELLED, null); + addGrpcResponse(GrpcStatusCode.CANCELLED, null); + addGrpcResponse(GrpcStatusCode.CANCELLED, null); assertThat( exporter @@ -804,7 +836,7 @@ void retryableError_tooManyAttempts() { @ValueSource(ints = {2, 3, 5, 6, 7, 9, 12, 13, 16}) @SuppressLogger(GrpcExporter.class) void nonRetryableError(int code) { - addGrpcError(GrpcStatusCode.fromValue(code), null); + addGrpcResponse(GrpcStatusCode.fromValue(code), null); assertThat( exporter @@ -1273,7 +1305,27 @@ protected TelemetryExporter nonRetryingExporter() { return exporterBuilder().setEndpoint(server.httpUri().toString()).setRetryPolicy(null).build(); } - protected static void addGrpcError(GrpcStatusCode code, @Nullable String message) { - grpcErrors.add(new ArmeriaStatusException(code.getValue(), message)); + protected static void addGrpcResponse(GrpcStatusCode code, @Nullable String message) { + addGrpcResponse(code, message, null); + } + + protected static void addGrpcResponse( + GrpcStatusCode code, + @Nullable String message, + @Nullable AbstractMessageLite bodyMessage) { + grpcResponses.add( + new GrpcServerResponse( + code == GrpcStatusCode.OK ? null : new ArmeriaStatusException(code.getValue(), message), + bodyMessage == null ? null : bodyMessage.toByteArray())); + } + + private static final class GrpcServerResponse { + @Nullable final ArmeriaStatusException error; + @Nullable final byte[] responseBytes; + + GrpcServerResponse(@Nullable ArmeriaStatusException error, @Nullable byte[] responseBytes) { + this.error = error; + this.responseBytes = responseBytes; + } } } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java index 79927098035..ecacdd48dcb 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java @@ -13,6 +13,7 @@ import static org.junit.jupiter.api.Named.named; import static org.junit.jupiter.params.provider.Arguments.arguments; +import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.linecorp.armeria.common.HttpRequest; @@ -34,11 +35,8 @@ import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; -import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; -import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; -import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InternalTelemetryVersion; import io.opentelemetry.sdk.common.export.ProxyOptions; @@ -116,6 +114,8 @@ public abstract class AbstractHttpTelemetryExporterTest { private static final ConcurrentLinkedQueue httpRequests = new ConcurrentLinkedQueue<>(); + private static volatile byte[] successResponseBytes = new byte[0]; + @RegisterExtension @Order(1) static final SelfSignedCertificateExtension certificate = new SelfSignedCertificateExtension(); @@ -135,20 +135,17 @@ protected void configure(ServerBuilder sb) { "/v1/traces", new CollectorService<>( ExportTraceServiceRequest::parseFrom, - ExportTraceServiceRequest::getResourceSpansList, - ExportTraceServiceResponse.getDefaultInstance().toByteArray())); + ExportTraceServiceRequest::getResourceSpansList)); sb.service( "/v1/metrics", new CollectorService<>( ExportMetricsServiceRequest::parseFrom, - ExportMetricsServiceRequest::getResourceMetricsList, - ExportMetricsServiceResponse.getDefaultInstance().toByteArray())); + ExportMetricsServiceRequest::getResourceMetricsList)); sb.service( "/v1/logs", new CollectorService<>( ExportLogsServiceRequest::parseFrom, - ExportLogsServiceRequest::getResourceLogsList, - ExportLogsServiceResponse.getDefaultInstance().toByteArray())); + ExportLogsServiceRequest::getResourceLogsList)); sb.http(0); sb.https(0); @@ -162,15 +159,12 @@ protected void configure(ServerBuilder sb) { private static class CollectorService implements HttpService { private final ThrowingExtractor parse; private final Function> getResourceTelemetry; - private final byte[] successResponse; private CollectorService( ThrowingExtractor parse, - Function> getResourceTelemetry, - byte[] successResponse) { + Function> getResourceTelemetry) { this.parse = parse; this.getResourceTelemetry = getResourceTelemetry; - this.successResponse = successResponse; } @Override @@ -195,7 +189,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) { : HttpResponse.of( HttpStatus.OK, MediaType.parse("application/x-protobuf"), - successResponse); + successResponseBytes); }); return HttpResponse.of(responseFuture); } @@ -236,7 +230,6 @@ protected AbstractHttpTelemetryExporterTest( @BeforeAll void setUp() { - // exporter = exporterBuilder() .setEndpoint(server.httpUri() + path) @@ -248,6 +241,7 @@ void setUp() { .setInitialBackoff(Duration.ofMillis(1)) .build()) .build(); + successResponseBytes = exporter.exportResponse(0).toByteArray(); // Sanity check that TLS files are in PEM format. assertThat(certificate.certificateFile()) @@ -553,7 +547,7 @@ void doubleShutdown() { @SuppressLogger(HttpExporter.class) void error() { int statusCode = 500; - addHttpError(statusCode); + addHttpResponse(statusCode); CompletableResultCode result = exporter .export(Collections.singletonList(generateFakeTelemetry())) @@ -590,10 +584,41 @@ void error() { assertThat(log.getLevel()).isEqualTo(Level.WARN); } + @Test + @SuppressLogger(HttpExporter.class) + void responseBodyBounds() { + // We have a 4mb hardcoded response body limit. Responses <= 4mb succeed. Responses >= 4mb + // succeed. We can't test payloads exactly at 4mb because protobuf message lengths are finicky - + // its hard to create a message with an exact size. + + // Body below the limit, succeeds + addHttpResponse(200, exporter.exportResponse(100)); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isTrue(); + + // Body over the limit, fails with RESOURCE_EXHAUSTED + addHttpResponse(200, exporter.exportResponse(4 * 1024 * 1024 + 1)); + result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.HttpExportException.class, + ex -> + assertThat(ex.getCause()) + .isNotNull() + .hasMessageContaining("HTTP response body exceeded limit of")); + } + @ParameterizedTest @ValueSource(ints = {429, 502, 503, 504}) void retryableError(int code) { - addHttpError(code); + addHttpResponse(code); assertThat( exporter @@ -608,8 +633,8 @@ void retryableError(int code) { @Test @SuppressLogger(HttpExporter.class) void retryableError_tooManyAttempts() { - addHttpError(502); - addHttpError(502); + addHttpResponse(502); + addHttpResponse(502); assertThat( exporter @@ -625,7 +650,7 @@ void retryableError_tooManyAttempts() { @SuppressLogger(HttpExporter.class) @ValueSource(ints = {400, 401, 403, 500, 501}) void nonRetryableError(int code) { - addHttpError(code); + addHttpResponse(code); assertThat( exporter @@ -1096,7 +1121,13 @@ private List toProto(List telemetry) { .collect(Collectors.toList()); } - private static void addHttpError(int code) { + private static void addHttpResponse(int code) { httpErrors.add(HttpResponse.of(code)); } + + private static void addHttpResponse(int code, AbstractMessageLite bodyMessage) { + httpErrors.add( + HttpResponse.of( + HttpStatus.valueOf(code), MediaType.PLAIN_TEXT_UTF_8, bodyMessage.toByteArray())); + } } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java index cbd0168789b..ab9b4611844 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java @@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull; +import com.google.protobuf.AbstractMessageLite; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.GrpcSslContexts; @@ -68,6 +69,8 @@ public TelemetryExporterBuilder setEndpoint(String endpoint) { // the User-Agent to be spec compliant they must manually set the user agent when building // their channel. channelBuilder.userAgent(OtlpUserAgent.getUserAgent()); + // Its user's responsibility to set the max inbound message size when building their channel. + channelBuilder.maxInboundMessageSize(4 * 1024 * 1024); return this; } @@ -230,6 +233,11 @@ public CompletableResultCode shutdown() { shutdownCallback.run(); return delegateExporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + return delegateExporter.exportResponse(minimumSize); + } }; } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java index 65a37f2650f..6bb4c3132cc 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java @@ -5,8 +5,18 @@ package io.opentelemetry.exporter.otlp.testing.internal; +import com.google.common.base.Strings; +import com.google.protobuf.AbstractMessageLite; import io.opentelemetry.exporter.otlp.profiles.ProfileData; import io.opentelemetry.exporter.otlp.profiles.ProfileExporter; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsPartialSuccess; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsPartialSuccess; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; +import io.opentelemetry.proto.collector.profiles.v1development.ExportProfilesPartialSuccess; +import io.opentelemetry.proto.collector.profiles.v1development.ExportProfilesServiceResponse; +import io.opentelemetry.proto.collector.trace.v1.ExportTracePartialSuccess; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; @@ -36,6 +46,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportTraceServiceResponse.getDefaultInstance(); + } + return ExportTraceServiceResponse.newBuilder() + .setPartialSuccess( + ExportTracePartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -56,6 +79,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportMetricsServiceResponse.getDefaultInstance(); + } + return ExportMetricsServiceResponse.newBuilder() + .setPartialSuccess( + ExportMetricsPartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -76,6 +112,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportLogsServiceResponse.getDefaultInstance(); + } + return ExportLogsServiceResponse.newBuilder() + .setPartialSuccess( + ExportLogsPartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -96,6 +145,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportProfilesServiceResponse.getDefaultInstance(); + } + return ExportProfilesServiceResponse.newBuilder() + .setPartialSuccess( + ExportProfilesPartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -105,6 +167,8 @@ public CompletableResultCode shutdown() { CompletableResultCode shutdown(); + AbstractMessageLite exportResponse(int minimumSize); + @Override default void close() { shutdown().join(10, TimeUnit.SECONDS); diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index aee4990c651..03c0cf2fa2f 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -5,10 +5,9 @@ package io.opentelemetry.exporter.sender.jdk.internal; -import static java.util.stream.Collectors.joining; - import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.Compressor; +import io.opentelemetry.sdk.common.export.HttpResponse; import io.opentelemetry.sdk.common.export.HttpSender; import io.opentelemetry.sdk.common.export.MessageWriter; import io.opentelemetry.sdk.common.export.ProxyOptions; @@ -16,19 +15,19 @@ import io.opentelemetry.sdk.common.internal.DaemonThreadFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.UncheckedIOException; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; -import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -72,6 +71,7 @@ public final class JdkHttpSender implements HttpSender { private final Supplier>> headerSupplier; @Nullable private final RetryPolicy retryPolicy; private final Predicate retryExceptionPredicate; + private final long maxResponseBodySize; // Visible for testing JdkHttpSender( @@ -82,7 +82,8 @@ public final class JdkHttpSender implements HttpSender { Duration timeout, Supplier>> headerSupplier, @Nullable RetryPolicy retryPolicy, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { this.client = client; this.endpoint = endpoint; this.contentType = contentType; @@ -101,6 +102,7 @@ public final class JdkHttpSender implements HttpSender { this.executorService = executorService; this.managedExecutor = false; } + this.maxResponseBodySize = maxResponseBodySize; } JdkHttpSender( @@ -113,7 +115,8 @@ public final class JdkHttpSender implements HttpSender { @Nullable RetryPolicy retryPolicy, @Nullable ProxyOptions proxyOptions, @Nullable SSLContext sslContext, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { this( configureClient(sslContext, connectTimeout, proxyOptions), endpoint, @@ -122,7 +125,8 @@ public final class JdkHttpSender implements HttpSender { timeout, headerSupplier, retryPolicy, - executorService); + executorService, + maxResponseBodySize); } private static ExecutorService newExecutor() { @@ -151,10 +155,8 @@ private static HttpClient configureClient( @Override public void send( - MessageWriter messageWriter, - Consumer onResponse, - Consumer onError) { - CompletableFuture> unused = + MessageWriter messageWriter, Consumer onResponse, Consumer onError) { + CompletableFuture unused = CompletableFuture.supplyAsync( () -> { try { @@ -170,12 +172,12 @@ public void send( onError.accept(throwable); return; } - onResponse.accept(toHttpResponse(httpResponse)); + onResponse.accept(httpResponse); }); } // Visible for testing - HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOException { + HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOException { long startTimeNanos = System.nanoTime(); HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(endpoint).timeout(timeout); Map> headers = headerSupplier.get(); @@ -207,7 +209,7 @@ HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOExce long attempt = 0; long nextBackoffNanos = retryPolicy.getInitialBackoff().toNanos(); - HttpResponse httpResponse = null; + HttpResponse httpResponse = null; IOException exception = null; do { if (attempt > 0) { @@ -234,7 +236,7 @@ HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOExce requestBuilder.timeout(timeout.minusNanos(System.nanoTime() - startTimeNanos)); try { httpResponse = sendRequest(requestBuilder, byteBufferPool); - boolean retryable = retryableStatusCodes.contains(httpResponse.statusCode()); + boolean retryable = retryableStatusCodes.contains(httpResponse.getStatusCode()); if (logger.isLoggable(Level.FINER)) { logger.log( Level.FINER, @@ -273,21 +275,16 @@ HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOExce throw exception; } - private static String responseStringRepresentation(HttpResponse response) { - StringJoiner joiner = new StringJoiner(",", "HttpResponse{", "}"); - joiner.add("code=" + response.statusCode()); - joiner.add( - "headers=" - + response.headers().map().entrySet().stream() - .map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue())) - .collect(joining(",", "[", "]"))); - return joiner.toString(); + private static String responseStringRepresentation(HttpResponse response) { + return "HttpResponse{code=" + response.getStatusCode() + "}"; } - private HttpResponse sendRequest( + private HttpResponse sendRequest( HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException { try { - return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray()); + java.net.http.HttpResponse response = + client.send(requestBuilder.build(), BodyHandlers.ofInputStream()); + return toHttpResponse(response); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); @@ -306,7 +303,14 @@ private static boolean isRetryableException(IOException throwable) { // Known retryable HttpTimeoutException messages: "request timed out" // Known retryable HttpConnectTimeoutException messages: "HTTP connect timed // out" - return !(throwable instanceof SSLException); + return !(throwable instanceof SSLException) + && !(throwable instanceof ResponseBodyTooLargeException); + } + + private static final class ResponseBodyTooLargeException extends IOException { + ResponseBodyTooLargeException(String message) { + super(message); + } } private static class NoCopyByteArrayOutputStream extends ByteArrayOutputStream { @@ -319,22 +323,45 @@ private byte[] buf() { } } - private static io.opentelemetry.sdk.common.export.HttpResponse toHttpResponse( - HttpResponse response) { - return new io.opentelemetry.sdk.common.export.HttpResponse() { + private HttpResponse toHttpResponse(java.net.http.HttpResponse response) + throws IOException { + int statusCode = response.statusCode(); + // Read up to maxResponseBodySize + 1 bytes. Reading exactly one byte more than the limit + // lets us detect overflow: if we read more than maxResponseBodySize bytes, the body exceeded + // the limit. A body exactly at the limit will read no further (EOF is reached first). + // If maxResponseBodySize is >= Integer.MAX_VALUE, adding 1 would overflow (long) or exceed + // what readNBytes accepts (int). In that case read Integer.MAX_VALUE bytes — the overflow + // check can never trigger for such a large limit. + int readUpTo = + maxResponseBodySize >= Integer.MAX_VALUE + ? Integer.MAX_VALUE + : (int) (maxResponseBodySize + 1); + byte[] bodyBytes; + try (InputStream is = response.body()) { + bodyBytes = is.readNBytes(readUpTo); + } catch (IOException e) { + bodyBytes = new byte[0]; + logger.log(Level.WARNING, "Failed to read response body", e); + } + if (bodyBytes.length > maxResponseBodySize) { + throw new ResponseBodyTooLargeException( + "HTTP response body exceeded limit of " + maxResponseBodySize + " bytes"); + } + byte[] body = bodyBytes; + return new HttpResponse() { @Override public int getStatusCode() { - return response.statusCode(); + return statusCode; } @Override public String getStatusMessage() { - return String.valueOf(response.statusCode()); + return String.valueOf(statusCode); } @Override public byte[] getResponseBody() { - return response.body(); + return body; } }; } diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java index 0ece6a9978d..fcd9d64e0fe 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java @@ -29,6 +29,7 @@ public HttpSender createSender(HttpSenderConfig httpSenderConfig) { httpSenderConfig.getRetryPolicy(), httpSenderConfig.getProxyOptions(), httpSenderConfig.getSslContext(), - httpSenderConfig.getExecutorService()); + httpSenderConfig.getExecutorService(), + httpSenderConfig.getMaxResponseBodySize()); } } diff --git a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java index c5bddc79fb1..24f764f3857 100644 --- a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java +++ b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java @@ -66,7 +66,8 @@ void setup() throws IOException, InterruptedException { Duration.ofSeconds(10), Collections::emptyMap, RetryPolicy.builder().setMaxAttempts(2).setInitialBackoff(Duration.ofMillis(1)).build(), - null); + null, + Long.MAX_VALUE); } @Test @@ -121,7 +122,8 @@ void sendInternal_RetryableConnectException() throws IOException, InterruptedExc Duration.ofSeconds(10), Collections::emptyMap, RetryPolicy.builder().setMaxAttempts(2).setInitialBackoff(Duration.ofMillis(1)).build(), - null); + null, + Long.MAX_VALUE); assertThatThrownBy(() -> sender.sendInternal(new NoOpRequestBodyWriter())) .satisfies( @@ -177,7 +179,8 @@ void connectTimeout() { null, null, null, - null); + null, + Long.MAX_VALUE); assertThat(sender) .extracting("client", as(InstanceOfAssertFactories.type(HttpClient.class))) diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index 82b4a169270..a3410432339 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -83,6 +83,7 @@ public final class OkHttpGrpcSender implements GrpcSender { private final HttpUrl url; @Nullable private final Compressor compressor; private final Supplier>> headersSupplier; + private final long maxResponseBodySize; /** Creates a new {@link OkHttpGrpcSender}. */ @SuppressWarnings("TooManyParameters") @@ -95,7 +96,8 @@ public OkHttpGrpcSender( @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { int callTimeoutMillis = (int) Math.min(timeout.toMillis(), Integer.MAX_VALUE); int connectTimeoutMillis = (int) Math.min(connectTimeout.toMillis(), Integer.MAX_VALUE); @@ -133,6 +135,7 @@ public OkHttpGrpcSender( this.compressor = compressor; this.headersSupplier = headersSupplier; this.url = HttpUrl.get(endpoint); + this.maxResponseBodySize = maxResponseBodySize; } @Override @@ -165,40 +168,88 @@ public void onFailure(Call call, IOException e) { @Override public void onResponse(Call call, Response response) { - try (ResponseBody body = response.body()) { - // Must consume body before accessing trailers - byte[] bodyBytes = null; - try { - bodyBytes = getResponseMessageBytes(body.bytes()); - } catch (IOException e) { - bodyBytes = new byte[0]; - logger.log(Level.FINE, "Failed to read response body", e); - } - byte[] resolvedBodyBytes = bodyBytes; - GrpcStatusCode status = grpcStatus(response); - String description = grpcMessage(response); - onResponse.accept( - new GrpcResponse() { - @Override - public GrpcStatusCode getStatusCode() { - return status; - } - - @Override - public String getStatusDescription() { - return description; - } - - @Override - public byte[] getResponseMessage() { - return resolvedBodyBytes; - } - }); - } + handleResponse(response, onResponse); } })); } + private void handleResponse(Response response, Consumer onResponse) { + try (ResponseBody body = response.body()) { + // Read up to maxResponseBodySize + 1 bytes. Reading exactly one byte more than the limit + // lets us detect overflow: if the buffer ends up larger than maxResponseBodySize, the body + // exceeded the limit. A body exactly at the limit will only fill the buffer to + // maxResponseBodySize (EOF is reached before the extra byte is read). + // If maxResponseBodySize is Long.MAX_VALUE, adding 1 would overflow. In that case use + // Long.MAX_VALUE directly — the overflow check can never trigger for such a large limit. + long readUpTo = + maxResponseBodySize == Long.MAX_VALUE ? Long.MAX_VALUE : maxResponseBodySize + 1; + Buffer buffer = new Buffer(); + try { + while (buffer.size() <= maxResponseBodySize) { + long n = body.source().read(buffer, readUpTo - buffer.size()); + if (n == -1L) { + break; + } + } + } catch (IOException e) { + logger.log(Level.FINE, "Failed to read response body", e); + } + + if (buffer.size() > maxResponseBodySize) { + onResponse.accept(responseMessageTooLarge(maxResponseBodySize)); + return; + } + + // Must consume body before accessing trailers + byte[] bodyBytes; + try { + bodyBytes = getResponseMessageBytes(buffer.readByteArray()); + } catch (IOException e) { + bodyBytes = new byte[0]; + logger.log(Level.FINE, "Failed to read response body", e); + } + byte[] resolvedBodyBytes = bodyBytes; + GrpcStatusCode status = grpcStatus(response); + String description = grpcMessage(response); + onResponse.accept( + new GrpcResponse() { + @Override + public GrpcStatusCode getStatusCode() { + return status; + } + + @Override + public String getStatusDescription() { + return description; + } + + @Override + public byte[] getResponseMessage() { + return resolvedBodyBytes; + } + }); + } + } + + private static GrpcResponse responseMessageTooLarge(long maxResponseBodySize) { + return new GrpcResponse() { + @Override + public GrpcStatusCode getStatusCode() { + return GrpcStatusCode.RESOURCE_EXHAUSTED; + } + + @Override + public String getStatusDescription() { + return "gRPC response body exceeded limit of " + maxResponseBodySize + " bytes"; + } + + @Override + public byte[] getResponseMessage() { + return new byte[0]; + } + }; + } + private static byte[] getResponseMessageBytes(byte[] bodyBytes) throws IOException { if (bodyBytes.length >= 5) { ByteArrayInputStream bodyStream = new ByteArrayInputStream(bodyBytes); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java index 09bb6eefe4f..fb4b275444f 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java @@ -31,6 +31,7 @@ public GrpcSender createSender(GrpcSenderConfig grpcSenderConfig) { grpcSenderConfig.getRetryPolicy(), grpcSenderConfig.getSslContext(), grpcSenderConfig.getTrustManager(), - grpcSenderConfig.getExecutorService()); + grpcSenderConfig.getExecutorService(), + grpcSenderConfig.getMaxResponseBodySize()); } } diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index 16b5d90b7f4..9305814b1ee 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -39,6 +39,7 @@ import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; +import okio.Buffer; import okio.BufferedSink; import okio.Okio; @@ -58,6 +59,7 @@ public final class OkHttpHttpSender implements HttpSender { private final Supplier>> headerSupplier; private final MediaType mediaType; @Nullable private final Compressor compressor; + private final long maxResponseBodySize; /** Create a sender. */ @SuppressWarnings("TooManyParameters") @@ -72,7 +74,8 @@ public OkHttpHttpSender( @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { int callTimeoutMillis = (int) Math.min(timeout.toMillis(), Integer.MAX_VALUE); int connectTimeoutMillis = (int) Math.min(connectTimeout.toMillis(), Integer.MAX_VALUE); @@ -111,6 +114,7 @@ public OkHttpHttpSender( this.mediaType = MediaType.parse(contentType); this.compressor = compressor; this.headerSupplier = headerSupplier; + this.maxResponseBodySize = maxResponseBodySize; } @Override @@ -141,39 +145,62 @@ public void onFailure(Call call, IOException e) { @Override public void onResponse(Call call, Response response) { - try (ResponseBody body = response.body()) { - onResponse.accept( - new HttpResponse() { - @Nullable private byte[] bodyBytes; - - @Override - public int getStatusCode() { - return response.code(); - } - - @Override - public String getStatusMessage() { - return response.message(); - } - - @Override - public byte[] getResponseBody() { - if (bodyBytes == null) { - try { - bodyBytes = body.bytes(); - } catch (IOException e) { - bodyBytes = new byte[0]; - logger.log(Level.WARNING, "Failed to read response body", e); - } - } - return bodyBytes; - } - }); - } + handleResponse(response, onResponse, onError); } })); } + private void handleResponse( + Response response, Consumer onResponse, Consumer onError) { + try (ResponseBody body = response.body()) { + // Read up to maxResponseBodySize + 1 bytes. Reading exactly one byte more than the limit + // lets us detect overflow: if the buffer ends up larger than maxResponseBodySize, the body + // exceeded the limit. A body exactly at the limit will only fill the buffer to + // maxResponseBodySize (EOF is reached before the extra byte is read). + // If maxResponseBodySize is Long.MAX_VALUE, adding 1 would overflow. In that case use + // Long.MAX_VALUE directly — the overflow check can never trigger for such a large limit. + long readUpTo = + maxResponseBodySize == Long.MAX_VALUE ? Long.MAX_VALUE : maxResponseBodySize + 1; + Buffer buffer = new Buffer(); + try { + while (buffer.size() <= maxResponseBodySize) { + long n = body.source().read(buffer, readUpTo - buffer.size()); + if (n == -1L) { + break; + } + } + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to read response body", e); + } + + if (buffer.size() > maxResponseBodySize) { + onError.accept( + new IOException( + "HTTP response body exceeded limit of " + maxResponseBodySize + " bytes")); + return; + } + + byte[] bodyBytes = buffer.readByteArray(); + onResponse.accept( + new HttpResponse() { + @Override + public int getStatusCode() { + return response.code(); + } + + @Override + public String getStatusMessage() { + return response.message(); + } + + @Override + public byte[] getResponseBody() { + return bodyBytes; + } + }); + } + } + @Override public CompletableResultCode shutdown() { client.dispatcher().cancelAll(); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java index 0fabe6e3bdb..581cf9bb549 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java @@ -30,6 +30,7 @@ public HttpSender createSender(HttpSenderConfig httpSenderConfig) { httpSenderConfig.getRetryPolicy(), httpSenderConfig.getSslContext(), httpSenderConfig.getTrustManager(), - httpSenderConfig.getExecutorService()); + httpSenderConfig.getExecutorService(), + httpSenderConfig.getMaxResponseBodySize()); } } diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java index 2127f92a072..a7f314c93b5 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java @@ -89,7 +89,8 @@ void shutdown_CompletableResultCodeShouldWaitForThreads() throws Exception { null, null, null, - null); + null, + Long.MAX_VALUE); CompletableResultCode sendResult = new CompletableResultCode(); sender.send( @@ -132,7 +133,8 @@ void shutdown_NonManagedExecutor_ReturnsImmediately() { null, null, null, - customExecutor); // Pass custom executor -> managedExecutor = false + customExecutor, // Pass custom executor -> managedExecutor = false + Long.MAX_VALUE); CompletableResultCode shutdownResult = sender.shutdown(); @@ -169,7 +171,8 @@ void shutdown_ExecutorDoesNotTerminateInTime_LogsWarningButSucceeds() throws Exc null, null, null, - null); // null executor = managed + null, // null executor = managed + Long.MAX_VALUE); // Start multiple requests to ensure threads are busy CountDownLatch blockCallbacks = new CountDownLatch(1); @@ -231,7 +234,8 @@ void shutdown_InterruptedWhileWaiting_StillSucceeds() throws Exception { null, null, null, - null); + null, + Long.MAX_VALUE); // Trigger some activity sender.send(new TestMessageWriter(), response -> {}, error -> {}); diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java index 705f96b601d..3bc73283db7 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java @@ -39,6 +39,7 @@ OkHttpGrpcSender createSender(String endpoint) { null, null, null, - null); + null, + Long.MAX_VALUE); } } diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java index 95699b5aa51..c64247a9b36 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java @@ -46,6 +46,7 @@ OkHttpHttpSender createSender(String endpoint) { null, null, null, - null); + null, + Long.MAX_VALUE); } } diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java index 66bdb787cdb..ba5429d827a 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java +++ b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java @@ -42,12 +42,14 @@ public final class JaegerRemoteSamplerBuilder { private static final Sampler INITIAL_SAMPLER = Sampler.parentBased(Sampler.traceIdRatioBased(0.001)); private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10); + private static final long DEFAULT_MAX_RESPONSE_BODY_SIZE = 4 * 1024L * 1024L; private URI endpoint = DEFAULT_ENDPOINT; private Sampler initialSampler = INITIAL_SAMPLER; private int pollingIntervalMillis = DEFAULT_POLLING_INTERVAL_MILLIS; private final TlsConfigHelper tlsConfigHelper = new TlsConfigHelper(); private Supplier> headerSupplier = Collections::emptyMap; + private long maxResponseBodySize = DEFAULT_MAX_RESPONSE_BODY_SIZE; @Nullable private String serviceName; @@ -147,6 +149,16 @@ public JaegerRemoteSamplerBuilder setInitialSampler(Sampler initialSampler) { return this; } + /** + * Sets the maximum number of bytes to read from a sampling strategy response body. If unset, + * defaults to 4 MiB. Must be positive. + */ + public JaegerRemoteSamplerBuilder setMaxSamplingStrategyResponseBodySize(long bytes) { + Utils.checkArgument(bytes > 0, "maxSamplingStrategyResponseBodySize must be positive"); + this.maxResponseBodySize = bytes; + return this; + } + /** * Sets the managed channel to use when communicating with the backend. Takes precedence over * {@link #setEndpoint(String)} if both are called. @@ -195,7 +207,8 @@ private GrpcSender resolveGrpcSender() { tlsConfigHelper.getSslContext(), tlsConfigHelper.getTrustManager(), null, - grpcChannel); + grpcChannel, + maxResponseBodySize); return grpcSenderProvider.createSender(grpcSenderConfig); } } diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java index 500b12ad147..047d99eccec 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java @@ -27,6 +27,9 @@ public interface GrpcSender { * called when the call could not be executed due to cancellation, connectivity problems, or * timeout. * + *

The byte array returned by {@link GrpcResponse#getResponseMessage()} should contain at most + * {@link GrpcSenderConfig#getMaxResponseBodySize()} bytes. + * * @param messageWriter the message writer * @param onResponse the callback to invoke with the gRPC response * @param onError the callback to invoke when the gRPC call could not be executed diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java index 53fd2eb556f..35e02e09b4c 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java @@ -88,4 +88,14 @@ public interface GrpcSenderConfig { */ @Nullable ExecutorService getExecutorService(); + + /** + * The maximum number of bytes to read from a response body. Defaults to 4 MiB. + * + *

Warning: setting a high or unbounded limit allows a malicious or misconfigured server to + * cause unbounded heap allocation, potentially leading to out-of-memory errors. + */ + default long getMaxResponseBodySize() { + return 4 * 1024L * 1024L; + } } diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java index a039d4ad6d0..bf0d75524f1 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java @@ -27,6 +27,9 @@ public interface HttpSender { * called when the request could not be executed due to cancellation, connectivity problems, or * timeout. * + *

The byte array returned by {@link HttpResponse#getResponseBody()} should contain at most + * {@link HttpSenderConfig#getMaxResponseBodySize()} bytes. + * * @param messageWriter the request body message writer * @param onResponse the callback to invoke with the HTTP response * @param onError the callback to invoke when the HTTP request could not be executed diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java index d67a71968a7..83d83b9f7b0 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java @@ -86,4 +86,14 @@ public interface HttpSenderConfig { */ @Nullable ExecutorService getExecutorService(); + + /** + * The maximum number of bytes to read from a response body. Defaults to 4 MiB. + * + *

Warning: setting a high or unbounded limit allows a malicious or misconfigured server to + * cause unbounded heap allocation, potentially leading to out-of-memory errors. + */ + default long getMaxResponseBodySize() { + return 4 * 1024L * 1024L; + } }