Skip to content

Commit 6b65726

Browse files
EQL: enable CPS (#137833)
Event queries only for now, since PIT is not supported yet
1 parent de424e6 commit 6b65726

File tree

13 files changed

+171
-17
lines changed

13 files changed

+171
-17
lines changed

docs/changelog/137833.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 137833
2+
summary: Enable CPS
3+
area: EQL
4+
type: enhancement
5+
issues: []

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.ActionRequestValidationException;
1111
import org.elasticsearch.action.IndicesRequest;
1212
import org.elasticsearch.action.LegacyActionRequest;
13+
import org.elasticsearch.action.ResolvedIndexExpressions;
1314
import org.elasticsearch.action.support.IndicesOptions;
1415
import org.elasticsearch.common.Strings;
1516
import org.elasticsearch.common.io.stream.StreamInput;
@@ -65,6 +66,8 @@ public class EqlSearchRequest extends LegacyActionRequest implements IndicesRequ
6566
private int maxSamplesPerKey = RequestDefaults.MAX_SAMPLES_PER_KEY;
6667
private Boolean allowPartialSearchResults;
6768
private Boolean allowPartialSequenceResults;
69+
private String projectRouting;
70+
private ResolvedIndexExpressions resolvedIndexExpressions;
6871

6972
// Async settings
7073
private TimeValue waitForCompletionTimeout = null;
@@ -142,6 +145,16 @@ public EqlSearchRequest(StreamInput in) throws IOException {
142145
}
143146
}
144147

148+
@Override
149+
public String getProjectRouting() {
150+
return projectRouting;
151+
}
152+
153+
public EqlSearchRequest projectRouting(String projectRouting) {
154+
this.projectRouting = projectRouting;
155+
return this;
156+
}
157+
145158
@Override
146159
public ActionRequestValidationException validate() {
147160
ActionRequestValidationException validationException = null;
@@ -297,6 +310,21 @@ public EqlSearchRequest indices(String... indices) {
297310
return this;
298311
}
299312

313+
@Override
314+
public boolean allowsCrossProject() {
315+
return true;
316+
}
317+
318+
@Override
319+
public void setResolvedIndexExpressions(ResolvedIndexExpressions expressions) {
320+
this.resolvedIndexExpressions = expressions;
321+
}
322+
323+
@Override
324+
public ResolvedIndexExpressions getResolvedIndexExpressions() {
325+
return resolvedIndexExpressions;
326+
}
327+
300328
public QueryBuilder filter() {
301329
return this.filter;
302330
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.index.query.IdsQueryBuilder;
1919
import org.elasticsearch.search.SearchHit;
2020
import org.elasticsearch.search.builder.SearchSourceBuilder;
21+
import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator;
2122
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
2223
import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
2324
import org.elasticsearch.tasks.TaskCancelledException;
@@ -61,7 +62,6 @@ public void query(QueryRequest request, ActionListener<SearchResponse> listener)
6162
SearchSourceBuilder searchSource = request.searchSource();
6263
// set query timeout
6364
searchSource.timeout(cfg.requestTimeout());
64-
6565
SearchRequest search = prepareRequest(searchSource, false, allowPartialSearchResults, indices);
6666
search(search, allowPartialSearchResults, searchLogListener(listener, log, allowPartialSearchResults));
6767
}
@@ -76,6 +76,9 @@ protected void search(SearchRequest search, boolean allowPartialSearchResults, A
7676
log.trace("About to execute query {} on {}", StringUtils.toString(search.source()), indices);
7777
}
7878

79+
if (cfg.crossProjectEnabled()) {
80+
search.indicesOptions(CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout(search.indicesOptions()));
81+
}
7982
client.search(search, listener);
8083
}
8184

@@ -92,7 +95,9 @@ protected void search(MultiSearchRequest search, boolean allowPartialSearchResul
9295
}
9396
log.trace("About to execute multi-queries {} on {}", sj, indices);
9497
}
95-
98+
if (cfg.crossProjectEnabled()) {
99+
search.indicesOptions(CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout(search.indicesOptions()));
100+
}
96101
client.multiSearch(search, multiSearchLogListener(listener, allowPartialSearchResults, log));
97102
}
98103

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClient.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.search.SearchResponse;
1818
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
1919
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
20+
import org.elasticsearch.action.support.IndicesOptions;
2021
import org.elasticsearch.common.Strings;
2122
import org.elasticsearch.common.bytes.BytesReference;
2223
import org.elasticsearch.common.util.CollectionUtils;
@@ -25,6 +26,7 @@
2526
import org.elasticsearch.index.query.QueryBuilder;
2627
import org.elasticsearch.search.builder.PointInTimeBuilder;
2728
import org.elasticsearch.search.builder.SearchSourceBuilder;
29+
import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator;
2830
import org.elasticsearch.xpack.eql.session.EqlSession;
2931
import org.elasticsearch.xpack.ql.index.IndexResolver;
3032

