Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.ignite.util.GridCommandHandlerIndexingTest;
import org.apache.ignite.util.GridCommandHandlerIndexingWithSSLTest;
import org.apache.ignite.util.GridCommandHandlerLegacyClientTest;
import org.apache.ignite.util.GridCommandHandlerManagementPoolTest;
import org.apache.ignite.util.GridCommandHandlerMetadataTest;
import org.apache.ignite.util.GridCommandHandlerSslTest;
import org.apache.ignite.util.GridCommandHandlerTest;
Expand Down Expand Up @@ -80,7 +81,8 @@
BaselineEventsRemoteTest.class,

GridCommandHandlerWalTest.class,
GridCommandHandlerCheckpointTest.class
GridCommandHandlerCheckpointTest.class,
GridCommandHandlerManagementPoolTest.class,
})
public class IgniteControlUtilityTestSuite {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.util;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;

import static org.apache.ignite.Ignition.startClient;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;

/**
* Tests management pool usage for management tasks.
*/
public class GridCommandHandlerManagementPoolTest extends GridCommandHandlerClusterPerMethodAbstractTest {
/** */
private static final long TIMEOUT = 10_000L;

/** */
@Test
public void testManagementTasksWorksWhenClientPoolBlocked() throws Exception {
Ignite ignite = startGrid(0);

assertEquals(EXIT_CODE_OK, execute("--set-state", ClusterState.ACTIVE.name()));

ignite.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setSqlSchema(QueryUtils.DFLT_SCHEMA)
.setSqlFunctionClasses(TestSqlFunctions.class)
.setIndexedTypes(Integer.class, String.class)
);

TestSqlFunctions.latch = new CountDownLatch(1);

try (IgniteClient client = startClient(new ClientConfiguration().setAddresses("127.0.0.1:10800"))) {
// Block client pool by SQL queries.
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
() -> client.query(new SqlFieldsQuery("SELECT wait_latch()")).getAll(),
ClientConnectorConfiguration.DFLT_THREAD_POOL_SIZE, "client-thread");

// Check that management tasks still can be processed.
assertEquals(EXIT_CODE_OK, execute("--state")); // Native command.
assertEquals(EXIT_CODE_OK, execute("--checkpoint")); // Multi-node task command.

TestSqlFunctions.latch.countDown();

fut.get(TIMEOUT, TimeUnit.MILLISECONDS);
}
}

/** */
public static class TestSqlFunctions {
/** */
private static CountDownLatch latch;

/** */
@QuerySqlFunction
public static boolean wait_latch() {
try {
latch.await(TIMEOUT, TimeUnit.MILLISECONDS);
}
catch (InterruptedException ignored) {
return false;
}

return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.MANAGEMENT_CLIENT_ATTR;
import static org.apache.ignite.plugin.security.SecuritySubjectType.REMOTE_CLIENT;

/**
Expand All @@ -56,6 +57,9 @@ public abstract class ClientListenerAbstractConnectionContext implements ClientL
/** User attributes. */
protected Map<String, String> userAttrs;

/** If client is management. */
private volatile Boolean managementClient;

/**
* Describes the client connection:
* - thin cli: "cli:host:port@user_name"
Expand Down Expand Up @@ -196,4 +200,12 @@ public int nextTxId() {
@Override public Map<String, String> attributes() {
return F.isEmpty(userAttrs) ? Collections.emptyMap() : Collections.unmodifiableMap(userAttrs);
}

/** {@inheritDoc} */
@Override public boolean managementClient() {
if (managementClient == null)
managementClient = Boolean.parseBoolean(attributes().get(MANAGEMENT_CLIENT_ATTR));

return managementClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.MANAGEMENT_CLIENT_ATTR;

/**
* SQL listener connection context.
*/
Expand Down Expand Up @@ -94,7 +92,5 @@ void initializeFromHandshake(GridNioSession ses, ClientListenerProtocolVersion v
/**
* @return {@code True} if client is management.
*/
default boolean managementClient() {
return Boolean.parseBoolean(attributes().get(MANAGEMENT_CLIENT_ATTR));
}
boolean managementClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerPool;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
Expand Down Expand Up @@ -122,6 +124,9 @@ public class ClientListenerProcessor extends GridProcessorAdapter {
/** Executor service. */
private ExecutorService execSvc;

/** Management pool. */
private GridWorkerPool mgmtPool;

/** Thin client distributed configuration. */
private DistributedThinClientConfiguration distrThinCfg;

Expand Down Expand Up @@ -161,6 +166,7 @@ public ClientListenerProcessor(GridKernalContext ctx) {
}

execSvc = ctx.pools().getThinClientExecutorService();
mgmtPool = new GridWorkerPool(ctx.pools().getManagementExecutorService(), log);

Exception lastErr = null;

Expand Down Expand Up @@ -435,10 +441,43 @@ private void unregisterMBean() {
else {
connCtx.handler().registerRequest(reqId, cmdType);

super.onMessageReceived(ses, msg);
onMessageReceived(ses, connCtx, msg);
}
}
else
onMessageReceived(ses, connCtx, msg);
}

/** */
private void onMessageReceived(
GridNioSession ses,
@Nullable ClientListenerConnectionContext connCtx,
Object msg
) throws IgniteCheckedException {
if (connCtx == null) {
// Process handshake in NIO thread.
try {
proceedMessageReceived(ses, msg);
}
catch (IgniteCheckedException e) {
handleException(ses, e);
}
}
else if (connCtx.managementClient()) {
// Process management messages in management pool.
mgmtPool.execute(
new GridWorker(ctx.igniteInstanceName(), "management-message-received-notify", log) {
@Override protected void body() {
try {
proceedMessageReceived(ses, msg);
}
catch (IgniteCheckedException e) {
handleException(ses, e);
}
}
});
}
else // Process regular messages in client-listener pool.
super.onMessageReceived(ses, msg);
}
};
Expand Down Expand Up @@ -488,6 +527,10 @@ private void unregisterMBean() {

execSvc = null;

mgmtPool.join(cancel);

mgmtPool = null;

if (!U.IGNITE_MBEANS_DISABLED)
unregisterMBean();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public GridNioAsyncNotifyFilter(String igniteInstanceName, Executor exec, Ignite
* @param ses Session.
* @param ex Exception.
*/
private void handleException(GridNioSession ses, IgniteCheckedException ex) {
protected void handleException(GridNioSession ses, IgniteCheckedException ex) {
try {
proceedExceptionCaught(ses, ex);
}
Expand Down
Loading