diff --git a/changelog/unreleased/SOLR-18174-prevent-double-registration.yml b/changelog/unreleased/SOLR-18174-prevent-double-registration.yml new file mode 100644 index 000000000000..ce816f4fece7 --- /dev/null +++ b/changelog/unreleased/SOLR-18174-prevent-double-registration.yml @@ -0,0 +1,8 @@ +title: Fix semaphore permit leaks in HttpJettySolrClient's AsyncTracker. Avoid IO-thread deadlock on connection failure retries. Add a new metric gauge solr.http.client.async_permits +type: fixed +authors: + - name: Jan Høydahl + url: https://home.apache.org/phonebook.html?uid=janhoy +links: + - name: SOLR-18174 + url: https://issues.apache.org/jira/browse/SOLR-18174 diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java index 6dc30f47b9bb..fa2392c5a1de 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java @@ -19,6 +19,7 @@ import static org.apache.solr.util.stats.InstrumentedHttpListenerFactory.KNOWN_METRIC_NAME_STRATEGIES; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableLongGauge; import java.lang.invoke.MethodHandles; import java.util.Iterator; import java.util.List; @@ -85,6 +86,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory protected volatile HttpJettySolrClient defaultClient; protected InstrumentedHttpListenerFactory httpListenerFactory; protected LBAsyncSolrClient loadbalancer; + private ObservableLongGauge asyncRequestsGauge; int corePoolSize = 0; int maximumPoolSize = Integer.MAX_VALUE; @@ -352,6 +354,7 @@ public void close() { ExecutorUtil.shutdownAndAwaitTermination(commExecutor); } } + IOUtils.closeQuietly(asyncRequestsGauge); try { SolrMetricProducer.super.close(); } catch (Exception e) { @@ -440,5 +443,20 @@ public void initializeMetrics(SolrMetricsContext parentContext, Attributes attri commExecutor = solrMetricsContext.instrumentedExecutorService( commExecutor, "solr.core.executor", "httpShardExecutor", SolrInfoBean.Category.QUERY); + if (defaultClient != null) { + asyncRequestsGauge = + solrMetricsContext.observableLongGauge( + "solr.client.request.async_permits", + "Outstanding async HTTP request permits in the Jetty SolrJ client" + + " (state=max: configured ceiling; state=available: currently unused permits).", + measurement -> { + measurement.record( + defaultClient.asyncTrackerMaxPermits(), Attributes.of(STATE_KEY_ATTR, "max")); + measurement.record( + defaultClient.asyncTrackerAvailablePermits(), + Attributes.of(STATE_KEY_ATTR, "available")); + }, + null); + } } } diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java new file mode 100644 index 000000000000..1ff3ec542ed5 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Field; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.lucene.util.SuppressForbidden; +import org.apache.solr.client.solrj.impl.LBSolrClient; +import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; +import org.apache.solr.client.solrj.jetty.LBJettySolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.params.SolrParams; +import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.client.Response; +import org.eclipse.jetty.client.Result; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for two semaphore-permit leak bugs in {@link HttpJettySolrClient}'s {@code AsyncTracker} + * that cause distributed queries to hang permanently. + * + *

Pattern A – HTTP/2 GOAWAY double-queue leak

+ * + *

Jetty HTTP/2 can re-queue the same exchange after a GOAWAY/connection race, firing {@code + * onRequestQueued} twice for one logical request. Because {@code onComplete} fires only once, one + * permit is permanently consumed per occurrence, gradually draining the semaphore over hours or + * days until Pattern B triggers. + * + *

Pattern B – IO-thread deadlock on LB retry when permits depleted

+ * + *

