Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -70,6 +73,7 @@
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
Expand All @@ -79,6 +83,7 @@
import org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer;
import org.apache.solr.crossdc.manager.consumer.PartitionManager;
import org.apache.solr.util.SolrKafkaTestsIgnoredThreadsFilter;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -160,7 +165,7 @@ public String toString() {

protected volatile Consumer consumer;

private List<ConsumerBatch> consumerBatches;
private BlockingQueue<ConsumerBatch> consumerBatches;

private static final String TOPIC = "topic1";

Expand All @@ -180,7 +185,7 @@ public void beforeSolrAndKafkaIntegrationTest() throws Exception {
(t, e) -> log.error("Uncaught exception in thread {}", t, e));
System.setProperty("otel.metrics.exporter", "prometheus");
System.setProperty(KafkaCrossDcConsumer.PROP_TOPIC_DEBUG, "true");
consumerBatches = new ArrayList<>();
consumerBatches = new LinkedBlockingQueue<>();
consumer =
new Consumer() {
@Override
Expand All @@ -195,7 +200,7 @@ public void sendBatch(
final MirroredSolrRequest.Type type,
final ConsumerRecord<String, MirroredSolrRequest<?>> lastRecord,
final PartitionManager.WorkUnit workUnit) {
consumerBatches.add(new ConsumerBatch(type, solrReqBatch));
consumerBatches.offer(new ConsumerBatch(type, solrReqBatch));
super.sendBatch(solrReqBatch, type, lastRecord, workUnit);
}
};
Expand Down Expand Up @@ -357,10 +362,27 @@ public void testPartitioning() throws Exception {
}
client.commit(COLLECTION);
client.commit(ALT_COLLECTION);

assertCluster2EventuallyHasDocs(COLLECTION, "*:*", NUM_DOCS);
assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", NUM_DOCS);

// check that updates to different collections were always sent to the same partition
Map<Integer, String> partitionsPerCol = new HashMap<>();
Map<String, Set<String>> docsPerCol = new HashMap<>();
for (ConsumerBatch batch : consumerBatches) {
int batchCount = 0;
TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.CURRENT_TIME);
while (!timeOut.hasTimedOut()) {
final ConsumerBatch batch = consumerBatches.poll(2, TimeUnit.SECONDS);
if (batch == null) {
int totalDocsSeen = docsPerCol.values().stream().mapToInt(Set::size).sum();
if (totalDocsSeen == NUM_DOCS * 2) {
// we've collected all expected docs
break;
} else {
continue; // keep waiting, it was just a longer pause
}
}
batchCount++;
String collection =
partitionsPerCol.computeIfAbsent(batch.partitionId, k -> batch.collection);
docsPerCol.computeIfAbsent(collection, col -> new HashSet<>()).addAll(batch.addIds);
Expand All @@ -376,8 +398,19 @@ public void testPartitioning() throws Exception {
collection,
batch.collection);
}
docsPerCol.forEach(
(col, ids) -> assertEquals("incorrect count in collection " + col, NUM_DOCS, ids.size()));
if (timeOut.hasTimedOut()) {
fail("timed out waiting for batches");
}
assertTrue("No batches were received from consumer", batchCount > 0);
assertEquals("Should have processed both collections", 2, docsPerCol.size());
assertTrue("COLLECTION not found in results", docsPerCol.containsKey(COLLECTION));
assertTrue("ALT_COLLECTION not found in results", docsPerCol.containsKey(ALT_COLLECTION));
assertEquals(
"incorrect count in collection " + COLLECTION, NUM_DOCS, docsPerCol.get(COLLECTION).size());
assertEquals(
"incorrect count in collection " + ALT_COLLECTION,
NUM_DOCS,
docsPerCol.get(ALT_COLLECTION).size());
}

@Test
Expand Down
Loading