Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import java.util.Queue;
import java.util.WeakHashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

/** Processes log records, grouping them by instrumentation scope. */
public final class OtelLogRecordProcessor {
private static final int MAX_QUEUE_SIZE = Config.get().getLogsOtelQueueSize();
private static final int MAX_BATCH_SIZE = Config.get().getLogsOtelBatchSize();

private static final Comparator<OtlpLogRecord> BY_SCOPE =
Comparator.comparing(o -> o.instrumentationScope);
Expand All @@ -29,18 +29,66 @@ public final class OtelLogRecordProcessor {

public static final OtelLogRecordProcessor INSTANCE = new OtelLogRecordProcessor();

private final Queue<OtlpLogRecord> queue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
private final int maxQueueSize = Config.get().getLogsOtelQueueSize();
private final int maxBatchSize = Config.get().getLogsOtelBatchSize();

private final Queue<OtlpLogRecord> queue = new ArrayBlockingQueue<>(maxQueueSize);

private final BlockingQueue<Boolean> logsReady = new ArrayBlockingQueue<>(1);
private volatile int logsNeeded = Integer.MAX_VALUE;

public void addLog(OtlpLogRecord logRecord) {
queue.offer(logRecord);
if (queue.offer(logRecord)) {
// report when we have enough logs for the collector's needs
if (queue.size() >= logsNeeded) {
logsReady.offer(true);
}
}
}

public void collectLogs(OtlpLogsVisitor visitor) {
public void waitForLogs(OtlpLogsVisitor visitor, int intervalMillis) {
long nextExportNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMillis);
List<OtlpLogRecord> batch = new ArrayList<>(maxBatchSize);
int batchSize = 0;

while (true) {

// attempt to collect enough logs to complete the batch
OtlpLogRecord record;
while (batchSize < maxBatchSize && (record = queue.poll()) != null) {
batch.add(record);
batchSize++;
}

// bail out if we have enough logs, or the interval has expired
long waitNanos;
if (batchSize >= maxBatchSize || (waitNanos = nextExportNanos - System.nanoTime()) <= 0) {
break;
}

logsNeeded = maxBatchSize - batchSize; // declare what we need and wait
try {
if (queue.isEmpty()) {
logsReady.poll(waitNanos, TimeUnit.NANOSECONDS);
}
} catch (InterruptedException ignore) {
break;
} finally {
logsNeeded = Integer.MAX_VALUE;
}
}

visitBatch(visitor, batch); // send what we have for this interval
}

private static void visitBatch(OtlpLogsVisitor visitor, List<OtlpLogRecord> batch) {
batch.sort(BY_SCOPE);

OtlpScopedLogsVisitor scopedVisitor = null;
OtelInstrumentationScope currentScope = null;
BiConsumer<Map<?, ?>, OtlpAttributeVisitor> attributesReader = null;
ClassLoader attributesClassLoader = null;
for (OtlpLogRecord logRecord : batchByScope()) {
for (OtlpLogRecord logRecord : batch) {
if (logRecord.instrumentationScope != currentScope) {
currentScope = logRecord.instrumentationScope;
scopedVisitor = visitor.visitScopedLogs(currentScope);
Expand Down Expand Up @@ -70,20 +118,4 @@ public static void registerAttributeReader(
ClassLoader cl, BiConsumer<Map<?, ?>, OtlpAttributeVisitor> reader) {
ATTRIBUTE_READERS.put(cl, reader);
}

private List<OtlpLogRecord> batchByScope() {
// capture expected batch size; records emitted after here go into next batch
int batchSize = Math.min(queue.size(), MAX_BATCH_SIZE);
List<OtlpLogRecord> batch = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize; i++) {
OtlpLogRecord logRecord = queue.poll();
if (logRecord != null) {
batch.add(logRecord);
} else {
break; // should not happen unless another thread is also batching records
}
}
batch.sort(BY_SCOPE);
return batch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class OpenTelemetryLogsTest extends AbstractInstrumentationTest {
@BeforeEach
void drainQueue() {
// drain any stale log records from the shared processor queue before each test
OtelLogRecordProcessor.INSTANCE.collectLogs(LogsDrainer.INSTANCE);
OtelLogRecordProcessor.INSTANCE.waitForLogs(LogsDrainer.INSTANCE, 0);
}

@ParameterizedTest
Expand All @@ -48,7 +48,7 @@ void testSeverity(Severity severity) {

logger.logRecordBuilder().setBody("test message").setSeverity(severity).emit();

OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader);
OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0);

assertEquals(1, logsReader.logs.size());
CapturedLog log = logsReader.logs.get(0);
Expand All @@ -68,7 +68,7 @@ void testSeverityText() {
.setSeverityText("custom-level")
.emit();

OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader);
OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0);

assertEquals(1, logsReader.logs.size());
CapturedLog log = logsReader.logs.get(0);
Expand All @@ -89,7 +89,7 @@ void testAttributes() {
.setAttribute(doubleKey("double.key"), 1.5)
.emit();

OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader);
OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0);

assertEquals(1, logsReader.logs.size());
CapturedLog log = logsReader.logs.get(0);
Expand All @@ -110,7 +110,7 @@ void testMultipleScopes() {
loggerB.logRecordBuilder().setBody("b-1").setSeverity(Severity.WARN).emit();
loggerA.logRecordBuilder().setBody("a-2").setSeverity(Severity.DEBUG).emit();

OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader);
OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0);

// logs are sorted by scope name, so all scope-a logs come before scope-b logs
assertEquals(3, logsReader.logs.size());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@

/** Collects logs ready for export. */
public abstract class OtlpLogsCollector {
public abstract OtlpPayload collectLogs();

/** Waits for logs to be batched within the given interval. */
public abstract OtlpPayload waitForLogs(int intervalMillis);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.ObjIntConsumer;

/**
* Collects OpenTelemetry logs and marshals them into a chunked 'logs.proto' payload.
Expand Down Expand Up @@ -61,14 +61,14 @@ private OtlpLogsProtoCollector() {}
* <p>This payload is only valid for the calling thread until the next collection.
*/
@Override
public OtlpPayload collectLogs() {
return collectLogs(OtelLogRecordProcessor.INSTANCE::collectLogs);
public OtlpPayload waitForLogs(int intervalMillis) {
return collectLogs(OtelLogRecordProcessor.INSTANCE::waitForLogs, intervalMillis);
}

OtlpPayload collectLogs(Consumer<OtlpLogsVisitor> processor) {
OtlpPayload collectLogs(ObjIntConsumer<OtlpLogsVisitor> processor, int intervalMillis) {
start();
try {
processor.accept(this);
processor.accept(this, intervalMillis);
return completePayload();
} finally {
stop();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package datadog.trace.core.otlp.logs;

import static datadog.trace.util.AgentThreadFactory.AgentThread.OTLP_LOGS_EXPORTER;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;

import datadog.trace.api.Config;
import datadog.trace.core.otlp.common.OtlpGrpcSender;
import datadog.trace.core.otlp.common.OtlpHttpSender;
import datadog.trace.core.otlp.common.OtlpPayload;
import datadog.trace.core.otlp.common.OtlpSender;
import datadog.trace.util.AgentTaskScheduler;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -19,17 +17,14 @@ public final class OtlpLogsService {

public static final OtlpLogsService INSTANCE = new OtlpLogsService(Config.get());

private final AgentTaskScheduler scheduler;
private final int intervalMillis;
private final OtlpLogsCollector collector;
private final OtlpSender sender;

private final int intervalMillis;

private AgentTaskScheduler.Scheduled<?> scheduledTask = null;
private volatile Thread exporterThread;

private OtlpLogsService(Config config) {
this.scheduler = new AgentTaskScheduler(OTLP_LOGS_EXPORTER);

intervalMillis = config.getLogsOtelInterval();
switch (config.getOtlpLogsProtocol()) {
case GRPC:
this.collector = OtlpLogsProtoCollector.INSTANCE;
Expand All @@ -53,51 +48,52 @@ private OtlpLogsService(Config config) {
break;
default:
LOGGER.debug("Unsupported OTLP logs protocol: {}", config.getOtlpLogsProtocol());
this.collector = NoopOtlpLogsCollector.INSTANCE;
this.collector = null;
this.sender = null;
}

this.intervalMillis = config.getLogsOtelInterval();
}

public void start() {
if (sender == null) {
return;
}

// add random jitter of up to 5 seconds to initial delay; avoids a fleet
// of apps starting at the same time from exporting OTLP logs in sync
long initialMillis =
intervalMillis
+ Math.min(
(long)
(500d
* Math.log(ThreadLocalRandom.current().nextDouble())
/ Math.log(1 - 0.25)),
5_000);

scheduledTask =
scheduler.scheduleAtFixedRate(
this::export, initialMillis, intervalMillis, TimeUnit.MILLISECONDS);
exporterThread = newAgentThread(OTLP_LOGS_EXPORTER, this::export);
exporterThread.start();
}

public void flush() {
scheduler.execute(this::export);
Thread thread = exporterThread;
if (thread != null) {
thread.interrupt();
}
}

public void shutdown() {
if (scheduledTask != null) {
scheduledTask.cancel();
Thread thread = exporterThread;
if (thread != null) {
exporterThread = null;
thread.interrupt();
try {
thread.join(1_000);
} catch (InterruptedException ignore) {
}
}
if (sender != null) {
sender.shutdown();
}
}

private void export() {
OtlpPayload payload = collector.collectLogs();
if (payload != OtlpPayload.EMPTY) {
sender.send(payload);
while (Thread.currentThread() == exporterThread) {
try {
OtlpPayload payload = collector.waitForLogs(intervalMillis);
if (payload != OtlpPayload.EMPTY) {
sender.send(payload);
}
} catch (RuntimeException e) {
LOGGER.debug("Uncaught exception exporting logs", e);
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@

/** Collects metrics ready for export. */
public abstract class OtlpMetricsCollector {

/** Collects all metrics recorded since the last collection. */
public abstract OtlpPayload collectMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private OtlpMetricsService(Config config) {
break;
default:
LOGGER.debug("Unsupported OTLP metrics protocol: {}", config.getOtlpMetricsProtocol());
this.collector = NoopOtlpMetricsCollector.INSTANCE;
this.collector = null;
this.sender = null;
}

Expand Down Expand Up @@ -82,7 +82,9 @@ public void start() {
}

public void flush() {
scheduler.execute(this::export);
if (sender != null) {
scheduler.execute(this::export);
}
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
/** Collects traces ready for export. */
public abstract class OtlpTraceCollector {

/** Adds spans from the given trace to the collector. */
public abstract void addTrace(List<? extends CoreSpan<?>> spans);

/** Collects all spans added since the last collection. */
public abstract OtlpPayload collectTraces();
}
Loading
Loading