When a connection-level failure causes {@link + * org.apache.solr.client.solrj.jetty.LBJettySolrClient} to retry synchronously inside a {@code + * whenComplete} callback on the Jetty IO selector thread, the retry calls {@code acquire()} on that + * same IO thread before the original request's {@code onComplete} can call {@code release()}. No + * permits are permanently lost — the deadlock simply requires two permits to be available + * simultaneously — but if the semaphore is at zero, {@code acquire()} blocks the IO thread + * permanently and distributed queries hang forever. + */ +public class AsyncTrackerSemaphoreLeakTest extends SolrCloudTestCase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String COLLECTION = "semaphore_leak_test"; + + /** Reduced semaphore size so we can observe the drain without needing thousands of requests. */ + private static final int MAX_PERMITS = 40; + + /** + * Number of concurrent requests. Set equal to MAX_PERMITS so that all permits are exhausted + * before any retry can acquire, triggering the IO-thread deadlock. + */ + private static final int NUM_RETRY_REQUESTS = MAX_PERMITS; + + @BeforeClass + public static void setupCluster() throws Exception { + // Reduce the semaphore size so we can observe drain with few requests. + // This property is read when HttpJettySolrClient is constructed, so it must + // be set BEFORE the cluster (and its HttpShardHandlerFactory) starts up. + System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP, String.valueOf(MAX_PERMITS)); + + configureCluster(1).addConfig("conf", configset("cloud-dynamic")).configure(); + + CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1) + .process(cluster.getSolrClient()); + + waitForState( + "Expected 1 active shard with 1 replica", + COLLECTION, + (n, c) -> SolrCloudTestCase.replicasForCollectionAreFullyActive(n, c, 2, 1)); + } + + @AfterClass + public static void cleanup() { + System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP); + } + + /** + * Demonstrates the permanent IO-thread deadlock (Pattern B) caused by {@link + * org.apache.solr.client.solrj.jetty.LBJettySolrClient} retrying a request synchronously inside a + * {@link CompletableFuture#whenComplete} callback that runs on the Jetty IO selector thread. + * + *

