Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.license.LicenseSettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.xpack.core.XPackSettings;
Expand All @@ -45,7 +46,9 @@
import org.elasticsearch.xpack.wildcard.Wildcard;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -133,42 +136,26 @@ public void testDatafeedWithCcsRemoteHealthy() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/84268")
public void testDatafeedWithCcsRemoteUnavailable() throws Exception {
setSkipUnavailable(randomBoolean());
String jobId = "ccs-unavailable-job";
String datafeedId = jobId;
long numDocs = randomIntBetween(32, 2048);
indexRemoteDocs(numDocs);

ContextBaseline baseline = captureContextBaseline();
setupJobAndDatafeed(jobId, datafeedId, null);
try {
NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.IsolateAllNodes(Set.of(cluster(REMOTE_CLUSTER).getNodeNames())),
NetworkDisruption.DISCONNECT
);
cluster(REMOTE_CLUSTER).setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();
// Wait until the datafeed suffers from the disruption OR processes all the documents.
// (Sometimes this test won't actually test the desired functionality, as it's possible
// that the datafeed processes all data before the disruption starts.)
assertBusy(() -> {
if (doesLocalAuditMessageExist("Datafeed is encountering errors extracting data") == false) {
JobStats jobStats = getJobStats(jobId);
assertThat(jobStats.getDataCounts().getProcessedRecordCount(), is(numDocs));
}
});
networkDisruption.removeAndEnsureHealthy(cluster(REMOTE_CLUSTER));
// Datafeed should eventually read all the docs.
// Use a 3 minute timeout because multiple suites run in parallel in CI which slows things down a lot.
// (Usually the test completes within 1 minute and much faster than that if run locally with nothing major running in parallel.)
assertBusy(() -> {
JobStats jobStats = getJobStats(jobId);
assertThat(jobStats.getState(), is(JobState.OPENED));
assertThat(jobStats.getDataCounts().getProcessedRecordCount(), is(numDocs));
}, 3, TimeUnit.MINUTES);
disruptNetworkAndWaitForRecovery(jobId, numDocs);
} finally {
client(LOCAL_CLUSTER).execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).actionGet();
client(LOCAL_CLUSTER).execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId)).actionGet();
stopDatafeedAndJob(datafeedId, jobId);
try {
waitForContextsToReturnToBaseline(baseline);
} catch (Exception e) {
// If waiting for contexts to return to baseline fails, we still need to clean up
// the skip_unavailable setting to avoid affecting subsequent tests
logger.warn(() -> "Failed to wait for contexts to return to baseline", e);
}
clearSkipUnavailable();
}
}
Expand Down Expand Up @@ -256,4 +243,112 @@ private void clearSkipUnavailable() {
.setPersistentSettings(Settings.builder().putNull("cluster.remote." + REMOTE_CLUSTER + ".skip_unavailable").build())
.get();
}

/**
* Captures baseline context counts before starting the datafeed to avoid false positives
* from contexts created by other operations or previous tests.
*/
private ContextBaseline captureContextBaseline() {
Map<String, Integer> scrollContexts = new HashMap<>();
Map<String, Integer> activeContexts = new HashMap<>();
for (String clusterAlias : List.of(LOCAL_CLUSTER, REMOTE_CLUSTER)) {
ContextCounts counts = getContextCounts(clusterAlias);
scrollContexts.put(clusterAlias, counts.scroll);
activeContexts.put(clusterAlias, counts.active);
}
return new ContextBaseline(scrollContexts, activeContexts);
}

/**
* Waits for scroll contexts to return to baseline after stopping the datafeed.
* This is especially important after network disruption when scroll contexts on the remote
* cluster may have been created but couldn't be cleared until connectivity was restored.
* The datafeed's cleanup mechanism (via ScrollDataExtractor.destroy()) should handle this
* once connectivity is restored, but it may take time for the clear scroll requests to
* complete, especially after network recovery.
*/
private void waitForContextsToReturnToBaseline(ContextBaseline baseline) throws Exception {
assertBusy(() -> {
for (String clusterAlias : List.of(LOCAL_CLUSTER, REMOTE_CLUSTER)) {
ContextCounts current = getContextCounts(clusterAlias);
int expectedScroll = baseline.scrollContexts.get(clusterAlias);
int expectedActive = baseline.activeContexts.get(clusterAlias);

assertThat(
"Scroll contexts not released on " + clusterAlias + ". Expected " + expectedScroll + " but got " + current.scroll,
current.scroll,
is(expectedScroll)
);
assertThat(
"Active contexts not released on " + clusterAlias + ". Expected " + expectedActive + " but got " + current.active,
current.active,
is(expectedActive)
);
}
}, 60, TimeUnit.SECONDS);
}

