Skip to content

Commit 5d0e4ce

Browse files
Initial commit
1 parent 9b91826 commit 5d0e4ce

File tree

63 files changed

+7995
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+7995
-1
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import datadog.trace.api.config.CrashTrackingConfig;
3232
import datadog.trace.api.config.CwsConfig;
3333
import datadog.trace.api.config.DebuggerConfig;
34+
import datadog.trace.api.config.FeatureFlagConfig;
3435
import datadog.trace.api.config.GeneralConfig;
3536
import datadog.trace.api.config.IastConfig;
3637
import datadog.trace.api.config.JmxFetchConfig;
@@ -124,7 +125,8 @@ private enum AgentFeature {
124125
DATA_JOBS(GeneralConfig.DATA_JOBS_ENABLED, false),
125126
AGENTLESS_LOG_SUBMISSION(GeneralConfig.AGENTLESS_LOG_SUBMISSION_ENABLED, false),
126127
LLMOBS(LlmObsConfig.LLMOBS_ENABLED, false),
127-
LLMOBS_AGENTLESS(LlmObsConfig.LLMOBS_AGENTLESS_ENABLED, false);
128+
LLMOBS_AGENTLESS(LlmObsConfig.LLMOBS_AGENTLESS_ENABLED, false),
129+
FEATURE_FLAG(FeatureFlagConfig.FLAGGING_PROVIDER_ENABLED, false);
128130

129131
private final String configKey;
130132
private final String systemProp;
@@ -183,6 +185,7 @@ public boolean isEnabledByDefault() {
183185
private static boolean codeOriginEnabled = false;
184186
private static boolean distributedDebuggerEnabled = false;
185187
private static boolean agentlessLogSubmissionEnabled = false;
188+
private static boolean featureFlagEnabled = false;
186189

187190
private static void safelySetContextClassLoader(ClassLoader classLoader) {
188191
try {
@@ -258,6 +261,7 @@ public static void start(
258261
codeOriginEnabled = isFeatureEnabled(AgentFeature.CODE_ORIGIN);
259262
agentlessLogSubmissionEnabled = isFeatureEnabled(AgentFeature.AGENTLESS_LOG_SUBMISSION);
260263
llmObsEnabled = isFeatureEnabled(AgentFeature.LLMOBS);
264+
featureFlagEnabled = isFeatureEnabled(AgentFeature.FEATURE_FLAG);
261265

262266
// setup writers when llmobs is enabled to accomodate apm and llmobs
263267
if (llmObsEnabled) {
@@ -652,6 +656,7 @@ public void execute() {
652656
maybeStartDebugger(instrumentation, scoClass, sco);
653657
maybeStartRemoteConfig(scoClass, sco);
654658
maybeStartAiGuard();
659+
maybeStartFeatureFlag(instrumentation, scoClass, sco);
655660

656661
if (telemetryEnabled) {
657662
startTelemetry(instrumentation, scoClass, sco);
@@ -1073,6 +1078,24 @@ private static void maybeStartLLMObs(Instrumentation inst, Class<?> scoClass, Ob
10731078
}
10741079
}
10751080

1081+
private static void maybeStartFeatureFlag(
1082+
final Instrumentation inst, final Class<?> scoClass, final Object sco) {
1083+
if (featureFlagEnabled) {
1084+
StaticEventLogger.begin("Feature Flag");
1085+
1086+
try {
1087+
final Class<?> ffSysClass =
1088+
AGENT_CLASSLOADER.loadClass("com.datadog.featureflag.FeatureFlagSystem");
1089+
final Method ffSysMethod = ffSysClass.getMethod("start", Instrumentation.class, scoClass);
1090+
ffSysMethod.invoke(null, inst, sco);
1091+
} catch (final Throwable e) {
1092+
log.warn("Not starting Feature Flag subsystem", e);
1093+
}
1094+
1095+
StaticEventLogger.end("Feature Flag");
1096+
}
1097+
}
1098+
10761099
private static void maybeInstallLogsIntake(Class<?> scoClass, Object sco) {
10771100
if (agentlessLogSubmissionEnabled) {
10781101
StaticEventLogger.begin("Logs Intake");
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
2+
3+
plugins {
4+
id 'com.gradleup.shadow'
5+
}
6+
7+
apply from: "$rootDir/gradle/java.gradle"
8+
apply from: "$rootDir/gradle/version.gradle"
9+
10+
ext {
11+
minJavaVersionForTests = JavaVersion.VERSION_11
12+
}
13+
14+
java {
15+
sourceCompatibility = JavaVersion.VERSION_1_8
16+
targetCompatibility = JavaVersion.VERSION_1_8
17+
}
18+
19+
dependencies {
20+
api libs.slf4j
21+
implementation libs.moshi
22+
implementation libs.jctools
23+
24+
api project(':dd-trace-api')
25+
implementation project(':internal-api')
26+
implementation project(':communication')
27+
28+
testImplementation project(':utils:test-utils')
29+
testImplementation project(':dd-trace-api:openfeature')
30+
testImplementation(libs.openfeature.sdk)
31+
}
32+
33+
tasks.named("shadowJar", ShadowJar) {
34+
dependencies deps.excludeShared
35+
}
36+
37+
tasks.named("jar", Jar) {
38+
archiveClassifier = 'unbundled'
39+
}
40+
41+
[GroovyCompile].each {
42+
tasks.withType(it).configureEach {
43+
configureCompiler(it, 11, JavaVersion.VERSION_11)
44+
}
45+
}
46+
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.datadog.featureflag;
2+
3+
import datadog.trace.api.featureflag.FeatureFlagEvaluator;
4+
5+
public class ExposureEvaluatorAdapter implements FeatureFlagEvaluator {
6+
7+
private final ExposureWriter writer;
8+
private final FeatureFlagEvaluator delegate;
9+
10+
public ExposureEvaluatorAdapter(
11+
final ExposureWriter writer, final FeatureFlagEvaluator delegate) {
12+
this.writer = writer;
13+
this.delegate = delegate;
14+
}
15+
16+
@Override
17+
public void addListener(final Listener listener) {
18+
delegate.addListener(listener);
19+
}
20+
21+
@Override
22+
public Resolution<Boolean> evaluate(
23+
final String key, final Boolean defaultValue, final Context context) {
24+
return handleResolution(key, context, delegate.evaluate(key, defaultValue, context));
25+
}
26+
27+
@Override
28+
public Resolution<Integer> evaluate(
29+
final String key, final Integer defaultValue, final Context context) {
30+
return handleResolution(key, context, delegate.evaluate(key, defaultValue, context));
31+
}
32+
33+
@Override
34+
public Resolution<Double> evaluate(
35+
final String key, final Double defaultValue, final Context context) {
36+
return handleResolution(key, context, delegate.evaluate(key, defaultValue, context));
37+
}
38+
39+
@Override
40+
public Resolution<String> evaluate(
41+
final String key, final String defaultValue, final Context context) {
42+
return handleResolution(key, context, delegate.evaluate(key, defaultValue, context));
43+
}
44+
45+
@Override
46+
public Resolution<Object> evaluate(
47+
final String key, final Object defaultValue, final Context context) {
48+
return handleResolution(key, context, delegate.evaluate(key, defaultValue, context));
49+
}
50+
51+
private <E> Resolution<E> handleResolution(
52+
final String key, final Context context, final Resolution<E> resolution) {
53+
writer.write(key, context, resolution);
54+
return resolution;
55+
}
56+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.datadog.featureflag;
2+
3+
import datadog.trace.api.featureflag.FeatureFlagEvaluator.Context;
4+
import datadog.trace.api.featureflag.FeatureFlagEvaluator.Resolution;
5+
6+
public interface ExposureWriter extends AutoCloseable {
7+
8+
void init();
9+
10+
void close();
11+
12+
void write(String flag, Context context, Resolution<?> resolution);
13+
}
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package com.datadog.featureflag;
2+
3+
import static datadog.trace.util.AgentThreadFactory.AgentThread.LLMOBS_EVALS_PROCESSOR;
4+
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
5+
6+
import com.datadog.featureflag.exposure.ExposureEvent;
7+
import com.datadog.featureflag.exposure.ExposuresRequest;
8+
import com.squareup.moshi.JsonAdapter;
9+
import com.squareup.moshi.Moshi;
10+
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
11+
import datadog.communication.ddagent.SharedCommunicationObjects;
12+
import datadog.communication.http.HttpRetryPolicy;
13+
import datadog.communication.http.OkHttpUtils;
14+
import datadog.trace.api.Config;
15+
import datadog.trace.api.featureflag.FeatureFlagEvaluator.Context;
16+
import datadog.trace.api.featureflag.FeatureFlagEvaluator.Resolution;
17+
import java.util.ArrayList;
18+
import java.util.HashMap;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.concurrent.TimeUnit;
22+
import okhttp3.Headers;
23+
import okhttp3.HttpUrl;
24+
import okhttp3.OkHttpClient;
25+
import okhttp3.Request;
26+
import okhttp3.RequestBody;
27+
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
public class ExposureWriterImpl implements ExposureWriter {
32+
33+
private static final String EXPOSURES_API_PATH = "/api/v2/exposures";
34+
private static final String EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain";
35+
private static final String EVP_SUBDOMAIN_HEADER_VALUE = "event-platform-intake";
36+
37+
private static final Logger log = LoggerFactory.getLogger(ExposureWriterImpl.class);
38+
39+
private final MpscBlockingConsumerArrayQueue<ExposureEvent> queue;
40+
private final Thread serializerThread;
41+
42+
public ExposureWriterImpl(
43+
final int capacity,
44+
final long flushInterval,
45+
final TimeUnit timeUnit,
46+
final SharedCommunicationObjects sco,
47+
Config config) {
48+
this.queue = new MpscBlockingConsumerArrayQueue<>(capacity);
49+
final Headers headers = Headers.of(EVP_SUBDOMAIN_HEADER_NAME, EVP_SUBDOMAIN_HEADER_VALUE);
50+
final HttpUrl url =
51+
HttpUrl.get(
52+
sco.agentUrl.toString()
53+
+ DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT
54+
+ EXPOSURES_API_PATH);
55+
final Map<String, String> context = new HashMap<>();
56+
context.put("service", config.getServiceName() == null ? "unknown" : config.getServiceName());
57+
if (config.getEnv() != null) {
58+
context.put("env", config.getEnv());
59+
}
60+
if (config.getVersion() != null) {
61+
context.put("version", config.getVersion());
62+
}
63+
final ExposureSerializingHandler serializer =
64+
new ExposureSerializingHandler(queue, flushInterval, timeUnit, url, headers, context);
65+
this.serializerThread = newAgentThread(LLMOBS_EVALS_PROCESSOR, serializer);
66+
}
67+
68+
@Override
69+
public void init() {
70+
this.serializerThread.start();
71+
}
72+
73+
@Override
74+
public void close() {}
75+
76+
@Override
77+
public void write(final String flag, final Context context, final Resolution<?> resolution) {
78+
final long timestamp = System.currentTimeMillis();
79+
}
80+
81+
private static class ExposureSerializingHandler implements Runnable {
82+
private static final int FLUSH_THRESHOLD = 50;
83+
84+
private final MpscBlockingConsumerArrayQueue<ExposureEvent> queue;
85+
private final long ticksRequiredToFlush;
86+
private long lastTicks;
87+
88+
private final JsonAdapter<ExposuresRequest> jsonAdapter;
89+
private final OkHttpClient httpClient;
90+
private final HttpUrl submissionUrl;
91+
private final Headers headers;
92+
93+
private final Map<String, String> context;
94+
95+
private final List<ExposureEvent> buffer = new ArrayList<>();
96+
97+
public ExposureSerializingHandler(
98+
final MpscBlockingConsumerArrayQueue<ExposureEvent> queue,
99+
final long flushInterval,
100+
final TimeUnit timeUnit,
101+
final HttpUrl submissionUrl,
102+
final Headers headers,
103+
final Map<String, String> context) {
104+
this.queue = queue;
105+
this.jsonAdapter = new Moshi.Builder().build().adapter(ExposuresRequest.class);
106+
this.httpClient = new OkHttpClient();
107+
this.submissionUrl = submissionUrl;
108+
this.headers = headers;
109+
this.context = context;
110+
111+
this.lastTicks = System.nanoTime();
112+
this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval);
113+
114+
log.debug("starting exposure serializer, url={}", submissionUrl);
115+
}
116+
117+
@Override
118+
public void run() {
119+
try {
120+
runDutyCycle();
121+
} catch (InterruptedException e) {
122+
Thread.currentThread().interrupt();
123+
}
124+
log.debug(
125+
"exposure processor worker exited. submitting explosures stopped. unsubmitted evals left: "
126+
+ !queuesAreEmpty());
127+
}
128+
129+
private void runDutyCycle() throws InterruptedException {
130+
final Thread thread = Thread.currentThread();
131+
while (!thread.isInterrupted()) {
132+
final ExposureEvent event = queue.poll(100, TimeUnit.MILLISECONDS);
133+
if (event != null) {
134+
buffer.add(event);
135+
consumeBatch();
136+
}
137+
flushIfNecessary();
138+
}
139+
}
140+
141+
private void consumeBatch() {
142+
queue.drain(buffer::add, queue.size());
143+
}
144+
145+
protected void flushIfNecessary() {
146+
if (buffer.isEmpty()) {
147+
return;
148+
}
149+
if (shouldFlush()) {
150+
final ExposuresRequest exposures = new ExposuresRequest(this.context, this.buffer);
151+
final HttpRetryPolicy.Factory retryPolicyFactory =
152+
new HttpRetryPolicy.Factory(5, 100, 2.0, true);
153+
final String reqBod = jsonAdapter.toJson(exposures);
154+
final RequestBody requestBody =
155+
RequestBody.create(okhttp3.MediaType.parse("application/json"), reqBod);
156+
final Request request =
157+
new Request.Builder().headers(headers).url(submissionUrl).post(requestBody).build();
158+
try (okhttp3.Response response =
159+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
160+
161+
if (response.isSuccessful()) {
162+
log.debug("successfully flushed exposures request with {} evals", this.buffer.size());
163+
this.buffer.clear();
164+
} else {
165+
log.error(
166+
"Could not submit exposures (HTTP code {}) {}",
167+
response.code(),
168+
response.body() != null ? response.body().string() : "");
169+
}
170+
} catch (Exception e) {
171+
log.error("Could not submit exposures", e);
172+
}
173+
}
174+
}
175+
176+
private boolean shouldFlush() {
177+
long nanoTime = System.nanoTime();
178+
long ticks = nanoTime - lastTicks;
179+
if (ticks > ticksRequiredToFlush || queue.size() >= FLUSH_THRESHOLD) {
180+
lastTicks = nanoTime;
181+
return true;
182+
}
183+
return false;
184+
}
185+
186+
protected boolean queuesAreEmpty() {
187+
return queue.isEmpty();
188+
}
189+
}
190+
}

0 commit comments

Comments
 (0)