This test passes with the {@code failureDispatchExecutor} fix in this branch. Without + * the fix, the IO thread would block forever in {@code semaphore.acquire()} and this test would + * time out. + */ + @Test + public void testSemaphoreLeakOnLBRetry() throws Exception { + // Dedicated client so that permanently deadlocked IO threads don't affect the cluster's client. + HttpJettySolrClient testClient = + new HttpJettySolrClient.Builder() + .withConnectionTimeout(5, TimeUnit.SECONDS) + .withIdleTimeout(30, TimeUnit.SECONDS) + .useHttp1_1(true) // HTTP/1.1: every request gets its own TCP connection + .build(); + + String realBaseUrl = + cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTION; + + List> futures = new ArrayList<>(); + + try (FakeTcpServer fakeServer = new FakeTcpServer(NUM_RETRY_REQUESTS); + LBJettySolrClient lbClient = new LBJettySolrClient.Builder(testClient).build()) { + + assertEquals( + "All permits should be available before the test (verifies sysprop was applied)", + MAX_PERMITS, + testClient.asyncTrackerAvailablePermits()); + + // Submit NUM_RETRY_REQUESTS async requests. + // Each request has two endpoints: fakeBaseUrl (first) and realBaseUrl (second/retry). + // Each requestAsync() call acquires a semaphore permit synchronously during send(). + // After NUM_RETRY_REQUESTS calls, the semaphore is at 0. + for (int i = 0; i < NUM_RETRY_REQUESTS; i++) { + QueryRequest qr = new QueryRequest(SolrParams.of("q", "*:*")); + LBSolrClient.Req req = + new LBSolrClient.Req( + qr, + List.of( + new LBSolrClient.Endpoint(fakeServer.baseUrl()), + new LBSolrClient.Endpoint(realBaseUrl))); + futures.add(lbClient.requestAsync(req)); + } + + log.info( + "Queued {} requests (semaphore now at 0). Waiting for all TCP connections...", + NUM_RETRY_REQUESTS); + + // Wait until the fake server has accepted all NUM_RETRY_REQUESTS connections. + // At this point all semaphore permits are consumed and no onComplete has fired yet. + assertTrue( + "All " + + NUM_RETRY_REQUESTS + + " connections should be established within 15 s, but only " + + fakeServer.connectionCount() + + " were.", + fakeServer.awaitAllConnected(15, TimeUnit.SECONDS)); + + assertEquals( + "Semaphore should be fully consumed after queuing all requests", + 0, + testClient.asyncTrackerAvailablePermits()); + + // Close all fake connections simultaneously with TCP RST. + // onFailure fires on the IO thread → LBJettySolrClient retry → acquire() blocks + // (semaphore=0). + int connCount = fakeServer.connectionCount(); + log.info("Closing {} fake connections via RST...", connCount); + fakeServer.rstAll(); + + try { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .get(30, TimeUnit.SECONDS); + } catch (ExecutionException e) { + // Individual request failure is fine; permits are released by onComplete regardless. + log.warn("Some requests failed during retry", e); + } catch (TimeoutException e) { + // Force-stop the HttpClient to unblock any threads stuck in semaphore.acquire() + // before asserting failure, so the finally block can close the client without hanging. + try { + testClient.getHttpClient().stop(); + } catch (Exception ignored) { + log.debug("Failed to stop HttpClient"); + } + fail( + "BUG (LBJettySolrClient retry deadlock): futures did not complete within 30s." + + " IO threads are permanently blocked in semaphore.acquire() because the retry" + + " fires synchronously on the IO thread before onComplete can release()."); + } + + int permitsAfterFailures = testClient.asyncTrackerAvailablePermits(); + log.info("Permits after retries: {}/{}", permitsAfterFailures, MAX_PERMITS); + assertEquals( + "All permits should be restored after retries complete", + MAX_PERMITS, + permitsAfterFailures); + } finally { + try { + testClient.close(); + } catch (Exception ignored) { + log.debug("Failed to close LBJettySolrClient"); + } + for (CompletableFuture f : futures) { + f.cancel(true); + } + } + } + + /** + * Verifies that the {@code PERMIT_ACQUIRED_ATTR} idempotency guard prevents the Pattern A permit + * leak where Jetty HTTP/2 re-queues the same exchange after a GOAWAY/connection race, firing + * {@code onRequestQueued} twice for one logical request while {@code onComplete} fires only once. + * + *

Rather than setting up a real HTTP/2 server, this test uses reflection to invoke {@code + * AsyncTracker.queuedListener} twice and {@code AsyncTracker.completeListener} once for the same + * {@code Request} object. Without the guard the semaphore count drops by one; with the guard the + * second queued call is a no-op and the count is unchanged. + */ + @Test + @SuppressForbidden( + reason = + "Reflection needed to access AsyncTracker's private fields for white-box testing without exposing them in the production API") + public void testPermitLeakOnHttp2GoAwayDoubleQueuedListener() throws Exception { + assumeWorkingMockito(); + + HttpJettySolrClient testClient = + new HttpJettySolrClient.Builder() + .withConnectionTimeout(5, TimeUnit.SECONDS) + .withIdleTimeout(30, TimeUnit.SECONDS) + // HTTP/2 is the default transport where this GOAWAY race occurs. + .build(); + + // Capture asyncTracker and its class for reflection-based listener access and cleanup. + Field asyncTrackerField = HttpJettySolrClient.class.getDeclaredField("asyncTracker"); + asyncTrackerField.setAccessible(true); + Object asyncTracker = asyncTrackerField.get(testClient); + Class asyncTrackerClass = asyncTracker.getClass(); + + try { + int maxPermits = testClient.asyncTrackerMaxPermits(); + assertEquals( + "All permits available before test", + maxPermits, + testClient.asyncTrackerAvailablePermits()); + + // Access the raw listeners via reflection to simulate Jetty's internal double-fire. + Field queuedListenerField = asyncTrackerClass.getDeclaredField("queuedListener"); + queuedListenerField.setAccessible(true); + Request.QueuedListener queuedListener = + (Request.QueuedListener) queuedListenerField.get(asyncTracker); + + Field completeListenerField = asyncTrackerClass.getDeclaredField("completeListener"); + completeListenerField.setAccessible(true); + Response.CompleteListener completeListener = + (Response.CompleteListener) completeListenerField.get(asyncTracker); + + // Fake Request that supports the attribute get/set used by the idempotency guard. + Map reqAttributes = new HashMap<>(); + Request fakeRequest = Mockito.mock(Request.class); + Mockito.when(fakeRequest.getAttributes()).thenReturn(reqAttributes); + Mockito.when(fakeRequest.attribute(ArgumentMatchers.anyString(), ArgumentMatchers.any())) + .thenAnswer( + inv -> { + reqAttributes.put(inv.getArgument(0), inv.getArgument(1)); + return fakeRequest; + }); + + // Simulate the GOAWAY double-fire: 1st call acquires a permit; 2nd is the bug trigger. + queuedListener.onQueued(fakeRequest); + queuedListener.onQueued(fakeRequest); + + Result fakeResult = Mockito.mock(Result.class); + Mockito.when(fakeResult.getRequest()).thenReturn(fakeRequest); + // Only one onComplete fires for the logical request (regardless of internal retries). + completeListener.onComplete(fakeResult); + + int permitsAfter = testClient.asyncTrackerAvailablePermits(); + log.info("Permits after double-queued + single complete: {}/{}", permitsAfter, maxPermits); + + assertEquals( + "BUG (Jetty HTTP/2 GOAWAY retry permit leak): onRequestQueued fired twice for the" + + " same Request object but onComplete fired only once. The second acquire()" + + " was not matched by a release(), permanently leaking one permit per" + + " occurrence. In production this causes gradual semaphore depletion over" + + " hours/days until Pattern B IO-thread deadlock triggers.", + maxPermits, + permitsAfter); + + } finally { + // Force-terminate the Phaser as a safety net; without the fix the phaser would be unbalanced. + try { + Field phaserField = asyncTrackerClass.getDeclaredField("phaser"); + phaserField.setAccessible(true); + Phaser phaser = (Phaser) phaserField.get(asyncTracker); + phaser.forceTermination(); + } catch (Exception ignored) { + log.debug("Failed to force-terminate Phaser"); + } + + try { + testClient.close(); + } catch (Exception ignored) { + log.debug("Failed to close HttpJettySolrClient"); + } + } + } + + /** + * A minimal fake TCP server that accepts a fixed number of connections and holds them open, + * allowing tests to simulate connection-level failures by RST-ing all sockets at once. + * + *

Implements {@link AutoCloseable} so that the server socket and any open connections are + * always cleaned up when used in a try-with-resources block, even if the test fails or throws. + */ + private static class FakeTcpServer implements AutoCloseable { + private final ServerSocket serverSocket; + private final List connections = Collections.synchronizedList(new ArrayList<>()); + private final CountDownLatch allConnected; + private final AtomicBoolean closed = new AtomicBoolean(false); + + FakeTcpServer(int expectedConnections) throws IOException { + this.serverSocket = new ServerSocket(0); + this.allConnected = new CountDownLatch(expectedConnections); + Thread acceptThread = + new Thread( + () -> { + try { + while (connections.size() < expectedConnections && !serverSocket.isClosed()) { + Socket s = serverSocket.accept(); + connections.add(s); + allConnected.countDown(); + } + } catch (IOException ioe) { + log.warn("Failed to accept connection", ioe); + } + }, + "fake-tcp-server"); + acceptThread.setDaemon(true); + acceptThread.start(); + } + + /** Returns the base URL clients should connect to, e.g. {@code http://127.0.0.1:PORT/solr}. */ + String baseUrl() { + return "http://127.0.0.1:" + serverSocket.getLocalPort() + "/solr"; + } + + /** Waits until all expected connections have been accepted. */ + boolean awaitAllConnected(long timeout, TimeUnit unit) throws InterruptedException { + return allConnected.await(timeout, unit); + } + + /** Returns the number of connections accepted so far. */ + int connectionCount() { + return connections.size(); + } + + /** + * Closes all accepted connections with TCP RST, triggering onFailure on the Jetty IO thread. + */ + void rstAll() { + for (Socket s : connections) { + try { + s.setSoLinger(true, 0); // send RST instead of FIN + s.close(); + } catch (IOException ignored) { + log.debug("Failed to close connection"); + } + } + } + + /** + * RSTs any remaining open connections and closes the server socket, stopping the accept thread. + * Safe to call multiple times. + */ + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + rstAll(); + try { + serverSocket.close(); + } catch (IOException ignored) { + log.debug("Failed to close server socket"); + } + } + } + } +} diff --git a/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc b/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc index 837c7282b5ed..b9ecaddb1002 100644 --- a/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc +++ b/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc @@ -106,6 +106,8 @@ NOTE: Properties marked with "!" indicate inverted meaning between pre Solr 10 a |solr.solrj.http.cookies.enabled|!solr.http.disableCookies| false |If Http2SolrClient should support cookies. +|solr.solrj.http.jetty.async_requests.max||1000|Maximum number of outstanding async HTTP requests allowed concurrently in the Jetty-based SolrJ HTTP client. Increase if you observe semaphore exhaustion under heavy distributed query load; decrease to limit resource usage. Related metric: `solr.client.request.async_permits`. + |solr.solrj.http.jetty.customizer|solr.httpclient.builder.factory||A class loaded to customize HttpJettySolrClient upon creation. |solr.streamingexpressions.facet.tiered.enabled|solr.facet.stream.tiered|true|Controls whether tiered faceting is enabled for streaming expressions. diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc index ea074b2ace22..15a2d592f87d 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc @@ -124,6 +124,17 @@ The `Overseer Registry` is initialized when running in SolrCloud mode and includ * Size of the Overseer queues (collection work queue and cluster state update queue) +=== HTTP Client Registry + +Solr exposes metrics for the internal Jetty-based HTTP client used for distributed (shard) requests: + +[cols="2,1,3",options="header"] +|=== +| Prometheus Metric Name | Type | Description +| `solr_client_request_async_permits{state="max"}` | gauge | Configured maximum number of outstanding concurrent async HTTP requests (controlled by `solr.solrj.http.jetty.async_requests.max`, default 1000). +| `solr_client_request_async_permits{state="available"}` | gauge | Number of async request permits currently available (i.e., not in use). When this approaches zero, new distributed requests will block waiting for a permit. +|=== + == Core Level Metrics === Index Merge Metrics diff --git a/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc b/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc index 3a250b69060d..4954860be117 100644 --- a/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc +++ b/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc @@ -335,3 +335,11 @@ If you require the ability to roll back, back up your indexes before upgrading. === Docker The `gosu` binary is no longer installed in the Solr Docker image. See https://github.com/tianon/gosu[gosu github page] for alternatives, such as `runuser`, `setpriv` or `chroot`. + +=== Max distributed requests now configurable + +The internal HTTP client used for distributed shard sub-requests previously had a hard-coded limit of 1000 concurrent async requests per node. +In large clusters, a single query can fan out to hundreds of sub-requests, quickly exhausting this limit and causing requests to queue, potentially leading to stalls or timeouts. +This limit is now configurable via the system property `solr.solrj.http.jetty.async_requests.max`. + +Current permit utilization can be monitored via the `solr_client_request_async_permits` metric (see xref:deployment-guide:metrics-reporting.adoc#http-client-registry[HTTP Client Registry]). diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index cac90ba46705..05491bb8e698 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Phaser; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -110,6 +111,10 @@ public class HttpJettySolrClient extends HttpSolrClientBase { */ public static final String CLIENT_CUSTOMIZER_SYSPROP = "solr.solrj.http.jetty.customizer"; + /** System property to cap the maximum number of outstanding async HTTP requests. Default 1000. */ + public static final String ASYNC_REQUESTS_MAX_SYSPROP = + "solr.solrj.http.jetty.async_requests.max"; + public static final String REQ_PRINCIPAL_KEY = "solr-req-principal"; private static final String USER_AGENT = "Solr[" + MethodHandles.lookup().lookupClass().getName() + "] " + SolrVersion.LATEST_STRING; @@ -439,7 +444,17 @@ public void onHeaders(Response response) { @Override public void onFailure(Response response, Throwable failure) { super.onFailure(response, failure); - future.completeExceptionally(new SolrServerException(failure.getMessage(), failure)); + // Dispatch off the IO thread to avoid blocking semaphore.acquire() on retry. + // Fall back to IO thread if executor rejects (shutdown/overloaded). + SolrServerException ex = new SolrServerException(failure.getMessage(), failure); + try { + executor.execute(() -> future.completeExceptionally(ex)); + } catch (RejectedExecutionException ree) { + log.warn( + "Failed to complete future exceptionally due to executor rejection, completing on IO thread.", + ree); + future.completeExceptionally(ex); + } } }); @@ -834,7 +849,21 @@ public void close() { } private static class AsyncTracker { - private static final int MAX_OUTSTANDING_REQUESTS = 1000; + /** + * Read per-instance so that tests can set the sysprop before constructing a client and have it + * take effect without relying on class-load ordering across test suites in the same JVM. + */ + private final int maxOutstandingRequests; + + /** + * Request attribute key used to guard idempotency across both listeners. Set immediately after + * {@code phaser.register()} — before {@code available.acquire()} — so that {@code onComplete} + * can never fire between registration and attribute-set and leave a phaser party stranded. + * Jetty can re-fire {@code onRequestQueued} for the same exchange (e.g. after a GOAWAY retry); + * the attribute makes the second call a no-op. {@code onComplete} always fires exactly once and + * uses the attribute to call {@code arriveAndDeregister()} + {@code release()} exactly once. + */ + private static final String PERMIT_ACQUIRED_ATTR = "solr.async_tracker.permit_acquired"; // wait for async requests private final Phaser phaser; @@ -845,35 +874,73 @@ private static class AsyncTracker { AsyncTracker() { // TODO: what about shared instances? + maxOutstandingRequests = EnvUtils.getPropertyAsInteger(ASYNC_REQUESTS_MAX_SYSPROP, 1000); phaser = new Phaser(1); - available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false); + available = new Semaphore(maxOutstandingRequests, false); queuedListener = request -> { + if (request.getAttributes().get(PERMIT_ACQUIRED_ATTR) != null) { + return; + } phaser.register(); + // Set the attribute before acquire() so onComplete can never race between + // phaser.register() and attribute-set, which would strand a phaser party forever. + request.attribute(PERMIT_ACQUIRED_ATTR, Boolean.TRUE); try { available.acquire(); - } catch (InterruptedException ignored) { - + } catch (InterruptedException e) { + // completeListener will call arriveAndDeregister() when onComplete fires. + Thread.currentThread().interrupt(); } }; completeListener = result -> { - phaser.arriveAndDeregister(); - available.release(); + if (result != null + && result.getRequest().getAttributes().get(PERMIT_ACQUIRED_ATTR) != null) { + phaser.arriveAndDeregister(); + available.release(); + } }; } int getMaxRequestsQueuedPerDestination() { // comfortably above max outstanding requests - return MAX_OUTSTANDING_REQUESTS * 3; + return maxOutstandingRequests * 3; + } + + int maxPermits() { + return maxOutstandingRequests; + } + + int availablePermits() { + return available.availablePermits(); } public void waitForComplete() { - phaser.arriveAndAwaitAdvance(); + // Use awaitAdvanceInterruptibly() instead of arriveAndAwaitAdvance() so that + // ExecutorUtil.shutdownNow() can unblock this during container shutdown. + int phase = phaser.arrive(); + try { + phaser.awaitAdvanceInterruptibly(phase); + } catch (InterruptedException e) { + // Terminate phaser on interrupt so in-flight onComplete callbacks don't stall. + phaser.forceTermination(); + Thread.currentThread().interrupt(); + } phaser.arriveAndDeregister(); } } + /** Returns the configured maximum number of outstanding async requests. */ + public int asyncTrackerMaxPermits() { + return asyncTracker.maxPermits(); + } + + /** Returns the number of currently available async-request permits. */ + public int asyncTrackerAvailablePermits() { + return asyncTracker.availablePermits(); + } + public static class Builder extends HttpSolrClientBuilderBase {