Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package datadog.trace.instrumentation.java.lang.jdk21;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.endTaskScope;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.startTaskScope;
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.AGENT_SCOPE_CLASS_NAME;
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_CLASS_NAME;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.environment.JavaVirtualMachine;
Expand All @@ -16,7 +14,8 @@
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
Expand All @@ -25,13 +24,13 @@

/**
* Instruments {@code VirtualThread} to capture active state at creation, activate it on
* continuation mount, and close the scope from activation on continuation unmount.
* continuation mount, close the scope on continuation unmount, and release the continuation when
* the virtual thread terminates.
*
* <p>The instrumentation uses two context stores. The first from {@link Runnable} (as {@code
* VirtualThread} inherits from {@link Runnable}) to store the captured {@link State} to restore
* later. It additionally stores the {@link AgentScope} to be able to close it later as activation /
* close is not done around the same method (so passing the scope from {@link OnMethodEnter} /
* {@link OnMethodExit} using advice return value is not possible).
* VirtualThread} inherits from {@link Runnable}) stores a held {@link ConcurrentState} so the
* parent context can be re-activated on each mount. It additionally stores the {@link AgentScope}
* to be able to close it later as activation / close is not done around the same method.
*
* <p>Instrumenting the internal {@code VirtualThread.runContinuation()} method does not work as the
* current thread is still the carrier thread and not a virtual thread. Activating the state when on
Expand Down Expand Up @@ -62,7 +61,7 @@ public boolean isEnabled() {
@Override
public Map<String, String> contextStore() {
Map<String, String> contextStore = new HashMap<>();
contextStore.put(Runnable.class.getName(), State.class.getName());
contextStore.put(Runnable.class.getName(), ConcurrentState.class.getName());
contextStore.put(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
return contextStore;
}
Expand All @@ -72,36 +71,66 @@ public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct");
transformer.applyAdvice(isMethod().and(named("mount")), getClass().getName() + "$Activate");
transformer.applyAdvice(isMethod().and(named("unmount")), getClass().getName() + "$Close");
transformer.applyAdvice(
isMethod()
.and(
// this one for jdk 21
named("afterTerminate")
.and(takesArguments(2))
// this one for jdk 25+
.or(named("afterDone").and(takesArguments(1)))),
getClass().getName() + "$Terminate");
}

public static final class Construct {
@OnMethodExit(suppress = Throwable.class)
public static void captureScope(@Advice.This Object virtualThread) {
capture(InstrumentationContext.get(Runnable.class, State.class), (Runnable) virtualThread);
ContextStore<Runnable, ConcurrentState> stateStore =
InstrumentationContext.get(Runnable.class, ConcurrentState.class);
ConcurrentState.captureContinuation(
stateStore, (Runnable) virtualThread, AgentTracer.activeSpan());
}
}

public static final class Activate {
@OnMethodExit(suppress = Throwable.class)
public static void activate(@Advice.This Object virtualThread) {
ContextStore<Runnable, State> stateStore =
InstrumentationContext.get(Runnable.class, State.class);
ContextStore<Runnable, ConcurrentState> stateStore =
InstrumentationContext.get(Runnable.class, ConcurrentState.class);
ContextStore<Object, Object> scopeStore =
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
AgentScope agentScope = startTaskScope(stateStore, (Runnable) virtualThread);
scopeStore.put(virtualThread, agentScope);
AgentScope agentScope =
ConcurrentState.activateAndContinueContinuation(stateStore, (Runnable) virtualThread);
if (agentScope != null) {
scopeStore.put(virtualThread, agentScope);
}
}
}

public static final class Close {
@OnMethodEnter(suppress = Throwable.class)
public static void close(@Advice.This Object virtualThread) {
ContextStore<Runnable, ConcurrentState> stateStore =
InstrumentationContext.get(Runnable.class, ConcurrentState.class);
ContextStore<Object, Object> scopeStore =
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
Object agentScope = scopeStore.get(virtualThread);
Object agentScope = scopeStore.remove(virtualThread);
if (agentScope instanceof AgentScope) {
endTaskScope((AgentScope) agentScope);
ConcurrentState.closeScope(
stateStore, (Runnable) virtualThread, (AgentScope) agentScope, null);
}
}
}

public static final class Terminate {
@OnMethodExit(suppress = Throwable.class)
public static void cleanup(@Advice.This Object virtualThread) {
ContextStore<Runnable, ConcurrentState> stateStore =
InstrumentationContext.get(Runnable.class, ConcurrentState.class);
ConcurrentState.cancelAndClearContinuation(stateStore, (Runnable) virtualThread);
ContextStore<Object, Object> scopeStore =
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
scopeStore.remove(virtualThread);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import static datadog.trace.agent.test.assertions.SpanMatcher.span;
import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME;
import static datadog.trace.agent.test.assertions.TraceMatcher.trace;
import static org.junit.jupiter.api.Assertions.assertEquals;

import datadog.trace.agent.test.AbstractInstrumentationTest;
import datadog.trace.api.CorrelationIdentifier;
import datadog.trace.api.Trace;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -137,6 +140,68 @@ public void run() {
span().childOfPrevious().operationName("great-great-child")));
}

@DisplayName("test CorrelationIdentifier across virtual thread remount")
@Test
void testCorrelationIdentifierAcrossVirtualThreadRemount() throws InterruptedException {
AtomicReference<String> parentTraceId = new AtomicReference<>();
AtomicReference<String> parentSpanId = new AtomicReference<>();
AtomicReference<String> traceIdBeforeRemount = new AtomicReference<>();
AtomicReference<String> spanIdBeforeRemount = new AtomicReference<>();
AtomicReference<String> traceIdAfterRemount = new AtomicReference<>();
AtomicReference<String> spanIdAfterRemount = new AtomicReference<>();

new Runnable() {
@Override
@Trace(operationName = "parent")
public void run() {
parentTraceId.set(CorrelationIdentifier.getTraceId());
parentSpanId.set(CorrelationIdentifier.getSpanId());

Thread thread =
Thread.startVirtualThread(
() -> {
traceIdBeforeRemount.set(CorrelationIdentifier.getTraceId());
spanIdBeforeRemount.set(CorrelationIdentifier.getSpanId());

try {
// Sleeping should park and later remount the virtual thread.
Thread.sleep(10);
Comment on lines +167 to +168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔨 issue: This is not deterministic. You can't use Thread.sleep() and expect the VM to unmount the thread. This will be a source of flakiness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, not really of flakiness, because if it's not unmounted then the trace and span IDs will stay the same, but "reverse flakiness" in the sense that if it's broken, we may not see it

Copy link
Contributor

@dougqh dougqh Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, depending on what is exposed, we may not have a way to make this fully reliable. For similar threading / scheduling issues, I took the approach of just repeating the test many times across multiple core / threads.

Alternatively, you could also add in a signal that the relevant code was actually triggered, so we can verify that this is working.

} catch (InterruptedException e) {
throw new RuntimeException(e);
}

traceIdAfterRemount.set(CorrelationIdentifier.getTraceId());
spanIdAfterRemount.set(CorrelationIdentifier.getSpanId());
});

try {
thread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}.run();

assertEquals(
parentTraceId.get(),
traceIdBeforeRemount.get(),
"trace id should be visible before the virtual thread remounts");
assertEquals(
parentSpanId.get(),
spanIdBeforeRemount.get(),
"span id should be visible before the virtual thread remounts");
assertEquals(
parentTraceId.get(),
traceIdAfterRemount.get(),
"trace id should survive a virtual thread remount");
assertEquals(
parentSpanId.get(),
spanIdAfterRemount.get(),
"span id should survive a virtual thread remount");

assertTraces(trace(span().root().operationName("parent")));
}

/** Verifies the parent / child span relation. */
void assertConnectedTrace() {
assertTraces(
Expand Down
Loading