From 5e9fc263485dcd01975b5cfe956b234755b3738d Mon Sep 17 00:00:00 2001 From: Giorgos Makris Date: Wed, 1 Apr 2026 21:40:00 +0200 Subject: [PATCH 1/3] Replace ArrayBlockingQueue with park/unpark Bypassing all the complexity that comes with ABQ should come with improved performance --- .../sdk/trace/export/BatchSpanProcessor.java | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index f264128696e..1a668824c3a 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -17,16 +17,16 @@ import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.internal.JcTools; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -86,6 +86,7 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { JcTools.newFixedSizeQueue(maxQueueSize), maxQueueSize); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); + this.worker.setWorkerThread(workerThread); workerThread.start(); } @@ -169,6 +170,7 @@ private static final class Worker implements Runnable { private final long exporterTimeoutNanos; private long nextExportTime; + @Nullable private Thread workerThread; private final Queue queue; private final AtomicInteger queueSize = new AtomicInteger(); @@ -178,8 +180,7 @@ private static final class Worker implements Runnable { // Integer.MAX_VALUE is used to imply that exporter thread is not expecting any signal. Since // exporter thread doesn't expect any signal initially, this value is initialized to // Integer.MAX_VALUE. - private final AtomicInteger spansNeeded = new AtomicInteger(Integer.MAX_VALUE); - private final BlockingQueue signal; + private volatile int spansNeeded = Integer.MAX_VALUE; private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; private final ArrayList batch; @@ -200,7 +201,6 @@ private Worker( this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; - this.signal = new ArrayBlockingQueue<>(1); spanProcessorInstrumentation = SpanProcessorInstrumentation.get(telemetryVersion, COMPONENT_ID, meterProvider); @@ -209,14 +209,18 @@ private Worker( this.batch = new ArrayList<>(this.maxExportBatchSize); } + private void setWorkerThread(Thread workerThread) { + this.workerThread = workerThread; + } + private void addSpan(ReadableSpan span) { spanProcessorInstrumentation.buildQueueMetricsOnce(maxQueueSize, queue::size); if (!queue.offer(span)) { spanProcessorInstrumentation.dropSpans(1); droppedSpanCount.incrementAndGet(); } else { - if (queueSize.incrementAndGet() >= spansNeeded.get()) { - signal.offer(true); + if (workerThread != null && queueSize.incrementAndGet() >= spansNeeded) { + LockSupport.unpark(workerThread); } } } @@ -236,16 +240,11 @@ public void run() { updateNextExportTime(); } if (queue.isEmpty()) { - try { - long pollWaitTime = nextExportTime - System.nanoTime(); - if (pollWaitTime > 0) { - spansNeeded.set(maxExportBatchSize - batch.size()); - signal.poll(pollWaitTime, TimeUnit.NANOSECONDS); - spansNeeded.set(Integer.MAX_VALUE); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; + long pollWaitTime = nextExportTime - System.nanoTime(); + if (pollWaitTime > 0) { + spansNeeded = maxExportBatchSize - batch.size(); + LockSupport.parkNanos(pollWaitTime); + spansNeeded = Integer.MAX_VALUE; } } } @@ -302,8 +301,8 @@ private CompletableResultCode shutdown() { private CompletableResultCode forceFlush() { CompletableResultCode flushResult = new CompletableResultCode(); // we set the atomic here to trigger the worker loop to do a flush of the entire queue. - if (flushRequested.compareAndSet(null, flushResult)) { - signal.offer(true); + if (workerThread != null && flushRequested.compareAndSet(null, flushResult)) { + LockSupport.unpark(workerThread); } CompletableResultCode possibleResult = flushRequested.get(); // there's a race here where the flush happening in the worker loop could complete before we From 481bf35e9824ad0c354d0e19a84788114380c5af Mon Sep 17 00:00:00 2001 From: Giorgos Makris Date: Wed, 1 Apr 2026 21:55:08 +0200 Subject: [PATCH 2/3] Reorder imports --- .../io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 1a668824c3a..f426b219f0f 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -17,7 +17,6 @@ import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.internal.JcTools; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -30,6 +29,7 @@ import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Implementation of the {@link SpanProcessor} that batches spans exported by the SDK then pushes From 8970e4d428e059ef49260ea7b038d49a043704f7 Mon Sep 17 00:00:00 2001 From: Giorgos Makris Date: Wed, 1 Apr 2026 23:53:34 +0200 Subject: [PATCH 3/3] Ensure constant reference to thread Making the worker extend thread was the only sensible way of having it keep a constant reference to its thread. Everything else I tried was quite hacky --- .../sdk/trace/export/BatchSpanProcessor.java | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index f426b219f0f..79611ff47f7 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -10,7 +10,6 @@ import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InternalTelemetryVersion; import io.opentelemetry.sdk.common.internal.ComponentId; -import io.opentelemetry.sdk.common.internal.DaemonThreadFactory; import io.opentelemetry.sdk.common.internal.ThrowableUtil; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; @@ -29,7 +28,6 @@ import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; /** * Implementation of the {@link SpanProcessor} that batches spans exported by the SDK then pushes @@ -85,9 +83,8 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { exporterTimeoutNanos, JcTools.newFixedSizeQueue(maxQueueSize), maxQueueSize); - Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); - this.worker.setWorkerThread(workerThread); - workerThread.start(); + + worker.start(); } @Override @@ -160,7 +157,7 @@ public String toString() { // Worker is a thread that batches multiple spans and calls the registered SpanExporter to export // the data. - private static final class Worker implements Runnable { + private static final class Worker extends Thread { private final SpanProcessorInstrumentation spanProcessorInstrumentation; @@ -170,7 +167,6 @@ private static final class Worker implements Runnable { private final long exporterTimeoutNanos; private long nextExportTime; - @Nullable private Thread workerThread; private final Queue queue; private final AtomicInteger queueSize = new AtomicInteger(); @@ -196,6 +192,9 @@ private Worker( long exporterTimeoutNanos, Queue queue, long maxQueueSize) { + super(WORKER_THREAD_NAME); + super.setDaemon(true); + this.spanExporter = spanExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; @@ -209,18 +208,14 @@ private Worker( this.batch = new ArrayList<>(this.maxExportBatchSize); } - private void setWorkerThread(Thread workerThread) { - this.workerThread = workerThread; - } - private void addSpan(ReadableSpan span) { spanProcessorInstrumentation.buildQueueMetricsOnce(maxQueueSize, queue::size); if (!queue.offer(span)) { spanProcessorInstrumentation.dropSpans(1); droppedSpanCount.incrementAndGet(); } else { - if (workerThread != null && queueSize.incrementAndGet() >= spansNeeded) { - LockSupport.unpark(workerThread); + if (queueSize.incrementAndGet() >= spansNeeded) { + LockSupport.unpark(this); } } } @@ -301,8 +296,8 @@ private CompletableResultCode shutdown() { private CompletableResultCode forceFlush() { CompletableResultCode flushResult = new CompletableResultCode(); // we set the atomic here to trigger the worker loop to do a flush of the entire queue. - if (workerThread != null && flushRequested.compareAndSet(null, flushResult)) { - LockSupport.unpark(workerThread); + if (flushRequested.compareAndSet(null, flushResult)) { + LockSupport.unpark(this); } CompletableResultCode possibleResult = flushRequested.get(); // there's a race here where the flush happening in the worker loop could complete before we