private ContextCounts getContextCounts(String clusterAlias) {
int scroll = 0;
int active = 0;
for (SearchService searchService : cluster(clusterAlias).getInstances(SearchService.class)) {
scroll += searchService.getOpenScrollContexts();
active += searchService.getActiveContexts();
}
return new ContextCounts(scroll, active);
}

private record ContextCounts(int scroll, int active) {}

private void disruptNetworkAndWaitForRecovery(String jobId, long numDocs) throws Exception {
NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.IsolateAllNodes(Set.of(cluster(REMOTE_CLUSTER).getNodeNames())),
NetworkDisruption.DISCONNECT
);
cluster(REMOTE_CLUSTER).setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();

// Wait until the datafeed suffers from the disruption OR processes all the documents.
// (Sometimes this test won't actually test the desired functionality, as it's possible
// that the datafeed processes all data before the disruption starts.)
assertBusy(() -> {
boolean hasError = doesLocalAuditMessageExist("Datafeed is encountering errors extracting data");
if (hasError) {
// Success: datafeed encountered disruption
return;
}
// No error yet - check if all documents are processed
JobStats jobStats = getJobStats(jobId);
assertThat(jobStats.getDataCounts().getProcessedRecordCount(), is(numDocs));
}, 2, TimeUnit.MINUTES);

networkDisruption.removeAndEnsureHealthy(cluster(REMOTE_CLUSTER));

// Datafeed should eventually read all the docs.
// Use a 3 minute timeout because multiple suites run in parallel in CI which slows things down a lot.
// (Usually the test completes within 1 minute and much faster than that if run locally with nothing major running in parallel.)
assertBusy(() -> {
JobStats jobStats = getJobStats(jobId);
assertThat(jobStats.getState(), is(JobState.OPENED));
assertThat(jobStats.getDataCounts().getProcessedRecordCount(), is(numDocs));
}, 3, TimeUnit.MINUTES);
}

private void stopDatafeedAndJob(String datafeedId, String jobId) {
client(LOCAL_CLUSTER).execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).actionGet();
client(LOCAL_CLUSTER).execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId)).actionGet();
}

