diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index f264128696e..79611ff47f7 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -10,7 +10,6 @@ import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InternalTelemetryVersion; import io.opentelemetry.sdk.common.internal.ComponentId; -import io.opentelemetry.sdk.common.internal.DaemonThreadFactory; import io.opentelemetry.sdk.common.internal.ThrowableUtil; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; @@ -21,12 +20,11 @@ import java.util.Collections; import java.util.List; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -85,8 +83,8 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { exporterTimeoutNanos, JcTools.newFixedSizeQueue(maxQueueSize), maxQueueSize); - Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); - workerThread.start(); + + worker.start(); } @Override @@ -159,7 +157,7 @@ public String toString() { // Worker is a thread that batches multiple spans and calls the registered SpanExporter to export // the data. - private static final class Worker implements Runnable { + private static final class Worker extends Thread { private final SpanProcessorInstrumentation spanProcessorInstrumentation; @@ -178,8 +176,7 @@ private static final class Worker implements Runnable { // Integer.MAX_VALUE is used to imply that exporter thread is not expecting any signal. Since // exporter thread doesn't expect any signal initially, this value is initialized to // Integer.MAX_VALUE. - private final AtomicInteger spansNeeded = new AtomicInteger(Integer.MAX_VALUE); - private final BlockingQueue signal; + private volatile int spansNeeded = Integer.MAX_VALUE; private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; private final ArrayList batch; @@ -195,12 +192,14 @@ private Worker( long exporterTimeoutNanos, Queue queue, long maxQueueSize) { + super(WORKER_THREAD_NAME); + super.setDaemon(true); + this.spanExporter = spanExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; - this.signal = new ArrayBlockingQueue<>(1); spanProcessorInstrumentation = SpanProcessorInstrumentation.get(telemetryVersion, COMPONENT_ID, meterProvider); @@ -215,8 +214,8 @@ private void addSpan(ReadableSpan span) { spanProcessorInstrumentation.dropSpans(1); droppedSpanCount.incrementAndGet(); } else { - if (queueSize.incrementAndGet() >= spansNeeded.get()) { - signal.offer(true); + if (queueSize.incrementAndGet() >= spansNeeded) { + LockSupport.unpark(this); } } } @@ -236,16 +235,11 @@ public void run() { updateNextExportTime(); } if (queue.isEmpty()) { - try { - long pollWaitTime = nextExportTime - System.nanoTime(); - if (pollWaitTime > 0) { - spansNeeded.set(maxExportBatchSize - batch.size()); - signal.poll(pollWaitTime, TimeUnit.NANOSECONDS); - spansNeeded.set(Integer.MAX_VALUE); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; + long pollWaitTime = nextExportTime - System.nanoTime(); + if (pollWaitTime > 0) { + spansNeeded = maxExportBatchSize - batch.size(); + LockSupport.parkNanos(pollWaitTime); + spansNeeded = Integer.MAX_VALUE; } } } @@ -303,7 +297,7 @@ private CompletableResultCode forceFlush() { CompletableResultCode flushResult = new CompletableResultCode(); // we set the atomic here to trigger the worker loop to do a flush of the entire queue. if (flushRequested.compareAndSet(null, flushResult)) { - signal.offer(true); + LockSupport.unpark(this); } CompletableResultCode possibleResult = flushRequested.get(); // there's a race here where the flush happening in the worker loop could complete before we