diff --git a/changelog/unreleased/SOLR-18244-parallel-http-shard-handler-lost-wakeup-fix.yml b/changelog/unreleased/SOLR-18244-parallel-http-shard-handler-lost-wakeup-fix.yml new file mode 100644 index 000000000000..96706027bc12 --- /dev/null +++ b/changelog/unreleased/SOLR-18244-parallel-http-shard-handler-lost-wakeup-fix.yml @@ -0,0 +1,10 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: Fix several concurrency bugs in HttpShardHandler / ParallelHttpShardHandler that + could cause search threads to hang in take() or return HTTP 500 instead of honoring + shards.tolerant under thread-pool saturation +type: fixed +authors: + - name: Mark Miller +links: + - name: SOLR-18244 + url: https://issues.apache.org/jira/browse/SOLR-18244 diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java index 73b960db91ea..57b1184f2bc9 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java @@ -96,6 +96,18 @@ public class HttpShardHandler extends ShardHandler { protected final BlockingQueue responses; private final AtomicBoolean canceled = new AtomicBoolean(false); + // Returns the monitor object that guards all cancellation-related state transitions + // (the canceled flag, the responseFutureMap, the responses queue's CANCELLATION_NOTIFICATION). + // Subclasses that track additional cancellable state must synchronize on this monitor when + // reading or transitioning that state so the whole cancellation invariant stays atomic. + protected final Object cancellationLock() { + return canceled; + } + + protected final boolean isCanceled() { + return canceled.get(); + } + private final Map> shardToURLs; protected LBHttp2SolrClient lbClient; @@ -278,27 +290,47 @@ protected void makeShardRequest( // on the map already having the future. future.whenComplete( (LBSolrClient.Rsp rsp, Throwable throwable) -> { - if (rsp != null) { - ssr.nl = rsp.getResponse(); - srsp.setShardAddress(rsp.getServer()); - } else if (throwable != null) { - srsp.setException(throwable); - if (throwable instanceof SolrException) { - srsp.setResponseCode(((SolrException) throwable).code()); + try { + if (rsp != null) { + ssr.nl = rsp.getResponse(); + srsp.setShardAddress(rsp.getServer()); + } else if (throwable != null) { + srsp.setException(throwable); + if (throwable instanceof SolrException) { + srsp.setResponseCode(((SolrException) throwable).code()); + } } - } - ssr.elapsedTime = - TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); - // Synchronize on cancelled so this code and cancelAll() cannot happen at the same time - synchronized (canceled) { - // We don't want to add responses after the requests have been canceled - if (responseFutureMap.containsKey(srsp)) { - responses.add(HttpShardHandler.this.transformResponse(sreq, srsp, shard)); + ssr.elapsedTime = + TimeUnit.MILLISECONDS.convert( + System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); + enqueueIfTracked(srsp, HttpShardHandler.this.transformResponse(sreq, srsp, shard)); + } catch (Throwable t) { + // If anything above throws (subclass transformResponse, malformed Rsp, OOM in the + // lambda) the response would never be enqueued — but responseFutureMap still tracks + // srsp, so take() would park forever. Record the failure on srsp and enqueue it raw + // (bypassing transformResponse, which may be the thrower). + srsp.setException(t); + if (t instanceof SolrException) { + srsp.setResponseCode(((SolrException) t).code()); } + enqueueIfTracked(srsp, srsp); } }); } + /** + * Enqueue {@code value} into the {@link #responses} queue iff {@code key} is still tracked in + * {@link #responseFutureMap}, holding the cancellation monitor so this stays atomic with {@link + * #cancelAll()}'s clear. + */ + private void enqueueIfTracked(ShardResponse key, ShardResponse value) { + synchronized (canceled) { + if (responseFutureMap.containsKey(key)) { + responses.add(value); + } + } + } + /** Subclasses could modify the request based on the shard */ @SuppressWarnings("unused") protected QueryRequest createQueryRequest( @@ -328,7 +360,12 @@ private ShardResponse take(boolean bailOnError) { ShardResponse previousResponse = null; try { while (responsesPending()) { - ShardResponse rsp = responses.take(); + ShardResponse rsp = awaitNextResponse(); + if (rsp == null) { + // awaitNextResponse() returned without a response — only happens for subclasses that + // override with a timed poll. Re-evaluate responsesPending() and either re-wait or exit. + continue; + } if (rsp == CANCELLATION_NOTIFICATION) { // This is only queued in cancelAll(), so all outstanding futures have already been // canceled. @@ -376,6 +413,23 @@ protected boolean responsesPending() { return !responseFutureMap.isEmpty() || !responses.isEmpty(); } + /** + * Wait for the next response from the {@link #responses} queue. Defaults to a blocking {@link + * BlockingQueue#take()}. + * + *

