Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
public class BalancedShardsAllocator implements ShardsAllocator {

private static final Logger logger = LogManager.getLogger(BalancedShardsAllocator.class);
private static final Logger notPreferredLogger = LogManager.getLogger(BalancedShardsAllocator.class.getName() + ".not-preferred");

public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting(
"cluster.routing.allocation.balance.shard",
Expand Down Expand Up @@ -866,6 +867,12 @@ public boolean moveShards() {
// can use the cached decision.
final var moveDecision = shardMoved ? decideMove(index, shardRouting) : storedShardMovement.moveDecision();
if (moveDecision.isDecisionTaken() && moveDecision.cannotRemainAndCanMove()) {
notPreferredLogger.debug(
"Moving shard [{}] to [{}] from a NOT_PREFERRED allocation, explanation is [{}]",
shardRouting,
moveDecision.getTargetNode().getName(),
moveDecision.getAllocationDecision().getExplanation()
);
executeMove(shardRouting, index, moveDecision, "move-non-preferred");
// Return after a single move so that the change can be simulated before further moves are made.
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.common.FrequencyCappedAction;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -154,14 +155,16 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting
var nodeWriteThreadPoolQueueLatencyThreshold = writeLoadConstraintSettings.getQueueLatencyThreshold();
if (nodeWriteThreadPoolStats.maxThreadPoolQueueLatencyMillis() >= nodeWriteThreadPoolQueueLatencyThreshold.millis()) {
if (logger.isDebugEnabled() || allocation.debugDecision()) {
final Double shardWriteLoad = getShardWriteLoad(allocation, shardRouting);
final String explain = Strings.format(
"""
Node [%s] has a queue latency of [%d] millis that exceeds the queue latency threshold of [%s]. This node is \
hot-spotting. Current thread pool utilization [%f]. Moving shard(s) away.""",
hot-spotting. Current thread pool utilization [%f]. Shard write load [%s]. Moving shard(s) away.""",
node.nodeId(),
nodeWriteThreadPoolStats.maxThreadPoolQueueLatencyMillis(),
nodeWriteThreadPoolQueueLatencyThreshold.toHumanReadableString(2),
nodeWriteThreadPoolStats.averageThreadPoolUtilization()
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
shardWriteLoad
);
if (logger.isDebugEnabled()) {
logCanRemainMessage.maybeExecute(() -> logger.debug(explain));
Expand All @@ -182,6 +185,11 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting
);
}

@Nullable
private Double getShardWriteLoad(RoutingAllocation allocation, ShardRouting shardRouting) {
return allocation.clusterInfo().getShardWriteLoads().get(shardRouting.shardId());
}

/**
* Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node.
* Returns the percent thread pool utilization change.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.apache.logging.log4j.Level;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterInfo;
Expand Down Expand Up @@ -55,7 +56,9 @@
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.hamcrest.Matchers;

import java.util.ArrayList;
Expand Down Expand Up @@ -1004,6 +1007,43 @@ public void testReturnEarlyOnShardAssignmentChanges() {
applyStartedShardsUntilNoChange(clusterState, allocationService);
}

@TestLogging(
value = "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.not-preferred:DEBUG",
reason = "debug logging for test"
)
public void testNotPreferredMovementIsLoggedAtDebugLevel() {
final var clusterState = ClusterStateCreationUtils.state(randomIdentifier(), 3, 3);
final var balancedShardsAllocator = new BalancedShardsAllocator(
BalancerSettings.DEFAULT,
TEST_WRITE_LOAD_FORECASTER,
new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT)
);

final var allocation = new RoutingAllocation(new AllocationDeciders(List.<AllocationDecider>of(new AllocationDecider() {
@Override
public Decision canRemain(
IndexMetadata indexMetadata,
ShardRouting shardRouting,
RoutingNode node,
RoutingAllocation allocation
) {
return new Decision.Single(Decision.Type.NOT_PREFERRED, "test_decider", "Always NOT_PREFERRED");
}
})), clusterState.getRoutingNodes(), clusterState, ClusterInfo.EMPTY, SnapshotShardSizeInfo.EMPTY, 0L);

final var notPreferredLoggerName = BalancedShardsAllocator.class.getName() + ".not-preferred";
MockLog.assertThatLogger(
() -> balancedShardsAllocator.allocate(allocation),
notPreferredLoggerName,
new MockLog.SeenEventExpectation(
"moved a NOT_PREFERRED allocation",
notPreferredLoggerName,
Level.DEBUG,
"Moving shard [*] to [*] from a NOT_PREFERRED allocation, explanation is [Always NOT_PREFERRED]"
)
);
}

/**
* Test for {@link PrioritiseByShardWriteLoadComparator}. See Comparator Javadoc for expected
* ordering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,13 @@ private static void addToMockLogs(MockLog mockLog, List<String> loggers) {
* Executes an action and verifies expectations against the provided logger
*/
public static void assertThatLogger(Runnable action, Class<?> loggerOwner, MockLog.LoggingExpectation... expectations) {
assertThatLogger(action, loggerOwner.getCanonicalName(), expectations);
}

/**
* Executes an action and verifies expectations against the provided logger
*/
public static void assertThatLogger(Runnable action, String loggerOwner, MockLog.LoggingExpectation... expectations) {
try (var mockLog = MockLog.capture(loggerOwner)) {
for (var expectation : expectations) {
mockLog.addExpectation(expectation);
Expand Down