diff --git a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java index 8e8b31d69d2..e42ac3844a3 100644 --- a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java +++ b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java @@ -38,9 +38,12 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -70,6 +73,7 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.ObjectReleaseTracker; import org.apache.solr.common.util.SolrNamedThreadFactory; +import org.apache.solr.common.util.TimeSource; import org.apache.solr.common.util.Utils; import org.apache.solr.crossdc.common.KafkaCrossDcConf; import org.apache.solr.crossdc.common.MirroredSolrRequest; @@ -79,6 +83,7 @@ import org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer; import org.apache.solr.crossdc.manager.consumer.PartitionManager; import org.apache.solr.util.SolrKafkaTestsIgnoredThreadsFilter; +import org.apache.solr.util.TimeOut; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -160,7 +165,7 @@ public String toString() { protected volatile Consumer consumer; - private List consumerBatches; + private BlockingQueue consumerBatches; private static final String TOPIC = "topic1"; @@ -180,7 +185,7 @@ public void beforeSolrAndKafkaIntegrationTest() throws Exception { (t, e) -> log.error("Uncaught exception in thread {}", t, e)); System.setProperty("otel.metrics.exporter", "prometheus"); System.setProperty(KafkaCrossDcConsumer.PROP_TOPIC_DEBUG, "true"); - consumerBatches = new ArrayList<>(); + consumerBatches = new LinkedBlockingQueue<>(); consumer = new Consumer() { @Override @@ -195,7 +200,7 @@ public void sendBatch( final MirroredSolrRequest.Type type, final ConsumerRecord> lastRecord, final PartitionManager.WorkUnit workUnit) { - consumerBatches.add(new ConsumerBatch(type, solrReqBatch)); + consumerBatches.offer(new ConsumerBatch(type, solrReqBatch)); super.sendBatch(solrReqBatch, type, lastRecord, workUnit); } }; @@ -357,10 +362,27 @@ public void testPartitioning() throws Exception { } client.commit(COLLECTION); client.commit(ALT_COLLECTION); + + assertCluster2EventuallyHasDocs(COLLECTION, "*:*", NUM_DOCS); + assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", NUM_DOCS); + // check that updates to different collections were always sent to the same partition Map partitionsPerCol = new HashMap<>(); Map> docsPerCol = new HashMap<>(); - for (ConsumerBatch batch : consumerBatches) { + int batchCount = 0; + TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.CURRENT_TIME); + while (!timeOut.hasTimedOut()) { + final ConsumerBatch batch = consumerBatches.poll(2, TimeUnit.SECONDS); + if (batch == null) { + int totalDocsSeen = docsPerCol.values().stream().mapToInt(Set::size).sum(); + if (totalDocsSeen == NUM_DOCS * 2) { + // we've collected all expected docs + break; + } else { + continue; // keep waiting, it was just a longer pause + } + } + batchCount++; String collection = partitionsPerCol.computeIfAbsent(batch.partitionId, k -> batch.collection); docsPerCol.computeIfAbsent(collection, col -> new HashSet<>()).addAll(batch.addIds); @@ -376,8 +398,19 @@ public void testPartitioning() throws Exception { collection, batch.collection); } - docsPerCol.forEach( - (col, ids) -> assertEquals("incorrect count in collection " + col, NUM_DOCS, ids.size())); + if (timeOut.hasTimedOut()) { + fail("timed out waiting for batches"); + } + assertTrue("No batches were received from consumer", batchCount > 0); + assertEquals("Should have processed both collections", 2, docsPerCol.size()); + assertTrue("COLLECTION not found in results", docsPerCol.containsKey(COLLECTION)); + assertTrue("ALT_COLLECTION not found in results", docsPerCol.containsKey(ALT_COLLECTION)); + assertEquals( + "incorrect count in collection " + COLLECTION, NUM_DOCS, docsPerCol.get(COLLECTION).size()); + assertEquals( + "incorrect count in collection " + ALT_COLLECTION, + NUM_DOCS, + docsPerCol.get(ALT_COLLECTION).size()); } @Test