Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
Comparing source compatibility of opentelemetry-sdk-common-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-common-1.60.1.jar
No changes.
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.GrpcSenderConfig (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize()
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.HttpSenderConfig (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize()
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Comparing source compatibility of opentelemetry-sdk-extension-jaeger-remote-sampler-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-extension-jaeger-remote-sampler-1.60.1.jar
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder setMaxSamplingStrategyResponseBodySize(long)
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,10 @@ public GrpcExporter build() {
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager(),
executorService,
grpcChannel));
grpcChannel,
// 4mb to align with spec guidance - even though we don't do anything with the
// response today, we will so better to have future-looking memory profile
4 * 1024L * 1024L));
LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName());

return new GrpcExporter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public static ImmutableGrpcSenderConfig create(
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager,
@Nullable ExecutorService executorService,
@Nullable Object managedChannel) {
@Nullable Object managedChannel,
long maxResponseBodySize) {
return new AutoValue_ImmutableGrpcSenderConfig(
endpoint,
fullMethodName,
Expand All @@ -49,6 +50,10 @@ public static ImmutableGrpcSenderConfig create(
sslContext,
trustManager,
executorService,
managedChannel);
managedChannel,
maxResponseBodySize);
}

@Override
public abstract long getMaxResponseBodySize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ public HttpExporter build() {
retryPolicy,
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager(),
executorService));
executorService,
// 4mb to align with spec guidance - even though we don't do anything with the
// response today, we will so better to have future-looking memory profile
4 * 1024L * 1024L));
LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName());

return new HttpExporter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ static HttpSenderConfig create(
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager,
@Nullable ExecutorService executorService) {
@Nullable ExecutorService executorService,
long maxResponseBodySize) {
return new AutoValue_ImmutableHttpSenderConfig(
endpoint,
contentType,
Expand All @@ -47,6 +48,10 @@ static HttpSenderConfig create(
retryPolicy,
sslContext,
trustManager,
executorService);
executorService,
maxResponseBodySize);
}

@Override
public abstract long getMaxResponseBodySize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public void setUp() {
null,
null,
null,
null),
null,
Long.MAX_VALUE),
InternalTelemetryVersion.LATEST,
ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER),
MeterProvider::noop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void usingOkHttp() throws Exception {
@Override // whilst profile signal type is in development it uses a different error message
@SuppressLogger(GrpcExporter.class)
protected void testExport_Unimplemented() {
addGrpcError(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED");
addGrpcResponse(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED");

TelemetryExporter<ProfileData> exporter = nonRetryingExporter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
Expand All @@ -15,7 +16,9 @@
import static org.junit.jupiter.api.Named.named;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import com.google.protobuf.AbstractMessageLite;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.TlsKeyPair;
Expand Down Expand Up @@ -111,9 +114,11 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
private static final ConcurrentLinkedQueue<Object> exportedResourceTelemetry =
new ConcurrentLinkedQueue<>();

private static final ConcurrentLinkedQueue<ArmeriaStatusException> grpcErrors =
private static final ConcurrentLinkedQueue<GrpcServerResponse> grpcResponses =
new ConcurrentLinkedQueue<>();

private static volatile byte[] defaultResponseBytes = new byte[0];

private static final AtomicInteger attempts = new AtomicInteger();

private static final ConcurrentLinkedQueue<HttpRequest> httpRequests =
Expand Down Expand Up @@ -143,13 +148,7 @@ public void export(
ExportLogsServiceRequest request,
StreamObserver<ExportLogsServiceResponse> responseObserver) {
exportedResourceTelemetry.addAll(request.getResourceLogsList());
ArmeriaStatusException grpcError = grpcErrors.poll();
if (grpcError != null) {
responseObserver.onError(grpcError);
} else {
responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
}
handleExport(responseObserver, ExportLogsServiceResponse.parser());
}
})
.addService(
Expand All @@ -159,14 +158,7 @@ public void export(
ExportMetricsServiceRequest request,
StreamObserver<ExportMetricsServiceResponse> responseObserver) {
exportedResourceTelemetry.addAll(request.getResourceMetricsList());
ArmeriaStatusException grpcError = grpcErrors.poll();
if (grpcError != null) {
responseObserver.onError(grpcError);
} else {
responseObserver.onNext(
ExportMetricsServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
}
handleExport(responseObserver, ExportMetricsServiceResponse.parser());
}
})
.addService(
Expand All @@ -176,14 +168,7 @@ public void export(
ExportTraceServiceRequest request,
StreamObserver<ExportTraceServiceResponse> responseObserver) {
exportedResourceTelemetry.addAll(request.getResourceSpansList());
ArmeriaStatusException grpcError = grpcErrors.poll();
if (grpcError != null) {
responseObserver.onError(grpcError);
} else {
responseObserver.onNext(
ExportTraceServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
}
handleExport(responseObserver, ExportTraceServiceResponse.parser());
}
})
.addService(
Expand All @@ -193,14 +178,7 @@ public void export(
ExportProfilesServiceRequest request,
StreamObserver<ExportProfilesServiceResponse> responseObserver) {
exportedResourceTelemetry.addAll(request.getResourceProfilesList());
ArmeriaStatusException grpcError = grpcErrors.poll();
if (grpcError != null) {
responseObserver.onError(grpcError);
} else {
responseObserver.onNext(
ExportProfilesServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
}
handleExport(responseObserver, ExportProfilesServiceResponse.parser());
}
})
.decompressorRegistry(
Expand Down Expand Up @@ -239,6 +217,25 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req)
}
};

