diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java index a18cd2d3df5..fd233f41893 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java @@ -10,6 +10,7 @@ import com.datadog.debugger.util.MoshiHelper; import com.squareup.moshi.JsonAdapter; import datadog.trace.api.Config; +import datadog.trace.util.RandomUtils; import datadog.trace.util.TagsHelper; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -19,8 +20,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import java.util.zip.GZIPOutputStream; import okhttp3.HttpUrl; import okhttp3.MediaType; @@ -36,21 +39,34 @@ public class SymbolSink { public static final BatchUploader.RetryPolicy RETRY_POLICY = new BatchUploader.RetryPolicy(10); private static final JsonAdapter SERVICE_VERSION_ADAPTER = MoshiHelper.createMoshiSymbol().adapter(ServiceVersion.class); + // The upload event message JSON. The "final" field is hard-coded to false: + // the Java tracer continuously uploads new code as classes get loaded, so + // there is no defined end-of-upload point. private static final String EVENT_FORMAT = "{%n" + "\"ddsource\": \"dd_debugger\",%n" + "\"service\": \"%s\",%n" + + "\"version\": \"%s\",%n" + + "\"language\": \"java\",%n" + "\"runtimeId\": \"%s\",%n" - + "\"type\": \"symdb\"%n" + + "\"type\": \"symdb\",%n" + + "\"uploadId\": \"%s\",%n" + + "\"batchNum\": %d,%n" + + "\"final\": false,%n" + + "\"attachmentSize\": %d%n" + "}"; static final int MAX_SYMDB_UPLOAD_SIZE = 50 * 1024 * 1024; private final String serviceName; private final String env; private final String version; + private final String runtimeId; private final BatchUploader symbolUploader; private final int maxPayloadSize; - private final BatchUploader.MultiPartContent event; + // uploadId is shared by all batches uploaded by this sink. The backend uses + // (runtimeId, uploadId) to group batches belonging to the same logical upload. + private final UUID uploadId = RandomUtils.randomUUID(); + private final AtomicLong batchNum = new AtomicLong(0); private final BlockingQueue scopes = new ArrayBlockingQueue<>(CAPACITY); private final Stats stats = new Stats(); private final boolean isCompressed; @@ -66,15 +82,10 @@ public SymbolSink(Config config) { this.serviceName = TagsHelper.sanitize(config.getServiceName()); this.env = TagsHelper.sanitize(config.getEnv()); this.version = TagsHelper.sanitize(config.getVersion()); + this.runtimeId = config.getRuntimeId(); this.symbolUploader = symbolUploader; this.maxPayloadSize = maxPayloadSize; this.isCompressed = config.isSymbolDatabaseCompressed(); - byte[] eventContent = - String.format( - EVENT_FORMAT, TagsHelper.sanitize(config.getServiceName()), config.getRuntimeId()) - .getBytes(StandardCharsets.UTF_8); - this.event = - new BatchUploader.MultiPartContent(eventContent, "event", "event.json", APPLICATION_JSON); } public void stop() { @@ -111,22 +122,35 @@ public void flush() { } private void serializeAndUpload(List scopesToSerialize) { + // Determine the batch number once so the attachment body and the EvP event + // message agree on it. + long currentBatch = batchNum.incrementAndGet(); try { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(2 * 1024 * 1024); try (OutputStream outputStream = isCompressed ? new GZIPOutputStream(byteArrayOutputStream) : byteArrayOutputStream) { BufferedSink sink = Okio.buffer(Okio.sink(outputStream)); SERVICE_VERSION_ADAPTER.toJson( - sink, new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize)); + sink, + new ServiceVersion( + serviceName, + env, + version, + "JAVA", + scopesToSerialize, + uploadId.toString(), + currentBatch, + false /* isFinal */)); sink.flush(); } - doUpload(scopesToSerialize, byteArrayOutputStream.toByteArray(), isCompressed); + doUpload(scopesToSerialize, byteArrayOutputStream.toByteArray(), isCompressed, currentBatch); } catch (IOException e) { LOGGER.debug("Error serializing scopes", e); } } - private void doUpload(List scopesToSerialize, byte[] payload, boolean isCompressed) { + private void doUpload( + List scopesToSerialize, byte[] payload, boolean isCompressed, long currentBatch) { if (payload.length > maxPayloadSize) { LOGGER.warn( "Payload is too big: {}/{} isCompressed={}", @@ -148,10 +172,26 @@ private void doUpload(List scopesToSerialize, byte[] payload, boolean isC fileName = "file.gz"; mediaType = APPLICATION_GZIP; } + BatchUploader.MultiPartContent event = buildEvent(currentBatch, payload.length); symbolUploader.uploadAsMultipart( "", event, new BatchUploader.MultiPartContent(payload, "file", fileName, mediaType)); } + private BatchUploader.MultiPartContent buildEvent(long currentBatch, int attachmentSize) { + byte[] eventContent = + String.format( + EVENT_FORMAT, + serviceName, + version, + runtimeId, + uploadId.toString(), + currentBatch, + attachmentSize) + .getBytes(StandardCharsets.UTF_8); + return new BatchUploader.MultiPartContent( + eventContent, "event", "event.json", APPLICATION_JSON); + } + private static byte[] compressPayload(byte[] jsonBytes) { // usual compression factor 40:1 for those json payload, so we are preallocating 1/40 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(jsonBytes.length / 40); diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/symbol/ServiceVersion.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/symbol/ServiceVersion.java index a88fc25da63..fd1efaaac04 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/symbol/ServiceVersion.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/symbol/ServiceVersion.java @@ -1,5 +1,6 @@ package com.datadog.debugger.symbol; +import com.squareup.moshi.Json; import java.util.List; public class ServiceVersion { @@ -10,13 +11,32 @@ public class ServiceVersion { private final String language; private final List scopes; + @Json(name = "upload_id") + private final String uploadId; + + @Json(name = "batch_num") + private final long batchNum; + + @Json(name = "final") + private final boolean isFinal; + public ServiceVersion( - String service, String env, String version, String language, List scopes) { + String service, + String env, + String version, + String language, + List scopes, + String uploadId, + long batchNum, + boolean isFinal) { this.service = service; this.env = env; this.version = version; this.language = language; this.scopes = scopes; + this.uploadId = uploadId; + this.batchNum = batchNum; + this.isFinal = isFinal; } public List getScopes() { diff --git a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java index 91a5743d2a0..9a04fbae6ba 100644 --- a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java +++ b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java @@ -24,6 +24,8 @@ public void testSimpleFlush() { SymbolUploaderMock symbolUploaderMock = new SymbolUploaderMock(); Config config = mock(Config.class); when(config.getServiceName()).thenReturn("service1"); + when(config.getVersion()).thenReturn("1.0.0"); + when(config.getRuntimeId()).thenReturn("test-runtime"); when(config.isSymbolDatabaseCompressed()).thenReturn(false); SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock, MAX_SYMDB_UPLOAD_SIZE); symbolSink.addScope(Scope.builder(ScopeType.JAR, null, 0, 0).build()); @@ -35,13 +37,25 @@ public void testSimpleFlush() { String strEventContent = new String(eventContent.getContent()); assertTrue(strEventContent.contains("\"ddsource\": \"dd_debugger\"")); assertTrue(strEventContent.contains("\"service\": \"service1\"")); + assertTrue(strEventContent.contains("\"version\": \"1.0.0\"")); + assertTrue(strEventContent.contains("\"language\": \"java\"")); + assertTrue(strEventContent.contains("\"runtimeId\": \"test-runtime\"")); assertTrue(strEventContent.contains("\"type\": \"symdb\"")); + assertTrue(strEventContent.contains("\"uploadId\":")); + assertTrue(strEventContent.contains("\"batchNum\": 1")); + assertTrue(strEventContent.contains("\"final\": false")); + assertTrue(strEventContent.contains("\"attachmentSize\":")); BatchUploader.MultiPartContent symbolContent = symbolUploaderMock.multiPartContents.get(1); assertEquals("file", symbolContent.getPartName()); assertEquals("file.json", symbolContent.getFileName()); - assertEquals( - "{\"language\":\"JAVA\",\"scopes\":[{\"end_line\":0,\"has_injectible_lines\":false,\"scope_type\":\"JAR\",\"start_line\":0}],\"service\":\"service1\"}", - new String(symbolContent.getContent())); + String fileContent = new String(symbolContent.getContent()); + assertTrue(fileContent.contains("\"language\":\"JAVA\"")); + assertTrue(fileContent.contains("\"scopes\":[")); + assertTrue(fileContent.contains("\"service\":\"service1\"")); + assertTrue(fileContent.contains("\"version\":\"1.0.0\"")); + assertTrue(fileContent.contains("\"upload_id\":")); + assertTrue(fileContent.contains("\"batch_num\":1")); + assertTrue(fileContent.contains("\"final\":false")); } @Test @@ -219,8 +233,10 @@ public void maxCompressedAndSplit() { .build()); } symbolSink.flush(); - assertEquals(4, symbolUploaderMock.multiPartContents.size()); - for (int i = 0; i < 4; i += 2) { + int total = symbolUploaderMock.multiPartContents.size(); + assertTrue(total >= 4, "expected at least 4 multipart entries (2+ event/file pairs), got " + total); + assertTrue(total % 2 == 0, "expected an even number of multipart entries (event/file pairs), got " + total); + for (int i = 0; i < total; i += 2) { BatchUploader.MultiPartContent eventContent = symbolUploaderMock.multiPartContents.get(i); assertEquals("event", eventContent.getPartName()); BatchUploader.MultiPartContent symbolContent =