Subclasses that gate {@link #responsesPending()} on an async tracker outside the {@link + * #responses} queue's lifecycle (e.g. {@link ParallelHttpShardHandler#submitFutures}) MUST + * override this with a timed poll. The cancellation lock can serialize {@link + * #responsesPending()} reads with state mutations, but it cannot signal the queue's internal + * {@code Condition}: if the tracker drains without anything being enqueued to {@link #responses}, + * a thread parked in {@link BlockingQueue#take()} would never wake up. Returning {@code null} + * from this method instructs {@link #take(boolean)} to re-check {@link #responsesPending()} and + * either re-wait or exit cleanly. + */ + protected ShardResponse awaitNextResponse() throws InterruptedException { + return responses.take(); + } + @Override public void cancelAll() { // Canceled must be set to true before calling the cancellation code, to ensure that new tasks diff --git a/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java index a3e964cf95f9..68557b02164f 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java @@ -16,19 +16,20 @@ */ package org.apache.solr.handler.component; -import java.lang.invoke.MethodHandles; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import net.jcip.annotations.NotThreadSafe; import org.apache.solr.client.solrj.impl.LBSolrClient; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.ModifiableSolrParams; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A version of {@link HttpShardHandler} optimized for massively-sharded collections. @@ -43,19 +44,20 @@ @NotThreadSafe public class ParallelHttpShardHandler extends HttpShardHandler { - @SuppressWarnings("unused") - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final ExecutorService commExecutor; /* - * Unlike the basic HttpShardHandler, this class allows us to exit submit before - * the responseFutureMap is updated. If the runnables that - * do that are slow to execute the calling code could attempt to takeCompleted(), - * while pending is still zero. In this condition, the code would assume that all - * requests are processed (despite the runnables created by this class still - * waiting). Thus, we need to track that there are attempts still in flight. + * Track in-flight async submits so responsesPending() doesn't return false + * while a runnable is still queued or executing. + * + * inFlightSubmits is the authoritative loop-guard counter. AtomicInteger.get() + * is exact; ConcurrentHashMap.size() is documented as an estimate — its + * sumCount() can settle at a non-zero value while the table is empty under + * concurrent put/remove — so we don't use submitFutures.isEmpty() as the guard. + * + * submitFutures is retained only as the iteration target for cancelAll. */ + private final AtomicInteger inFlightSubmits = new AtomicInteger(); private final ConcurrentMap> submitFutures; public ParallelHttpShardHandler(ParallelHttpShardHandlerFactory httpShardHandlerFactory) { @@ -66,9 +68,29 @@ public ParallelHttpShardHandler(ParallelHttpShardHandlerFactory httpShardHandler @Override protected boolean responsesPending() { - // ensure we can't exit while loop in HttpShardHandler.take(boolean) until we've completed - // submitting all of the shard requests - return super.responsesPending() || !submitFutures.isEmpty(); + // Take the lock so the read of inFlightSubmits is serialized with super.responsesPending()'s + // reads of responseFutureMap and responses. Without it, take()'s loop can transiently observe + // both responseFutureMap empty (super.makeShardRequest hasn't put yet) and inFlightSubmits == 0 + // (outer.whenComplete already decremented for a different submit) even though those events are + // causally ordered, and the in-flight inner whenComplete's responses.add would be silently + // lost. + synchronized (cancellationLock()) { + return super.responsesPending() || inFlightSubmits.get() > 0; + } + } + + /** + * Override the base class's blocking wait with a timed poll. inFlightSubmits is an async tracker + * outside the {@link #responses} queue's lifecycle: when an outer {@code whenComplete} decrements + * the last in-flight submit but the inner {@code whenComplete} hasn't fired yet to enqueue the + * response, a thread parked in {@code responses.take()} would never be woken — the cancellation + * lock cannot signal the queue's internal {@code Condition}. Polling lets the outer {@link + * #take(boolean)} loop re-evaluate {@link #responsesPending()} until the inner callback enqueues + * the real response (or the trackers drain to empty). + */ + @Override + protected ShardResponse awaitNextResponse() throws InterruptedException { + return responses.poll(50, TimeUnit.MILLISECONDS); } @Override @@ -80,22 +102,73 @@ protected void makeShardRequest( SimpleSolrResponse ssr, ShardResponse srsp, long startTimeNS) { - CompletableFuture completableFuture = - CompletableFuture.runAsync( - () -> super.makeShardRequest(sreq, shard, params, lbReq, ssr, srsp, startTimeNS), - commExecutor); - submitFutures.put(srsp, completableFuture); + // Holder so the lambda can read its own outer future. We can't capture the variable directly + // (it would have to be effectively final, but we assign it from runAsync). AtomicReference + // gives the lambda volatile-style visibility on the assignment that happens after runAsync + // returns. + AtomicReference> selfRef = new AtomicReference<>(); + CompletableFuture completableFuture; + // Increment BEFORE runAsync so responsesPending() never observes inFlightSubmits == 0 while + // there is a submit in flight. The matching decrement is in the unconditional finally of + // whenComplete below, or in the catch block if runAsync itself rejects. + inFlightSubmits.incrementAndGet(); + try { + completableFuture = + CompletableFuture.runAsync( + () -> { + // Skip the work if THIS specific outer future was cancelled (e.g. cancelAll + // cancelled + // it before this runnable got CPU time). Avoids a wasted lbClient.requestAsync that + // super.makeShardRequest would just immediately cancel anyway. selfRef may briefly + // be null if the runnable runs before the assignment below — in that case we fall + // through to super, which has its own canceled-check guard. + CompletableFuture self = selfRef.get(); + if (self != null && self.isCancelled()) { + return; + } + super.makeShardRequest(sreq, shard, params, lbReq, ssr, srsp, startTimeNS); + }, + commExecutor); + } catch (RejectedExecutionException ree) { + // Saturation or shutdown of commExecutor would otherwise propagate synchronously, + // crash SearchHandler's distributed loop before cancelAll() runs, abandon any + // already-submitted shard requests, and return HTTP 500 even when shards.tolerant=true. + // Treat it as a shard failure so the responses queue stays consistent and shards.tolerant + // semantics are honored. SERVICE_UNAVAILABLE (503) marks it as transient. + // No future was produced, so whenComplete will never fire — undo the increment here. + inFlightSubmits.decrementAndGet(); + recordShardSubmitError( + srsp, + new SolrException( + SolrException.ErrorCode.SERVICE_UNAVAILABLE, + "Comm executor thread pool is full, unable to send request to shard: " + shard, + ree)); + return; + } + // Publish the self-reference BEFORE the cancellation check so that if the cancellation block + // below cancels this future, the runnable (whenever it runs) will see the cancellation via + // selfRef.get().isCancelled(). AtomicReference provides happens-before across threads. + selfRef.set(completableFuture); + + // Synchronize registering submitFutures with the same monitor super uses for responseFutureMap. + // If cancelAll has already set canceled=true, don't track this request — cancel the outer + // future and return. The runnable, when it runs, will see self.isCancelled() and short-circuit. + // Mirrors super.makeShardRequest's check-and-put-or-early-return pattern on responseFutureMap. + synchronized (cancellationLock()) { + if (isCanceled()) { + completableFuture.cancel(true); + // whenComplete is never registered on this early-return path, so undo the increment. + inFlightSubmits.decrementAndGet(); + return; + } + submitFutures.put(srsp, completableFuture); + } completableFuture.whenComplete( (r, t) -> { try { if (t != null) { - Throwable failure = t; - if (failure instanceof CompletionException) { - CompletionException completionException = (CompletionException) failure; - if (completionException.getCause() != null) { - failure = completionException.getCause(); - } - } + Throwable failure = + (t instanceof CompletionException && t.getCause() != null) ? t.getCause() : t; if (!(failure instanceof CancellationException)) { recordShardSubmitError( srsp, @@ -106,23 +179,35 @@ protected void makeShardRequest( } } } finally { - // Remove so that we keep track of in-flight submits only - submitFutures.remove(srsp); + // Order matters: remove from submitFutures (under the cross-tracker lock so cancelAll + // sees a consistent set), then decrement the loop-guard counter. Decrementing last + // means responsesPending() — which reads under the same lock — never observes + // inFlightSubmits == 0 while submitFutures still contains this entry. + synchronized (cancellationLock()) { + submitFutures.remove(srsp); + } + inFlightSubmits.decrementAndGet(); } }); } @Override public void cancelAll() { - super.cancelAll(); - submitFutures - .values() - .forEach( - future -> { - if (!future.isDone()) { - future.cancel(true); - } - }); - submitFutures.clear(); + // Synchronize the whole cancellation — super.cancelAll plus our submitFutures ops — on the + // same monitor so the invariant matches HttpShardHandler.cancelAll. Without this, a runnable + // entering super.makeShardRequest's synchronized block could observe canceled=true while + // submitFutures is still being walked, leaving the maps mutually inconsistent. + synchronized (cancellationLock()) { + super.cancelAll(); + submitFutures + .values() + .forEach( + future -> { + if (!future.isDone()) { + future.cancel(true); + } + }); + submitFutures.clear(); + } } } diff --git a/solr/core/src/test/org/apache/solr/handler/component/ParallelHttpShardHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/component/ParallelHttpShardHandlerTest.java index f98aac8e3c38..3e9a1a78eb17 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/ParallelHttpShardHandlerTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/ParallelHttpShardHandlerTest.java @@ -19,16 +19,35 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.impl.LBHttp2SolrClient; import org.apache.solr.client.solrj.impl.LBSolrClient; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; +import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; public class ParallelHttpShardHandlerTest extends SolrTestCaseJ4 { + @BeforeClass + public static void ensureWorkingMockito() { + assumeWorkingMockito(); + } + private static class DirectExecutorService extends AbstractExecutorService { private volatile boolean shutdown; @@ -73,9 +92,7 @@ public void testSubmitFailureIsRecordedWhenSuperThrows() throws Exception { // Force super.makeShardRequest to throw before it enqueues the response future. handler.lbClient = null; - ShardRequest shardRequest = new ShardRequest(); - shardRequest.params = new ModifiableSolrParams(); - shardRequest.actualShards = new String[] {"shardA"}; + ShardRequest shardRequest = buildShardRequest("shardA"); ShardResponse shardResponse = new ShardResponse(); shardResponse.setShardRequest(shardRequest); @@ -107,4 +124,286 @@ public void testSubmitFailureIsRecordedWhenSuperThrows() throws Exception { recorded.getException()); assertTrue(recorded.getException() instanceof SolrException); } + + /** + * Verifies the contract that when the commExecutor rejects the runnable, the failure is recorded + * via recordShardSubmitError (i.e., shows up in the responses queue) rather than being propagated + * synchronously to the caller. + * + *

This exercises issue #1 from the ParallelHttpShardHandler review: with a single-thread + * ThreadPoolExecutor backed by a SynchronousQueue, once the worker is busy, the next + * CompletableFuture.runAsync(...) call throws RejectedExecutionException synchronously out of + * makeShardRequest. The expected (post-fix) behavior is that the error is routed through + * recordShardSubmitError instead. + */ + @Test + public void testRejectedExecutorRecordsErrorInsteadOfThrowing() throws Exception { + CountDownLatch holdWorker = new CountDownLatch(1); + CountDownLatch workerStarted = new CountDownLatch(1); + ThreadPoolExecutor busyExecutor = + new ExecutorUtil.MDCAwareThreadPoolExecutor( + 1, 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>()); // default AbortPolicy + try { + // Occupy the single worker thread so the next submission has nowhere to go. + busyExecutor.execute( + () -> { + workerStarted.countDown(); + try { + holdWorker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + assertTrue("worker did not start within timeout", workerStarted.await(5, TimeUnit.SECONDS)); + + ParallelHttpShardHandlerFactory factory = new ParallelHttpShardHandlerFactory(); + factory.commExecutor = busyExecutor; + ParallelHttpShardHandler handler = new ParallelHttpShardHandler(factory); + + ShardRequest shardRequest = buildShardRequest("shardA"); + + ShardResponse shardResponse = new ShardResponse(); + shardResponse.setShardRequest(shardRequest); + shardResponse.setShard("shardA"); + + HttpShardHandler.SimpleSolrResponse simpleResponse = + new HttpShardHandler.SimpleSolrResponse(); + shardResponse.setSolrResponse(simpleResponse); + + ModifiableSolrParams params = new ModifiableSolrParams(); + QueryRequest queryRequest = new QueryRequest(params); + LBSolrClient.Endpoint endpoint = new LBSolrClient.Endpoint("http://ignored:8983/solr"); + LBSolrClient.Req lbReq = new LBSolrClient.Req(queryRequest, List.of(endpoint)); + + // The desired contract: rejection is captured and surfaced through the responses queue + // (i.e., this call should not throw RejectedExecutionException). + try { + handler.makeShardRequest( + shardRequest, + "shardA", + params, + lbReq, + simpleResponse, + shardResponse, + System.nanoTime()); + } catch (RejectedExecutionException ree) { + fail( + "makeShardRequest should not propagate RejectedExecutionException; the failure " + + "should be recorded via recordShardSubmitError. Got: " + + ree); + } + + ShardResponse recorded = handler.responses.poll(2, TimeUnit.SECONDS); + assertNotNull( + "Expected the executor rejection to be recorded as a shard failure in the responses" + + " queue, but no response arrived", + recorded); + assertSame( + "The recorded shard response should be the same instance passed in", + shardResponse, + recorded); + assertNotNull( + "Expected an exception to be attached to the recorded shard response", + recorded.getException()); + } finally { + holdWorker.countDown(); + busyExecutor.shutdownNow(); + busyExecutor.awaitTermination(5, TimeUnit.SECONDS); + } + } + + private ShardRequest buildShardRequest(String shard) { + ShardRequest sreq = new ShardRequest(); + sreq.params = new ModifiableSolrParams(); + sreq.actualShards = new String[] {shard}; + return sreq; + } + + /** + * Runs handler.takeCompletedIncludingErrors() on a worker thread with a timeout. If take() does + * not return within timeoutMs, fails the test with a clear message naming the iteration and phase + * — this is the signal for the lost-wakeup bug. + */ + private ShardResponse runTakeWithTimeout( + ParallelHttpShardHandler handler, + ExecutorService takeExecutor, + int iteration, + String phaseLabel, + long timeoutMs) + throws Exception { + Future future = takeExecutor.submit(handler::takeCompletedIncludingErrors); + try { + return future.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException te) { + future.cancel(true); + fail( + "take() hung in iteration " + + iteration + + " " + + phaseLabel + + ": did not return within " + + timeoutMs + + "ms. The worker thread is parked in LinkedBlockingQueue.take() waiting for" + + " an element that will never arrive because the handler's state transitioned" + + " to empty without anything being enqueued on the responses queue."); + throw new AssertionError("unreachable"); + } catch (ExecutionException ee) { + throw new AssertionError( + "take() threw unexpectedly in iteration " + iteration + " " + phaseLabel, ee.getCause()); + } + } + + /** + * More aggressive variant of the lost-wakeup stress test that uses asynchronous inner-future + * completion on a dedicated scheduler. In production the inner future (from {@code + * lbClient.requestAsync}) completes on a Jetty IO thread, not synchronously at the registration + * site. That timing gap between {@code super.makeShardRequest} returning (and the outer {@code + * whenComplete} firing to remove {@code submitFutures}) and the inner {@code whenComplete} firing + * (to add to {@code responses}) is exactly where the observed 930-handler hang lives. This test + * matches that timing. + */ + @Test + public void testTakeDoesNotHangUnderAsyncInnerFutureCompletion() throws Exception { + final int iterations = 1000; + final long perIterationTimeoutMs = 3_000; + + ExecutorService commExecutor = + new ExecutorUtil.MDCAwareThreadPoolExecutor( + 0, + Integer.MAX_VALUE, + 5L, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + new SolrNamedThreadFactory("testCommExecutor")); + + // Simulates Jetty IO threads: a small pool that completes the inner future asynchronously + // some tiny amount of time after requestAsync() returns, exposing the race window. + ExecutorService mockIoThreads = + ExecutorUtil.newMDCAwareFixedThreadPool(2, new SolrNamedThreadFactory("testMockIo")); + + ExecutorService takeExecutor = + ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("testTakeRunner")); + + try { + for (int i = 0; i < iterations; i++) { + runAsyncRaceCycle(commExecutor, mockIoThreads, takeExecutor, i, perIterationTimeoutMs); + } + } finally { + takeExecutor.shutdownNow(); + takeExecutor.awaitTermination(5, TimeUnit.SECONDS); + mockIoThreads.shutdownNow(); + mockIoThreads.awaitTermination(5, TimeUnit.SECONDS); + commExecutor.shutdown(); + if (!commExecutor.awaitTermination(15, TimeUnit.SECONDS)) { + commExecutor.shutdownNow(); + commExecutor.awaitTermination(5, TimeUnit.SECONDS); + } + } + } + + private void runAsyncRaceCycle( + ExecutorService commExecutor, + ExecutorService mockIoThreads, + ExecutorService takeExecutor, + int iteration, + long timeoutMs) + throws Exception { + + ParallelHttpShardHandlerFactory factory = new ParallelHttpShardHandlerFactory(); + factory.commExecutor = commExecutor; + ParallelHttpShardHandler handler = new ParallelHttpShardHandler(factory); + + // LB client that returns a future which completes asynchronously on a separate thread — + // mimicking the Jetty IO thread model. This creates a real race between: + // (a) the outer runAsync future completing + its whenComplete removing submitFutures, + // (b) the inner future completing + its whenComplete adding to responses. + LBHttp2SolrClient mockLb = Mockito.mock(LBHttp2SolrClient.class); + Mockito.when(mockLb.requestAsync(Mockito.any(LBSolrClient.Req.class))) + .thenAnswer( + inv -> { + CompletableFuture f = new CompletableFuture<>(); + mockIoThreads.execute(() -> f.complete(new LBSolrClient.Rsp())); + return f; + }); + handler.lbClient = mockLb; + + // Single-shard submit → take. This is the simplest real workload. Under async inner-future + // completion, the outer whenComplete (removing submitFutures) and inner whenComplete + // (adding to responses) race. If there's a window where responsesPending() transitions to + // false without the responses queue getting an entry, take() parks forever. + ShardRequest sreq = buildShardRequest("shard-" + iteration); + handler.submit(sreq, "shard-" + iteration, sreq.params); + + ShardResponse rsp = + runTakeWithTimeout(handler, takeExecutor, iteration, "async-race", timeoutMs); + + assertNotNull( + "async-race iteration " + iteration + " take() returned null — response was never enqueued", + rsp); + } + + /** + * Invariant test for the cancellation synchronization contract in {@link + * ParallelHttpShardHandler}: when {@code makeShardRequest} is invoked while {@code canceled} is + * already {@code true}, the outer future must be cancelled and NOT tracked in {@code + * submitFutures}. This keeps {@code submitFutures} consistent with the cancellation state — + * mirroring {@link HttpShardHandler#makeShardRequest}'s check-and-put pattern on {@code + * responseFutureMap}. + * + *

Without this invariant, a runnable could observe {@code canceled=true} (and early-return in + * super) while {@code submitFutures} still tracks its outer future, leaving the outer + * whenComplete's bookkeeping racing against {@code cancelAll}'s own submitFutures sweep. + */ + @Test + public void testCanceledMakeShardRequestDoesNotTrackSubmitFutures() throws Exception { + ExecutorService commExecutor = + new ExecutorUtil.MDCAwareThreadPoolExecutor( + 0, + Integer.MAX_VALUE, + 5L, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + new SolrNamedThreadFactory("invariantTestComm")); + + try { + ParallelHttpShardHandlerFactory factory = new ParallelHttpShardHandlerFactory(); + factory.commExecutor = commExecutor; + ParallelHttpShardHandler handler = new ParallelHttpShardHandler(factory); + + LBHttp2SolrClient mockLb = Mockito.mock(LBHttp2SolrClient.class); + Mockito.when(mockLb.requestAsync(Mockito.any(LBSolrClient.Req.class))) + .thenAnswer(inv -> new CompletableFuture()); + handler.lbClient = mockLb; + + // Force canceled=true and drain the CANCELLATION_NOTIFICATION so we can observe the + // post-cancel state cleanly. + handler.cancelAll(); + assertNotNull( + "CANCELLATION_NOTIFICATION should be queued by cancelAll", + handler.responses.poll(2, TimeUnit.SECONDS)); + + ShardRequest sreq = buildShardRequest("shardA"); + ShardResponse srsp = new ShardResponse(); + srsp.setShardRequest(sreq); + srsp.setShard("shardA"); + HttpShardHandler.SimpleSolrResponse ssr = new HttpShardHandler.SimpleSolrResponse(); + srsp.setSolrResponse(ssr); + + ModifiableSolrParams params = new ModifiableSolrParams(); + QueryRequest queryRequest = new QueryRequest(params); + LBSolrClient.Endpoint endpoint = new LBSolrClient.Endpoint("http://ignored:8983/solr"); + LBSolrClient.Req lbReq = new LBSolrClient.Req(queryRequest, List.of(endpoint)); + + // Invoke makeShardRequest while canceled=true. Expected: outer is cancelled, nothing is + // tracked in submitFutures, responsesPending() stays false. + handler.makeShardRequest(sreq, "shardA", params, lbReq, ssr, srsp, System.nanoTime()); + + assertFalse( + "submitFutures must not track requests submitted while canceled=true", + handler.responsesPending()); + } finally { + commExecutor.shutdownNow(); + commExecutor.awaitTermination(5, TimeUnit.SECONDS); + } + } }