diff --git a/buildSrc/src/main/kotlin/otel.japicmp-conventions.gradle.kts b/buildSrc/src/main/kotlin/otel.japicmp-conventions.gradle.kts index 48ce7c56d70..81e07422ecc 100644 --- a/buildSrc/src/main/kotlin/otel.japicmp-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/otel.japicmp-conventions.gradle.kts @@ -81,15 +81,20 @@ fun getClasspathForVersion(version: String): List { group = "virtual_group" try { return getAllPublishedModules().map { - val depModule = "io.opentelemetry:${it.base.archivesName.get()}:$version@jar" - val depJar = "${it.base.archivesName.get()}-$version.jar" - val configuration: Configuration = configurations.detachedConfiguration( - dependencies.create(depModule), - ) - files(configuration.files).filter { file -> - file.name.equals(depJar) - }.singleFile - }.toList() + try { + val depModule = "io.opentelemetry:${it.base.archivesName.get()}:$version@jar" + val depJar = "${it.base.archivesName.get()}-$version.jar" + val configuration: Configuration = configurations.detachedConfiguration( + dependencies.create(depModule), + ) + files(configuration.files).filter { file -> + file.name.equals(depJar) + }.singleFile + } catch (e: Exception) { + println("Failed to fetch artifact for version ${it.base.archivesName.get()}:$version. If this artifact is has not yet been published, ignore.") + null + } + }.toList().filterNotNull() } finally { group = existingGroup } diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index debe9f7f40f..f4c31a44c6d 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -18,7 +18,6 @@ val opencensusVersion = "0.31.1" val prometheusServerVersion = "1.3.10" val armeriaVersion = "1.36.0" val junitVersion = "5.14.2" -val okhttpVersion = "5.3.2" val DEPENDENCY_BOMS = listOf( // for some reason boms show up as runtime dependencies in license and vulnerability scans @@ -29,7 +28,6 @@ val DEPENDENCY_BOMS = listOf( "com.fasterxml.jackson:jackson-bom:2.21.0", "com.google.guava:guava-bom:33.5.0-jre", "com.google.protobuf:protobuf-bom:4.33.5", - "com.squareup.okhttp3:okhttp-bom:$okhttpVersion", "com.squareup.okio:okio-bom:3.16.4", // applies to transitive dependencies of okhttp "io.grpc:grpc-bom:1.79.0", "io.netty:netty-bom:4.2.10.Final", @@ -73,7 +71,6 @@ val DEPENDENCIES = listOf( "com.google.code.findbugs:jsr305:3.0.2", "com.google.guava:guava-beta-checker:1.0", "com.sun.net.httpserver:http:20070405", - "com.squareup.okhttp3:okhttp:$okhttpVersion", "com.tngtech.archunit:archunit-junit5:1.4.1", "com.uber.nullaway:nullaway:0.13.1", "edu.berkeley.cs.jqf:jqf-fuzz:1.7", // jqf-fuzz version 1.8+ requires Java 11+ diff --git a/exporters/otlp/all/build.gradle.kts b/exporters/otlp/all/build.gradle.kts index 78616d46933..2e1fbf4c43d 100644 --- a/exporters/otlp/all/build.gradle.kts +++ b/exporters/otlp/all/build.gradle.kts @@ -42,45 +42,38 @@ val testJavaVersion: String? by project testing { suites { - listOf( - "LATEST", - "4.11.0" - ).forEach { - register("testOkHttpVersion$it") { - sources { - java { - setSrcDirs(listOf("src/testDefaultSender/java")) - } - } - dependencies { - implementation(project(":exporters:sender:okhttp")) - implementation(project(":exporters:otlp:testing-internal")) - - implementation(platform("com.squareup.okhttp3:okhttp-bom")) { - // Only impose dependency constraint if not testing the LATEST version, which is defined in /dependencyManagement/build.gradle.kts - if (!it.equals("LATEST")) { - version { - strictly(it) - } - } - } + register("testOkHttp5") { + dependencies { + implementation(project(":exporters:sender:okhttp")) + implementation(project(":exporters:otlp:testing-internal")) - implementation("com.squareup.okhttp3:okhttp") - implementation("io.grpc:grpc-stub") - } + implementation("com.squareup.okhttp3:okhttp:5.3.2") + implementation("io.grpc:grpc-stub") + } + } + register("testOkHttp4") { + dependencies { + implementation(project(":exporters:sender:okhttp4")) + implementation(project(":exporters:otlp:testing-internal")) - targets { - all { - testTask { - // Only enable test suite for non-LATEST in GitHub CI (CI=true) - enabled = it.equals("LATEST") || "true".equals(System.getenv("CI")) - systemProperty("expected.okhttp.version", it) - } + implementation("com.squareup.okhttp3:okhttp:4.12.0") + implementation("io.grpc:grpc-stub") + } + targets { + all { + testTask { + systemProperty( + "io.opentelemetry.sdk.common.export.GrpcSenderProvider", + "io.opentelemetry.exporter.sender.okhttp4.internal.OkHttpGrpcSenderProvider" + ) + systemProperty( + "io.opentelemetry.sdk.common.export.HttpSenderProvider", + "io.opentelemetry.exporter.sender.okhttp4.internal.OkHttpHttpSenderProvider" + ) } } } } - register("testGrpcNetty") { dependencies { implementation(project(":exporters:sender:grpc-managed-channel")) diff --git a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/OkHttpVersionTest.java b/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/OkHttpVersionTest.java deleted file mode 100644 index 8798781bb3f..00000000000 --- a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/OkHttpVersionTest.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.otlp; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assumptions.assumeThat; - -import java.util.logging.Level; -import java.util.logging.Logger; -import okhttp3.OkHttp; -import org.junit.jupiter.api.Test; - -class OkHttpVersionTest { - - private static final Logger LOGGER = Logger.getLogger(OkHttpVersionTest.class.getName()); - - @Test - void expectedOkHttpVersion() { - String expectedVersion = System.getProperty("expected.okhttp.version"); - LOGGER.log(Level.WARNING, "Testing okhttp version " + expectedVersion); - assumeThat(expectedVersion.equals("LATEST")).isFalse(); - assertThat(OkHttp.VERSION).isEqualTo(expectedVersion); - } -} diff --git a/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/OkHttp4VersionTest.java b/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/OkHttp4VersionTest.java new file mode 100644 index 00000000000..6dad1d7ec5f --- /dev/null +++ b/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/OkHttp4VersionTest.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp; + +import static org.assertj.core.api.Assertions.assertThat; + +import okhttp3.OkHttp; +import org.junit.jupiter.api.Test; + +class OkHttp4VersionTest { + @Test + void expectedOkHttpVersion() { + assertThat(OkHttp.VERSION).startsWith("4"); + } +} diff --git a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterOkHttpSenderTest.java b/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterOkHttpSenderTest.java similarity index 100% rename from exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterOkHttpSenderTest.java rename to exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterOkHttpSenderTest.java diff --git a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterOkHttpSenderTest.java b/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterOkHttpSenderTest.java similarity index 100% rename from exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterOkHttpSenderTest.java rename to exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterOkHttpSenderTest.java diff --git a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterOkHttpSenderTest.java b/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterOkHttpSenderTest.java similarity index 100% rename from exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterOkHttpSenderTest.java rename to exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterOkHttpSenderTest.java diff --git a/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterTest.java b/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterTest.java new file mode 100644 index 00000000000..bbd1882ed19 --- /dev/null +++ b/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterTest.java @@ -0,0 +1,58 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.logs; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.otlp.logs.ResourceLogsMarshaler; +import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest; +import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder; +import io.opentelemetry.exporter.sender.okhttp4.internal.OkHttpGrpcSender; +import io.opentelemetry.proto.logs.v1.ResourceLogs; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.io.Closeable; +import java.util.List; +import org.junit.jupiter.api.Test; + +class OtlpGrpcLogRecordExporterTest + extends AbstractGrpcTelemetryExporterTest { + + OtlpGrpcLogRecordExporterTest() { + super("log", ResourceLogs.getDefaultInstance()); + } + + @Test + void usingOkHttp() throws Exception { + try (Closeable exporter = OtlpGrpcLogRecordExporter.builder().build()) { + assertThat(exporter).extracting("delegate.grpcSender").isInstanceOf(OkHttpGrpcSender.class); + } + } + + @Override + protected TelemetryExporterBuilder exporterBuilder() { + return TelemetryExporterBuilder.wrap(OtlpGrpcLogRecordExporter.builder()); + } + + @Override + protected TelemetryExporterBuilder toBuilder( + TelemetryExporter exporter) { + return TelemetryExporterBuilder.wrap( + ((OtlpGrpcLogRecordExporter) exporter.unwrap()).toBuilder()); + } + + @Override + protected LogRecordData generateFakeTelemetry() { + return FakeTelemetryUtil.generateFakeLogRecordData(); + } + + @Override + protected Marshaler[] toMarshalers(List telemetry) { + return ResourceLogsMarshaler.create(telemetry); + } +} diff --git a/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java b/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java new file mode 100644 index 00000000000..eccedbf0f8f --- /dev/null +++ b/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java @@ -0,0 +1,137 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.metrics; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler; +import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest; +import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder; +import io.opentelemetry.exporter.sender.okhttp4.internal.OkHttpGrpcSender; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; +import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +class OtlpGrpcMetricExporterTest + extends AbstractGrpcTelemetryExporterTest { + + OtlpGrpcMetricExporterTest() { + super("metric", ResourceMetrics.getDefaultInstance()); + } + + /** Test configuration specific to metric exporter. */ + @Test + void validMetricConfig() { + assertThatCode( + () -> + OtlpGrpcMetricExporter.builder() + .setAggregationTemporalitySelector( + AggregationTemporalitySelector.deltaPreferred())) + .doesNotThrowAnyException(); + assertThat( + OtlpGrpcMetricExporter.builder() + .setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred()) + .build() + .getAggregationTemporality(InstrumentType.COUNTER)) + .isEqualTo(AggregationTemporality.DELTA); + assertThat( + OtlpGrpcMetricExporter.builder() + .build() + .getAggregationTemporality(InstrumentType.COUNTER)) + .isEqualTo(AggregationTemporality.CUMULATIVE); + + assertThat( + OtlpGrpcMetricExporter.builder() + .setDefaultAggregationSelector( + DefaultAggregationSelector.getDefault() + .with(InstrumentType.HISTOGRAM, Aggregation.drop())) + .build() + .getDefaultAggregation(InstrumentType.HISTOGRAM)) + .isEqualTo(Aggregation.drop()); + } + + /** Test configuration specific to metric exporter. */ + @Test + void invalidMetricConfig() { + assertThatThrownBy( + () -> OtlpGrpcMetricExporter.builder().setAggregationTemporalitySelector(null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("aggregationTemporalitySelector"); + + assertThatThrownBy(() -> OtlpGrpcMetricExporter.builder().setDefaultAggregationSelector(null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("defaultAggregationSelector"); + } + + /** Test configuration specific to metric exporter. */ + @Test + void stringRepresentation() { + try (MetricExporter metricExporter = OtlpGrpcMetricExporter.builder().build()) { + assertThat(metricExporter.toString()) + .matches( + "OtlpGrpcMetricExporter\\{" + + "endpoint=http://localhost:4317, " + + "fullMethodName=.*, " + + "timeoutNanos=" + + TimeUnit.SECONDS.toNanos(10) + + ", " + + "connectTimeoutNanos=" + + TimeUnit.SECONDS.toNanos(10) + + ", " + + "compressorEncoding=null, " + + "headers=Headers\\{User-Agent=OBFUSCATED\\}, " + + "retryPolicy=RetryPolicy\\{.*\\}, " + + "componentLoader=.*, " + + "exporterType=OTLP_GRPC_METRIC_EXPORTER, " + + "internalTelemetrySchemaVersion=LEGACY, " + + "aggregationTemporalitySelector=AggregationTemporalitySelector\\{.*\\}, " + + "defaultAggregationSelector=DefaultAggregationSelector\\{.*\\}, " + + "memoryMode=REUSABLE_DATA" + + "\\}"); + } + } + + @Test + void usingOkHttp() throws Exception { + try (Closeable exporter = OtlpGrpcMetricExporter.builder().build()) { + assertThat(exporter).extracting("delegate.grpcSender").isInstanceOf(OkHttpGrpcSender.class); + } + } + + @Override + protected TelemetryExporterBuilder exporterBuilder() { + return TelemetryExporterBuilder.wrap(OtlpGrpcMetricExporter.builder()); + } + + @Override + protected TelemetryExporterBuilder toBuilder(TelemetryExporter exporter) { + return TelemetryExporterBuilder.wrap(((OtlpGrpcMetricExporter) exporter.unwrap()).toBuilder()); + } + + @Override + protected MetricData generateFakeTelemetry() { + return FakeTelemetryUtil.generateFakeMetricData(); + } + + @Override + protected Marshaler[] toMarshalers(List telemetry) { + return ResourceMetricsMarshaler.create(telemetry); + } +} diff --git a/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/traces/OtlpGrpcSpanExporterTest.java b/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/traces/OtlpGrpcSpanExporterTest.java new file mode 100644 index 00000000000..0e6828c114e --- /dev/null +++ b/exporters/otlp/all/src/testOkhttp4/java/io/opentelemetry/exporter/otlp/traces/OtlpGrpcSpanExporterTest.java @@ -0,0 +1,84 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.traces; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler; +import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest; +import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.exporter.sender.okhttp4.internal.OkHttpGrpcSender; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +class OtlpGrpcSpanExporterTest extends AbstractGrpcTelemetryExporterTest { + + OtlpGrpcSpanExporterTest() { + super("span", ResourceSpans.getDefaultInstance()); + } + + /** Test configuration specific to metric exporter. */ + @Test + void stringRepresentation() { + try (SpanExporter spanExporter = OtlpGrpcSpanExporter.builder().build()) { + assertThat(spanExporter.toString()) + .matches( + "OtlpGrpcSpanExporter\\{" + + "endpoint=http://localhost:4317, " + + "fullMethodName=.*, " + + "timeoutNanos=" + + TimeUnit.SECONDS.toNanos(10) + + ", " + + "connectTimeoutNanos=" + + TimeUnit.SECONDS.toNanos(10) + + ", " + + "compressorEncoding=null, " + + "headers=Headers\\{User-Agent=OBFUSCATED\\}, " + + "retryPolicy=RetryPolicy\\{.*\\}, " + + "componentLoader=.*, " + + "exporterType=OTLP_GRPC_SPAN_EXPORTER, " + + "internalTelemetrySchemaVersion=LEGACY, " + + "memoryMode=REUSABLE_DATA" + + "\\}"); + } + } + + @Test + void usingOkHttp() throws Exception { + try (Closeable exporter = OtlpGrpcSpanExporter.builder().build()) { + assertThat(exporter).extracting("delegate.grpcSender").isInstanceOf(OkHttpGrpcSender.class); + } + } + + @Override + protected TelemetryExporterBuilder exporterBuilder() { + return TelemetryExporterBuilder.wrap(OtlpGrpcSpanExporter.builder()); + } + + @Override + protected TelemetryExporterBuilder toBuilder(TelemetryExporter exporter) { + return TelemetryExporterBuilder.wrap(((OtlpGrpcSpanExporter) exporter.unwrap()).toBuilder()); + } + + @Override + protected SpanData generateFakeTelemetry() { + return FakeTelemetryUtil.generateFakeSpanData(); + } + + @Override + protected Marshaler[] toMarshalers(List telemetry) { + return ResourceSpansMarshaler.create(telemetry); + } +} diff --git a/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/OkHttp5VersionTest.java b/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/OkHttp5VersionTest.java new file mode 100644 index 00000000000..27049efe1d1 --- /dev/null +++ b/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/OkHttp5VersionTest.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp; + +import static org.assertj.core.api.Assertions.assertThat; + +import okhttp3.OkHttp; +import org.junit.jupiter.api.Test; + +class OkHttp5VersionTest { + @Test + void expectedOkHttpVersion() { + assertThat(OkHttp.VERSION).startsWith("5"); + } +} diff --git a/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterOkHttpSenderTest.java b/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterOkHttpSenderTest.java new file mode 100644 index 00000000000..1804543a02d --- /dev/null +++ b/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterOkHttpSenderTest.java @@ -0,0 +1,47 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.http.logs; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.otlp.logs.ResourceLogsMarshaler; +import io.opentelemetry.exporter.otlp.testing.internal.AbstractHttpTelemetryExporterTest; +import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil; +import io.opentelemetry.exporter.otlp.testing.internal.HttpLogRecordExporterBuilderWrapper; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder; +import io.opentelemetry.proto.logs.v1.ResourceLogs; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.util.List; + +class OtlpHttpLogRecordExporterOkHttpSenderTest + extends AbstractHttpTelemetryExporterTest { + + protected OtlpHttpLogRecordExporterOkHttpSenderTest() { + super("log", "/v1/logs", ResourceLogs.getDefaultInstance()); + } + + @Override + protected TelemetryExporterBuilder exporterBuilder() { + return new HttpLogRecordExporterBuilderWrapper(OtlpHttpLogRecordExporter.builder()); + } + + @Override + protected TelemetryExporterBuilder toBuilder( + TelemetryExporter exporter) { + return new HttpLogRecordExporterBuilderWrapper( + ((OtlpHttpLogRecordExporter) exporter.unwrap()).toBuilder()); + } + + @Override + protected LogRecordData generateFakeTelemetry() { + return FakeTelemetryUtil.generateFakeLogRecordData(); + } + + @Override + protected Marshaler[] toMarshalers(List telemetry) { + return ResourceLogsMarshaler.create(telemetry); + } +} diff --git a/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterOkHttpSenderTest.java b/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterOkHttpSenderTest.java new file mode 100644 index 00000000000..a2b18d2bed0 --- /dev/null +++ b/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterOkHttpSenderTest.java @@ -0,0 +1,131 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.http.metrics; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler; +import io.opentelemetry.exporter.otlp.testing.internal.AbstractHttpTelemetryExporterTest; +import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil; +import io.opentelemetry.exporter.otlp.testing.internal.HttpMetricExporterBuilderWrapper; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; +import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +class OtlpHttpMetricExporterOkHttpSenderTest + extends AbstractHttpTelemetryExporterTest { + + protected OtlpHttpMetricExporterOkHttpSenderTest() { + super("metric", "/v1/metrics", ResourceMetrics.getDefaultInstance()); + } + + /** Test configuration specific to metric exporter. */ + @Test + void validMetricConfig() { + assertThatCode( + () -> + OtlpHttpMetricExporter.builder() + .setAggregationTemporalitySelector( + AggregationTemporalitySelector.deltaPreferred())) + .doesNotThrowAnyException(); + assertThat( + OtlpHttpMetricExporter.builder() + .setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred()) + .build() + .getAggregationTemporality(InstrumentType.COUNTER)) + .isEqualTo(AggregationTemporality.DELTA); + assertThat( + OtlpHttpMetricExporter.builder() + .build() + .getAggregationTemporality(InstrumentType.COUNTER)) + .isEqualTo(AggregationTemporality.CUMULATIVE); + + assertThat( + OtlpHttpMetricExporter.builder() + .setDefaultAggregationSelector( + DefaultAggregationSelector.getDefault() + .with(InstrumentType.HISTOGRAM, Aggregation.drop())) + .build() + .getDefaultAggregation(InstrumentType.HISTOGRAM)) + .isEqualTo(Aggregation.drop()); + } + + /** Test configuration specific to metric exporter. */ + @Test + void invalidMetricConfig() { + assertThatThrownBy( + () -> OtlpHttpMetricExporter.builder().setAggregationTemporalitySelector(null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("aggregationTemporalitySelector"); + + assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setDefaultAggregationSelector(null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("defaultAggregationSelector"); + } + + /** Test configuration specific to metric exporter. */ + @Test + void stringRepresentation() { + try (MetricExporter metricExporter = OtlpHttpMetricExporter.builder().build()) { + assertThat(metricExporter.toString()) + .matches( + "OtlpHttpMetricExporter\\{" + + "endpoint=http://localhost:4318/v1/metrics, " + + "timeoutNanos=" + + TimeUnit.SECONDS.toNanos(10) + + ", " + + "proxyOptions=null, " + + "compressorEncoding=null, " + + "connectTimeoutNanos=" + + TimeUnit.SECONDS.toNanos(10) + + ", " + + "exportAsJson=false, " + + "headers=Headers\\{User-Agent=OBFUSCATED\\}, " + + "retryPolicy=RetryPolicy\\{.*\\}, " + + "componentLoader=.*, " + + "exporterType=OTLP_HTTP_METRIC_EXPORTER, " + + "internalTelemetrySchemaVersion=LEGACY, " + + "aggregationTemporalitySelector=AggregationTemporalitySelector\\{.*\\}, " + + "defaultAggregationSelector=DefaultAggregationSelector\\{.*\\}, " + + "memoryMode=REUSABLE_DATA" + + "\\}"); + } + } + + @Override + protected TelemetryExporterBuilder exporterBuilder() { + return new HttpMetricExporterBuilderWrapper(OtlpHttpMetricExporter.builder()); + } + + @Override + protected TelemetryExporterBuilder toBuilder(TelemetryExporter exporter) { + return new HttpMetricExporterBuilderWrapper( + ((OtlpHttpMetricExporter) exporter.unwrap()).toBuilder()); + } + + @Override + protected MetricData generateFakeTelemetry() { + return FakeTelemetryUtil.generateFakeMetricData(); + } + + @Override + protected Marshaler[] toMarshalers(List telemetry) { + return ResourceMetricsMarshaler.create(telemetry); + } +} diff --git a/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterOkHttpSenderTest.java b/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterOkHttpSenderTest.java new file mode 100644 index 00000000000..a68c2e5fd19 --- /dev/null +++ b/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterOkHttpSenderTest.java @@ -0,0 +1,78 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.http.trace; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler; +import io.opentelemetry.exporter.otlp.testing.internal.AbstractHttpTelemetryExporterTest; +import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil; +import io.opentelemetry.exporter.otlp.testing.internal.HttpSpanExporterBuilderWrapper; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter; +import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +class OtlpHttpSpanExporterOkHttpSenderTest + extends AbstractHttpTelemetryExporterTest { + + protected OtlpHttpSpanExporterOkHttpSenderTest() { + super("span", "/v1/traces", ResourceSpans.getDefaultInstance()); + } + + /** Test configuration specific to metric exporter. */ + @Test + void stringRepresentation() { + try (SpanExporter spanExporter = OtlpHttpSpanExporter.builder().build()) { + assertThat(spanExporter.toString()) + .matches( + "OtlpHttpSpanExporter\\{" + + "endpoint=http://localhost:4318/v1/traces, " + + "timeoutNanos=" + + TimeUnit.SECONDS.toNanos(10) + + ", " + + "proxyOptions=null, " + + "compressorEncoding=null, " + + "connectTimeoutNanos=" + + TimeUnit.SECONDS.toNanos(10) + + ", " + + "exportAsJson=false, " + + "headers=Headers\\{User-Agent=OBFUSCATED\\}, " + + "retryPolicy=RetryPolicy\\{.*\\}, " + + "componentLoader=.*, " + + "exporterType=OTLP_HTTP_SPAN_EXPORTER, " + + "internalTelemetrySchemaVersion=LEGACY, " + + "memoryMode=REUSABLE_DATA" + + "\\}"); + } + } + + @Override + protected TelemetryExporterBuilder exporterBuilder() { + return new HttpSpanExporterBuilderWrapper(OtlpHttpSpanExporter.builder()); + } + + @Override + protected TelemetryExporterBuilder toBuilder(TelemetryExporter exporter) { + return new HttpSpanExporterBuilderWrapper( + ((OtlpHttpSpanExporter) exporter.unwrap()).toBuilder()); + } + + @Override + protected SpanData generateFakeTelemetry() { + return FakeTelemetryUtil.generateFakeSpanData(); + } + + @Override + protected Marshaler[] toMarshalers(List telemetry) { + return ResourceSpansMarshaler.create(telemetry); + } +} diff --git a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterTest.java b/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterTest.java similarity index 100% rename from exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterTest.java rename to exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterTest.java diff --git a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java b/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java similarity index 100% rename from exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java rename to exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java diff --git a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/traces/OtlpGrpcSpanExporterTest.java b/exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/traces/OtlpGrpcSpanExporterTest.java similarity index 100% rename from exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/traces/OtlpGrpcSpanExporterTest.java rename to exporters/otlp/all/src/testOkhttp5/java/io/opentelemetry/exporter/otlp/traces/OtlpGrpcSpanExporterTest.java diff --git a/exporters/otlp/testing-internal/build.gradle.kts b/exporters/otlp/testing-internal/build.gradle.kts index 1d6bff53854..61ff4b17327 100644 --- a/exporters/otlp/testing-internal/build.gradle.kts +++ b/exporters/otlp/testing-internal/build.gradle.kts @@ -27,7 +27,7 @@ dependencies { api("io.opentelemetry.proto:opentelemetry-proto") api("org.junit.jupiter:junit-jupiter-api") - implementation("com.squareup.okhttp3:okhttp") + implementation("com.squareup.okhttp3:okhttp:5.3.2") implementation("org.junit.jupiter:junit-jupiter-params") implementation("com.linecorp.armeria:armeria-grpc") diff --git a/exporters/sender/okhttp/build.gradle.kts b/exporters/sender/okhttp/build.gradle.kts index 107270e9ada..89d56712275 100644 --- a/exporters/sender/okhttp/build.gradle.kts +++ b/exporters/sender/okhttp/build.gradle.kts @@ -5,14 +5,14 @@ plugins { id("otel.animalsniffer-conventions") } -description = "OpenTelemetry OkHttp Senders" +description = "OpenTelemetry OkHttp5 Senders" otelJava.moduleName.set("io.opentelemetry.exporter.sender.okhttp.internal") dependencies { implementation(project(":exporters:common")) implementation(project(":sdk:common")) - implementation("com.squareup.okhttp3:okhttp") + implementation("com.squareup.okhttp3:okhttp:5.3.2") compileOnly("io.grpc:grpc-stub") compileOnly("com.fasterxml.jackson.core:jackson-core") diff --git a/exporters/sender/okhttp4/build.gradle.kts b/exporters/sender/okhttp4/build.gradle.kts new file mode 100644 index 00000000000..47f068a28d0 --- /dev/null +++ b/exporters/sender/okhttp4/build.gradle.kts @@ -0,0 +1,22 @@ +plugins { + id("otel.java-conventions") + // TODO: enable to publish + // id("otel.publish-conventions") + + id("otel.animalsniffer-conventions") +} + +description = "OpenTelemetry OkHttp4 Senders" +otelJava.moduleName.set("io.opentelemetry.exporter.sender.okhttp.internal") + +dependencies { + implementation(project(":exporters:common")) + implementation(project(":sdk:common")) + + implementation("com.squareup.okhttp3:okhttp:4.12.0") + + compileOnly("io.grpc:grpc-stub") + compileOnly("com.fasterxml.jackson.core:jackson-core") + + testImplementation("com.linecorp.armeria:armeria-junit5") +} diff --git a/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/GrpcRequestBody.java b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/GrpcRequestBody.java new file mode 100644 index 00000000000..402ab226aa0 --- /dev/null +++ b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/GrpcRequestBody.java @@ -0,0 +1,83 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.sdk.common.export.Compressor; +import io.opentelemetry.sdk.common.export.MessageWriter; +import java.io.IOException; +import javax.annotation.Nullable; +import okhttp3.MediaType; +import okhttp3.RequestBody; +import okio.Buffer; +import okio.BufferedSink; +import okio.Okio; + +/** + * A {@link RequestBody} for reading from a {@link Marshaler} and writing in gRPC wire format. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class GrpcRequestBody extends RequestBody { + + private static final int HEADER_LENGTH = 5; + + private static final byte UNCOMPRESSED_FLAG = 0; + private static final byte COMPRESSED_FLAG = 1; + + private static final MediaType GRPC_MEDIA_TYPE = MediaType.parse("application/grpc"); + + private final MessageWriter messageWriter; + private final int messageSize; + private final int contentLength; + @Nullable private final Compressor compressor; + + /** Creates a new {@link GrpcRequestBody}. */ + public GrpcRequestBody(MessageWriter messageWriter, @Nullable Compressor compressor) { + this.messageWriter = messageWriter; + this.compressor = compressor; + + messageSize = messageWriter.getContentLength(); + if (compressor != null) { + // Content length not known since we want to compress on the I/O thread. + contentLength = -1; + } else { + contentLength = HEADER_LENGTH + messageSize; + } + } + + @Nullable + @Override + public MediaType contentType() { + return GRPC_MEDIA_TYPE; + } + + @Override + public long contentLength() { + return contentLength; + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + if (compressor == null) { + sink.writeByte(UNCOMPRESSED_FLAG); + sink.writeInt(messageSize); + messageWriter.writeMessage(sink.outputStream()); + } else { + try (Buffer compressedBody = new Buffer()) { + try (BufferedSink compressedSink = + Okio.buffer(Okio.sink(compressor.compress(compressedBody.outputStream())))) { + messageWriter.writeMessage(compressedSink.outputStream()); + } + sink.writeByte(COMPRESSED_FLAG); + int compressedBytes = (int) compressedBody.size(); + sink.writeInt(compressedBytes); + sink.write(compressedBody, compressedBytes); + } + } + } +} diff --git a/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSender.java new file mode 100644 index 00000000000..43cc14d0e3b --- /dev/null +++ b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSender.java @@ -0,0 +1,315 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: + +/* + * Copyright 2014 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import io.opentelemetry.api.internal.InstrumentationUtil; +import io.opentelemetry.exporter.internal.RetryUtil; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.Compressor; +import io.opentelemetry.sdk.common.export.GrpcResponse; +import io.opentelemetry.sdk.common.export.GrpcSender; +import io.opentelemetry.sdk.common.export.GrpcStatusCode; +import io.opentelemetry.sdk.common.export.MessageWriter; +import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.net.ssl.X509TrustManager; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.ConnectionSpec; +import okhttp3.Dispatcher; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; + +/** + * A {@link GrpcSender} which uses OkHttp instead of grpc-java. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class OkHttpGrpcSender implements GrpcSender { + + private static final Logger logger = Logger.getLogger(OkHttpGrpcSender.class.getName()); + + private static final String GRPC_STATUS = "grpc-status"; + private static final String GRPC_MESSAGE = "grpc-message"; + + private final boolean managedExecutor; + private final OkHttpClient client; + private final HttpUrl url; + @Nullable private final Compressor compressor; + private final Supplier>> headersSupplier; + + /** Creates a new {@link OkHttpGrpcSender}. */ + @SuppressWarnings("TooManyParameters") + public OkHttpGrpcSender( + String endpoint, + @Nullable Compressor compressor, + Duration timeout, + Duration connectTimeout, + Supplier>> headersSupplier, + @Nullable RetryPolicy retryPolicy, + @Nullable SSLContext sslContext, + @Nullable X509TrustManager trustManager, + @Nullable ExecutorService executorService) { + int callTimeoutMillis = (int) Math.min(timeout.toMillis(), Integer.MAX_VALUE); + int connectTimeoutMillis = (int) Math.min(connectTimeout.toMillis(), Integer.MAX_VALUE); + + Dispatcher dispatcher; + if (executorService == null) { + dispatcher = OkHttpUtil.newDispatcher(); + this.managedExecutor = true; + } else { + dispatcher = new Dispatcher(executorService); + this.managedExecutor = false; + } + + OkHttpClient.Builder clientBuilder = + new OkHttpClient.Builder() + .dispatcher(dispatcher) + .callTimeout(Duration.ofMillis(callTimeoutMillis)) + .connectTimeout(Duration.ofMillis(connectTimeoutMillis)); + if (retryPolicy != null) { + clientBuilder.addInterceptor( + new RetryInterceptor(retryPolicy, OkHttpGrpcSender::isRetryable)); + } + + boolean isPlainHttp = endpoint.startsWith("http://"); + if (isPlainHttp) { + clientBuilder.connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT)); + clientBuilder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE)); + } else { + clientBuilder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1)); + if (sslContext != null && trustManager != null) { + clientBuilder.sslSocketFactory(sslContext.getSocketFactory(), trustManager); + } + } + + this.client = clientBuilder.build(); + this.compressor = compressor; + this.headersSupplier = headersSupplier; + this.url = HttpUrl.get(endpoint); + } + + @Override + public void send( + MessageWriter messageWriter, Consumer onResponse, Consumer onError) { + Request.Builder requestBuilder = new Request.Builder().url(url); + + Map> headers = headersSupplier.get(); + if (headers != null) { + headers.forEach( + (key, values) -> values.forEach(value -> requestBuilder.addHeader(key, value))); + } + requestBuilder.addHeader("te", "trailers"); + if (compressor != null) { + requestBuilder.addHeader("grpc-encoding", compressor.getEncoding()); + } + RequestBody requestBody = new GrpcRequestBody(messageWriter, compressor); + requestBuilder.post(requestBody); + + InstrumentationUtil.suppressInstrumentation( + () -> + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + onError.accept(e); + } + + @Override + public void onResponse(Call call, Response response) { + try (ResponseBody body = response.body()) { + // Must consume body before accessing trailers + byte[] bodyBytes = null; + try { + bodyBytes = body.bytes(); + } catch (IOException e) { + bodyBytes = new byte[0]; + logger.log(Level.WARNING, "Failed to read response body", e); + } + byte[] resolvedBodyBytes = bodyBytes; + GrpcStatusCode status = grpcStatus(response); + String description = grpcMessage(response); + onResponse.accept( + new GrpcResponse() { + @Override + public GrpcStatusCode getStatusCode() { + return status; + } + + @Override + public String getStatusDescription() { + return description; + } + + @Override + public byte[] getResponseMessage() { + return resolvedBodyBytes; + } + }); + } + } + })); + } + + private static GrpcStatusCode grpcStatus(Response response) { + // Status can either be in the headers or trailers depending on error + String grpcStatus = response.header(GRPC_STATUS); + if (grpcStatus == null) { + try { + grpcStatus = response.trailers().get(GRPC_STATUS); + } catch (IOException e) { + // Could not read a status, this generally means the HTTP status is the error. + return GrpcStatusCode.UNKNOWN; + } + } + if (grpcStatus == null) { + return GrpcStatusCode.UNKNOWN; + } + try { + return GrpcStatusCode.fromValue(Integer.parseInt(grpcStatus)); + } catch (NumberFormatException ex) { + return GrpcStatusCode.UNKNOWN; + } + } + + private static String grpcMessage(Response response) { + String message = response.header(GRPC_MESSAGE); + if (message == null) { + try { + message = response.trailers().get(GRPC_MESSAGE); + } catch (IOException e) { + // Fall through + } + } + if (message != null) { + return unescape(message); + } + // Couldn't get message for some reason, use the HTTP status. + return response.message(); + } + + @Override + public CompletableResultCode shutdown() { + client.dispatcher().cancelAll(); + client.connectionPool().evictAll(); + + if (managedExecutor) { + ExecutorService executorService = client.dispatcher().executorService(); + // Use shutdownNow() to interrupt idle threads immediately since we've cancelled all work + executorService.shutdownNow(); + + // Wait for threads to terminate in a background thread + CompletableResultCode result = new CompletableResultCode(); + Thread terminationThread = + new Thread( + () -> { + try { + // Wait up to 5 seconds for threads to terminate + // Even if timeout occurs, we succeed since these are daemon threads + boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS); + if (!terminated) { + logger.log( + Level.WARNING, + "Executor did not terminate within 5 seconds, proceeding with shutdown since threads are daemon threads."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + result.succeed(); + } + }, + "okhttp-shutdown"); + terminationThread.setDaemon(true); + terminationThread.start(); + return result; + } + + return CompletableResultCode.ofSuccess(); + } + + /** Whether response is retriable or not. */ + public static boolean isRetryable(Response response) { + // We don't check trailers for retry since retryable error codes always come with response + // headers, not trailers, in practice. + String grpcStatus = response.header(GRPC_STATUS); + if (grpcStatus == null) { + return false; + } + return RetryUtil.retryableGrpcStatusCodes().contains(grpcStatus); + } + + // From grpc-java + + /** Unescape the provided ascii to a unicode {@link String}. */ + private static String unescape(String value) { + for (int i = 0; i < value.length(); i++) { + char c = value.charAt(i); + if (c < ' ' || c >= '~' || (c == '%' && i + 2 < value.length())) { + return doUnescape(value.getBytes(StandardCharsets.US_ASCII)); + } + } + return value; + } + + private static String doUnescape(byte[] value) { + ByteBuffer buf = ByteBuffer.allocate(value.length); + for (int i = 0; i < value.length; ) { + if (value[i] == '%' && i + 2 < value.length) { + try { + buf.put((byte) Integer.parseInt(new String(value, i + 1, 2, StandardCharsets.UTF_8), 16)); + i += 3; + continue; + } catch (NumberFormatException e) { + // ignore, fall through, just push the bytes. + } + } + buf.put(value[i]); + i += 1; + } + return new String(buf.array(), 0, buf.position(), StandardCharsets.UTF_8); + } +} diff --git a/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSenderProvider.java b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSenderProvider.java new file mode 100644 index 00000000000..40def6d4964 --- /dev/null +++ b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSenderProvider.java @@ -0,0 +1,36 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import io.opentelemetry.sdk.common.export.GrpcSender; +import io.opentelemetry.sdk.common.export.GrpcSenderConfig; +import io.opentelemetry.sdk.common.export.GrpcSenderProvider; + +/** + * {@link GrpcSender} SPI implementation for {@link OkHttpGrpcSender}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class OkHttpGrpcSenderProvider implements GrpcSenderProvider { + + @Override + public GrpcSender createSender(GrpcSenderConfig grpcSenderConfig) { + return new OkHttpGrpcSender( + grpcSenderConfig + .getEndpoint() + .resolve("/" + grpcSenderConfig.getFullMethodName()) + .toString(), + grpcSenderConfig.getCompressor(), + grpcSenderConfig.getTimeout(), + grpcSenderConfig.getConnectTimeout(), + grpcSenderConfig.getHeadersSupplier(), + grpcSenderConfig.getRetryPolicy(), + grpcSenderConfig.getSslContext(), + grpcSenderConfig.getTrustManager(), + grpcSenderConfig.getExecutorService()); + } +} diff --git a/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpHttpSender.java b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpHttpSender.java new file mode 100644 index 00000000000..726f84922ea --- /dev/null +++ b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpHttpSender.java @@ -0,0 +1,226 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import io.opentelemetry.api.internal.InstrumentationUtil; +import io.opentelemetry.exporter.internal.RetryUtil; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.Compressor; +import io.opentelemetry.sdk.common.export.HttpResponse; +import io.opentelemetry.sdk.common.export.HttpSender; +import io.opentelemetry.sdk.common.export.MessageWriter; +import io.opentelemetry.sdk.common.export.ProxyOptions; +import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.net.ssl.X509TrustManager; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.ConnectionSpec; +import okhttp3.Dispatcher; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; +import okio.BufferedSink; +import okio.Okio; + +/** + * {@link HttpSender} which is backed by OkHttp. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class OkHttpHttpSender implements HttpSender { + + private static final Logger logger = Logger.getLogger(OkHttpHttpSender.class.getName()); + + private final boolean managedExecutor; + private final OkHttpClient client; + private final HttpUrl url; + private final Supplier>> headerSupplier; + private final MediaType mediaType; + @Nullable private final Compressor compressor; + + /** Create a sender. */ + @SuppressWarnings("TooManyParameters") + public OkHttpHttpSender( + URI endpoint, + String contentType, + @Nullable Compressor compressor, + Duration timeout, + Duration connectTimeout, + Supplier>> headerSupplier, + @Nullable ProxyOptions proxyOptions, + @Nullable RetryPolicy retryPolicy, + @Nullable SSLContext sslContext, + @Nullable X509TrustManager trustManager, + @Nullable ExecutorService executorService) { + int callTimeoutMillis = (int) Math.min(timeout.toMillis(), Integer.MAX_VALUE); + int connectTimeoutMillis = (int) Math.min(connectTimeout.toMillis(), Integer.MAX_VALUE); + + Dispatcher dispatcher; + if (executorService == null) { + dispatcher = OkHttpUtil.newDispatcher(); + this.managedExecutor = true; + } else { + dispatcher = new Dispatcher(executorService); + this.managedExecutor = false; + } + + OkHttpClient.Builder builder = + new OkHttpClient.Builder() + .dispatcher(dispatcher) + .connectTimeout(Duration.ofMillis(connectTimeoutMillis)) + .callTimeout(Duration.ofMillis(callTimeoutMillis)); + + if (proxyOptions != null) { + builder.proxySelector(proxyOptions.getProxySelector()); + } + + if (retryPolicy != null) { + builder.addInterceptor(new RetryInterceptor(retryPolicy, OkHttpHttpSender::isRetryable)); + } + + boolean isPlainHttp = endpoint.getScheme().equals("http"); + if (isPlainHttp) { + builder.connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT)); + } else if (sslContext != null && trustManager != null) { + builder.sslSocketFactory(sslContext.getSocketFactory(), trustManager); + } + + this.client = builder.build(); + this.url = HttpUrl.get(endpoint); + this.mediaType = MediaType.parse(contentType); + this.compressor = compressor; + this.headerSupplier = headerSupplier; + } + + @Override + public void send( + MessageWriter messageWriter, Consumer onResponse, Consumer onError) { + Request.Builder requestBuilder = new Request.Builder().url(url); + + Map> headers = headerSupplier.get(); + if (headers != null) { + headers.forEach( + (key, values) -> values.forEach(value -> requestBuilder.addHeader(key, value))); + } + if (compressor != null) { + requestBuilder.addHeader("Content-Encoding", compressor.getEncoding()); + } + requestBuilder.post(new RequestBodyImpl(messageWriter, compressor, mediaType)); + + InstrumentationUtil.suppressInstrumentation( + () -> + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + onError.accept(e); + } + + @Override + public void onResponse(Call call, Response response) { + try (ResponseBody body = response.body()) { + onResponse.accept( + new HttpResponse() { + @Nullable private byte[] bodyBytes; + + @Override + public int getStatusCode() { + return response.code(); + } + + @Override + public String getStatusMessage() { + return response.message(); + } + + @Override + public byte[] getResponseBody() { + if (bodyBytes == null) { + try { + bodyBytes = body.bytes(); + } catch (IOException e) { + bodyBytes = new byte[0]; + logger.log(Level.WARNING, "Failed to read response body", e); + } + } + return bodyBytes; + } + }); + } + } + })); + } + + @Override + public CompletableResultCode shutdown() { + client.dispatcher().cancelAll(); + if (managedExecutor) { + client.dispatcher().executorService().shutdownNow(); + } + client.connectionPool().evictAll(); + return CompletableResultCode.ofSuccess(); + } + + static boolean isRetryable(Response response) { + return RetryUtil.retryableHttpResponseCodes().contains(response.code()); + } + + private static class RequestBodyImpl extends RequestBody { + + private final MessageWriter requestBodyWriter; + @Nullable private final Compressor compressor; + private final MediaType mediaType; + + private RequestBodyImpl( + MessageWriter requestBodyWriter, @Nullable Compressor compressor, MediaType mediaType) { + this.requestBodyWriter = requestBodyWriter; + this.compressor = compressor; + this.mediaType = mediaType; + } + + @Override + public long contentLength() { + return compressor == null ? requestBodyWriter.getContentLength() : -1; + } + + @Override + public MediaType contentType() { + return mediaType; + } + + @Override + public void writeTo(BufferedSink bufferedSink) throws IOException { + if (compressor != null) { + BufferedSink compressedSink = + Okio.buffer(Okio.sink(compressor.compress(bufferedSink.outputStream()))); + requestBodyWriter.writeMessage(compressedSink.outputStream()); + compressedSink.close(); + } else { + requestBodyWriter.writeMessage(bufferedSink.outputStream()); + } + } + } +} diff --git a/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpHttpSenderProvider.java b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpHttpSenderProvider.java new file mode 100644 index 00000000000..0625a1b77bf --- /dev/null +++ b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpHttpSenderProvider.java @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import io.opentelemetry.sdk.common.export.HttpSender; +import io.opentelemetry.sdk.common.export.HttpSenderConfig; +import io.opentelemetry.sdk.common.export.HttpSenderProvider; + +/** + * {@link HttpSender} SPI implementation for {@link OkHttpHttpSender}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class OkHttpHttpSenderProvider implements HttpSenderProvider { + + @Override + public HttpSender createSender(HttpSenderConfig httpSenderConfig) { + return new OkHttpHttpSender( + httpSenderConfig.getEndpoint(), + httpSenderConfig.getContentType(), + httpSenderConfig.getCompressor(), + httpSenderConfig.getTimeout(), + httpSenderConfig.getConnectTimeout(), + httpSenderConfig.getHeadersSupplier(), + httpSenderConfig.getProxyOptions(), + httpSenderConfig.getRetryPolicy(), + httpSenderConfig.getSslContext(), + httpSenderConfig.getTrustManager(), + httpSenderConfig.getExecutorService()); + } +} diff --git a/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpUtil.java b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpUtil.java new file mode 100644 index 00000000000..299d0800517 --- /dev/null +++ b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpUtil.java @@ -0,0 +1,52 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.internal.DaemonThreadFactory; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import okhttp3.Dispatcher; + +/** + * Utilities for OkHttp. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class OkHttpUtil { + @SuppressWarnings("NonFinalStaticField") + private static boolean propagateContextForTestingInDispatcher = false; + + public static void setPropagateContextForTestingInDispatcher( + boolean propagateContextForTestingInDispatcher) { + OkHttpUtil.propagateContextForTestingInDispatcher = propagateContextForTestingInDispatcher; + } + + /** Returns a {@link Dispatcher} using daemon threads, otherwise matching the OkHttp default. */ + public static Dispatcher newDispatcher() { + return new Dispatcher( + new ThreadPoolExecutor( + 0, + Integer.MAX_VALUE, + 60, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + createThreadFactory("okhttp-dispatch"))); + } + + private static DaemonThreadFactory createThreadFactory(String namePrefix) { + if (propagateContextForTestingInDispatcher) { + return new DaemonThreadFactory( + namePrefix, r -> Executors.defaultThreadFactory().newThread(Context.current().wrap(r))); + } + return new DaemonThreadFactory(namePrefix); + } + + private OkHttpUtil() {} +} diff --git a/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/RetryInterceptor.java b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/RetryInterceptor.java new file mode 100644 index 00000000000..74e6a5d1344 --- /dev/null +++ b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/RetryInterceptor.java @@ -0,0 +1,180 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import static java.util.stream.Collectors.joining; + +import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.util.StringJoiner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; +import okhttp3.Interceptor; +import okhttp3.Response; + +/** + * Retrier of OkHttp requests. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class RetryInterceptor implements Interceptor { + + private static final Logger logger = Logger.getLogger(RetryInterceptor.class.getName()); + + private final RetryPolicy retryPolicy; + private final Function isRetryable; + private final Predicate retryExceptionPredicate; + private final Sleeper sleeper; + private final Supplier randomJitter; + + /** Constructs a new retrier. */ + public RetryInterceptor(RetryPolicy retryPolicy, Function isRetryable) { + this( + retryPolicy, + isRetryable, + retryPolicy.getRetryExceptionPredicate() == null + ? RetryInterceptor::isRetryableException + : retryPolicy.getRetryExceptionPredicate(), + TimeUnit.NANOSECONDS::sleep, + () -> ThreadLocalRandom.current().nextDouble(0.8d, 1.2d)); + } + + // Visible for testing + RetryInterceptor( + RetryPolicy retryPolicy, + Function isRetryable, + Predicate retryExceptionPredicate, + Sleeper sleeper, + Supplier randomJitter) { + this.retryPolicy = retryPolicy; + this.isRetryable = isRetryable; + this.retryExceptionPredicate = retryExceptionPredicate; + this.sleeper = sleeper; + this.randomJitter = randomJitter; + } + + @Override + public Response intercept(Chain chain) throws IOException { + Response response = null; + IOException exception = null; + int attempt = 0; + long nextBackoffNanos = retryPolicy.getInitialBackoff().toNanos(); + do { + if (attempt > 0) { + // Compute and sleep for backoff + // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#exponential-backoff + long currentBackoffNanos = + Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos()); + long backoffNanos = (long) (randomJitter.get() * currentBackoffNanos); + nextBackoffNanos = (long) (currentBackoffNanos * retryPolicy.getBackoffMultiplier()); + try { + sleeper.sleep(backoffNanos); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; // Break out and return response or throw + } + // Close response from previous attempt + if (response != null) { + response.close(); + } + exception = null; + } + try { + response = chain.proceed(chain.request()); + if (response != null) { + boolean retryable = Boolean.TRUE.equals(isRetryable.apply(response)); + if (logger.isLoggable(Level.FINER)) { + logger.log( + Level.FINER, + "Attempt " + + attempt + + " returned " + + (retryable ? "retryable" : "non-retryable") + + " response: " + + responseStringRepresentation(response)); + } + if (!retryable) { + return response; + } + } else { + throw new NullPointerException("response cannot be null."); + } + } catch (IOException e) { + exception = e; + response = null; + boolean retryable = retryExceptionPredicate.test(exception); + if (logger.isLoggable(Level.FINER)) { + logger.log( + Level.FINER, + "Attempt " + + attempt + + " failed with " + + (retryable ? "retryable" : "non-retryable") + + " exception", + exception); + } + if (!retryable) { + throw exception; + } + } + } while (++attempt < retryPolicy.getMaxAttempts()); + + if (response != null) { + return response; + } + throw exception; + } + + private static String responseStringRepresentation(Response response) { + StringJoiner joiner = new StringJoiner(",", "Response{", "}"); + joiner.add("code=" + response.code()); + joiner.add( + "headers=" + + response.headers().toMultimap().entrySet().stream() + .map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue())) + .collect(joining(",", "[", "]"))); + return joiner.toString(); + } + + // Visible for testing + boolean shouldRetryOnException(IOException e) { + return retryExceptionPredicate.test(e); + } + + // Visible for testing + static boolean isRetryableException(IOException e) { + // Known retryable SocketTimeoutException messages: null, "connect timed out", "timeout" + // Known retryable ConnectTimeout messages: "Failed to connect to + // localhost/[0:0:0:0:0:0:0:1]:62611" + // Known retryable UnknownHostException messages: "xxxxxx.com" + // Known retryable SocketException: Socket closed + if (e instanceof SocketTimeoutException) { + return true; + } else if (e instanceof ConnectException) { + return true; + } else if (e instanceof UnknownHostException) { + return true; + } else if (e instanceof SocketException) { + return true; + } + return false; + } + + // Visible for testing + interface Sleeper { + void sleep(long delayNanos) throws InterruptedException; + } +} diff --git a/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/package-info.java b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/package-info.java new file mode 100644 index 00000000000..1d374c2eeff --- /dev/null +++ b/exporters/sender/okhttp4/src/main/java/io/opentelemetry/exporter/sender/okhttp4/internal/package-info.java @@ -0,0 +1,9 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +@ParametersAreNonnullByDefault +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import javax.annotation.ParametersAreNonnullByDefault; diff --git a/exporters/sender/okhttp4/src/main/resources/META-INF/services/io.opentelemetry.sdk.common.export.GrpcSenderProvider b/exporters/sender/okhttp4/src/main/resources/META-INF/services/io.opentelemetry.sdk.common.export.GrpcSenderProvider new file mode 100644 index 00000000000..af754caf032 --- /dev/null +++ b/exporters/sender/okhttp4/src/main/resources/META-INF/services/io.opentelemetry.sdk.common.export.GrpcSenderProvider @@ -0,0 +1 @@ +io.opentelemetry.exporter.sender.okhttp4.internal.OkHttpGrpcSenderProvider diff --git a/exporters/sender/okhttp4/src/main/resources/META-INF/services/io.opentelemetry.sdk.common.export.HttpSenderProvider b/exporters/sender/okhttp4/src/main/resources/META-INF/services/io.opentelemetry.sdk.common.export.HttpSenderProvider new file mode 100644 index 00000000000..134a3b59f9a --- /dev/null +++ b/exporters/sender/okhttp4/src/main/resources/META-INF/services/io.opentelemetry.sdk.common.export.HttpSenderProvider @@ -0,0 +1 @@ +io.opentelemetry.exporter.sender.okhttp4.internal.OkHttpHttpSenderProvider diff --git a/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/AbstractOkHttpSuppressionTest.java b/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/AbstractOkHttpSuppressionTest.java new file mode 100644 index 00000000000..7850cdca4a4 --- /dev/null +++ b/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/AbstractOkHttpSuppressionTest.java @@ -0,0 +1,58 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.internal.InstrumentationUtil; +import io.opentelemetry.context.Context; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +abstract class AbstractOkHttpSuppressionTest { + + @BeforeEach + void setUp() { + OkHttpUtil.setPropagateContextForTestingInDispatcher(true); + } + + @AfterEach + void tearDown() { + OkHttpUtil.setPropagateContextForTestingInDispatcher(false); + } + + @Test + void testSuppressInstrumentation() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean suppressInstrumentation = new AtomicBoolean(false); + + Runnable onSuccess = Assertions::fail; + Runnable onFailure = + () -> { + suppressInstrumentation.set( + InstrumentationUtil.shouldSuppressInstrumentation(Context.current())); + latch.countDown(); + }; + + send(getSender(), onSuccess, onFailure); + + latch.await(); + + assertTrue(suppressInstrumentation.get()); + } + + abstract void send(T sender, Runnable onSuccess, Runnable onFailure); + + private T getSender() { + return createSender("https://none"); + } + + abstract T createSender(String endpoint); +} diff --git a/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSenderTest.java b/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSenderTest.java new file mode 100644 index 00000000000..03585229cf3 --- /dev/null +++ b/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSenderTest.java @@ -0,0 +1,278 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.exporter.internal.RetryUtil; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.GrpcStatusCode; +import io.opentelemetry.sdk.common.export.MessageWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.time.Duration; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import okhttp3.MediaType; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +class OkHttpGrpcSenderTest { + + private static final String GRPC_STATUS = "grpc-status"; + private static final MediaType TEXT_PLAIN = MediaType.get("text/plain"); + + static Set provideRetryableGrpcStatusCodes() { + return RetryUtil.retryableGrpcStatusCodes(); + } + + @ParameterizedTest(name = "isRetryable should return true for GRPC status code: {0}") + @MethodSource("provideRetryableGrpcStatusCodes") + void isRetryable_RetryableGrpcStatus(String retryableGrpcStatus) { + Response response = createResponse(503, retryableGrpcStatus, "Retryable"); + boolean isRetryable = OkHttpGrpcSender.isRetryable(response); + assertTrue(isRetryable); + } + + @Test + void isRetryable_NonRetryableGrpcStatus() { + String nonRetryableGrpcStatus = + Integer.valueOf(GrpcStatusCode.UNKNOWN.getValue()).toString(); // INVALID_ARGUMENT + Response response = createResponse(503, nonRetryableGrpcStatus, "Non-retryable"); + boolean isRetryable = OkHttpGrpcSender.isRetryable(response); + assertFalse(isRetryable); + } + + private static Response createResponse(int httpCode, String grpcStatus, String message) { + return new Response.Builder() + .request(new Request.Builder().url("http://localhost/").build()) + .protocol(Protocol.HTTP_2) + .code(httpCode) + .body(ResponseBody.create("body", TEXT_PLAIN)) + .message(message) + .header(GRPC_STATUS, grpcStatus) + .build(); + } + + @Test + void shutdown_CompletableResultCodeShouldWaitForThreads() throws Exception { + // This test verifies that shutdown() returns a CompletableResultCode that only + // completes AFTER threads terminate, not immediately. + + // Allocate an ephemeral port and immediately close it to get a port with nothing listening + int port; + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } + + OkHttpGrpcSender sender = + new OkHttpGrpcSender( + "http://localhost:" + port, // Non-existent endpoint to trigger thread creation + null, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + null, + null); + + CompletableResultCode sendResult = new CompletableResultCode(); + sender.send( + new TestMessageWriter(), response -> sendResult.succeed(), error -> sendResult.fail()); + + // Give threads time to start + Thread.sleep(500); + + CompletableResultCode shutdownResult = sender.shutdown(); + + // The key test: the CompletableResultCode should NOT be done() immediately + // because we need to wait for threads to terminate. + // Before #7840, this would fail. + assertFalse( + shutdownResult.isDone(), + "CompletableResultCode should not be done immediately - it should wait for thread termination"); + + // Now wait for it to complete + shutdownResult.join(10, TimeUnit.SECONDS); + assertTrue(shutdownResult.isDone(), "CompletableResultCode should be done after waiting"); + assertTrue(shutdownResult.isSuccess(), "Shutdown should complete successfully"); + } + + @Test + void shutdown_NonManagedExecutor_ReturnsImmediately() { + // This test verifies that when using a non-managed executor (custom ExecutorService), + // shutdown() returns an already-completed CompletableResultCode immediately. + + // Create a custom ExecutorService - this makes the executor non-managed + ExecutorService customExecutor = Executors.newSingleThreadExecutor(); + + try { + OkHttpGrpcSender sender = + new OkHttpGrpcSender( + "http://localhost:8080", + null, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + null, + customExecutor); // Pass custom executor -> managedExecutor = false + + CompletableResultCode shutdownResult = sender.shutdown(); + + // Should complete immediately since executor is not managed + assertTrue( + shutdownResult.isDone(), + "CompletableResultCode should be done immediately for non-managed executor"); + assertTrue(shutdownResult.isSuccess(), "Shutdown should complete successfully"); + } finally { + // Clean up the custom executor + customExecutor.shutdownNow(); + } + } + + @Test + void shutdown_ExecutorDoesNotTerminateInTime_LogsWarningButSucceeds() throws Exception { + // This test verifies that when threads don't terminate within 5 seconds, + // a warning is logged but shutdown still succeeds. + + // Allocate an ephemeral port + int port; + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } + + // Create sender with managed executor (default) + OkHttpGrpcSender sender = + new OkHttpGrpcSender( + "http://localhost:" + port, + null, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + null, + null); // null executor = managed + + // Start multiple requests to ensure threads are busy + CountDownLatch blockCallbacks = new CountDownLatch(1); + for (int i = 0; i < 3; i++) { + sender.send( + new TestMessageWriter(), + response -> { + try { + // Block in callback for longer than the 5-second timeout + blockCallbacks.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, + error -> { + try { + // Block in callback for longer than the 5-second timeout + blockCallbacks.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + + // Give threads time to start (same pattern as existing test) + Thread.sleep(500); + + // Shutdown will now try to terminate threads that are blocked + CompletableResultCode shutdownResult = sender.shutdown(); + + // The shutdown should eventually complete successfully + // even though threads didn't terminate in 5 seconds + assertTrue( + shutdownResult.join(10, TimeUnit.SECONDS).isSuccess(), + "Shutdown should succeed even when threads don't terminate quickly"); + + // Release the blocking callbacks + blockCallbacks.countDown(); + } + + @Test + void shutdown_InterruptedWhileWaiting_StillSucceeds() throws Exception { + // This test verifies that if the shutdown thread is interrupted while waiting + // for termination, it still marks the shutdown as successful. + + // Allocate an ephemeral port + int port; + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } + + OkHttpGrpcSender sender = + new OkHttpGrpcSender( + "http://localhost:" + port, + null, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + null, + null); + + // Trigger some activity + sender.send(new TestMessageWriter(), response -> {}, error -> {}); + + // Give threads time to start (same pattern as existing test) + Thread.sleep(500); + + // Start shutdown + CompletableResultCode shutdownResult = sender.shutdown(); + + // Give the shutdown thread a moment to start + Thread.sleep(100); + + // Find and interrupt the okhttp-shutdown thread to trigger the InterruptedException path + Thread[] threads = new Thread[Thread.activeCount() + 10]; + int count = Thread.enumerate(threads); + for (int i = 0; i < count; i++) { + Thread thread = threads[i]; + if (thread != null && thread.getName().equals("okhttp-shutdown")) { + // Interrupt the shutdown thread to test the InterruptedException handling + thread.interrupt(); + break; + } + } + + // Even with interruption, shutdown should still succeed + assertTrue( + shutdownResult.join(10, TimeUnit.SECONDS).isSuccess(), + "Shutdown should succeed even when interrupted"); + } + + /** Simple test marshaler for testing purposes. */ + private static class TestMessageWriter implements MessageWriter { + @Override + public void writeMessage(OutputStream output) throws IOException { + // Empty writer + } + + @Override + public int getContentLength() { + return 0; + } + } +} diff --git a/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSuppressionTest.java b/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSuppressionTest.java new file mode 100644 index 00000000000..db1e1df6ecb --- /dev/null +++ b/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpGrpcSuppressionTest.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import io.opentelemetry.sdk.common.export.MessageWriter; +import java.io.OutputStream; +import java.time.Duration; +import java.util.Collections; + +class OkHttpGrpcSuppressionTest extends AbstractOkHttpSuppressionTest { + + @Override + void send(OkHttpGrpcSender sender, Runnable onSuccess, Runnable onFailure) { + sender.send( + new MessageWriter() { + @Override + public void writeMessage(OutputStream output) {} + + @Override + public int getContentLength() { + return 0; + } + }, + grpcResponse -> {}, + throwable -> onFailure.run()); + } + + @Override + OkHttpGrpcSender createSender(String endpoint) { + return new OkHttpGrpcSender( + "https://localhost", + null, + Duration.ofNanos(10), + Duration.ofNanos(10L), + Collections::emptyMap, + null, + null, + null, + null); + } +} diff --git a/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpHttpSuppressionTest.java b/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpHttpSuppressionTest.java new file mode 100644 index 00000000000..0fde42868e8 --- /dev/null +++ b/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/OkHttpHttpSuppressionTest.java @@ -0,0 +1,51 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import io.opentelemetry.sdk.common.export.MessageWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; + +class OkHttpHttpSuppressionTest extends AbstractOkHttpSuppressionTest { + + @Override + void send(OkHttpHttpSender sender, Runnable onSuccess, Runnable onFailure) { + byte[] content = "A".getBytes(StandardCharsets.UTF_8); + MessageWriter requestBodyWriter = + new MessageWriter() { + @Override + public void writeMessage(OutputStream output) throws IOException { + output.write(content); + } + + @Override + public int getContentLength() { + return content.length; + } + }; + sender.send(requestBodyWriter, (response) -> onSuccess.run(), (error) -> onFailure.run()); + } + + @Override + OkHttpHttpSender createSender(String endpoint) { + return new OkHttpHttpSender( + URI.create(endpoint), + "text/plain", + null, + Duration.ofNanos(10), + Duration.ofNanos(10), + Collections::emptyMap, + null, + null, + null, + null, + null); + } +} diff --git a/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/RetryInterceptorTest.java b/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/RetryInterceptorTest.java new file mode 100644 index 00000000000..5e2eadfd9d4 --- /dev/null +++ b/exporters/sender/okhttp4/src/test/java/io/opentelemetry/exporter/sender/okhttp4/internal/RetryInterceptorTest.java @@ -0,0 +1,316 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp4.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension; +import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.io.IOException; +import java.net.ConnectException; +import java.net.HttpRetryException; +import java.net.ServerSocket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Stream; +import okhttp3.Interceptor; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; + +@ExtendWith(MockitoExtension.class) +class RetryInterceptorTest { + + @RegisterExtension static final MockWebServerExtension server = new MockWebServerExtension(); + + @Mock private RetryInterceptor.Sleeper sleeper; + @Mock private Supplier random; + private Predicate retryExceptionPredicate; + + private RetryInterceptor retrier; + private OkHttpClient client; + + @BeforeEach + void setUp() { + Logger logger = Logger.getLogger(RetryInterceptor.class.getName()); + logger.setLevel(Level.FINER); + retryExceptionPredicate = + spy( + new Predicate() { + @Override + public boolean test(IOException e) { + return RetryInterceptor.isRetryableException(e) + || (e instanceof HttpRetryException + && e.getMessage().contains("timeout retry")); + } + }); + + RetryPolicy retryPolicy = + RetryPolicy.builder() + .setBackoffMultiplier(1.6) + .setInitialBackoff(Duration.ofSeconds(1)) + .setMaxBackoff(Duration.ofSeconds(2)) + .setMaxAttempts(5) + .setRetryExceptionPredicate(retryExceptionPredicate) + .build(); + + retrier = + new RetryInterceptor( + retryPolicy, r -> !r.isSuccessful(), retryExceptionPredicate, sleeper, random); + client = new OkHttpClient.Builder().addInterceptor(retrier).build(); + } + + @Test + void noRetryOnNullResponse() throws IOException { + Interceptor.Chain chain = mock(Interceptor.Chain.class); + when(chain.proceed(any())).thenReturn(null); + when(chain.request()) + .thenReturn(new Request.Builder().url(server.httpUri().toString()).build()); + assertThatThrownBy( + () -> { + retrier.intercept(chain); + }) + .isInstanceOf(NullPointerException.class) + .hasMessage("response cannot be null."); + + verifyNoInteractions(retryExceptionPredicate); + verifyNoInteractions(random); + verifyNoInteractions(sleeper); + } + + @Test + void noRetry() throws Exception { + server.enqueue(HttpResponse.of(HttpStatus.OK)); + + try (Response response = sendRequest()) { + assertThat(response.isSuccessful()).isTrue(); + } + + verifyNoInteractions(random); + verifyNoInteractions(sleeper); + } + + @ParameterizedTest + // Test is mostly same for 5 or more attempts since it's the max. We check the backoff timings and + // handling of max attempts by checking both. + @ValueSource(ints = {5, 6}) + void backsOff(int attempts) throws Exception { + succeedOnAttempt(attempts); + when(random.get()).thenReturn(1.0d); + doNothing().when(sleeper).sleep(anyLong()); + + try (Response response = sendRequest()) { + if (attempts <= 5) { + assertThat(response.isSuccessful()).isTrue(); + } else { + assertThat(response.isSuccessful()).isFalse(); + } + } + + for (int i = 0; i < 5; i++) { + server.takeRequest(0, TimeUnit.NANOSECONDS); + } + } + + @Test + void interrupted() throws Exception { + succeedOnAttempt(5); + + // Backs off twice, second is interrupted + when(random.get()).thenReturn(1.0d).thenReturn(1.0d); + doAnswer( + new Answer() { + int counter = 0; + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + if (counter++ == 1) { + throw new InterruptedException(); + } + return null; + } + }) + .when(sleeper) + .sleep(anyLong()); + + try (Response response = sendRequest()) { + assertThat(response.isSuccessful()).isFalse(); + } + verify(sleeper, times(2)).sleep(anyLong()); + for (int i = 0; i < 2; i++) { + server.takeRequest(0, TimeUnit.NANOSECONDS); + } + } + + @Test + void connectTimeout() throws Exception { + client = connectTimeoutClient(); + when(random.get()).thenReturn(1.0d); + doNothing().when(sleeper).sleep(anyLong()); + + // Connecting to a non-routable IP address to trigger connection error + assertThatThrownBy( + () -> + client.newCall(new Request.Builder().url("http://10.255.255.1").build()).execute()) + .isInstanceOfAny(SocketTimeoutException.class, SocketException.class); + + verify(retryExceptionPredicate, times(5)).test(any()); + // Should retry maxAttempts, and sleep maxAttempts - 1 times + verify(sleeper, times(4)).sleep(anyLong()); + } + + @Test + void connectException() throws Exception { + client = connectTimeoutClient(); + when(random.get()).thenReturn(1.0d); + doNothing().when(sleeper).sleep(anyLong()); + + // Connecting to localhost on an unused port address to trigger java.net.ConnectException + int openPort = freePort(); + assertThatThrownBy( + () -> + client + .newCall(new Request.Builder().url("http://localhost:" + openPort).build()) + .execute()) + .isInstanceOfAny(ConnectException.class, SocketTimeoutException.class); + + verify(retryExceptionPredicate, times(5)).test(any()); + // Should retry maxAttempts, and sleep maxAttempts - 1 times + verify(sleeper, times(4)).sleep(anyLong()); + } + + private static int freePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + void nonRetryableException() throws InterruptedException { + client = connectTimeoutClient(); + // Override retryPredicate so that no exception is retryable + when(retryExceptionPredicate.test(any())).thenReturn(false); + + // Connecting to a non-routable IP address to trigger connection timeout + assertThatThrownBy( + () -> + client.newCall(new Request.Builder().url("http://10.255.255.1").build()).execute()) + .isInstanceOfAny(SocketTimeoutException.class, SocketException.class); + + verify(retryExceptionPredicate, times(1)).test(any()); + verify(sleeper, never()).sleep(anyLong()); + } + + private OkHttpClient connectTimeoutClient() { + return new OkHttpClient.Builder() + .connectTimeout(Duration.ofMillis(10)) + .addInterceptor(retrier) + .build(); + } + + @ParameterizedTest + @MethodSource("isRetryableExceptionArgs") + void isRetryableException(IOException exception, boolean expectedRetryResult) { + assertThat(retrier.shouldRetryOnException(exception)).isEqualTo(expectedRetryResult); + } + + private static Stream isRetryableExceptionArgs() { + return Stream.of( + // Should retry on SocketTimeoutExceptions + Arguments.of(new SocketTimeoutException("Connect timed out"), true), + Arguments.of(new SocketTimeoutException("connect timed out"), true), + Arguments.of(new SocketTimeoutException("timeout"), true), + Arguments.of(new SocketTimeoutException("Read timed out"), true), + Arguments.of(new SocketTimeoutException(), true), + // Should retry on UnknownHostExceptions + Arguments.of(new UnknownHostException("host"), true), + // Should retry on SocketException + Arguments.of(new SocketException("closed"), true), + // Should retry on ConnectException + Arguments.of( + new ConnectException("Failed to connect to localhost/[0:0:0:0:0:0:0:1]:62611"), true), + // Shouldn't retry other IOException + Arguments.of(new IOException("error"), false), + // Testing configured predicate + Arguments.of(new HttpRetryException("error", 400), false), + Arguments.of(new HttpRetryException("timeout retry", 400), true)); + } + + @Test + void isRetryableExceptionDefaultBehaviour() { + RetryInterceptor retryInterceptor = + new RetryInterceptor(RetryPolicy.getDefault(), OkHttpHttpSender::isRetryable); + assertThat( + retryInterceptor.shouldRetryOnException( + new SocketTimeoutException("Connect timed out"))) + .isTrue(); + assertThat(retryInterceptor.shouldRetryOnException(new IOException("Connect timed out"))) + .isFalse(); + } + + @Test + void isRetryableExceptionCustomRetryPredicate() { + RetryInterceptor retryInterceptor = + new RetryInterceptor( + RetryPolicy.builder() + .setRetryExceptionPredicate((IOException e) -> e.getMessage().equals("retry")) + .build(), + OkHttpHttpSender::isRetryable); + + assertThat(retryInterceptor.shouldRetryOnException(new IOException("some message"))).isFalse(); + assertThat(retryInterceptor.shouldRetryOnException(new IOException("retry"))).isTrue(); + assertThat( + retryInterceptor.shouldRetryOnException( + new SocketTimeoutException("Connect timed out"))) + .isFalse(); + } + + private Response sendRequest() throws IOException { + return client.newCall(new Request.Builder().url(server.httpUri().toString()).build()).execute(); + } + + private static void succeedOnAttempt(int attempt) { + for (int i = 1; i < attempt; i++) { + server.enqueue(HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR)); + } + server.enqueue(HttpResponse.of(HttpStatus.OK)); + } +} diff --git a/sdk-extensions/jaeger-remote-sampler/build.gradle.kts b/sdk-extensions/jaeger-remote-sampler/build.gradle.kts index 871fd46b065..a58553b2933 100644 --- a/sdk-extensions/jaeger-remote-sampler/build.gradle.kts +++ b/sdk-extensions/jaeger-remote-sampler/build.gradle.kts @@ -20,7 +20,7 @@ dependencies { implementation(project(":exporters:common")) implementation(project(":exporters:sender:okhttp")) - implementation("com.squareup.okhttp3:okhttp") + implementation("com.squareup.okhttp3:okhttp:5.3.2") compileOnly("io.grpc:grpc-api") compileOnly("io.grpc:grpc-protobuf") diff --git a/settings.gradle.kts b/settings.gradle.kts index e6fc58fb3d1..08d59b9469b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -41,6 +41,7 @@ include(":exporters:common:compile-stub") include(":exporters:sender:grpc-managed-channel") include(":exporters:sender:jdk") include(":exporters:sender:okhttp") +include(":exporters:sender:okhttp4") include(":exporters:logging") include(":exporters:logging-otlp") include(":exporters:otlp:all")