@@ -136,10 +138,15 @@ private <Response> ActionListener<Response> pitListener(
136138
}
137139

138140
private <Response> void openPIT(ActionListener<Response> listener, Runnable runnable, boolean allowPartialSearchResults) {
139-
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).indicesOptions(IndexResolver.FIELD_CAPS_INDICES_OPTIONS)
141+
IndicesOptions indicesOptions = IndexResolver.FIELD_CAPS_INDICES_OPTIONS;
142+
if (cfg.crossProjectEnabled()) {
143+
indicesOptions = CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout(indicesOptions);
144+
}
145+
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).indicesOptions(indicesOptions)
140146
.keepAlive(keepAlive)
141147
.allowPartialSearchResults(allowPartialSearchResults);
142148
request.indexFilter(filter);
149+
143150
client.execute(TransportOpenPointInTimeAction.TYPE, request, listener.delegateFailureAndWrap((l, r) -> {
144151
pitId = r.getPointInTimeId();
145152
runnable.run();

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlSearchAction.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,21 @@ public List<Route> routes() {
5353

5454
@Override
5555
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
56-
if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) {
57-
// accept but drop project_routing param until fully supported
58-
request.param("project_routing");
59-
}
60-
56+
final boolean crossProjectEnabled = settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false);
6157
EqlSearchRequest eqlRequest;
6258
String indices;
6359
try (XContentParser parser = request.contentOrSourceParamParser()) {
6460
eqlRequest = EqlSearchRequest.fromXContent(parser);
6561
indices = request.param("index");
6662
eqlRequest.indices(Strings.splitStringByCommaToArray(indices));
67-
eqlRequest.indicesOptions(IndicesOptions.fromRequest(request, eqlRequest.indicesOptions()));
63+
IndicesOptions indicesOptions = IndicesOptions.fromRequest(request, eqlRequest.indicesOptions());
64+
if (crossProjectEnabled) {
65+
indicesOptions = IndicesOptions.builder(indicesOptions)
66+
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
67+
.build();
68+
eqlRequest.projectRouting(request.param("project_routing"));
69+
}
70+
eqlRequest.indicesOptions(indicesOptions);
6871
if (request.hasParam("wait_for_completion_timeout")) {
6972
eqlRequest.waitForCompletionTimeout(
7073
request.paramAsTime("wait_for_completion_timeout", eqlRequest.waitForCompletionTimeout())

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public static void operation(
193193
request.indicesOptions()
194194
);
195195
Set<String> clusterAliases = remoteClusterRegistry.clusterAliases(request.indices(), false);
196-
if (canMinimizeRountrips(request, clusterAliases)) {
196+
if (canMinimizeRountrips(request, clusterAliases, transportService.getRemoteClusterService().crossProjectEnabled())) {
197197
String clusterAlias = clusterAliases.iterator().next();
198198
String[] remoteIndices = new String[request.indices().length];
199199
for (int i = 0; i < request.indices().length; i++) {
@@ -239,9 +239,12 @@ public static void operation(
239239
request.allowPartialSequenceResults() == null
240240
? defaultAllowPartialSequenceResults(clusterService)
241241
: request.allowPartialSequenceResults(),
242+
request.getProjectRouting(),
242243
clientId,
243244
new TaskId(nodeId, task.getId()),
244-
task
245+
task,
246+
transportService.getRemoteClusterService().crossProjectEnabled(),
247+
request.getResolvedIndexExpressions()
245248
);
246249
planExecutor.eql(
247250
cfg,
@@ -288,7 +291,10 @@ private static boolean requestIsAsync(EqlSearchRequest request) {
288291
}
289292

290293
// can the request be proxied to the remote cluster?
291-
private static boolean canMinimizeRountrips(EqlSearchRequest request, Set<String> clusterAliases) {
294+
private static boolean canMinimizeRountrips(EqlSearchRequest request, Set<String> clusterAliases, boolean crossProjectEnabled) {
295+
if (crossProjectEnabled) {
296+
return false;
297+
}
292298
// Has minimizing the round trips been (explicitly) disabled?
293299
if (request.ccsMinimizeRoundtrips() == false) {
294300
return false;

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.eql.session;
99

10+
import org.elasticsearch.action.ResolvedIndexExpressions;
1011
import org.elasticsearch.action.support.IndicesOptions;
1112
import org.elasticsearch.common.Strings;
1213
import org.elasticsearch.core.Nullable;
@@ -32,6 +33,9 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu
3233
private final int maxSamplesPerKey;
3334
private final boolean allowPartialSearchResults;
3435
private final boolean allowPartialSequenceResults;
36+
private final String projectRouting;
37+
private final boolean crossProjectEnabled;
38+
private final ResolvedIndexExpressions resolvedIndexExpressions;
3539

3640
@Nullable
3741
private final QueryBuilder filter;
@@ -54,9 +58,54 @@ public EqlConfiguration(
5458
int maxSamplesPerKey,
5559
boolean allowPartialSearchResults,
5660
boolean allowPartialSequenceResults,
61+
String projectRouting,
5762
String clientId,
5863
TaskId taskId,
5964
EqlSearchTask task
65+
) {
66+
this(
67+
indices,
68+
zi,
69+
username,
70+
clusterName,
71+
filter,
72+
runtimeMappings,
73+
fetchFields,
74+
requestTimeout,
75+
indicesOptions,
76+
fetchSize,
77+
maxSamplesPerKey,
78+
allowPartialSearchResults,
79+
allowPartialSequenceResults,
80+
projectRouting,
81+
clientId,
82+
taskId,
83+
task,
84+
false,
85+
null
86+
);
87+
}
88+
89+
public EqlConfiguration(
90+
String[] indices,
91+
ZoneId zi,
92+
String username,
93+
String clusterName,
94+
QueryBuilder filter,
95+
Map<String, Object> runtimeMappings,
96+
List<FieldAndFormat> fetchFields,
97+
TimeValue requestTimeout,
98+
IndicesOptions indicesOptions,
99+
int fetchSize,
100+
int maxSamplesPerKey,
101+
boolean allowPartialSearchResults,
102+
boolean allowPartialSequenceResults,
103+
String projectRouting,
104+
String clientId,
105+
TaskId taskId,
106+
EqlSearchTask task,
107+
boolean crossProjectEnabled,
108+
ResolvedIndexExpressions resolvedIndexExpressions
60109
) {
61110
super(zi, username, clusterName);
62111

@@ -73,6 +122,21 @@ public EqlConfiguration(
73122
this.maxSamplesPerKey = maxSamplesPerKey;
74123
this.allowPartialSearchResults = allowPartialSearchResults;
75124
this.allowPartialSequenceResults = allowPartialSequenceResults;
125+
this.projectRouting = projectRouting;
126+
this.crossProjectEnabled = crossProjectEnabled;
127+
this.resolvedIndexExpressions = resolvedIndexExpressions;
128+
}
129+
130+
public boolean crossProjectEnabled() {
131+
return crossProjectEnabled;
132+
}
133+
134+
public ResolvedIndexExpressions resolvedIndexExpressions() {
135+
return resolvedIndexExpressions;
136+
}
137+
138+
public String projectRouting() {
139+
return projectRouting;
76140
}
77141

78142
public String[] indices() {
@@ -130,4 +194,5 @@ public boolean isCancelled() {
130194
public TaskId getTaskId() {
131195
return taskId;
132196
}
197+
133198
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ private <T> void preAnalyze(LogicalPlan parsed, ActionListener<LogicalPlan> list
127127
fieldNames,
128128
configuration.indicesOptions(),
129129
configuration.runtimeMappings(),
130+
configuration.crossProjectEnabled(),
131+
configuration.projectRouting(),
132+
configuration.resolvedIndexExpressions(),
130133
map(listener, r -> preAnalyzer.preAnalyze(parsed, r))
131134
);
132135
}

x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ private EqlTestUtils() {}
5353
1,
5454
false,
5555
true,
56+
null,
5657
"",
5758
new TaskId("test", 123),
5859
null
@@ -73,6 +74,7 @@ public static EqlConfiguration randomConfiguration() {
7374
randomIntBetween(1, 1000),
7475
randomBoolean(),
7576
randomBoolean(),
77+
null,
7678
randomAlphaOfLength(16),
7779
new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()),
7880
randomTask()

x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClientTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public void testQueryFilterUsedInPitAndSearches() {
104104
1,
105105
randomBoolean(),
106106
randomBoolean(),
107+
null,
107108
"",
108109
new TaskId("test", 123),
109110
new EqlSearchTask(

0 commit comments

Comments
 (0)