private static <R extends Message> void handleExport(
StreamObserver<R> responseObserver, Parser<R> parser) {
GrpcServerResponse grpcResponse = grpcResponses.poll();
if (grpcResponse != null && grpcResponse.error != null) {
responseObserver.onError(grpcResponse.error);
} else {
try {
byte[] responseBytes =
grpcResponse != null && grpcResponse.responseBytes != null
? grpcResponse.responseBytes
: defaultResponseBytes;
responseObserver.onNext(parser.parseFrom(responseBytes));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
responseObserver.onCompleted();
}
}

@RegisterExtension
protected LogCapturer logs = LogCapturer.create().captureForType(GrpcExporter.class);

Expand All @@ -265,6 +262,7 @@ void setUp() {
.setInitialBackoff(Duration.ofMillis(1))
.build())
.build();
defaultResponseBytes = exporter.exportResponse(0).toByteArray();

// Sanity check that TLS files are in PEM format.
assertThat(certificate.certificateFile())
Expand Down Expand Up @@ -293,7 +291,7 @@ void tearDown() {
@AfterEach
void reset() {
exportedResourceTelemetry.clear();
grpcErrors.clear();
grpcResponses.clear();
attempts.set(0);
httpRequests.clear();
}
Expand Down Expand Up @@ -332,6 +330,40 @@ void export() {
.matches("OTel-OTLP-Exporter-Java/1\\..*"));
}

@Test
// @SuppressLogger(HttpExporter.class)
void responseBodyBounds() {
// We have a 4mb hardcoded response body limit. Responses <= 4mb succeed. Responses >= 4mb
// succeed. We can't test payloads exactly at 4mb because protobuf message lengths are finicky -
// its hard to create a message with an exact size.

// Body below the limit, succeeds
addGrpcResponse(GrpcStatusCode.OK, null, exporter.exportResponse(100));
CompletableResultCode result =
exporter
.export(Collections.singletonList(generateFakeTelemetry()))
.join(10, TimeUnit.SECONDS);
assertThat(result.isSuccess()).isTrue();

// Body over the limit, fails with RESOURCE_EXHAUSTED
addGrpcResponse(GrpcStatusCode.OK, null, exporter.exportResponse(4 * 1024 * 1024 + 1));
result =
exporter
.export(Collections.singletonList(generateFakeTelemetry()))
.join(10, TimeUnit.SECONDS);
assertThat(result.isSuccess()).isFalse();
assertThat(result.getFailureThrowable())
.isInstanceOfSatisfying(
FailedExportException.GrpcExportException.class,
ex ->
assertThat(requireNonNull(ex.getResponse()))
.isNotNull()
.satisfies(
grpcResponse ->
assertThat(grpcResponse.getStatusCode())
.isEqualTo(GrpcStatusCode.RESOURCE_EXHAUSTED)));
}

@Test
void multipleItems() {
List<T> telemetry = new ArrayList<>();
Expand Down Expand Up @@ -603,7 +635,7 @@ void doubleShutdown() {
@SuppressLogger(GrpcExporter.class)
void error() {
GrpcStatusCode statusCode = GrpcStatusCode.INTERNAL;
addGrpcError(statusCode, null);
addGrpcResponse(statusCode, null);

try (TelemetryExporter<T> exporter = nonRetryingExporter()) {
CompletableResultCode result =
Expand Down Expand Up @@ -639,7 +671,7 @@ void error() {
@Test
@SuppressLogger(GrpcExporter.class)
void errorWithUnknownError() {
addGrpcError(GrpcStatusCode.UNKNOWN, null);
addGrpcResponse(GrpcStatusCode.UNKNOWN, null);

try (TelemetryExporter<T> exporter = nonRetryingExporter()) {
assertThat(
Expand All @@ -662,7 +694,7 @@ void errorWithUnknownError() {
@Test
@SuppressLogger(GrpcExporter.class)
void errorWithMessage() {
addGrpcError(GrpcStatusCode.RESOURCE_EXHAUSTED, "out of quota");
addGrpcResponse(GrpcStatusCode.RESOURCE_EXHAUSTED, "out of quota");

try (TelemetryExporter<T> exporter = nonRetryingExporter()) {
assertThat(
Expand All @@ -683,7 +715,7 @@ void errorWithMessage() {
@Test
@SuppressLogger(GrpcExporter.class)
void errorWithEscapedMessage() {
addGrpcError(GrpcStatusCode.NOT_FOUND, "クマ🐻");
addGrpcResponse(GrpcStatusCode.NOT_FOUND, "クマ🐻");

try (TelemetryExporter<T> exporter = nonRetryingExporter()) {
assertThat(
Expand All @@ -704,7 +736,7 @@ void errorWithEscapedMessage() {
@Test
@SuppressLogger(GrpcExporter.class)
void testExport_Unavailable() {
addGrpcError(GrpcStatusCode.UNAVAILABLE, null);
addGrpcResponse(GrpcStatusCode.UNAVAILABLE, null);

try (TelemetryExporter<T> exporter = nonRetryingExporter()) {
assertThat(
Expand All @@ -726,7 +758,7 @@ void testExport_Unavailable() {
@Test
@SuppressLogger(GrpcExporter.class)
protected void testExport_Unimplemented() {
addGrpcError(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED");
addGrpcResponse(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED");

try (TelemetryExporter<T> exporter = nonRetryingExporter()) {
assertThat(
Expand Down Expand Up @@ -772,7 +804,7 @@ protected void testExport_Unimplemented() {
@ValueSource(ints = {1, 4, 8, 10, 11, 14, 15})
@SuppressLogger(GrpcExporter.class)
void retryableError(int code) {
addGrpcError(GrpcStatusCode.fromValue(code), null);
addGrpcResponse(GrpcStatusCode.fromValue(code), null);

assertThat(
exporter
Expand All @@ -787,8 +819,8 @@ void retryableError(int code) {
@Test
@SuppressLogger(GrpcExporter.class)
void retryableError_tooManyAttempts() {
addGrpcError(GrpcStatusCode.CANCELLED, null);
addGrpcError(GrpcStatusCode.CANCELLED, null);
addGrpcResponse(GrpcStatusCode.CANCELLED, null);
addGrpcResponse(GrpcStatusCode.CANCELLED, null);

assertThat(
exporter
Expand All @@ -804,7 +836,7 @@ void retryableError_tooManyAttempts() {
@ValueSource(ints = {2, 3, 5, 6, 7, 9, 12, 13, 16})
@SuppressLogger(GrpcExporter.class)
void nonRetryableError(int code) {
addGrpcError(GrpcStatusCode.fromValue(code), null);
addGrpcResponse(GrpcStatusCode.fromValue(code), null);

assertThat(
exporter
Expand Down Expand Up @@ -1273,7 +1305,27 @@ protected TelemetryExporter<T> nonRetryingExporter() {
return exporterBuilder().setEndpoint(server.httpUri().toString()).setRetryPolicy(null).build();
}

protected static void addGrpcError(GrpcStatusCode code, @Nullable String message) {
grpcErrors.add(new ArmeriaStatusException(code.getValue(), message));
protected static void addGrpcResponse(GrpcStatusCode code, @Nullable String message) {
addGrpcResponse(code, message, null);
}

protected static void addGrpcResponse(
GrpcStatusCode code,
@Nullable String message,
@Nullable AbstractMessageLite<?, ?> bodyMessage) {
grpcResponses.add(
new GrpcServerResponse(
code == GrpcStatusCode.OK ? null : new ArmeriaStatusException(code.getValue(), message),
bodyMessage == null ? null : bodyMessage.toByteArray()));
}

private static final class GrpcServerResponse {
@Nullable final ArmeriaStatusException error;
@Nullable final byte[] responseBytes;

GrpcServerResponse(@Nullable ArmeriaStatusException error, @Nullable byte[] responseBytes) {
this.error = error;
this.responseBytes = responseBytes;
}
}
}
Loading
Loading