Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,6 @@ tests:
- class: org.elasticsearch.entitlement.runtime.policy.FileAccessTreeTests
method: testWindowsAbsolutPathAccess
issue: https://github.com/elastic/elasticsearch/issues/129168
- class: org.elasticsearch.xpack.ml.integration.ClassificationIT
method: testWithDatastreams
issue: https://github.com/elastic/elasticsearch/issues/129457
- class: org.elasticsearch.xpack.profiling.action.GetStatusActionIT
method: testWaitsUntilResourcesAreCreated
issue: https://github.com/elastic/elasticsearch/issues/129486
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -56,29 +59,29 @@
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -359,23 +362,44 @@ protected TrainedModelDefinition getModelDefinition(String modelId) throws IOExc
/**
* Asserts whether the audit messages fetched from index match provided prefixes.
* More specifically, in order to pass:
* 1. the number of fetched messages must equal the number of provided prefixes
* 1. ALL expected message prefixes must be found in the fetched messages
* AND
* 2. each fetched message must start with the corresponding prefix
* 2. each fetched message that matches must start with the corresponding prefix
*/
protected static void assertThatAuditMessagesMatch(String configId, String... expectedAuditMessagePrefixes) throws Exception {
// Make sure we wrote to the audit
// Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
// finished the job (as this is a very short analytics job), all without the audit being fully written.
awaitIndexExists(NotificationsIndex.NOTIFICATIONS_INDEX);

@SuppressWarnings("unchecked")
Matcher<String>[] itemMatchers = Arrays.stream(expectedAuditMessagePrefixes).map(Matchers::startsWith).toArray(Matcher[]::new);
Set<String> expectedPrefixes = Set.of(expectedAuditMessagePrefixes);
assertBusy(() -> {
// Refresh the notifications index to ensure latest writes are visible
RefreshRequest refreshRequest = new RefreshRequest(NotificationsIndex.NOTIFICATIONS_INDEX);
BroadcastResponse refreshResponse = client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet();
assertThat(refreshResponse.getStatus().getStatus(), anyOf(equalTo(200), equalTo(201)));

List<String> allAuditMessages = fetchAllAuditMessages(configId);
assertThat(allAuditMessages, hasItems(itemMatchers));
// TODO: Consider restoring this assertion when we are sure all the audit messages are available at this point.
// assertThat("Messages: " + allAuditMessages, allAuditMessages, hasSize(expectedAuditMessagePrefixes.length));

// Find which expected prefixes match any of the audit messages
Set<String> foundPrefixes = expectedPrefixes.stream()
.filter(prefix -> allAuditMessages.stream().anyMatch(msg -> msg.startsWith(prefix)))
.collect(Collectors.toSet());

// Only calculate missing prefixes if not all were found
if (foundPrefixes.size() != expectedPrefixes.size()) {
Set<String> missingPrefixes = new HashSet<>(expectedPrefixes);
missingPrefixes.removeAll(foundPrefixes);
fail(
String.format(
Locale.ROOT,
"Expected audit messages not found for config [%s]. Missing prefixes: %s. Found messages: %s",
configId,
missingPrefixes,
allAuditMessages
)
);
}
});
}

Expand Down