From 3c835a7250795b9dbc9b578d7c5d9164d33d6887 Mon Sep 17 00:00:00 2001 From: Stuart McCulloch Date: Thu, 7 May 2026 21:16:44 +0100 Subject: [PATCH] Fix OtelLogRecordProcessor schedule: * if we have enough logs for a batch, keep sending batched logs * otherwise wait for the interval to elapse, then send what we have --- .../logs/data/OtelLogRecordProcessor.java | 76 +++++++++++++------ .../logs/OpenTelemetryLogsTest.java | 10 +-- .../core/otlp/logs/NoopOtlpLogsCollector.java | 11 --- .../core/otlp/logs/OtlpLogsCollector.java | 4 +- .../otlp/logs/OtlpLogsProtoCollector.java | 10 +-- .../trace/core/otlp/logs/OtlpLogsService.java | 60 +++++++-------- .../metrics/NoopOtlpMetricsCollector.java | 11 --- .../otlp/metrics/OtlpMetricsCollector.java | 2 + .../core/otlp/metrics/OtlpMetricsService.java | 6 +- .../core/otlp/trace/OtlpTraceCollector.java | 2 + .../core/otlp/logs/OtlpLogsProtoTest.java | 8 +- 11 files changed, 108 insertions(+), 92 deletions(-) delete mode 100644 dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/NoopOtlpLogsCollector.java delete mode 100644 dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/NoopOtlpMetricsCollector.java diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/logs/data/OtelLogRecordProcessor.java b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/logs/data/OtelLogRecordProcessor.java index 905cdce10a8..ef32b6abbac 100644 --- a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/logs/data/OtelLogRecordProcessor.java +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/logs/data/OtelLogRecordProcessor.java @@ -14,12 +14,12 @@ import java.util.Queue; import java.util.WeakHashMap; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; /** Processes log records, grouping them by instrumentation scope. */ public final class OtelLogRecordProcessor { - private static final int MAX_QUEUE_SIZE = Config.get().getLogsOtelQueueSize(); - private static final int MAX_BATCH_SIZE = Config.get().getLogsOtelBatchSize(); private static final Comparator BY_SCOPE = Comparator.comparing(o -> o.instrumentationScope); @@ -29,18 +29,66 @@ public final class OtelLogRecordProcessor { public static final OtelLogRecordProcessor INSTANCE = new OtelLogRecordProcessor(); - private final Queue queue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); + private final int maxQueueSize = Config.get().getLogsOtelQueueSize(); + private final int maxBatchSize = Config.get().getLogsOtelBatchSize(); + + private final Queue queue = new ArrayBlockingQueue<>(maxQueueSize); + + private final BlockingQueue logsReady = new ArrayBlockingQueue<>(1); + private volatile int logsNeeded = Integer.MAX_VALUE; public void addLog(OtlpLogRecord logRecord) { - queue.offer(logRecord); + if (queue.offer(logRecord)) { + // report when we have enough logs for the collector's needs + if (queue.size() >= logsNeeded) { + logsReady.offer(true); + } + } } - public void collectLogs(OtlpLogsVisitor visitor) { + public void waitForLogs(OtlpLogsVisitor visitor, int intervalMillis) { + long nextExportNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMillis); + List batch = new ArrayList<>(maxBatchSize); + int batchSize = 0; + + while (true) { + + // attempt to collect enough logs to complete the batch + OtlpLogRecord record; + while (batchSize < maxBatchSize && (record = queue.poll()) != null) { + batch.add(record); + batchSize++; + } + + // bail out if we have enough logs, or the interval has expired + long waitNanos; + if (batchSize >= maxBatchSize || (waitNanos = nextExportNanos - System.nanoTime()) <= 0) { + break; + } + + logsNeeded = maxBatchSize - batchSize; // declare what we need and wait + try { + if (queue.isEmpty()) { + logsReady.poll(waitNanos, TimeUnit.NANOSECONDS); + } + } catch (InterruptedException ignore) { + break; + } finally { + logsNeeded = Integer.MAX_VALUE; + } + } + + visitBatch(visitor, batch); // send what we have for this interval + } + + private static void visitBatch(OtlpLogsVisitor visitor, List batch) { + batch.sort(BY_SCOPE); + OtlpScopedLogsVisitor scopedVisitor = null; OtelInstrumentationScope currentScope = null; BiConsumer, OtlpAttributeVisitor> attributesReader = null; ClassLoader attributesClassLoader = null; - for (OtlpLogRecord logRecord : batchByScope()) { + for (OtlpLogRecord logRecord : batch) { if (logRecord.instrumentationScope != currentScope) { currentScope = logRecord.instrumentationScope; scopedVisitor = visitor.visitScopedLogs(currentScope); @@ -70,20 +118,4 @@ public static void registerAttributeReader( ClassLoader cl, BiConsumer, OtlpAttributeVisitor> reader) { ATTRIBUTE_READERS.put(cl, reader); } - - private List batchByScope() { - // capture expected batch size; records emitted after here go into next batch - int batchSize = Math.min(queue.size(), MAX_BATCH_SIZE); - List batch = new ArrayList<>(batchSize); - for (int i = 0; i < batchSize; i++) { - OtlpLogRecord logRecord = queue.poll(); - if (logRecord != null) { - batch.add(logRecord); - } else { - break; // should not happen unless another thread is also batching records - } - } - batch.sort(BY_SCOPE); - return batch; - } } diff --git a/dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.27/src/test/java/opentelemetry127/logs/OpenTelemetryLogsTest.java b/dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.27/src/test/java/opentelemetry127/logs/OpenTelemetryLogsTest.java index 40cd9b318fa..b5c387d0a38 100644 --- a/dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.27/src/test/java/opentelemetry127/logs/OpenTelemetryLogsTest.java +++ b/dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.27/src/test/java/opentelemetry127/logs/OpenTelemetryLogsTest.java @@ -35,7 +35,7 @@ class OpenTelemetryLogsTest extends AbstractInstrumentationTest { @BeforeEach void drainQueue() { // drain any stale log records from the shared processor queue before each test - OtelLogRecordProcessor.INSTANCE.collectLogs(LogsDrainer.INSTANCE); + OtelLogRecordProcessor.INSTANCE.waitForLogs(LogsDrainer.INSTANCE, 0); } @ParameterizedTest @@ -48,7 +48,7 @@ void testSeverity(Severity severity) { logger.logRecordBuilder().setBody("test message").setSeverity(severity).emit(); - OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader); + OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0); assertEquals(1, logsReader.logs.size()); CapturedLog log = logsReader.logs.get(0); @@ -68,7 +68,7 @@ void testSeverityText() { .setSeverityText("custom-level") .emit(); - OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader); + OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0); assertEquals(1, logsReader.logs.size()); CapturedLog log = logsReader.logs.get(0); @@ -89,7 +89,7 @@ void testAttributes() { .setAttribute(doubleKey("double.key"), 1.5) .emit(); - OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader); + OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0); assertEquals(1, logsReader.logs.size()); CapturedLog log = logsReader.logs.get(0); @@ -110,7 +110,7 @@ void testMultipleScopes() { loggerB.logRecordBuilder().setBody("b-1").setSeverity(Severity.WARN).emit(); loggerA.logRecordBuilder().setBody("a-2").setSeverity(Severity.DEBUG).emit(); - OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader); + OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0); // logs are sorted by scope name, so all scope-a logs come before scope-b logs assertEquals(3, logsReader.logs.size()); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/NoopOtlpLogsCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/NoopOtlpLogsCollector.java deleted file mode 100644 index ac237442e8c..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/NoopOtlpLogsCollector.java +++ /dev/null @@ -1,11 +0,0 @@ -package datadog.trace.core.otlp.logs; - -import datadog.trace.core.otlp.common.OtlpPayload; - -final class NoopOtlpLogsCollector extends OtlpLogsCollector { - static final NoopOtlpLogsCollector INSTANCE = new NoopOtlpLogsCollector(); - - public OtlpPayload collectLogs() { - return OtlpPayload.EMPTY; - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsCollector.java index 510054107f3..c263efab665 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsCollector.java @@ -4,5 +4,7 @@ /** Collects logs ready for export. */ public abstract class OtlpLogsCollector { - public abstract OtlpPayload collectLogs(); + + /** Waits for logs to be batched within the given interval. */ + public abstract OtlpPayload waitForLogs(int intervalMillis); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProtoCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProtoCollector.java index 176354465a8..448aad5a5ad 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProtoCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProtoCollector.java @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.Deque; import java.util.List; -import java.util.function.Consumer; +import java.util.function.ObjIntConsumer; /** * Collects OpenTelemetry logs and marshals them into a chunked 'logs.proto' payload. @@ -61,14 +61,14 @@ private OtlpLogsProtoCollector() {} *

This payload is only valid for the calling thread until the next collection. */ @Override - public OtlpPayload collectLogs() { - return collectLogs(OtelLogRecordProcessor.INSTANCE::collectLogs); + public OtlpPayload waitForLogs(int intervalMillis) { + return collectLogs(OtelLogRecordProcessor.INSTANCE::waitForLogs, intervalMillis); } - OtlpPayload collectLogs(Consumer processor) { + OtlpPayload collectLogs(ObjIntConsumer processor, int intervalMillis) { start(); try { - processor.accept(this); + processor.accept(this, intervalMillis); return completePayload(); } finally { stop(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsService.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsService.java index e8e31817f96..fe6cc7084b0 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsService.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsService.java @@ -1,15 +1,13 @@ package datadog.trace.core.otlp.logs; import static datadog.trace.util.AgentThreadFactory.AgentThread.OTLP_LOGS_EXPORTER; +import static datadog.trace.util.AgentThreadFactory.newAgentThread; import datadog.trace.api.Config; import datadog.trace.core.otlp.common.OtlpGrpcSender; import datadog.trace.core.otlp.common.OtlpHttpSender; import datadog.trace.core.otlp.common.OtlpPayload; import datadog.trace.core.otlp.common.OtlpSender; -import datadog.trace.util.AgentTaskScheduler; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,17 +17,14 @@ public final class OtlpLogsService { public static final OtlpLogsService INSTANCE = new OtlpLogsService(Config.get()); - private final AgentTaskScheduler scheduler; + private final int intervalMillis; private final OtlpLogsCollector collector; private final OtlpSender sender; - private final int intervalMillis; - - private AgentTaskScheduler.Scheduled scheduledTask = null; + private volatile Thread exporterThread; private OtlpLogsService(Config config) { - this.scheduler = new AgentTaskScheduler(OTLP_LOGS_EXPORTER); - + intervalMillis = config.getLogsOtelInterval(); switch (config.getOtlpLogsProtocol()) { case GRPC: this.collector = OtlpLogsProtoCollector.INSTANCE; @@ -53,11 +48,9 @@ private OtlpLogsService(Config config) { break; default: LOGGER.debug("Unsupported OTLP logs protocol: {}", config.getOtlpLogsProtocol()); - this.collector = NoopOtlpLogsCollector.INSTANCE; + this.collector = null; this.sender = null; } - - this.intervalMillis = config.getLogsOtelInterval(); } public void start() { @@ -65,29 +58,26 @@ public void start() { return; } - // add random jitter of up to 5 seconds to initial delay; avoids a fleet - // of apps starting at the same time from exporting OTLP logs in sync - long initialMillis = - intervalMillis - + Math.min( - (long) - (500d - * Math.log(ThreadLocalRandom.current().nextDouble()) - / Math.log(1 - 0.25)), - 5_000); - - scheduledTask = - scheduler.scheduleAtFixedRate( - this::export, initialMillis, intervalMillis, TimeUnit.MILLISECONDS); + exporterThread = newAgentThread(OTLP_LOGS_EXPORTER, this::export); + exporterThread.start(); } public void flush() { - scheduler.execute(this::export); + Thread thread = exporterThread; + if (thread != null) { + thread.interrupt(); + } } public void shutdown() { - if (scheduledTask != null) { - scheduledTask.cancel(); + Thread thread = exporterThread; + if (thread != null) { + exporterThread = null; + thread.interrupt(); + try { + thread.join(1_000); + } catch (InterruptedException ignore) { + } } if (sender != null) { sender.shutdown(); @@ -95,9 +85,15 @@ public void shutdown() { } private void export() { - OtlpPayload payload = collector.collectLogs(); - if (payload != OtlpPayload.EMPTY) { - sender.send(payload); + while (Thread.currentThread() == exporterThread) { + try { + OtlpPayload payload = collector.waitForLogs(intervalMillis); + if (payload != OtlpPayload.EMPTY) { + sender.send(payload); + } + } catch (RuntimeException e) { + LOGGER.debug("Uncaught exception exporting logs", e); + } } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/NoopOtlpMetricsCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/NoopOtlpMetricsCollector.java deleted file mode 100644 index 43d97a094e2..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/NoopOtlpMetricsCollector.java +++ /dev/null @@ -1,11 +0,0 @@ -package datadog.trace.core.otlp.metrics; - -import datadog.trace.core.otlp.common.OtlpPayload; - -final class NoopOtlpMetricsCollector extends OtlpMetricsCollector { - static final NoopOtlpMetricsCollector INSTANCE = new NoopOtlpMetricsCollector(); - - public OtlpPayload collectMetrics() { - return OtlpPayload.EMPTY; - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsCollector.java index bc26e5b6eaa..5e528d7e9de 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsCollector.java @@ -4,5 +4,7 @@ /** Collects metrics ready for export. */ public abstract class OtlpMetricsCollector { + + /** Collects all metrics recorded since the last collection. */ public abstract OtlpPayload collectMetrics(); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsService.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsService.java index bed847ff073..ea6e28f47f9 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsService.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsService.java @@ -53,7 +53,7 @@ private OtlpMetricsService(Config config) { break; default: LOGGER.debug("Unsupported OTLP metrics protocol: {}", config.getOtlpMetricsProtocol()); - this.collector = NoopOtlpMetricsCollector.INSTANCE; + this.collector = null; this.sender = null; } @@ -82,7 +82,9 @@ public void start() { } public void flush() { - scheduler.execute(this::export); + if (sender != null) { + scheduler.execute(this::export); + } } public void shutdown() { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java index e7d53f3d009..6990f2d3fa7 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java @@ -7,7 +7,9 @@ /** Collects traces ready for export. */ public abstract class OtlpTraceCollector { + /** Adds spans from the given trace to the collector. */ public abstract void addTrace(List> spans); + /** Collects all spans added since the last collection. */ public abstract OtlpPayload collectTraces(); } diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/logs/OtlpLogsProtoTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/logs/OtlpLogsProtoTest.java index 178f069aa3e..80078635f47 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/otlp/logs/OtlpLogsProtoTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/logs/OtlpLogsProtoTest.java @@ -24,6 +24,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope; +import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor; import datadog.trace.bootstrap.otlp.logs.OtlpLogRecord; import datadog.trace.bootstrap.otlp.logs.OtlpScopedLogsVisitor; import datadog.trace.common.writer.LoggingWriter; @@ -47,7 +48,7 @@ import org.junit.jupiter.params.provider.MethodSource; /** - * Tests for {@link OtlpLogsProto} via {@link OtlpLogsProtoCollector#collectLogs}. + * Tests for {@link OtlpLogsProto} via {@link OtlpLogsProtoCollector#waitForLogs}. * *

Each test case constructs {@link OtlpLogRecord} instances (using real {@link DDSpan} contexts * where needed), collects them via {@link OtlpLogsProtoCollector}, drains the resulting chunked @@ -366,7 +367,7 @@ void testCollectLogs(String caseName, List specs) throws IOException { OtlpPayload payload = OtlpLogsProtoCollector.INSTANCE.collectLogs( - visitor -> { + (visitor, interval) -> { OtelInstrumentationScope lastScope = null; OtlpScopedLogsVisitor scoped = null; for (LogSpec spec : specs) { @@ -390,7 +391,8 @@ void testCollectLogs(String caseName, List specs) throws IOException { ctx, spec.eventName)); } - }); + }, + 0); if (specs.isEmpty()) { assertEquals(0, payload.getContentLength(), "empty specs must produce empty payload");