Skip to content

Commit 5743cd5

Browse files
authored
Add SparkPlanInfo constructor compatible with Databricks' Spark fork (#9888)
* Use SparkPlanInfo constructor that is compatible with Databricks' Spark fork * Centralize logic in a common AbstractSparkPlanUtils class
1 parent 19d774d commit 5743cd5

File tree

6 files changed

+174
-58
lines changed

6 files changed

+174
-58
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import datadog.trace.agent.tooling.InstrumenterModule;
88
import datadog.trace.api.Config;
99
import de.thetaphi.forbiddenapis.SuppressForbidden;
10-
import java.lang.reflect.Constructor;
1110
import net.bytebuddy.asm.Advice;
1211
import org.apache.spark.SparkContext;
1312
import org.apache.spark.sql.execution.SparkPlan;
@@ -25,6 +24,7 @@ public String[] helperClassNames() {
2524
return new String[] {
2625
packageName + ".AbstractDatadogSparkListener",
2726
packageName + ".AbstractSparkPlanSerializer",
27+
packageName + ".AbstractSparkPlanUtils",
2828
packageName + ".DatabricksParentContext",
2929
packageName + ".OpenlineageParentContext",
3030
packageName + ".DatadogSpark212Listener",
@@ -35,7 +35,8 @@ public String[] helperClassNames() {
3535
packageName + ".SparkSQLUtils",
3636
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
3737
packageName + ".SparkSQLUtils$AccumulatorWithStage",
38-
packageName + ".Spark212PlanSerializer"
38+
packageName + ".Spark212PlanSerializer",
39+
packageName + ".Spark212PlanUtils"
3940
};
4041
}
4142

@@ -104,29 +105,15 @@ public static void exit(
104105
if (planInfo.metadata().size() == 0
105106
&& (Config.get().isDataJobsParseSparkPlanEnabled()
106107
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
107-
Spark212PlanSerializer planUtils = new Spark212PlanSerializer();
108+
Spark212PlanSerializer planSerializer = new Spark212PlanSerializer();
108109
Map<String, String> meta =
109-
JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan))
110+
JavaConverters.mapAsScalaMap(planSerializer.extractFormattedProduct(plan))
110111
.toMap(Predef.$conforms());
111-
try {
112-
Constructor<?> targetCtor = null;
113-
for (Constructor<?> c : SparkPlanInfo.class.getConstructors()) {
114-
if (c.getParameterCount() == 5) {
115-
targetCtor = c;
116-
break;
117-
}
118-
}
119-
if (targetCtor != null) {
120-
Object newInst =
121-
targetCtor.newInstance(
122-
planInfo.nodeName(),
123-
planInfo.simpleString(),
124-
planInfo.children(),
125-
meta,
126-
planInfo.metrics());
127-
planInfo = (SparkPlanInfo) newInst;
128-
}
129-
} catch (Throwable ignored) {
112+
113+
SparkPlanInfo newPlanInfo =
114+
new Spark212PlanUtils().upsertSparkPlanInfoMetadata(planInfo, meta);
115+
if (newPlanInfo != null) {
116+
planInfo = newPlanInfo;
130117
}
131118
}
132119
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import java.lang.invoke.MethodHandle;
4+
import org.apache.spark.sql.execution.SparkPlanInfo;
5+
import scala.Option;
6+
import scala.collection.immutable.Map;
7+
8+
public class Spark212PlanUtils extends AbstractSparkPlanUtils {
9+
private static final MethodHandle constructor =
10+
methodLoader.constructor(
11+
SparkPlanInfo.class,
12+
String.class,
13+
String.class,
14+
scala.collection.Seq.class,
15+
scala.collection.immutable.Map.class,
16+
scala.collection.Seq.class);
17+
private static final MethodHandle databricksConstructor =
18+
methodLoader.constructor(
19+
SparkPlanInfo.class,
20+
String.class,
21+
String.class,
22+
scala.collection.Seq.class,
23+
scala.collection.immutable.Map.class,
24+
scala.collection.Seq.class,
25+
Option.class,
26+
String.class,
27+
Option.class);
28+
29+
@Override
30+
protected MethodHandle getConstructor() {
31+
return constructor;
32+
}
33+
34+
@Override
35+
protected MethodHandle getDatabricksConstructor() {
36+
return databricksConstructor;
37+
}
38+
39+
@Override
40+
protected Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta) {
41+
return new Object[] {
42+
planInfo.nodeName(), planInfo.simpleString(), planInfo.children(), meta, planInfo.metrics()
43+
};
44+
}
45+
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import datadog.trace.agent.tooling.InstrumenterModule;
88
import datadog.trace.api.Config;
99
import de.thetaphi.forbiddenapis.SuppressForbidden;
10-
import java.lang.reflect.Constructor;
1110
import net.bytebuddy.asm.Advice;
1211
import org.apache.spark.SparkContext;
1312
import org.apache.spark.sql.execution.SparkPlan;
@@ -25,6 +24,7 @@ public String[] helperClassNames() {
2524
return new String[] {
2625
packageName + ".AbstractDatadogSparkListener",
2726
packageName + ".AbstractSparkPlanSerializer",
27+
packageName + ".AbstractSparkPlanUtils",
2828
packageName + ".DatabricksParentContext",
2929
packageName + ".OpenlineageParentContext",
3030
packageName + ".DatadogSpark213Listener",
@@ -35,7 +35,8 @@ public String[] helperClassNames() {
3535
packageName + ".SparkSQLUtils",
3636
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
3737
packageName + ".SparkSQLUtils$AccumulatorWithStage",
38-
packageName + ".Spark213PlanSerializer"
38+
packageName + ".Spark213PlanSerializer",
39+
packageName + ".Spark213PlanUtils"
3940
};
4041
}
4142

@@ -105,28 +106,14 @@ public static void exit(
105106
if (planInfo.metadata().size() == 0
106107
&& (Config.get().isDataJobsParseSparkPlanEnabled()
107108
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
108-
Spark213PlanSerializer planUtils = new Spark213PlanSerializer();
109+
Spark213PlanSerializer planSerializer = new Spark213PlanSerializer();
109110
Map<String, String> meta =
110-
HashMap.from(JavaConverters.asScala(planUtils.extractFormattedProduct(plan)));
111-
try {
112-
Constructor<?> targetCtor = null;
113-
for (Constructor<?> c : SparkPlanInfo.class.getConstructors()) {
114-
if (c.getParameterCount() == 5) {
115-
targetCtor = c;
116-
break;
117-
}
118-
}
119-
if (targetCtor != null) {
120-
Object newInst =
121-
targetCtor.newInstance(
122-
planInfo.nodeName(),
123-
planInfo.simpleString(),
124-
planInfo.children(),
125-
meta,
126-
planInfo.metrics());
127-
planInfo = (SparkPlanInfo) newInst;
128-
}
129-
} catch (Throwable ignored) {
111+
HashMap.from(JavaConverters.asScala(planSerializer.extractFormattedProduct(plan)));
112+
113+
SparkPlanInfo newPlanInfo =
114+
new Spark213PlanUtils().upsertSparkPlanInfoMetadata(planInfo, meta);
115+
if (newPlanInfo != null) {
116+
planInfo = newPlanInfo;
130117
}
131118
}
132119
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import java.lang.invoke.MethodHandle;
4+
import org.apache.spark.sql.execution.SparkPlanInfo;
5+
import scala.Option;
6+
import scala.collection.immutable.Map;
7+
8+
public class Spark213PlanUtils extends AbstractSparkPlanUtils {
9+
private static final MethodHandle constructor =
10+
methodLoader.constructor(
11+
SparkPlanInfo.class,
12+
String.class,
13+
String.class,
14+
scala.collection.immutable.Seq.class,
15+
scala.collection.immutable.Map.class,
16+
scala.collection.immutable.Seq.class);
17+
private static final MethodHandle databricksConstructor =
18+
methodLoader.constructor(
19+
SparkPlanInfo.class,
20+
String.class,
21+
String.class,
22+
scala.collection.immutable.Seq.class,
23+
scala.collection.immutable.Map.class,
24+
scala.collection.immutable.Seq.class,
25+
Option.class,
26+
String.class,
27+
Option.class);
28+
29+
@Override
30+
protected MethodHandle getConstructor() {
31+
return constructor;
32+
}
33+
34+
@Override
35+
protected MethodHandle getDatabricksConstructor() {
36+
return databricksConstructor;
37+
}
38+
39+
@Override
40+
protected Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta) {
41+
return new Object[] {
42+
planInfo.nodeName(), planInfo.simpleString(), planInfo.children(), meta, planInfo.metrics()
43+
};
44+
}
45+
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanSerializer.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public abstract class AbstractSparkPlanSerializer {
6060

6161
private final MethodHandles methodLoader = new MethodHandles(ClassLoader.getSystemClassLoader());
6262
private final MethodHandle getSimpleString =
63-
methodLoader.method(TreeNode.class, "simpleString", new Class[] {int.class});
63+
methodLoader.method(TreeNode.class, "simpleString", int.class);
6464
private final MethodHandle getSimpleStringLegacy =
6565
methodLoader.method(TreeNode.class, "simpleString");
6666

@@ -156,25 +156,20 @@ protected Object safeParseObjectToJson(Object value, int depth) {
156156
}
157157

158158
private String getSimpleString(TreeNode value) {
159-
Object simpleString = null;
160-
161159
if (getSimpleString != null) {
162-
try {
163-
simpleString = getSimpleString.invoke(value, MAX_LENGTH);
164-
} catch (Throwable e) {
160+
String simpleString = methodLoader.invoke(getSimpleString, value, MAX_LENGTH);
161+
if (simpleString != null) {
162+
return simpleString;
165163
}
166164
}
167165

168166
if (getSimpleStringLegacy != null) {
169-
try {
170-
simpleString = getSimpleStringLegacy.invoke(value);
171-
} catch (Throwable e) {
167+
String simpleString = methodLoader.invoke(getSimpleStringLegacy, value);
168+
if (simpleString != null) {
169+
return simpleString;
172170
}
173171
}
174172

175-
if (simpleString != null && simpleString instanceof String) {
176-
return (String) simpleString;
177-
}
178173
return null;
179174
}
180175

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import datadog.trace.util.MethodHandles;
4+
import java.lang.invoke.MethodHandle;
5+
import java.util.ArrayList;
6+
import java.util.Arrays;
7+
import java.util.List;
8+
import org.apache.spark.sql.execution.SparkPlanInfo;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
import scala.collection.immutable.Map;
12+
13+
abstract class AbstractSparkPlanUtils {
14+
private static final Logger log = LoggerFactory.getLogger(AbstractSparkPlanUtils.class);
15+
16+
protected static final MethodHandles methodLoader =
17+
new MethodHandles(ClassLoader.getSystemClassLoader());
18+
19+
protected abstract MethodHandle getConstructor();
20+
21+
protected abstract MethodHandle getDatabricksConstructor();
22+
23+
// Deals with Seq which changed from Scala 2.12 to 2.13, so delegate to version-specific classes
24+
protected abstract Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta);
25+
26+
// Attempt to create a new SparkPlanInfo with additional metadata replaced
27+
// Since the fields are immutable we must instantiate a new SparkPlanInfo to do this
28+
public SparkPlanInfo upsertSparkPlanInfoMetadata(
29+
SparkPlanInfo planInfo, scala.collection.immutable.Map<String, String> meta) {
30+
if (getDatabricksConstructor() != null) {
31+
List<Object> databricksArgs = new ArrayList<>(Arrays.asList(getStandardArgs(planInfo, meta)));
32+
try {
33+
databricksArgs.add(SparkPlanInfo.class.getMethod("estRowCount").invoke(planInfo));
34+
databricksArgs.add(SparkPlanInfo.class.getMethod("rddScopeId").invoke(planInfo));
35+
databricksArgs.add(SparkPlanInfo.class.getMethod("explainId").invoke(planInfo));
36+
} catch (Throwable t) {
37+
log.warn("Error obtaining Databricks-specific SparkPlanInfo args", t);
38+
}
39+
40+
SparkPlanInfo newPlan =
41+
methodLoader.invoke(getDatabricksConstructor(), databricksArgs.toArray());
42+
if (newPlan != null) {
43+
return newPlan;
44+
}
45+
}
46+
47+
if (getConstructor() != null) {
48+
SparkPlanInfo newPlan =
49+
methodLoader.invoke(getConstructor(), getStandardArgs(planInfo, meta));
50+
if (newPlan != null) {
51+
return newPlan;
52+
}
53+
}
54+
55+
return null;
56+
}
57+
}

0 commit comments

Comments
 (0)