From 7d0eefd3f7a6b81569884a2192ad71e208fffb71 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Thu, 28 May 2026 19:01:35 +0300 Subject: [PATCH 1/3] IGNITE-28734 Process control.sh messages in management pool --- .../IgniteControlUtilityTestSuite.java | 4 +- .../GridCommandHandlerManagementPoolTest.java | 95 +++++++++++++++++++ ...ientListenerAbstractConnectionContext.java | 12 +++ .../odbc/ClientListenerConnectionContext.java | 6 +- .../odbc/ClientListenerProcessor.java | 45 ++++++++- .../util/nio/GridNioAsyncNotifyFilter.java | 2 +- 6 files changed, 156 insertions(+), 8 deletions(-) create mode 100644 modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java index 0ed39ff19be2a..1c5701b6adaf9 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java @@ -34,6 +34,7 @@ import org.apache.ignite.util.GridCommandHandlerIndexingTest; import org.apache.ignite.util.GridCommandHandlerIndexingWithSSLTest; import org.apache.ignite.util.GridCommandHandlerLegacyClientTest; +import org.apache.ignite.util.GridCommandHandlerManagementPoolTest; import org.apache.ignite.util.GridCommandHandlerMetadataTest; import org.apache.ignite.util.GridCommandHandlerSslTest; import org.apache.ignite.util.GridCommandHandlerTest; @@ -80,7 +81,8 @@ BaselineEventsRemoteTest.class, GridCommandHandlerWalTest.class, - GridCommandHandlerCheckpointTest.class + GridCommandHandlerCheckpointTest.class, + GridCommandHandlerManagementPoolTest.class, }) public class IgniteControlUtilityTestSuite { } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java new file mode 100644 index 0000000000000..8d61c79b8bcf5 --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.configuration.ClientConnectorConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +import static org.apache.ignite.Ignition.startClient; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; + +/** + * Tests management pool usage for management tasks. + */ +public class GridCommandHandlerManagementPoolTest extends GridCommandHandlerClusterPerMethodAbstractTest { + /** */ + private static final long TIMEOUT = 10_000L; + + /** */ + @Test + public void testManagementTasksWorksWhenClientPoolBlocked() throws Exception { + Ignite ignite = startGrid(0); + + assertEquals(EXIT_CODE_OK, execute("--set-state", ClusterState.ACTIVE.name())); + + ignite.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setSqlSchema(QueryUtils.DFLT_SCHEMA) + .setSqlFunctionClasses(TestSqlFunctions.class) + .setIndexedTypes(Integer.class, String.class) + ); + + TestSqlFunctions.latch = new CountDownLatch(1); + + try (IgniteClient client = startClient(new ClientConfiguration().setAddresses("127.0.0.1:10800"))) { + // Block client pool by SQL queries. + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync( + () -> client.query(new SqlFieldsQuery("SELECT wait_latch()")).getAll(), + ClientConnectorConfiguration.DFLT_THREAD_POOL_SIZE, "client-thread"); + + // Check that management tasks still can be processed. + assertEquals(EXIT_CODE_OK, execute("--state")); // Native command. + assertEquals(EXIT_CODE_OK, execute("--checkpoint")); // Multi-node task command. + + + TestSqlFunctions.latch.countDown(); + + fut.get(TIMEOUT, TimeUnit.MILLISECONDS); + } + } + + /** */ + public static class TestSqlFunctions { + /** */ + private static CountDownLatch latch; + + /** */ + @QuerySqlFunction + public static boolean wait_latch() { + try { + latch.await(TIMEOUT, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ignored) { + return false; + } + + return true; + } + } +} \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java index 3cdbf56b82eaf..fa376668ab1e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java @@ -32,6 +32,7 @@ import org.apache.ignite.plugin.security.SecurityCredentials; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.MANAGEMENT_CLIENT_ATTR; import static org.apache.ignite.plugin.security.SecuritySubjectType.REMOTE_CLIENT; /** @@ -56,6 +57,9 @@ public abstract class ClientListenerAbstractConnectionContext implements ClientL /** User attributes. */ protected Map userAttrs; + /** If client is management. */ + private volatile Boolean managementClient; + /** * Describes the client connection: * - thin cli: "cli:host:port@user_name" @@ -196,4 +200,12 @@ public int nextTxId() { @Override public Map attributes() { return F.isEmpty(userAttrs) ? Collections.emptyMap() : Collections.unmodifiableMap(userAttrs); } + + /** {@inheritDoc} */ + @Override public boolean managementClient() { + if (managementClient == null) + managementClient = Boolean.parseBoolean(attributes().get(MANAGEMENT_CLIENT_ATTR)); + + return managementClient; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java index 0848f458b75ed..88ce50de86647 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java @@ -24,8 +24,6 @@ import org.apache.ignite.internal.util.nio.GridNioSession; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.MANAGEMENT_CLIENT_ATTR; - /** * SQL listener connection context. */ @@ -94,7 +92,5 @@ void initializeFromHandshake(GridNioSession ses, ClientListenerProtocolVersion v /** * @return {@code True} if client is management. */ - default boolean managementClient() { - return Boolean.parseBoolean(attributes().get(MANAGEMENT_CLIENT_ATTR)); - } + boolean managementClient(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java index d092d37a4c1e2..68af9dfe14aeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java @@ -59,6 +59,8 @@ import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.util.worker.GridWorkerPool; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.metric.MetricRegistry; import org.apache.ignite.mxbean.ClientProcessorMXBean; @@ -122,6 +124,9 @@ public class ClientListenerProcessor extends GridProcessorAdapter { /** Executor service. */ private ExecutorService execSvc; + /** Management pool. */ + private GridWorkerPool mgmtPool; + /** Thin client distributed configuration. */ private DistributedThinClientConfiguration distrThinCfg; @@ -161,6 +166,7 @@ public ClientListenerProcessor(GridKernalContext ctx) { } execSvc = ctx.pools().getThinClientExecutorService(); + mgmtPool = new GridWorkerPool(ctx.pools().getManagementExecutorService(), log); Exception lastErr = null; @@ -435,10 +441,43 @@ private void unregisterMBean() { else { connCtx.handler().registerRequest(reqId, cmdType); - super.onMessageReceived(ses, msg); + onMessageReceived(ses, connCtx, msg); } } else + onMessageReceived(ses, connCtx, msg); + } + + /** */ + private void onMessageReceived( + GridNioSession ses, + @Nullable ClientListenerConnectionContext connCtx, + Object msg + ) throws IgniteCheckedException { + if (connCtx == null) { + // Process handshake in NIO thread. + try { + proceedMessageReceived(ses, msg); + } + catch (IgniteCheckedException e) { + handleException(ses, e); + } + } + else if (connCtx.managementClient()) { + // Process management messages in management pool. + mgmtPool.execute( + new GridWorker(ctx.igniteInstanceName(), "management-message-received-notify", log) { + @Override protected void body() { + try { + proceedMessageReceived(ses, msg); + } + catch (IgniteCheckedException e) { + handleException(ses, e); + } + } + }); + } + else // Process regular messages in client-listener pool. super.onMessageReceived(ses, msg); } }; @@ -488,6 +527,10 @@ private void unregisterMBean() { execSvc = null; + mgmtPool.join(cancel); + + mgmtPool = null; + if (!U.IGNITE_MBEANS_DISABLED) unregisterMBean(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java index ccb7f38b9121f..3fb1ec5dd4a8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java @@ -139,7 +139,7 @@ public GridNioAsyncNotifyFilter(String igniteInstanceName, Executor exec, Ignite * @param ses Session. * @param ex Exception. */ - private void handleException(GridNioSession ses, IgniteCheckedException ex) { + protected void handleException(GridNioSession ses, IgniteCheckedException ex) { try { proceedExceptionCaught(ses, ex); } From acdaad2673c7e64916b8c0949f75842e9e9c1b56 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Thu, 28 May 2026 19:15:00 +0300 Subject: [PATCH 2/3] IGNITE-28734 Process control.sh messages in management pool --- .../ignite/util/GridCommandHandlerManagementPoolTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java index 8d61c79b8bcf5..e970c57d20288 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java @@ -92,4 +92,4 @@ public static boolean wait_latch() { return true; } } -} \ No newline at end of file +} From 19fbee31193d0ba8e523dffa4f70861280dcf0e5 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Thu, 28 May 2026 19:15:28 +0300 Subject: [PATCH 3/3] IGNITE-28734 Process control.sh messages in management pool --- .../apache/ignite/util/GridCommandHandlerManagementPoolTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java index e970c57d20288..cd364a74e8a88 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java @@ -67,7 +67,6 @@ public void testManagementTasksWorksWhenClientPoolBlocked() throws Exception { assertEquals(EXIT_CODE_OK, execute("--state")); // Native command. assertEquals(EXIT_CODE_OK, execute("--checkpoint")); // Multi-node task command. - TestSqlFunctions.latch.countDown(); fut.get(TIMEOUT, TimeUnit.MILLISECONDS);