diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCcsIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCcsIT.java index e437c91c8e50e..846501b0a62f6 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCcsIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCcsIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.license.LicenseSettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.search.SearchService; import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.disruption.NetworkDisruption; import org.elasticsearch.xpack.core.XPackSettings; @@ -45,7 +46,9 @@ import org.elasticsearch.xpack.wildcard.Wildcard; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -133,42 +136,26 @@ public void testDatafeedWithCcsRemoteHealthy() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/84268") public void testDatafeedWithCcsRemoteUnavailable() throws Exception { setSkipUnavailable(randomBoolean()); String jobId = "ccs-unavailable-job"; String datafeedId = jobId; long numDocs = randomIntBetween(32, 2048); indexRemoteDocs(numDocs); + + ContextBaseline baseline = captureContextBaseline(); setupJobAndDatafeed(jobId, datafeedId, null); try { - NetworkDisruption networkDisruption = new NetworkDisruption( - new NetworkDisruption.IsolateAllNodes(Set.of(cluster(REMOTE_CLUSTER).getNodeNames())), - NetworkDisruption.DISCONNECT - ); - cluster(REMOTE_CLUSTER).setDisruptionScheme(networkDisruption); - networkDisruption.startDisrupting(); - // Wait until the datafeed suffers from the disruption OR processes all the documents. - // (Sometimes this test won't actually test the desired functionality, as it's possible - // that the datafeed processes all data before the disruption starts.) - assertBusy(() -> { - if (doesLocalAuditMessageExist("Datafeed is encountering errors extracting data") == false) { - JobStats jobStats = getJobStats(jobId); - assertThat(jobStats.getDataCounts().getProcessedRecordCount(), is(numDocs)); - } - }); - networkDisruption.removeAndEnsureHealthy(cluster(REMOTE_CLUSTER)); - // Datafeed should eventually read all the docs. - // Use a 3 minute timeout because multiple suites run in parallel in CI which slows things down a lot. - // (Usually the test completes within 1 minute and much faster than that if run locally with nothing major running in parallel.) - assertBusy(() -> { - JobStats jobStats = getJobStats(jobId); - assertThat(jobStats.getState(), is(JobState.OPENED)); - assertThat(jobStats.getDataCounts().getProcessedRecordCount(), is(numDocs)); - }, 3, TimeUnit.MINUTES); + disruptNetworkAndWaitForRecovery(jobId, numDocs); } finally { - client(LOCAL_CLUSTER).execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).actionGet(); - client(LOCAL_CLUSTER).execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId)).actionGet(); + stopDatafeedAndJob(datafeedId, jobId); + try { + waitForContextsToReturnToBaseline(baseline); + } catch (Exception e) { + // If waiting for contexts to return to baseline fails, we still need to clean up + // the skip_unavailable setting to avoid affecting subsequent tests + logger.warn(() -> "Failed to wait for contexts to return to baseline", e); + } clearSkipUnavailable(); } } @@ -256,4 +243,112 @@ private void clearSkipUnavailable() { .setPersistentSettings(Settings.builder().putNull("cluster.remote." + REMOTE_CLUSTER + ".skip_unavailable").build()) .get(); } + + /** + * Captures baseline context counts before starting the datafeed to avoid false positives + * from contexts created by other operations or previous tests. + */ + private ContextBaseline captureContextBaseline() { + Map scrollContexts = new HashMap<>(); + Map activeContexts = new HashMap<>(); + for (String clusterAlias : List.of(LOCAL_CLUSTER, REMOTE_CLUSTER)) { + ContextCounts counts = getContextCounts(clusterAlias); + scrollContexts.put(clusterAlias, counts.scroll); + activeContexts.put(clusterAlias, counts.active); + } + return new ContextBaseline(scrollContexts, activeContexts); + } + + /** + * Waits for scroll contexts to return to baseline after stopping the datafeed. + * This is especially important after network disruption when scroll contexts on the remote + * cluster may have been created but couldn't be cleared until connectivity was restored. + * The datafeed's cleanup mechanism (via ScrollDataExtractor.destroy()) should handle this + * once connectivity is restored, but it may take time for the clear scroll requests to + * complete, especially after network recovery. + */ + private void waitForContextsToReturnToBaseline(ContextBaseline baseline) throws Exception { + assertBusy(() -> { + for (String clusterAlias : List.of(LOCAL_CLUSTER, REMOTE_CLUSTER)) { + ContextCounts current = getContextCounts(clusterAlias); + int expectedScroll = baseline.scrollContexts.get(clusterAlias); + int expectedActive = baseline.activeContexts.get(clusterAlias); + + assertThat( + "Scroll contexts not released on " + clusterAlias + ". Expected " + expectedScroll + " but got " + current.scroll, + current.scroll, + is(expectedScroll) + ); + assertThat( + "Active contexts not released on " + clusterAlias + ". Expected " + expectedActive + " but got " + current.active, + current.active, + is(expectedActive) + ); + } + }, 60, TimeUnit.SECONDS); + } + + private ContextCounts getContextCounts(String clusterAlias) { + int scroll = 0; + int active = 0; + for (SearchService searchService : cluster(clusterAlias).getInstances(SearchService.class)) { + scroll += searchService.getOpenScrollContexts(); + active += searchService.getActiveContexts(); + } + return new ContextCounts(scroll, active); + } + + private record ContextCounts(int scroll, int active) {} + + private void disruptNetworkAndWaitForRecovery(String jobId, long numDocs) throws Exception { + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.IsolateAllNodes(Set.of(cluster(REMOTE_CLUSTER).getNodeNames())), + NetworkDisruption.DISCONNECT + ); + cluster(REMOTE_CLUSTER).setDisruptionScheme(networkDisruption); + networkDisruption.startDisrupting(); + + // Wait until the datafeed suffers from the disruption OR processes all the documents. + // (Sometimes this test won't actually test the desired functionality, as it's possible + // that the datafeed processes all data before the disruption starts.) + assertBusy(() -> { + boolean hasError = doesLocalAuditMessageExist("Datafeed is encountering errors extracting data"); + if (hasError) { + // Success: datafeed encountered disruption + return; + } + // No error yet - check if all documents are processed + JobStats jobStats = getJobStats(jobId); + assertThat(jobStats.getDataCounts().getProcessedRecordCount(), is(numDocs)); + }, 2, TimeUnit.MINUTES); + + networkDisruption.removeAndEnsureHealthy(cluster(REMOTE_CLUSTER)); + + // Datafeed should eventually read all the docs. + // Use a 3 minute timeout because multiple suites run in parallel in CI which slows things down a lot. + // (Usually the test completes within 1 minute and much faster than that if run locally with nothing major running in parallel.) + assertBusy(() -> { + JobStats jobStats = getJobStats(jobId); + assertThat(jobStats.getState(), is(JobState.OPENED)); + assertThat(jobStats.getDataCounts().getProcessedRecordCount(), is(numDocs)); + }, 3, TimeUnit.MINUTES); + } + + private void stopDatafeedAndJob(String datafeedId, String jobId) { + client(LOCAL_CLUSTER).execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).actionGet(); + client(LOCAL_CLUSTER).execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId)).actionGet(); + } + + /** + * Holds baseline context counts per cluster to compare against after test cleanup. + */ + private static class ContextBaseline { + final Map scrollContexts; + final Map activeContexts; + + ContextBaseline(Map scrollContexts, Map activeContexts) { + this.scrollContexts = scrollContexts; + this.activeContexts = activeContexts; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 4582d1a49392e..6fcd7235eb199 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -363,112 +363,117 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) { long recordCount = 0; DataExtractor dataExtractor = dataExtractorFactory.newExtractor(start, end); - while (dataExtractor.hasNext()) { - if ((isIsolated || isRunning() == false) && dataExtractor.isCancelled() == false) { - dataExtractor.cancel(); - } - if (isIsolated) { - return; - } - - Optional extractedData; - try { - DataExtractor.Result result = dataExtractor.next(); - extractedData = result.data(); - searchInterval = result.searchInterval(); - } catch (Exception e) { - LOGGER.warn(() -> "[" + jobId + "] error while extracting data", e); - // When extraction problems are encountered, we do not want to advance time. - // Instead, it is preferable to retry the given interval next time an extraction - // is triggered. - - // For aggregated datafeeds it is possible for our users to use fields without doc values. - // In that case, it is really useful to display an error message explaining exactly that. - // Unfortunately, there are no great ways to identify the issue but search for 'doc values' - // deep in the exception. - if (e.toString().contains("doc values")) { - throw new ExtractionProblemException( - nextRealtimeTimestamp(), - new IllegalArgumentException( - "One or more fields do not have doc values; please enable doc values for all analysis fields for datafeeds" - + " using aggregations" - ) - ); + try { + while (dataExtractor.hasNext()) { + if ((isIsolated || isRunning() == false) && dataExtractor.isCancelled() == false) { + dataExtractor.cancel(); } - throw new ExtractionProblemException(nextRealtimeTimestamp(), e); - } - if (isIsolated) { - return; - } - if (extractedData.isPresent()) { - DataCounts counts; - try (InputStream in = extractedData.get()) { - counts = postData(in, XContentType.JSON); - LOGGER.trace( - () -> format( - "[%s] Processed another %s records with latest timestamp [%s]", - jobId, - counts.getProcessedRecordCount(), - counts.getLatestRecordTimeStamp() - ) - ); - timingStatsReporter.reportDataCounts(counts); + if (isIsolated) { + return; + } + + Optional extractedData; + try { + DataExtractor.Result result = dataExtractor.next(); + extractedData = result.data(); + searchInterval = result.searchInterval(); } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + LOGGER.warn(() -> "[" + jobId + "] error while extracting data", e); + // When extraction problems are encountered, we do not want to advance time. + // Instead, it is preferable to retry the given interval next time an extraction + // is triggered. + + // For aggregated datafeeds it is possible for our users to use fields without doc values. + // In that case, it is really useful to display an error message explaining exactly that. + // Unfortunately, there are no great ways to identify the issue but search for 'doc values' + // deep in the exception. + if (e.toString().contains("doc values")) { + throw new ExtractionProblemException( + nextRealtimeTimestamp(), + new IllegalArgumentException( + "One or more fields do not have doc values; please enable doc values for all analysis fields for datafeeds" + + " using aggregations" + ) + ); } - if (isIsolated) { - return; - } - LOGGER.error(() -> "[" + jobId + "] error while posting data", e); - - // a conflict exception means the job state is not open any more. - // we should therefore stop the datafeed. - boolean shouldStop = isConflictException(e); - - // When an analysis problem occurs, it means something catastrophic has - // happened to the c++ process. We sent a batch of data to the c++ process - // yet we do not know how many of those were processed. It is better to - // advance time in order to avoid importing duplicate data. - error = new AnalysisProblemException(nextRealtimeTimestamp(), shouldStop, e); - break; + throw new ExtractionProblemException(nextRealtimeTimestamp(), e); } - recordCount += counts.getProcessedRecordCount(); - haveEverSeenData |= (recordCount > 0); - if (counts.getLatestRecordTimeStamp() != null) { - lastEndTimeMs = counts.getLatestRecordTimeStamp().getTime(); + if (isIsolated) { + return; + } + if (extractedData.isPresent()) { + DataCounts counts; + try (InputStream in = extractedData.get()) { + counts = postData(in, XContentType.JSON); + LOGGER.trace( + () -> format( + "[%s] Processed another %s records with latest timestamp [%s]", + jobId, + counts.getProcessedRecordCount(), + counts.getLatestRecordTimeStamp() + ) + ); + timingStatsReporter.reportDataCounts(counts); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + if (isIsolated) { + return; + } + LOGGER.error(() -> "[" + jobId + "] error while posting data", e); + + // a conflict exception means the job state is not open any more. + // we should therefore stop the datafeed. + boolean shouldStop = isConflictException(e); + + // When an analysis problem occurs, it means something catastrophic has + // happened to the c++ process. We sent a batch of data to the c++ process + // yet we do not know how many of those were processed. It is better to + // advance time in order to avoid importing duplicate data. + error = new AnalysisProblemException(nextRealtimeTimestamp(), shouldStop, e); + break; + } + recordCount += counts.getProcessedRecordCount(); + haveEverSeenData |= (recordCount > 0); + if (counts.getLatestRecordTimeStamp() != null) { + lastEndTimeMs = counts.getLatestRecordTimeStamp().getTime(); + } } } - } - lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, dataExtractor.getEndTime() - 1); - LOGGER.debug( - "[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", - jobId, - error, - recordCount, - lastEndTimeMs, - isRunning(), - dataExtractor.isCancelled() - ); - - // We can now throw any stored error as we have updated time. - if (error != null) { - throw error; - } + lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, dataExtractor.getEndTime() - 1); + LOGGER.debug( + "[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", + jobId, + error, + recordCount, + lastEndTimeMs, + isRunning(), + dataExtractor.isCancelled() + ); + + // We can now throw any stored error as we have updated time. + if (error != null) { + throw error; + } - // If the datafeed was stopped, then it is possible that by the time - // we call flush the job is closed. Thus, we don't flush unless the - // datafeed is still running. - if (isRunning() && isIsolated == false) { - Instant lastFinalizedBucketEnd = flushJob(flushRequest).getLastFinalizedBucketEnd(); - if (lastFinalizedBucketEnd != null) { - this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.toEpochMilli(); + // If the datafeed was stopped, then it is possible that by the time + // we call flush the job is closed. Thus, we don't flush unless the + // datafeed is still running. + if (isRunning() && isIsolated == false) { + Instant lastFinalizedBucketEnd = flushJob(flushRequest).getLastFinalizedBucketEnd(); + if (lastFinalizedBucketEnd != null) { + this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.toEpochMilli(); + } } - } - if (recordCount == 0) { - throw new EmptyDataCountException(nextRealtimeTimestamp(), haveEverSeenData); + if (recordCount == 0) { + throw new EmptyDataCountException(nextRealtimeTimestamp(), haveEverSeenData); + } + } finally { + // Ensure the extractor is always destroyed to clean up scroll contexts + dataExtractor.destroy(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java index 43ab86cfcacd3..e85b56f639646 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java @@ -162,6 +162,10 @@ private Result getNextStream() throws IOException { } private void advanceTime() { + // Destroy the previous extractor to clean up any scroll contexts before creating a new one + if (currentExtractor != null) { + currentExtractor.destroy(); + } currentStart = currentEnd; currentEnd = Math.min(currentStart + chunkSpan, context.end()); currentExtractor = dataExtractorFactory.newExtractor(currentStart, currentEnd); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index e043fb60e92f7..974e5e0c2b035 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -269,7 +269,7 @@ protected SearchResponse executeSearchScrollRequest(String scrollId) { } private void clearScroll() { - innerClearScroll(scrollId); + clearScrollLoggingExceptions(scrollId); scrollId = null; }