From 29e522e4e8229ae7944f379633becf4629ed2da6 Mon Sep 17 00:00:00 2001 From: Siddhant Sangwan Date: Fri, 13 Mar 2026 13:20:34 +0530 Subject: [PATCH 1/6] HDDS-14822. Add a configuration for excluding Datanodes from pipelines --- .../org/apache/hadoop/hdds/scm/ScmConfig.java | 114 +++++++++++++ .../apache/hadoop/hdds/scm/TestScmConfig.java | 81 ++++++++++ .../scm/container/ContainerManagerImpl.java | 8 + .../scm/pipeline/PipelineManagerImpl.java | 60 ++++++- .../pipeline/WritableECContainerProvider.java | 16 ++ .../WritableRatisContainerProvider.java | 18 +++ .../container/TestContainerManagerImpl.java | 41 +++++ .../scm/pipeline/TestPipelineManagerImpl.java | 153 ++++++++++++++++++ .../TestWritableECContainerProvider.java | 92 +++++++++++ .../TestWritableRatisContainerProvider.java | 84 +++++++++- 10 files changed, 660 insertions(+), 7 deletions(-) create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestScmConfig.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java index 0876bd22ea4f..299ccdbad0c2 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java @@ -17,12 +17,23 @@ package org.apache.hadoop.hdds.scm; +import com.google.common.collect.ImmutableSet; import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Locale; +import java.util.Objects; +import java.util.Set; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; import org.apache.hadoop.hdds.conf.ConfigType; +import org.apache.hadoop.hdds.conf.PostConstruct; import org.apache.hadoop.hdds.conf.ReconfigurableConfig; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; /** * The configuration class for the SCM service. @@ -102,6 +113,19 @@ public class ScmConfig extends ReconfigurableConfig { ) private String ecPipelineChoosePolicyName; + @Config(key = "hdds.scm.pipeline.exclude.datanodes", + type = ConfigType.STRING, + defaultValue = "", + tags = { ConfigTag.SCM, ConfigTag.PIPELINE }, + description = + "Comma-separated list of Datanodes to exclude from SCM pipeline creation and pipeline selection. " + + "Each entry may be a Datanode UUID, hostname, or IP address. " + + "Example: \",dn-1.example.com,10.0.0.12\"." + ) + private String pipelineExcludeDatanodes = ""; + + private volatile PipelineExcludedNodes pipelineExcludedNodes = PipelineExcludedNodes.EMPTY; + @Config(key = "hdds.scm.block.deletion.per-interval.max", type = ConfigType.INT, defaultValue = "500000", @@ -138,6 +162,11 @@ public class ScmConfig extends ReconfigurableConfig { ) private int transactionToDNsCommitMapLimit = 5000000; + @PostConstruct + public void parsePipelineExcludedNodes() { + pipelineExcludedNodes = PipelineExcludedNodes.parse(pipelineExcludeDatanodes); + } + public int getTransactionToDNsCommitMapLimit() { return transactionToDNsCommitMapLimit; } @@ -194,6 +223,10 @@ public String getECPipelineChoosePolicyName() { return ecPipelineChoosePolicyName; } + public PipelineExcludedNodes getPipelineExcludedNodes() { + return pipelineExcludedNodes; + } + public int getBlockDeletionLimit() { return blockDeletionLimit; } @@ -209,4 +242,85 @@ public static class ConfigStrings { public static final String HDDS_SCM_KERBEROS_PRINCIPAL_KEY = "hdds.scm.kerberos.principal"; public static final String HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY = "hdds.scm.kerberos.keytab.file"; } + + /** + * Parsed and normalized snapshot of {@code hdds.scm.pipeline.exclude.datanodes}. + */ + public static final class PipelineExcludedNodes { + public static final PipelineExcludedNodes EMPTY = new PipelineExcludedNodes( + Collections.emptySet(), Collections.emptySet()); + + private final Set excludedDatanodeIds; + private final Set excludedAddressTokens; + + private PipelineExcludedNodes(Set excludedDatanodeIds, Set excludedAddressTokens) { + this.excludedDatanodeIds = ImmutableSet.copyOf(excludedDatanodeIds); + this.excludedAddressTokens = ImmutableSet.copyOf(excludedAddressTokens); + } + + public static PipelineExcludedNodes parse(String rawValue) { + if (rawValue == null || rawValue.trim().isEmpty()) { + return EMPTY; + } + + Set datanodeIDs = new HashSet<>(); + Set addressTokens = new HashSet<>(); + + Arrays.stream(rawValue.split(",")) + .map(String::trim) + .filter(token -> !token.isEmpty()) + .forEach(token -> { + try { + datanodeIDs.add(DatanodeID.fromUuidString(token)); + } catch (IllegalArgumentException ignored) { + addressTokens.add(normalizeAddress(token)); + } + }); + + if (datanodeIDs.isEmpty() && addressTokens.isEmpty()) { + return EMPTY; + } + return new PipelineExcludedNodes(datanodeIDs, addressTokens); + } + + public boolean isEmpty() { + return excludedDatanodeIds.isEmpty() && excludedAddressTokens.isEmpty(); + } + + public Set getExcludedDatanodeIds() { + return excludedDatanodeIds; + } + + public Set getExcludedAddressTokens() { + return excludedAddressTokens; + } + + public boolean isExcluded(DatanodeDetails datanodeDetails) { + Objects.requireNonNull(datanodeDetails, "datanodeDetails cannot be null"); + if (excludedDatanodeIds.contains(datanodeDetails.getID())) { + return true; + } + + final String hostName = datanodeDetails.getHostName(); + if (hostName != null && excludedAddressTokens.contains(normalizeAddress(hostName))) { + return true; + } + + final String ipAddress = datanodeDetails.getIpAddress(); + return ipAddress != null && excludedAddressTokens.contains(normalizeAddress(ipAddress)); + } + + public boolean isExcluded(Pipeline pipeline) { + for (DatanodeDetails dn : pipeline.getNodes()) { + if (isExcluded(dn)) { + return true; + } + } + return false; + } + + private static String normalizeAddress(String value) { + return value.toLowerCase(Locale.ROOT); + } + } } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestScmConfig.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestScmConfig.java new file mode 100644 index 000000000000..f1ce3ed7588f --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestScmConfig.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.UUID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link ScmConfig}. + */ +class TestScmConfig { + + @Test + void testPipelineExcludedNodesDefaultsToEmpty() { + OzoneConfiguration conf = new OzoneConfiguration(); + ScmConfig scmConfig = conf.getObject(ScmConfig.class); + + assertTrue(scmConfig.getPipelineExcludedNodes().isEmpty()); + } + + @Test + void testPipelineExcludedNodesParsesUuidHostnameAndIp() { + String uuid = UUID.randomUUID().toString(); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("hdds.scm.pipeline.exclude.datanodes", + uuid + ", DN-1.EXAMPLE.COM, 10.0.0.12, dn-1.example.com"); + ScmConfig scmConfig = conf.getObject(ScmConfig.class); + + ScmConfig.PipelineExcludedNodes first = scmConfig.getPipelineExcludedNodes(); + ScmConfig.PipelineExcludedNodes second = scmConfig.getPipelineExcludedNodes(); + + assertFalse(first.isEmpty()); + assertSame(first, second, "Snapshot should be parsed once and reused"); + assertEquals(1, first.getExcludedDatanodeIds().size()); + assertTrue(first.getExcludedDatanodeIds().contains(DatanodeID.fromUuidString(uuid))); + assertTrue(first.getExcludedAddressTokens().contains("dn-1.example.com")); + assertTrue(first.getExcludedAddressTokens().contains("10.0.0.12")); + assertEquals(2, first.getExcludedAddressTokens().size()); + } + + @Test + void testPipelineExcludedNodesMatchesDatanodeByIdAndAddress() { + DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails(); + datanode.setHostName("dn-2.example.com"); + datanode.setIpAddress("10.10.10.10"); + + ScmConfig.PipelineExcludedNodes byUUID = ScmConfig.PipelineExcludedNodes.parse(datanode.getUuidString()); + assertTrue(byUUID.isExcluded(datanode)); + + ScmConfig.PipelineExcludedNodes byHost = ScmConfig.PipelineExcludedNodes.parse("DN-2.EXAMPLE.COM"); + assertTrue(byHost.isExcluded(datanode)); + + ScmConfig.PipelineExcludedNodes byIp = ScmConfig.PipelineExcludedNodes.parse("10.10.10.10"); + assertTrue(byIp.isExcluded(datanode)); + } +} + diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index dc701a0be661..b5f5d11ed383 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -34,11 +34,13 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; @@ -80,6 +82,7 @@ public class ContainerManagerImpl implements ContainerManager { private final Random random = new Random(); private final long maxContainerSize; + private final ScmConfig.PipelineExcludedNodes pipelineExcludedNodes; /** * @@ -108,6 +111,9 @@ public ContainerManagerImpl( maxContainerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + pipelineExcludedNodes = + new OzoneConfiguration(conf).getObject(ScmConfig.class) + .getPipelineExcludedNodes(); this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create(); } @@ -182,6 +188,7 @@ public ContainerInfo allocateContainer( try { pipelines = pipelineManager .getPipelines(replicationConfig, Pipeline.PipelineState.OPEN); + pipelines.removeIf(pipelineExcludedNodes::isExcluded); if (!pipelines.isEmpty()) { pipeline = pipelines.get(random.nextInt(pipelines.size())); containerInfo = createContainer(pipeline, owner); @@ -209,6 +216,7 @@ public ContainerInfo allocateContainer( try { pipelines = pipelineManager .getPipelines(replicationConfig, Pipeline.PipelineState.OPEN); + pipelines.removeIf(pipelineExcludedNodes::isExcluded); if (!pipelines.isEmpty()) { pipeline = pipelines.get(random.nextInt(pipelines.size())); containerInfo = createContainer(pipeline, owner); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 9c529e22e7e1..94a35c2a4efb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -18,14 +18,17 @@ package org.apache.hadoop.hdds.scm.pipeline; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import java.io.IOException; import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -41,10 +44,12 @@ import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -97,6 +102,8 @@ public class PipelineManagerImpl implements PipelineManager { // SCM is already out of SafeMode. private AtomicBoolean freezePipelineCreation; private final Clock clock; + private final ScmConfig.PipelineExcludedNodes pipelineExcludedNodes; + private final Set configuredExcludedDatanodeDetails; @SuppressWarnings("checkstyle:parameterNumber") protected PipelineManagerImpl(ConfigurationSource conf, @@ -124,6 +131,8 @@ protected PipelineManagerImpl(ConfigurationSource conf, HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); this.freezePipelineCreation = new AtomicBoolean(); + this.pipelineExcludedNodes = conf.getObject(ScmConfig.class).getPipelineExcludedNodes(); + this.configuredExcludedDatanodeDetails = resolvePipelineExcludedNodesToDatanodeDetails(); } @SuppressWarnings("checkstyle:parameterNumber") @@ -216,8 +225,8 @@ public Pipeline buildECPipeline(ReplicationConfig replicationConfig, throw new IllegalArgumentException("Replication type must be EC"); } checkIfPipelineCreationIsAllowed(replicationConfig); - return pipelineFactory.create(replicationConfig, excludedNodes, - favoredNodes); + List allExcludedNodes = mergeConfiguredExcludedNodes(excludedNodes); + return pipelineFactory.create(replicationConfig, allExcludedNodes, favoredNodes); } /** @@ -250,13 +259,13 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig, List excludedNodes, List favoredNodes) throws IOException { checkIfPipelineCreationIsAllowed(replicationConfig); + List allExcludedNodes = mergeConfiguredExcludedNodes(excludedNodes); acquireWriteLock(); final Pipeline pipeline; try { try { - pipeline = pipelineFactory.create(replicationConfig, - excludedNodes, favoredNodes); + pipeline = pipelineFactory.create(replicationConfig, allExcludedNodes, favoredNodes); } catch (IOException e) { metrics.incNumPipelineCreationFailed(); throw e; @@ -302,6 +311,45 @@ private void addPipelineToManager(Pipeline pipeline) recordMetricsForPipeline(pipeline); } + private List mergeConfiguredExcludedNodes(List excludedNodes) { + if ((excludedNodes == null || excludedNodes.isEmpty()) && configuredExcludedDatanodeDetails.isEmpty()) { + return Collections.emptyList(); + } + if (excludedNodes == null || excludedNodes.isEmpty()) { + return new ArrayList<>(configuredExcludedDatanodeDetails); + } + if (configuredExcludedDatanodeDetails.isEmpty()) { + return excludedNodes; + } + + Set mergedExcludedNodes = + new HashSet<>(configuredExcludedDatanodeDetails); + mergedExcludedNodes.addAll(excludedNodes); + return new ArrayList<>(mergedExcludedNodes); + } + + private Set resolvePipelineExcludedNodesToDatanodeDetails() { + if (pipelineExcludedNodes.isEmpty()) { + return Collections.emptySet(); + } + + Set resolved = new HashSet<>(); + for (DatanodeID datanodeID : pipelineExcludedNodes.getExcludedDatanodeIds()) { + DatanodeDetails datanodeDetails = nodeManager.getNode(datanodeID); + if (datanodeDetails != null) { + resolved.add(datanodeDetails); + } + } + for (String address : pipelineExcludedNodes.getExcludedAddressTokens()) { + List datanodes = nodeManager.getNodesByAddress(address); + if (datanodes != null) { + resolved.addAll(datanodes); + } + } + + return ImmutableSet.copyOf(resolved); + } + private boolean factorOne(ReplicationConfig replicationConfig) { if (replicationConfig.getReplicationType() == ReplicationType.RATIS) { return ((RatisReplicationConfig) replicationConfig).getReplicationFactor() @@ -860,6 +908,10 @@ public PipelineStateManager getStateManager() { return stateManager; } + ScmConfig.PipelineExcludedNodes getPipelineExcludedNodesConfig() { + return pipelineExcludedNodes; + } + @VisibleForTesting public SCMHAManager getScmhaManager() { return scmhaManager; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java index d7c4b7705f46..c86d9014ed13 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; import org.apache.hadoop.hdds.scm.PipelineRequestInformation; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -60,6 +61,7 @@ public class WritableECContainerProvider private final ContainerManager containerManager; private final long containerSize; private final WritableECContainerProviderConfig providerConfig; + private final ScmConfig.PipelineExcludedNodes pipelineExcludedNodes; public WritableECContainerProvider(WritableECContainerProviderConfig config, long containerSize, @@ -73,6 +75,19 @@ public WritableECContainerProvider(WritableECContainerProviderConfig config, this.containerManager = containerManager; this.pipelineChoosePolicy = pipelineChoosePolicy; this.containerSize = containerSize; + this.pipelineExcludedNodes = configuredPipelineExcludedNodes(pipelineManager); + } + + private static ScmConfig.PipelineExcludedNodes configuredPipelineExcludedNodes( + PipelineManager pipelineManager) { + if (pipelineManager instanceof PipelineManagerImpl) { + ScmConfig.PipelineExcludedNodes excludedNodes = + ((PipelineManagerImpl) pipelineManager).getPipelineExcludedNodesConfig(); + if (excludedNodes != null) { + return excludedNodes; + } + } + return ScmConfig.PipelineExcludedNodes.EMPTY; } /** @@ -115,6 +130,7 @@ public ContainerInfo getContainer(final long size, } List existingPipelines = pipelineManager.getPipelines( repConfig, Pipeline.PipelineState.OPEN); + existingPipelines.removeIf(pipelineExcludedNodes::isExcluded); final int pipelineCount = existingPipelines.size(); LOG.debug("Checking existing pipelines: {}", existingPipelines); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java index a61b32892352..401036a61e28 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; import org.apache.hadoop.hdds.scm.PipelineRequestInformation; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -43,6 +44,7 @@ public class WritableRatisContainerProvider private final PipelineManager pipelineManager; private final PipelineChoosePolicy pipelineChoosePolicy; private final ContainerManager containerManager; + private final ScmConfig.PipelineExcludedNodes pipelineExcludedNodes; public WritableRatisContainerProvider( PipelineManager pipelineManager, @@ -51,6 +53,19 @@ public WritableRatisContainerProvider( this.pipelineManager = pipelineManager; this.containerManager = containerManager; this.pipelineChoosePolicy = pipelineChoosePolicy; + this.pipelineExcludedNodes = configuredPipelineExcludedNodes(pipelineManager); + } + + private static ScmConfig.PipelineExcludedNodes configuredPipelineExcludedNodes( + PipelineManager pipelineManager) { + if (pipelineManager instanceof PipelineManagerImpl) { + ScmConfig.PipelineExcludedNodes excludedNodes = + ((PipelineManagerImpl) pipelineManager).getPipelineExcludedNodesConfig(); + if (excludedNodes != null) { + return excludedNodes; + } + } + return ScmConfig.PipelineExcludedNodes.EMPTY; } @Override @@ -164,10 +179,13 @@ private List findPipelinesByState( List pipelines = pipelineManager.getPipelines(repConfig, pipelineState, excludeList.getDatanodes(), excludeList.getPipelineIds()); + pipelines.removeIf(pipelineExcludedNodes::isExcluded); if (pipelines.isEmpty() && !excludeList.isEmpty()) { // if no pipelines can be found, try finding pipeline without // exclusion pipelines = pipelineManager.getPipelines(repConfig, pipelineState); + // but still, configured excluded nodes must be excluded + pipelines.removeIf(pipelineExcludedNodes::isExcluded); } return pipelines; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java index daf0b4b4c6a8..9b794fe973d1 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -27,8 +27,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -347,6 +351,43 @@ void testAllocateContainersWithECReplicationConfig() throws Exception { containerManager.getContainer(admin.containerID())); } + /** + * First there's only one pipeline, and a DN in that pipeline is excluded, so container creation fails. Then + * another allowed is made availabel and container creation passes. + */ + @Test + void testAllocateContainerFiltersConfiguredExcludedPipelines() + throws Exception { + OzoneConfiguration conf = SCMTestUtils.getConf(new File(testDir, "exclude")); + RatisReplicationConfig replicationConfig = + RatisReplicationConfig.getInstance(ReplicationFactor.THREE); + List pipelines = pipelineManager.getPipelines(replicationConfig, Pipeline.PipelineState.OPEN); + assertEquals(1, pipelines.size()); + Pipeline excludedPipeline = pipelines.get(0); + String excludedUuid = excludedPipeline.getNodes().get(0).getUuidString(); + conf.set("hdds.scm.pipeline.exclude.datanodes", excludedUuid); + + ContainerManager manager = new ContainerManagerImpl(conf, + scmhaManager, sequenceIdGen, pipelineManager, + SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock); + clearInvocations(pipelineManager); + + // No non-excluded pipeline is available, and pipeline creation is blocked. + doThrow(new IOException("No pipeline available")) + .when(pipelineManager).createPipeline(replicationConfig); + assertThrows(IOException.class, () -> manager.allocateContainer(replicationConfig, "admin")); + verify(pipelineManager, times(1)).createPipeline(replicationConfig); + + // Make a new pipeline available and ensure allocation succeeds. + doCallRealMethod().when(pipelineManager).createPipeline(replicationConfig); + Pipeline allowedPipeline = pipelineManager.createPipeline(replicationConfig); + clearInvocations(pipelineManager); + + ContainerInfo container = manager.allocateContainer(replicationConfig, "admin"); + assertEquals(allowedPipeline.getId(), container.getPipelineID()); + verify(pipelineManager, never()).createPipeline(replicationConfig); + } + @Test void testUpdateContainerReplicaInvokesPendingOp() throws IOException, TimeoutException { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index e7fc6f14f9b6..4ccc4724f45e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -999,6 +999,159 @@ public void testHasEnoughSpace() throws IOException { assertFalse(pipelineManager.hasEnoughSpace(pipeline, containerSize)); } + /** + * {@link PipelineManagerImpl#createPipeline(ReplicationConfig, List, List)} calls + * {@link PipelineFactory#create(ReplicationConfig, List, List)} to create a pipeline. This test asserts that + * {@link PipelineManagerImpl#createPipeline(ReplicationConfig, List, List)} creates a merged list of both configured + * excluded nodes (hdds.scm.pipeline.exclude.datanodes) and client provided excluded nodes to pass as argument to the + * PipelineFactory create method. + */ + @Test + public void testConfiguredExcludedNodesMergedForCreatePipeline() throws Exception { + DatanodeDetails excludedByUuid = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails excludedByAddress = MockDatanodeDetails.randomDatanodeDetails(); + excludedByAddress.setIpAddress("10.0.0.25"); + DatanodeDetails excludedByClient = MockDatanodeDetails.randomDatanodeDetails(); + + conf.set("hdds.scm.pipeline.exclude.datanodes", + excludedByUuid.getUuidString() + "," + excludedByAddress.getIpAddress()); + + SCMContext localContext = SCMContext.emptyContext(); + localContext.updateLeaderAndTerm(true, 1); + NodeManager localNodeManager = mock(NodeManager.class); + when(localNodeManager.getNode(excludedByUuid.getID())).thenReturn(excludedByUuid); + when(localNodeManager.getNodesByAddress(excludedByAddress.getIpAddress())) + .thenReturn(Collections.singletonList(excludedByAddress)); + PipelineStateManager localStateManager = mock(PipelineStateManager.class); + PipelineFactory localFactory = mock(PipelineFactory.class); + + try (PipelineManagerImpl pipelineManager = new PipelineManagerImpl(conf, + SCMHAManagerStub.getInstance(true), localNodeManager, localStateManager, + localFactory, new EventQueue(), localContext, testClock)) { + RatisReplicationConfig replicationConfig = RatisReplicationConfig.getInstance(ReplicationFactor.THREE); + Pipeline createdPipeline = Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.ALLOCATED) + .setReplicationConfig(replicationConfig) + .setNodes(ImmutableList.of( + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails())) + .build(); + + when(localFactory.create(eq(replicationConfig), any(), eq(Collections.emptyList()))).thenReturn(createdPipeline); + + pipelineManager.createPipeline(replicationConfig, + Collections.singletonList(excludedByClient), Collections.emptyList()); + + ArgumentCaptor> excludedCaptor = ArgumentCaptor.forClass(List.class); + verify(localFactory).create(eq(replicationConfig), excludedCaptor.capture(), eq(Collections.emptyList())); + List allExcluded = excludedCaptor.getValue(); + assertThat(allExcluded).contains(excludedByClient, excludedByUuid, excludedByAddress); + } + } + + @Test + public void testConfiguredExcludedNodesMergedForBuildECPipeline() throws Exception { + DatanodeDetails excludedByUuid = MockDatanodeDetails.randomDatanodeDetails(); + conf.set("hdds.scm.pipeline.exclude.datanodes", excludedByUuid.getUuidString()); + + SCMContext localContext = SCMContext.emptyContext(); + localContext.updateLeaderAndTerm(true, 1); + NodeManager localNodeManager = mock(NodeManager.class); + when(localNodeManager.getNode(excludedByUuid.getID())).thenReturn(excludedByUuid); + PipelineStateManager localStateManager = mock(PipelineStateManager.class); + PipelineFactory localFactory = mock(PipelineFactory.class); + + try (PipelineManagerImpl pipelineManager = new PipelineManagerImpl(conf, + SCMHAManagerStub.getInstance(true), localNodeManager, localStateManager, + localFactory, new EventQueue(), localContext, testClock)) { + ECReplicationConfig ecConfig = new ECReplicationConfig(3, 2); + Pipeline builtPipeline = MockPipeline.createEcPipeline(ecConfig); + when(localFactory.create(eq(ecConfig), any(), eq(Collections.emptyList()))).thenReturn(builtPipeline); + + pipelineManager.buildECPipeline(ecConfig, new ArrayList<>(), Collections.emptyList()); + + ArgumentCaptor> excludedCaptor = ArgumentCaptor.forClass(List.class); + verify(localFactory).create(eq(ecConfig), excludedCaptor.capture(), eq(Collections.emptyList())); + assertThat(excludedCaptor.getValue()).contains(excludedByUuid); + } + } + + @Test + public void testConfiguredUnknownAddressTokenIgnoredInCreatePipeline() throws Exception { + DatanodeDetails callerExcluded = MockDatanodeDetails.randomDatanodeDetails(); + conf.set("hdds.scm.pipeline.exclude.datanodes", "unknown-dn-host"); + + SCMContext localContext = SCMContext.emptyContext(); + localContext.updateLeaderAndTerm(true, 1); + NodeManager localNodeManager = mock(NodeManager.class); + when(localNodeManager.getNodesByAddress("unknown-dn-host")).thenReturn(Collections.emptyList()); + PipelineStateManager localStateManager = mock(PipelineStateManager.class); + PipelineFactory localFactory = mock(PipelineFactory.class); + + try (PipelineManagerImpl pipelineManager = new PipelineManagerImpl(conf, + SCMHAManagerStub.getInstance(true), localNodeManager, localStateManager, + localFactory, new EventQueue(), localContext, testClock)) { + RatisReplicationConfig replicationConfig = RatisReplicationConfig.getInstance(ReplicationFactor.THREE); + Pipeline createdPipeline = Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.ALLOCATED) + .setReplicationConfig(replicationConfig) + .setNodes(ImmutableList.of( + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails())) + .build(); + + when(localFactory.create(eq(replicationConfig), any(), eq(Collections.emptyList()))).thenReturn(createdPipeline); + + pipelineManager.createPipeline(replicationConfig, + Collections.singletonList(callerExcluded), Collections.emptyList()); + + ArgumentCaptor> excludedCaptor = ArgumentCaptor.forClass(List.class); + verify(localFactory).create(eq(replicationConfig), excludedCaptor.capture(), eq(Collections.emptyList())); + assertThat(excludedCaptor.getValue()).containsExactly(callerExcluded); + } + } + + @Test + public void testConfiguredAndCallerExcludedNodesAreDeduplicated() throws Exception { + DatanodeDetails excludedByUuid = MockDatanodeDetails.randomDatanodeDetails(); + conf.set("hdds.scm.pipeline.exclude.datanodes", excludedByUuid.getUuidString()); + + SCMContext localContext = SCMContext.emptyContext(); + localContext.updateLeaderAndTerm(true, 1); + NodeManager localNodeManager = mock(NodeManager.class); + when(localNodeManager.getNode(excludedByUuid.getID())).thenReturn(excludedByUuid); + PipelineStateManager localStateManager = mock(PipelineStateManager.class); + PipelineFactory localFactory = mock(PipelineFactory.class); + + try (PipelineManagerImpl pipelineManager = new PipelineManagerImpl(conf, + SCMHAManagerStub.getInstance(true), localNodeManager, localStateManager, + localFactory, new EventQueue(), localContext, testClock)) { + RatisReplicationConfig replicationConfig = RatisReplicationConfig.getInstance(ReplicationFactor.THREE); + Pipeline createdPipeline = Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.ALLOCATED) + .setReplicationConfig(replicationConfig) + .setNodes(ImmutableList.of( + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails())) + .build(); + + when(localFactory.create(eq(replicationConfig), any(), eq(Collections.emptyList()))).thenReturn(createdPipeline); + + pipelineManager.createPipeline(replicationConfig, + Collections.singletonList(excludedByUuid), Collections.emptyList()); + + ArgumentCaptor> excludedCaptor = ArgumentCaptor.forClass(List.class); + verify(localFactory).create(eq(replicationConfig), excludedCaptor.capture(), eq(Collections.emptyList())); + assertThat(excludedCaptor.getValue()).containsExactly(excludedByUuid); + } + } + private Set createContainerReplicasList( List dns) { Set replicas = new HashSet<>(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java index b3c34b44e4c2..c9b8af8532aa 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java @@ -33,8 +33,10 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; @@ -47,6 +49,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.Set; +import java.util.TreeSet; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -57,6 +60,7 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -70,6 +74,8 @@ import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.net.NodeSchema; import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig; import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.CapacityPipelineChoosePolicy; import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy; @@ -555,6 +561,92 @@ public void testExcludedNodesPassedToCreatePipelineIfProvided( Collections.emptyList()); } + @ParameterizedTest + @MethodSource("policies") + public void testConfiguredExcludedPipelineFilteredBeforeSelection( + PipelineChoosePolicy policy) throws IOException { + /* + This test first has only one pipeline, and a DN in that pipeline is excluded by configuration. Since there's only + one pipeline and that's excluded, no container can be selected from that. Also since there are only 3 (mocked) + Datanodes and one out of them is excluded, no new pipelines can be created either. So + WritableECContainerProvider#getContainer should throw an IOException. WRT code coverage, this tests the first if + branch `if (openPipelineCount < maximumPipelines)` in getContainer(), when that throws, the while loop is not + entered because all pipelines are excluded. Then it also hits the `maximumPipelines = nodeCount;` path where + maximum pipelines limit is increased. That path also throws since no new pipelines can be created. + */ + + // pipelinePerVolumeFactor set to 0 and minimumPipelines set to 2 so that getMaximumPipelines() returns 2 + providerConf.setPipelinePerVolumeFactor(0); + providerConf.setMinimumPipelines(2); + + NodeManager localNodeManager = mock(NodeManager.class); + when(localNodeManager.getNodeCount(NodeStatus.inServiceHealthy())).thenReturn(3); + PipelineManagerImpl localPipelineManager = mock(PipelineManagerImpl.class); + ContainerManager localContainerManager = mock(ContainerManager.class); + + DatanodeDetails excludedDn = MockDatanodeDetails.randomDatanodeDetails(); + Pipeline excludedPipeline = createEcPipelineWithFirstNode(excludedDn); + + // there's only one pipeline (excludedPipeline) + when(localPipelineManager.getPipelineCount(repConfig, Pipeline.PipelineState.OPEN)).thenReturn(1); + List pipelines = new ArrayList<>(); + pipelines.add(excludedPipeline); + when(localPipelineManager.getPipelines(repConfig, Pipeline.PipelineState.OPEN)).thenReturn(pipelines); + when(localPipelineManager.getPipelineExcludedNodesConfig()).thenReturn(ScmConfig.PipelineExcludedNodes.parse( + excludedDn.getUuidString())); + + WritableContainerProvider localProvider = new WritableECContainerProvider( + providerConf, getMaxContainerSize(), localNodeManager, localPipelineManager, localContainerManager, policy); + assertThrows(IOException.class, () -> localProvider.getContainer(1, repConfig, OWNER, new ExcludeList())); + + /* + Now, we introduce another pipeline to the cluster which is not excluded. So now a container from this allowed + pipeline can be selected. WRT code coverage, this additionally tests the while loop branch + `while (!existingPipelines.isEmpty())`. + */ + Pipeline allowedPipeline = createEcPipelineWithFirstNode(MockDatanodeDetails.randomDatanodeDetails()); + pipelines.add(allowedPipeline); + ContainerID allowedContainerId = ContainerID.valueOf(100L); + NavigableSet allowedContainerSet = new TreeSet<>(); + allowedContainerSet.add(allowedContainerId); + ContainerInfo allowedContainer = new ContainerInfo.Builder() + .setContainerID(allowedContainerId.getId()) + .setPipelineID(allowedPipeline.getId()) + .build(); + // there are two pipelines in the cluster now + when(localPipelineManager.getPipelineCount(repConfig, Pipeline.PipelineState.OPEN)).thenReturn(2); + when(localNodeManager.getNodeCount(NodeStatus.inServiceHealthy())).thenReturn(6); + when(localPipelineManager.getContainersInPipeline(allowedPipeline.getId())).thenReturn(allowedContainerSet); + when(localContainerManager.getContainer(allowedContainerId)).thenReturn(allowedContainer); + + ContainerInfo selected = localProvider.getContainer(1, repConfig, OWNER, new ExcludeList()); + + assertEquals(allowedContainer, selected); + verify(localPipelineManager, never()).getContainersInPipeline(excludedPipeline.getId()); + } + + private Pipeline createEcPipelineWithFirstNode(DatanodeDetails firstNode) { + List nodes = new ArrayList<>(); + nodes.add(firstNode); + while (nodes.size() < repConfig.getRequiredNodes()) { + nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + } + + Map nodeIndexes = new HashMap<>(); + int ecIndex = 1; + for (DatanodeDetails dn : nodes) { + nodeIndexes.put(dn, ecIndex++); + } + + return Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.OPEN) + .setReplicationConfig(repConfig) + .setNodes(nodes) + .setReplicaIndexes(nodeIndexes) + .build(); + } + private ContainerInfo createContainer(Pipeline pipeline, ReplicationConfig repConf, long containerID) { return new ContainerInfo.Builder() diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java index a1ba81d0a70a..b060cc19ecad 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java @@ -24,6 +24,8 @@ import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -35,9 +37,11 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -61,12 +65,11 @@ class TestWritableRatisContainerProvider { private static final int CONTAINER_SIZE = 1234; private static final ExcludeList NO_EXCLUSION = new ExcludeList(); - private final OzoneConfiguration conf = new OzoneConfiguration(); private final PipelineChoosePolicy policy = new RandomPipelineChoosePolicy(); private final AtomicLong containerID = new AtomicLong(1); @Mock - private PipelineManager pipelineManager; + private PipelineManagerImpl pipelineManager; @Mock private ContainerManager containerManager; @@ -118,6 +121,73 @@ void failsIfContainerCannotBeCreated() throws Exception { verifyPipelineCreated(); } + @Test + void configuredExcludedPipelineReturnsNoContainerUntilGoodPipelineAppears() + throws Exception { + DatanodeDetails excludedDn = MockDatanodeDetails.randomDatanodeDetails(); + Pipeline excludedPipeline = MockPipeline.createPipeline(asList( + excludedDn, + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails())); + Pipeline allowedPipeline = MockPipeline.createPipeline(3); + ContainerInfo containerFromAllowedPipeline = + pipelineHasContainer(allowedPipeline); + List listWithAllowedPipeline = new ArrayList<>(); + listWithAllowedPipeline.add(allowedPipeline); + listWithAllowedPipeline.add(excludedPipeline); + when(pipelineManager.getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet())) + .thenReturn(new ArrayList<>(singletonList(excludedPipeline))) + .thenReturn(new ArrayList<>(singletonList(excludedPipeline))) + .thenReturn(listWithAllowedPipeline); + when(pipelineManager.createPipeline(REPLICATION_CONFIG)) + .thenThrow(new SCMException(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE)); + + ScmConfig.PipelineExcludedNodes pipelineExcludedNodes = + ScmConfig.PipelineExcludedNodes.parse(excludedDn.getUuidString()); + WritableRatisContainerProvider subject = createSubject(pipelineExcludedNodes); + + assertThrows(IOException.class, () -> subject.getContainer( + CONTAINER_SIZE, REPLICATION_CONFIG, OWNER, NO_EXCLUSION)); + + ContainerInfo container = subject.getContainer(CONTAINER_SIZE, REPLICATION_CONFIG, OWNER, NO_EXCLUSION); + + assertSame(containerFromAllowedPipeline, container); + verify(pipelineManager, times(1)).createPipeline(REPLICATION_CONFIG); + } + + @Test + void usesExcludeListFallbackButStillFiltersConfiguredExcludedPipelines() + throws Exception { + DatanodeDetails excludedDn = MockDatanodeDetails.randomDatanodeDetails(); + Pipeline excludedPipeline = MockPipeline.createPipeline(asList( + excludedDn, + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails())); + Pipeline allowedPipeline = MockPipeline.createPipeline(3); + ContainerInfo containerFromAllowedPipeline = + pipelineHasContainer(allowedPipeline); + + ExcludeList excludeList = new ExcludeList(); + excludeList.addDatanode(MockDatanodeDetails.randomDatanodeDetails()); + when(pipelineManager.getPipelines(eq(REPLICATION_CONFIG), eq(OPEN), + anySet(), anySet())) + .thenReturn(emptyList()); + when(pipelineManager.getPipelines(REPLICATION_CONFIG, OPEN)) + .thenReturn(new ArrayList<>(asList(excludedPipeline, allowedPipeline))); + + ScmConfig.PipelineExcludedNodes pipelineExcludedNodes = + ScmConfig.PipelineExcludedNodes.parse(excludedDn.getUuidString()); + + ContainerInfo container = createSubject(pipelineExcludedNodes) + .getContainer(CONTAINER_SIZE, REPLICATION_CONFIG, OWNER, excludeList); + + assertSame(containerFromAllowedPipeline, container); + verify(pipelineManager).getPipelines(eq(REPLICATION_CONFIG), eq(OPEN), + eq(excludeList.getDatanodes()), eq(excludeList.getPipelineIds())); + verify(pipelineManager).getPipelines(REPLICATION_CONFIG, OPEN); + verify(pipelineManager, never()).createPipeline(REPLICATION_CONFIG); + } + private void existingPipelines(Pipeline... pipelines) { existingPipelines(new ArrayList<>(asList(pipelines))); } @@ -161,6 +231,14 @@ private WritableRatisContainerProvider createSubject() { pipelineManager, containerManager, policy); } + private WritableRatisContainerProvider createSubject( + ScmConfig.PipelineExcludedNodes pipelineExcludedNodes) { + when(pipelineManager.getPipelineExcludedNodesConfig()) + .thenReturn(pipelineExcludedNodes); + return new WritableRatisContainerProvider( + pipelineManager, containerManager, policy); + } + private void verifyPipelineCreated() throws IOException { verify(pipelineManager, times(2)) .getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet()); From 41ba516afeb85dc5c7590b3be7e4c604118f1194 Mon Sep 17 00:00:00 2001 From: Siddhant Sangwan Date: Fri, 13 Mar 2026 13:36:36 +0530 Subject: [PATCH 2/6] improve null handling --- .../src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java index 299ccdbad0c2..5fde0d8f7539 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java @@ -296,7 +296,9 @@ public Set getExcludedAddressTokens() { } public boolean isExcluded(DatanodeDetails datanodeDetails) { - Objects.requireNonNull(datanodeDetails, "datanodeDetails cannot be null"); + if (datanodeDetails == null) { + return false; + } if (excludedDatanodeIds.contains(datanodeDetails.getID())) { return true; } From 5218b5ad3ccc60632f37cd0a5800c63343c9801e Mon Sep 17 00:00:00 2001 From: Siddhant Sangwan Date: Fri, 13 Mar 2026 14:04:15 +0530 Subject: [PATCH 3/6] fix checkstyle --- .../src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java index 5fde0d8f7539..de3c77b354af 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.Locale; -import java.util.Objects; import java.util.Set; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; From e7f22686bc85b16d5aee13c63cee26a8eac5154f Mon Sep 17 00:00:00 2001 From: Siddhant Sangwan Date: Fri, 13 Mar 2026 14:11:55 +0530 Subject: [PATCH 4/6] replace String.trim().isEmpty() with StringUtils.isBlank() to fix PMD Failure --- .../src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java index de3c77b354af..23e58783be90 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Locale; import java.util.Set; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; @@ -258,7 +259,7 @@ private PipelineExcludedNodes(Set excludedDatanodeIds, Set e } public static PipelineExcludedNodes parse(String rawValue) { - if (rawValue == null || rawValue.trim().isEmpty()) { + if (rawValue == null || StringUtils.isBlank(rawValue)) { return EMPTY; } From a3b1df77f4dd9dfbd02cc47f46049c90823dc07a Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Wed, 1 Apr 2026 11:08:00 +0530 Subject: [PATCH 5/6] Add resolvePipelineExcludedNodes in NodeManager and moved out PipelineExcludedNodes --- .../hdds/scm/PipelineExcludedNodes.java | 113 ++++++++++++++++++ .../org/apache/hadoop/hdds/scm/ScmConfig.java | 107 +---------------- .../apache/hadoop/hdds/scm/TestScmConfig.java | 14 +-- .../scm/container/ContainerManagerImpl.java | 3 +- .../hadoop/hdds/scm/node/NodeManager.java | 13 ++ .../hadoop/hdds/scm/node/SCMNodeManager.java | 23 ++++ .../scm/pipeline/PipelineManagerImpl.java | 33 +---- .../pipeline/WritableECContainerProvider.java | 10 +- .../WritableRatisContainerProvider.java | 10 +- .../hdds/scm/container/MockNodeManager.java | 24 ++++ .../scm/container/SimpleMockNodeManager.java | 23 ++++ .../scm/pipeline/TestPipelineManagerImpl.java | 36 ++++++ .../TestWritableECContainerProvider.java | 4 +- .../TestWritableRatisContainerProvider.java | 12 +- 14 files changed, 269 insertions(+), 156 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineExcludedNodes.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineExcludedNodes.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineExcludedNodes.java new file mode 100644 index 000000000000..8f38bfd904a8 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineExcludedNodes.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; + +/** + * Parsed snapshot of {@code hdds.scm.pipeline.exclude.datanodes}: UUIDs, hostnames, + * and IP tokens to exclude from pipeline creation and selection. + */ +public final class PipelineExcludedNodes { + public static final PipelineExcludedNodes EMPTY = new PipelineExcludedNodes( + Collections.emptySet(), Collections.emptySet()); + + private final Set excludedDatanodeIds; + private final Set excludedAddressTokens; + + private PipelineExcludedNodes(Set excludedDatanodeIds, Set excludedAddressTokens) { + this.excludedDatanodeIds = ImmutableSet.copyOf(excludedDatanodeIds); + this.excludedAddressTokens = ImmutableSet.copyOf(excludedAddressTokens); + } + + public static PipelineExcludedNodes parse(String rawValue) { + if (rawValue == null || StringUtils.isBlank(rawValue)) { + return EMPTY; + } + + Set datanodeIDs = new HashSet<>(); + Set addressTokens = new HashSet<>(); + + Arrays.stream(rawValue.split(",")) + .map(String::trim) + .filter(token -> !token.isEmpty()) + .forEach(token -> { + try { + datanodeIDs.add(DatanodeID.fromUuidString(token)); + } catch (IllegalArgumentException ignored) { + addressTokens.add(normalizeAddress(token)); + } + }); + + if (datanodeIDs.isEmpty() && addressTokens.isEmpty()) { + return EMPTY; + } + return new PipelineExcludedNodes(datanodeIDs, addressTokens); + } + + public boolean isEmpty() { + return excludedDatanodeIds.isEmpty() && excludedAddressTokens.isEmpty(); + } + + public Set getExcludedDatanodeIds() { + return excludedDatanodeIds; + } + + public Set getExcludedAddressTokens() { + return excludedAddressTokens; + } + + public boolean isExcluded(DatanodeDetails datanodeDetails) { + if (datanodeDetails == null) { + return false; + } + if (excludedDatanodeIds.contains(datanodeDetails.getID())) { + return true; + } + + final String hostName = datanodeDetails.getHostName(); + if (hostName != null && excludedAddressTokens.contains(normalizeAddress(hostName))) { + return true; + } + + final String ipAddress = datanodeDetails.getIpAddress(); + return ipAddress != null && excludedAddressTokens.contains(normalizeAddress(ipAddress)); + } + + public boolean isExcluded(Pipeline pipeline) { + for (DatanodeDetails dn : pipeline.getNodes()) { + if (isExcluded(dn)) { + return true; + } + } + return false; + } + + private static String normalizeAddress(String value) { + return value.toLowerCase(Locale.ROOT); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java index 23e58783be90..93e23ff92ee5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java @@ -17,23 +17,12 @@ package org.apache.hadoop.hdds.scm; -import com.google.common.collect.ImmutableSet; import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Locale; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; import org.apache.hadoop.hdds.conf.ConfigType; -import org.apache.hadoop.hdds.conf.PostConstruct; import org.apache.hadoop.hdds.conf.ReconfigurableConfig; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.DatanodeID; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; /** * The configuration class for the SCM service. @@ -124,8 +113,6 @@ public class ScmConfig extends ReconfigurableConfig { ) private String pipelineExcludeDatanodes = ""; - private volatile PipelineExcludedNodes pipelineExcludedNodes = PipelineExcludedNodes.EMPTY; - @Config(key = "hdds.scm.block.deletion.per-interval.max", type = ConfigType.INT, defaultValue = "500000", @@ -162,11 +149,6 @@ public class ScmConfig extends ReconfigurableConfig { ) private int transactionToDNsCommitMapLimit = 5000000; - @PostConstruct - public void parsePipelineExcludedNodes() { - pipelineExcludedNodes = PipelineExcludedNodes.parse(pipelineExcludeDatanodes); - } - public int getTransactionToDNsCommitMapLimit() { return transactionToDNsCommitMapLimit; } @@ -223,8 +205,12 @@ public String getECPipelineChoosePolicyName() { return ecPipelineChoosePolicyName; } + /** + * Parsed view of {@link #pipelineExcludeDatanodes}. Computed on each call so it stays aligned + * with the current config string (including dynamic reconfiguration). + */ public PipelineExcludedNodes getPipelineExcludedNodes() { - return pipelineExcludedNodes; + return PipelineExcludedNodes.parse(pipelineExcludeDatanodes); } public int getBlockDeletionLimit() { @@ -242,87 +228,4 @@ public static class ConfigStrings { public static final String HDDS_SCM_KERBEROS_PRINCIPAL_KEY = "hdds.scm.kerberos.principal"; public static final String HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY = "hdds.scm.kerberos.keytab.file"; } - - /** - * Parsed and normalized snapshot of {@code hdds.scm.pipeline.exclude.datanodes}. - */ - public static final class PipelineExcludedNodes { - public static final PipelineExcludedNodes EMPTY = new PipelineExcludedNodes( - Collections.emptySet(), Collections.emptySet()); - - private final Set excludedDatanodeIds; - private final Set excludedAddressTokens; - - private PipelineExcludedNodes(Set excludedDatanodeIds, Set excludedAddressTokens) { - this.excludedDatanodeIds = ImmutableSet.copyOf(excludedDatanodeIds); - this.excludedAddressTokens = ImmutableSet.copyOf(excludedAddressTokens); - } - - public static PipelineExcludedNodes parse(String rawValue) { - if (rawValue == null || StringUtils.isBlank(rawValue)) { - return EMPTY; - } - - Set datanodeIDs = new HashSet<>(); - Set addressTokens = new HashSet<>(); - - Arrays.stream(rawValue.split(",")) - .map(String::trim) - .filter(token -> !token.isEmpty()) - .forEach(token -> { - try { - datanodeIDs.add(DatanodeID.fromUuidString(token)); - } catch (IllegalArgumentException ignored) { - addressTokens.add(normalizeAddress(token)); - } - }); - - if (datanodeIDs.isEmpty() && addressTokens.isEmpty()) { - return EMPTY; - } - return new PipelineExcludedNodes(datanodeIDs, addressTokens); - } - - public boolean isEmpty() { - return excludedDatanodeIds.isEmpty() && excludedAddressTokens.isEmpty(); - } - - public Set getExcludedDatanodeIds() { - return excludedDatanodeIds; - } - - public Set getExcludedAddressTokens() { - return excludedAddressTokens; - } - - public boolean isExcluded(DatanodeDetails datanodeDetails) { - if (datanodeDetails == null) { - return false; - } - if (excludedDatanodeIds.contains(datanodeDetails.getID())) { - return true; - } - - final String hostName = datanodeDetails.getHostName(); - if (hostName != null && excludedAddressTokens.contains(normalizeAddress(hostName))) { - return true; - } - - final String ipAddress = datanodeDetails.getIpAddress(); - return ipAddress != null && excludedAddressTokens.contains(normalizeAddress(ipAddress)); - } - - public boolean isExcluded(Pipeline pipeline) { - for (DatanodeDetails dn : pipeline.getNodes()) { - if (isExcluded(dn)) { - return true; - } - } - return false; - } - - private static String normalizeAddress(String value) { - return value.toLowerCase(Locale.ROOT); - } - } } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestScmConfig.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestScmConfig.java index f1ce3ed7588f..4c40035c61b3 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestScmConfig.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestScmConfig.java @@ -19,7 +19,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.UUID; @@ -50,11 +49,12 @@ void testPipelineExcludedNodesParsesUuidHostnameAndIp() { uuid + ", DN-1.EXAMPLE.COM, 10.0.0.12, dn-1.example.com"); ScmConfig scmConfig = conf.getObject(ScmConfig.class); - ScmConfig.PipelineExcludedNodes first = scmConfig.getPipelineExcludedNodes(); - ScmConfig.PipelineExcludedNodes second = scmConfig.getPipelineExcludedNodes(); + PipelineExcludedNodes first = scmConfig.getPipelineExcludedNodes(); + PipelineExcludedNodes second = scmConfig.getPipelineExcludedNodes(); assertFalse(first.isEmpty()); - assertSame(first, second, "Snapshot should be parsed once and reused"); + assertEquals(first.getExcludedDatanodeIds(), second.getExcludedDatanodeIds()); + assertEquals(first.getExcludedAddressTokens(), second.getExcludedAddressTokens()); assertEquals(1, first.getExcludedDatanodeIds().size()); assertTrue(first.getExcludedDatanodeIds().contains(DatanodeID.fromUuidString(uuid))); assertTrue(first.getExcludedAddressTokens().contains("dn-1.example.com")); @@ -68,13 +68,13 @@ void testPipelineExcludedNodesMatchesDatanodeByIdAndAddress() { datanode.setHostName("dn-2.example.com"); datanode.setIpAddress("10.10.10.10"); - ScmConfig.PipelineExcludedNodes byUUID = ScmConfig.PipelineExcludedNodes.parse(datanode.getUuidString()); + PipelineExcludedNodes byUUID = PipelineExcludedNodes.parse(datanode.getUuidString()); assertTrue(byUUID.isExcluded(datanode)); - ScmConfig.PipelineExcludedNodes byHost = ScmConfig.PipelineExcludedNodes.parse("DN-2.EXAMPLE.COM"); + PipelineExcludedNodes byHost = PipelineExcludedNodes.parse("DN-2.EXAMPLE.COM"); assertTrue(byHost.isExcluded(datanode)); - ScmConfig.PipelineExcludedNodes byIp = ScmConfig.PipelineExcludedNodes.parse("10.10.10.10"); + PipelineExcludedNodes byIp = PipelineExcludedNodes.parse("10.10.10.10"); assertTrue(byIp.isExcluded(datanode)); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index b5f5d11ed383..8639ba003c29 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics; @@ -82,7 +83,7 @@ public class ContainerManagerImpl implements ContainerManager { private final Random random = new Random(); private final long maxContainerSize; - private final ScmConfig.PipelineExcludedNodes pipelineExcludedNodes; + private final PipelineExcludedNodes pipelineExcludedNodes; /** * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index e9a019945c1f..7ea621879bf5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; @@ -370,6 +371,18 @@ Map getTotalDatanodeCommandCounts( /** @return the datanode of the given id if it exists; otherwise, return null. */ @Nullable DatanodeDetails getNode(@Nullable DatanodeID id); + /** + * Resolves pipeline exclusion configuration against datanodes currently known to this manager. + * Call each time an exclusion set is needed (for example when creating a pipeline); + * do not cache the result across calls, so DNs that register after SCM startup are included when + * they match a configured hostname or IP, and UUIDs resolve as soon as the node exists. + * Removed DNs simply stop appearing in the result on the next call. + * + * @param pipelineExcludedNodes parsed {@code hdds.scm.pipeline.exclude.datanodes}; null or empty yields an empty set + * @return immutable copy of matching registered datanodes + */ + Set resolvePipelineExcludedDatanodes(PipelineExcludedNodes pipelineExcludedNodes); + /** * Given datanode address(Ipaddress or hostname), returns a list of * DatanodeDetails for the datanodes running at that address. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 3289e7b312a8..057ae4adac60 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; import jakarta.annotation.Nullable; import java.io.IOException; import java.math.RoundingMode; @@ -69,6 +70,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -1894,6 +1896,27 @@ public List getNodesByAddress(String address) { .collect(Collectors.toList()); } + @Override + public Set resolvePipelineExcludedDatanodes(PipelineExcludedNodes pipelineExcludedNodes) { + if (pipelineExcludedNodes == null || pipelineExcludedNodes.isEmpty()) { + return Collections.emptySet(); + } + Set resolved = new HashSet<>(); + for (DatanodeID datanodeID : pipelineExcludedNodes.getExcludedDatanodeIds()) { + DatanodeDetails datanodeDetails = getNode(datanodeID); + if (datanodeDetails != null) { + resolved.add(datanodeDetails); + } + } + for (String address : pipelineExcludedNodes.getExcludedAddressTokens()) { + List datanodes = getNodesByAddress(address); + if (datanodes != null) { + resolved.addAll(datanodes); + } + } + return ImmutableSet.copyOf(resolved); + } + /** * Get cluster map as in network topology for this node manager. * @return cluster map diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 94a35c2a4efb..bfb66bc45466 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.pipeline; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import java.io.IOException; import java.time.Clock; @@ -44,10 +43,10 @@ import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -102,8 +101,7 @@ public class PipelineManagerImpl implements PipelineManager { // SCM is already out of SafeMode. private AtomicBoolean freezePipelineCreation; private final Clock clock; - private final ScmConfig.PipelineExcludedNodes pipelineExcludedNodes; - private final Set configuredExcludedDatanodeDetails; + private final PipelineExcludedNodes pipelineExcludedNodes; @SuppressWarnings("checkstyle:parameterNumber") protected PipelineManagerImpl(ConfigurationSource conf, @@ -132,7 +130,6 @@ protected PipelineManagerImpl(ConfigurationSource conf, TimeUnit.MILLISECONDS); this.freezePipelineCreation = new AtomicBoolean(); this.pipelineExcludedNodes = conf.getObject(ScmConfig.class).getPipelineExcludedNodes(); - this.configuredExcludedDatanodeDetails = resolvePipelineExcludedNodesToDatanodeDetails(); } @SuppressWarnings("checkstyle:parameterNumber") @@ -312,6 +309,8 @@ private void addPipelineToManager(Pipeline pipeline) } private List mergeConfiguredExcludedNodes(List excludedNodes) { + Set configuredExcludedDatanodeDetails = + nodeManager.resolvePipelineExcludedDatanodes(pipelineExcludedNodes); if ((excludedNodes == null || excludedNodes.isEmpty()) && configuredExcludedDatanodeDetails.isEmpty()) { return Collections.emptyList(); } @@ -328,28 +327,6 @@ private List mergeConfiguredExcludedNodes(List return new ArrayList<>(mergedExcludedNodes); } - private Set resolvePipelineExcludedNodesToDatanodeDetails() { - if (pipelineExcludedNodes.isEmpty()) { - return Collections.emptySet(); - } - - Set resolved = new HashSet<>(); - for (DatanodeID datanodeID : pipelineExcludedNodes.getExcludedDatanodeIds()) { - DatanodeDetails datanodeDetails = nodeManager.getNode(datanodeID); - if (datanodeDetails != null) { - resolved.add(datanodeDetails); - } - } - for (String address : pipelineExcludedNodes.getExcludedAddressTokens()) { - List datanodes = nodeManager.getNodesByAddress(address); - if (datanodes != null) { - resolved.addAll(datanodes); - } - } - - return ImmutableSet.copyOf(resolved); - } - private boolean factorOne(ReplicationConfig replicationConfig) { if (replicationConfig.getReplicationType() == ReplicationType.RATIS) { return ((RatisReplicationConfig) replicationConfig).getReplicationFactor() @@ -908,7 +885,7 @@ public PipelineStateManager getStateManager() { return stateManager; } - ScmConfig.PipelineExcludedNodes getPipelineExcludedNodesConfig() { + PipelineExcludedNodes getPipelineExcludedNodesConfig() { return pipelineExcludedNodes; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java index c86d9014ed13..4d7da667e23b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java @@ -35,8 +35,8 @@ import org.apache.hadoop.hdds.conf.ReconfigurableConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.PipelineRequestInformation; -import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -61,7 +61,7 @@ public class WritableECContainerProvider private final ContainerManager containerManager; private final long containerSize; private final WritableECContainerProviderConfig providerConfig; - private final ScmConfig.PipelineExcludedNodes pipelineExcludedNodes; + private final PipelineExcludedNodes pipelineExcludedNodes; public WritableECContainerProvider(WritableECContainerProviderConfig config, long containerSize, @@ -78,16 +78,16 @@ public WritableECContainerProvider(WritableECContainerProviderConfig config, this.pipelineExcludedNodes = configuredPipelineExcludedNodes(pipelineManager); } - private static ScmConfig.PipelineExcludedNodes configuredPipelineExcludedNodes( + private static PipelineExcludedNodes configuredPipelineExcludedNodes( PipelineManager pipelineManager) { if (pipelineManager instanceof PipelineManagerImpl) { - ScmConfig.PipelineExcludedNodes excludedNodes = + PipelineExcludedNodes excludedNodes = ((PipelineManagerImpl) pipelineManager).getPipelineExcludedNodesConfig(); if (excludedNodes != null) { return excludedNodes; } } - return ScmConfig.PipelineExcludedNodes.EMPTY; + return PipelineExcludedNodes.EMPTY; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java index 401036a61e28..5050f4455f71 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java @@ -23,8 +23,8 @@ import java.util.stream.Collectors; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.PipelineRequestInformation; -import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -44,7 +44,7 @@ public class WritableRatisContainerProvider private final PipelineManager pipelineManager; private final PipelineChoosePolicy pipelineChoosePolicy; private final ContainerManager containerManager; - private final ScmConfig.PipelineExcludedNodes pipelineExcludedNodes; + private final PipelineExcludedNodes pipelineExcludedNodes; public WritableRatisContainerProvider( PipelineManager pipelineManager, @@ -56,16 +56,16 @@ public WritableRatisContainerProvider( this.pipelineExcludedNodes = configuredPipelineExcludedNodes(pipelineManager); } - private static ScmConfig.PipelineExcludedNodes configuredPipelineExcludedNodes( + private static PipelineExcludedNodes configuredPipelineExcludedNodes( PipelineManager pipelineManager) { if (pipelineManager instanceof PipelineManagerImpl) { - ScmConfig.PipelineExcludedNodes excludedNodes = + PipelineExcludedNodes excludedNodes = ((PipelineManagerImpl) pipelineManager).getPipelineExcludedNodesConfig(); if (excludedNodes != null) { return excludedNodes; } } - return ScmConfig.PipelineExcludedNodes.EMPTY; + return PipelineExcludedNodes.EMPTY; } @Override diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 013f14b16504..9bf492609936 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; +import com.google.common.collect.ImmutableSet; import jakarta.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; @@ -28,6 +29,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -51,6 +53,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.net.NetConstants; @@ -903,6 +906,27 @@ public List getNodesByAddress(String address) { return results; } + @Override + public Set resolvePipelineExcludedDatanodes(PipelineExcludedNodes pipelineExcludedNodes) { + if (pipelineExcludedNodes == null || pipelineExcludedNodes.isEmpty()) { + return Collections.emptySet(); + } + Set resolved = new HashSet<>(); + for (DatanodeID datanodeID : pipelineExcludedNodes.getExcludedDatanodeIds()) { + DatanodeDetails datanodeDetails = getNode(datanodeID); + if (datanodeDetails != null) { + resolved.add(datanodeDetails); + } + } + for (String address : pipelineExcludedNodes.getExcludedAddressTokens()) { + List datanodes = getNodesByAddress(address); + if (datanodes != null) { + resolved.addAll(datanodes); + } + } + return ImmutableSet.copyOf(resolved); + } + @Override public NetworkTopology getClusterNetworkTopologyMap() { return clusterMap; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index 4f0679470eab..fda3d9037dbf 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.container; +import com.google.common.collect.ImmutableSet; import jakarta.annotation.Nullable; import java.io.IOException; import java.util.Collections; @@ -35,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -359,6 +361,27 @@ public List getNodesByAddress(String address) { return null; } + @Override + public Set resolvePipelineExcludedDatanodes(PipelineExcludedNodes pipelineExcludedNodes) { + if (pipelineExcludedNodes == null || pipelineExcludedNodes.isEmpty()) { + return Collections.emptySet(); + } + Set resolved = new HashSet<>(); + for (DatanodeID datanodeID : pipelineExcludedNodes.getExcludedDatanodeIds()) { + DatanodeDetails datanodeDetails = getNode(datanodeID); + if (datanodeDetails != null) { + resolved.add(datanodeDetails); + } + } + for (String address : pipelineExcludedNodes.getExcludedAddressTokens()) { + List datanodes = getNodesByAddress(address); + if (datanodes != null) { + resolved.addAll(datanodes); + } + } + return ImmutableSet.copyOf(resolved); + } + @Override public NetworkTopology getClusterNetworkTopologyMap() { return null; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index 4ccc4724f45e..84a449db3e96 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -44,6 +44,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -51,6 +52,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import java.io.File; import java.io.IOException; import java.time.Instant; @@ -76,6 +78,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -123,6 +126,35 @@ * Tests for PipelineManagerImpl. */ public class TestPipelineManagerImpl { + + /** + * Mockito mocks do not execute real interface methods; replay resolution using the mock's + * stubbed {@link NodeManager#getNode} / {@link NodeManager#getNodesByAddress}. + */ + private static void stubResolvePipelineExcludedDatanodes(NodeManager nodeManager) { + lenient().when(nodeManager.resolvePipelineExcludedDatanodes(any())) + .thenAnswer(invocation -> { + PipelineExcludedNodes spec = invocation.getArgument(0); + if (spec == null || spec.isEmpty()) { + return Collections.emptySet(); + } + Set resolved = new HashSet<>(); + for (DatanodeID datanodeID : spec.getExcludedDatanodeIds()) { + DatanodeDetails datanodeDetails = nodeManager.getNode(datanodeID); + if (datanodeDetails != null) { + resolved.add(datanodeDetails); + } + } + for (String address : spec.getExcludedAddressTokens()) { + List datanodes = nodeManager.getNodesByAddress(address); + if (datanodes != null) { + resolved.addAll(datanodes); + } + } + return ImmutableSet.copyOf(resolved); + }); + } + private OzoneConfiguration conf; private DBStore dbStore; private MockNodeManager nodeManager; @@ -1022,6 +1054,7 @@ public void testConfiguredExcludedNodesMergedForCreatePipeline() throws Exceptio when(localNodeManager.getNode(excludedByUuid.getID())).thenReturn(excludedByUuid); when(localNodeManager.getNodesByAddress(excludedByAddress.getIpAddress())) .thenReturn(Collections.singletonList(excludedByAddress)); + stubResolvePipelineExcludedDatanodes(localNodeManager); PipelineStateManager localStateManager = mock(PipelineStateManager.class); PipelineFactory localFactory = mock(PipelineFactory.class); @@ -1060,6 +1093,7 @@ public void testConfiguredExcludedNodesMergedForBuildECPipeline() throws Excepti localContext.updateLeaderAndTerm(true, 1); NodeManager localNodeManager = mock(NodeManager.class); when(localNodeManager.getNode(excludedByUuid.getID())).thenReturn(excludedByUuid); + stubResolvePipelineExcludedDatanodes(localNodeManager); PipelineStateManager localStateManager = mock(PipelineStateManager.class); PipelineFactory localFactory = mock(PipelineFactory.class); @@ -1087,6 +1121,7 @@ public void testConfiguredUnknownAddressTokenIgnoredInCreatePipeline() throws Ex localContext.updateLeaderAndTerm(true, 1); NodeManager localNodeManager = mock(NodeManager.class); when(localNodeManager.getNodesByAddress("unknown-dn-host")).thenReturn(Collections.emptyList()); + stubResolvePipelineExcludedDatanodes(localNodeManager); PipelineStateManager localStateManager = mock(PipelineStateManager.class); PipelineFactory localFactory = mock(PipelineFactory.class); @@ -1124,6 +1159,7 @@ public void testConfiguredAndCallerExcludedNodesAreDeduplicated() throws Excepti localContext.updateLeaderAndTerm(true, 1); NodeManager localNodeManager = mock(NodeManager.class); when(localNodeManager.getNode(excludedByUuid.getID())).thenReturn(excludedByUuid); + stubResolvePipelineExcludedDatanodes(localNodeManager); PipelineStateManager localStateManager = mock(PipelineStateManager.class); PipelineFactory localFactory = mock(PipelineFactory.class); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java index c9b8af8532aa..d4e50676588c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java @@ -60,7 +60,7 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; -import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -592,7 +592,7 @@ public void testConfiguredExcludedPipelineFilteredBeforeSelection( List pipelines = new ArrayList<>(); pipelines.add(excludedPipeline); when(localPipelineManager.getPipelines(repConfig, Pipeline.PipelineState.OPEN)).thenReturn(pipelines); - when(localPipelineManager.getPipelineExcludedNodesConfig()).thenReturn(ScmConfig.PipelineExcludedNodes.parse( + when(localPipelineManager.getPipelineExcludedNodesConfig()).thenReturn(PipelineExcludedNodes.parse( excludedDn.getUuidString())); WritableContainerProvider localProvider = new WritableECContainerProvider( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java index b060cc19ecad..803605696c6c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; -import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -142,8 +142,8 @@ void configuredExcludedPipelineReturnsNoContainerUntilGoodPipelineAppears() when(pipelineManager.createPipeline(REPLICATION_CONFIG)) .thenThrow(new SCMException(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE)); - ScmConfig.PipelineExcludedNodes pipelineExcludedNodes = - ScmConfig.PipelineExcludedNodes.parse(excludedDn.getUuidString()); + PipelineExcludedNodes pipelineExcludedNodes = + PipelineExcludedNodes.parse(excludedDn.getUuidString()); WritableRatisContainerProvider subject = createSubject(pipelineExcludedNodes); assertThrows(IOException.class, () -> subject.getContainer( @@ -175,8 +175,8 @@ void usesExcludeListFallbackButStillFiltersConfiguredExcludedPipelines() when(pipelineManager.getPipelines(REPLICATION_CONFIG, OPEN)) .thenReturn(new ArrayList<>(asList(excludedPipeline, allowedPipeline))); - ScmConfig.PipelineExcludedNodes pipelineExcludedNodes = - ScmConfig.PipelineExcludedNodes.parse(excludedDn.getUuidString()); + PipelineExcludedNodes pipelineExcludedNodes = + PipelineExcludedNodes.parse(excludedDn.getUuidString()); ContainerInfo container = createSubject(pipelineExcludedNodes) .getContainer(CONTAINER_SIZE, REPLICATION_CONFIG, OWNER, excludeList); @@ -232,7 +232,7 @@ private WritableRatisContainerProvider createSubject() { } private WritableRatisContainerProvider createSubject( - ScmConfig.PipelineExcludedNodes pipelineExcludedNodes) { + PipelineExcludedNodes pipelineExcludedNodes) { when(pipelineManager.getPipelineExcludedNodesConfig()) .thenReturn(pipelineExcludedNodes); return new WritableRatisContainerProvider( From 6bb6a1750d1ae6dc1ef81c95e6aac74ddd72602d Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Wed, 1 Apr 2026 11:28:08 +0530 Subject: [PATCH 6/6] Fixed pmd failure --- .../scm/pipeline/TestPipelineManagerImpl.java | 52 +++++++++---------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index 84a449db3e96..67138d5bf804 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -127,34 +127,6 @@ */ public class TestPipelineManagerImpl { - /** - * Mockito mocks do not execute real interface methods; replay resolution using the mock's - * stubbed {@link NodeManager#getNode} / {@link NodeManager#getNodesByAddress}. - */ - private static void stubResolvePipelineExcludedDatanodes(NodeManager nodeManager) { - lenient().when(nodeManager.resolvePipelineExcludedDatanodes(any())) - .thenAnswer(invocation -> { - PipelineExcludedNodes spec = invocation.getArgument(0); - if (spec == null || spec.isEmpty()) { - return Collections.emptySet(); - } - Set resolved = new HashSet<>(); - for (DatanodeID datanodeID : spec.getExcludedDatanodeIds()) { - DatanodeDetails datanodeDetails = nodeManager.getNode(datanodeID); - if (datanodeDetails != null) { - resolved.add(datanodeDetails); - } - } - for (String address : spec.getExcludedAddressTokens()) { - List datanodes = nodeManager.getNodesByAddress(address); - if (datanodes != null) { - resolved.addAll(datanodes); - } - } - return ImmutableSet.copyOf(resolved); - }); - } - private OzoneConfiguration conf; private DBStore dbStore; private MockNodeManager nodeManager; @@ -1238,4 +1210,28 @@ private static void assertFailsNotLeader(CheckedRunnable block) { assertEquals(ResultCodes.SCM_NOT_LEADER, e.getResult()); assertInstanceOf(NotLeaderException.class, e.getCause()); } + + private static void stubResolvePipelineExcludedDatanodes(NodeManager nodeManager) { + lenient().when(nodeManager.resolvePipelineExcludedDatanodes(any())) + .thenAnswer(invocation -> { + PipelineExcludedNodes pipelineExcludedNodes = invocation.getArgument(0); + if (pipelineExcludedNodes == null || pipelineExcludedNodes.isEmpty()) { + return Collections.emptySet(); + } + Set resolved = new HashSet<>(); + for (DatanodeID datanodeID : pipelineExcludedNodes.getExcludedDatanodeIds()) { + DatanodeDetails datanodeDetails = nodeManager.getNode(datanodeID); + if (datanodeDetails != null) { + resolved.add(datanodeDetails); + } + } + for (String address : pipelineExcludedNodes.getExcludedAddressTokens()) { + List datanodes = nodeManager.getNodesByAddress(address); + if (datanodes != null) { + resolved.addAll(datanodes); + } + } + return ImmutableSet.copyOf(resolved); + }); + } }