/**
* Holds baseline context counts per cluster to compare against after test cleanup.
*/
private static class ContextBaseline {
final Map<String, Integer> scrollContexts;
final Map<String, Integer> activeContexts;

ContextBaseline(Map<String, Integer> scrollContexts, Map<String, Integer> activeContexts) {
this.scrollContexts = scrollContexts;
this.activeContexts = activeContexts;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,112 +363,117 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) {

long recordCount = 0;
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(start, end);
while (dataExtractor.hasNext()) {
if ((isIsolated || isRunning() == false) && dataExtractor.isCancelled() == false) {
dataExtractor.cancel();
}
if (isIsolated) {
return;
}

Optional<InputStream> extractedData;
try {
DataExtractor.Result result = dataExtractor.next();
extractedData = result.data();
searchInterval = result.searchInterval();
} catch (Exception e) {
LOGGER.warn(() -> "[" + jobId + "] error while extracting data", e);
// When extraction problems are encountered, we do not want to advance time.
// Instead, it is preferable to retry the given interval next time an extraction
// is triggered.

// For aggregated datafeeds it is possible for our users to use fields without doc values.
// In that case, it is really useful to display an error message explaining exactly that.
// Unfortunately, there are no great ways to identify the issue but search for 'doc values'
// deep in the exception.
if (e.toString().contains("doc values")) {
throw new ExtractionProblemException(
nextRealtimeTimestamp(),
new IllegalArgumentException(
"One or more fields do not have doc values; please enable doc values for all analysis fields for datafeeds"
+ " using aggregations"
)
);
try {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapped DataExtractor usage in a try-finally block so destroy() is always called, including on early returns due to isolation.

while (dataExtractor.hasNext()) {
if ((isIsolated || isRunning() == false) && dataExtractor.isCancelled() == false) {
dataExtractor.cancel();
}
throw new ExtractionProblemException(nextRealtimeTimestamp(), e);
}
if (isIsolated) {
return;
}
if (extractedData.isPresent()) {
DataCounts counts;
try (InputStream in = extractedData.get()) {
counts = postData(in, XContentType.JSON);
LOGGER.trace(
() -> format(
"[%s] Processed another %s records with latest timestamp [%s]",
jobId,
counts.getProcessedRecordCount(),
counts.getLatestRecordTimeStamp()
)
);
timingStatsReporter.reportDataCounts(counts);
if (isIsolated) {
return;
}

Optional<InputStream> extractedData;
try {
DataExtractor.Result result = dataExtractor.next();
extractedData = result.data();
searchInterval = result.searchInterval();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
LOGGER.warn(() -> "[" + jobId + "] error while extracting data", e);
// When extraction problems are encountered, we do not want to advance time.
// Instead, it is preferable to retry the given interval next time an extraction
// is triggered.

// For aggregated datafeeds it is possible for our users to use fields without doc values.
// In that case, it is really useful to display an error message explaining exactly that.
// Unfortunately, there are no great ways to identify the issue but search for 'doc values'
// deep in the exception.
if (e.toString().contains("doc values")) {
throw new ExtractionProblemException(
nextRealtimeTimestamp(),
new IllegalArgumentException(
"One or more fields do not have doc values; please enable doc values for all analysis fields for datafeeds"
+ " using aggregations"
)
);
}
if (isIsolated) {
return;
}
LOGGER.error(() -> "[" + jobId + "] error while posting data", e);

// a conflict exception means the job state is not open any more.
// we should therefore stop the datafeed.
boolean shouldStop = isConflictException(e);

// When an analysis problem occurs, it means something catastrophic has
// happened to the c++ process. We sent a batch of data to the c++ process
// yet we do not know how many of those were processed. It is better to
// advance time in order to avoid importing duplicate data.
error = new AnalysisProblemException(nextRealtimeTimestamp(), shouldStop, e);
break;
throw new ExtractionProblemException(nextRealtimeTimestamp(), e);
}
recordCount += counts.getProcessedRecordCount();
haveEverSeenData |= (recordCount > 0);
if (counts.getLatestRecordTimeStamp() != null) {
lastEndTimeMs = counts.getLatestRecordTimeStamp().getTime();
if (isIsolated) {
return;
}
if (extractedData.isPresent()) {
DataCounts counts;
try (InputStream in = extractedData.get()) {
counts = postData(in, XContentType.JSON);
LOGGER.trace(
() -> format(
"[%s] Processed another %s records with latest timestamp [%s]",
jobId,
counts.getProcessedRecordCount(),
counts.getLatestRecordTimeStamp()
)
);
timingStatsReporter.reportDataCounts(counts);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (isIsolated) {
return;
}
LOGGER.error(() -> "[" + jobId + "] error while posting data", e);

// a conflict exception means the job state is not open any more.
// we should therefore stop the datafeed.
boolean shouldStop = isConflictException(e);

// When an analysis problem occurs, it means something catastrophic has
// happened to the c++ process. We sent a batch of data to the c++ process
// yet we do not know how many of those were processed. It is better to
// advance time in order to avoid importing duplicate data.
error = new AnalysisProblemException(nextRealtimeTimestamp(), shouldStop, e);
break;
}
recordCount += counts.getProcessedRecordCount();
haveEverSeenData |= (recordCount > 0);
if (counts.getLatestRecordTimeStamp() != null) {
lastEndTimeMs = counts.getLatestRecordTimeStamp().getTime();
}
}
}
}

lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, dataExtractor.getEndTime() - 1);
LOGGER.debug(
"[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]",
jobId,
error,
recordCount,
lastEndTimeMs,
isRunning(),
dataExtractor.isCancelled()
);

// We can now throw any stored error as we have updated time.
if (error != null) {
throw error;
}
lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, dataExtractor.getEndTime() - 1);
LOGGER.debug(
"[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]",
jobId,
error,
recordCount,
lastEndTimeMs,
isRunning(),
dataExtractor.isCancelled()
);

// We can now throw any stored error as we have updated time.
if (error != null) {
throw error;
}

// If the datafeed was stopped, then it is possible that by the time
// we call flush the job is closed. Thus, we don't flush unless the
// datafeed is still running.
if (isRunning() && isIsolated == false) {
Instant lastFinalizedBucketEnd = flushJob(flushRequest).getLastFinalizedBucketEnd();
if (lastFinalizedBucketEnd != null) {
this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.toEpochMilli();
// If the datafeed was stopped, then it is possible that by the time
// we call flush the job is closed. Thus, we don't flush unless the
// datafeed is still running.
if (isRunning() && isIsolated == false) {
Instant lastFinalizedBucketEnd = flushJob(flushRequest).getLastFinalizedBucketEnd();
if (lastFinalizedBucketEnd != null) {
this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.toEpochMilli();
}
}
}

if (recordCount == 0) {
throw new EmptyDataCountException(nextRealtimeTimestamp(), haveEverSeenData);
if (recordCount == 0) {
throw new EmptyDataCountException(nextRealtimeTimestamp(), haveEverSeenData);
}
} finally {
// Ensure the extractor is always destroyed to clean up scroll contexts
dataExtractor.destroy();
Comment on lines +474 to +476
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the block I actually added

}
}

Expand Down
Loading