diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineExcludedNodes.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineExcludedNodes.java new file mode 100644 index 000000000000..8f38bfd904a8 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineExcludedNodes.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; + +/** + * Parsed snapshot of {@code hdds.scm.pipeline.exclude.datanodes}: UUIDs, hostnames, + * and IP tokens to exclude from pipeline creation and selection. + */ +public final class PipelineExcludedNodes { + public static final PipelineExcludedNodes EMPTY = new PipelineExcludedNodes( + Collections.emptySet(), Collections.emptySet()); + + private final Set excludedDatanodeIds; + private final Set excludedAddressTokens; + + private PipelineExcludedNodes(Set excludedDatanodeIds, Set excludedAddressTokens) { + this.excludedDatanodeIds = ImmutableSet.copyOf(excludedDatanodeIds); + this.excludedAddressTokens = ImmutableSet.copyOf(excludedAddressTokens); + } + + public static PipelineExcludedNodes parse(String rawValue) { + if (rawValue == null || StringUtils.isBlank(rawValue)) { + return EMPTY; + } + + Set datanodeIDs = new HashSet<>(); + Set addressTokens = new HashSet<>(); + + Arrays.stream(rawValue.split(",")) + .map(String::trim) + .filter(token -> !token.isEmpty()) + .forEach(token -> { + try { + datanodeIDs.add(DatanodeID.fromUuidString(token)); + } catch (IllegalArgumentException ignored) { + addressTokens.add(normalizeAddress(token)); + } + }); + + if (datanodeIDs.isEmpty() && addressTokens.isEmpty()) { + return EMPTY; + } + return new PipelineExcludedNodes(datanodeIDs, addressTokens); + } + + public boolean isEmpty() { + return excludedDatanodeIds.isEmpty() && excludedAddressTokens.isEmpty(); + } + + public Set getExcludedDatanodeIds() { + return excludedDatanodeIds; + } + + public Set getExcludedAddressTokens() { + return excludedAddressTokens; + } + + public boolean isExcluded(DatanodeDetails datanodeDetails) { + if (datanodeDetails == null) { + return false; + } + if (excludedDatanodeIds.contains(datanodeDetails.getID())) { + return true; + } + + final String hostName = datanodeDetails.getHostName(); + if (hostName != null && excludedAddressTokens.contains(normalizeAddress(hostName))) { + return true; + } + + final String ipAddress = datanodeDetails.getIpAddress(); + return ipAddress != null && excludedAddressTokens.contains(normalizeAddress(ipAddress)); + } + + public boolean isExcluded(Pipeline pipeline) { + for (DatanodeDetails dn : pipeline.getNodes()) { + if (isExcluded(dn)) { + return true; + } + } + return false; + } + + private static String normalizeAddress(String value) { + return value.toLowerCase(Locale.ROOT); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java index 0876bd22ea4f..93e23ff92ee5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java @@ -102,6 +102,17 @@ public class ScmConfig extends ReconfigurableConfig { ) private String ecPipelineChoosePolicyName; + @Config(key = "hdds.scm.pipeline.exclude.datanodes", + type = ConfigType.STRING, + defaultValue = "", + tags = { ConfigTag.SCM, ConfigTag.PIPELINE }, + description = + "Comma-separated list of Datanodes to exclude from SCM pipeline creation and pipeline selection. " + + "Each entry may be a Datanode UUID, hostname, or IP address. " + + "Example: \",dn-1.example.com,10.0.0.12\"." + ) + private String pipelineExcludeDatanodes = ""; + @Config(key = "hdds.scm.block.deletion.per-interval.max", type = ConfigType.INT, defaultValue = "500000", @@ -194,6 +205,14 @@ public String getECPipelineChoosePolicyName() { return ecPipelineChoosePolicyName; } + /** + * Parsed view of {@link #pipelineExcludeDatanodes}. Computed on each call so it stays aligned + * with the current config string (including dynamic reconfiguration). + */ + public PipelineExcludedNodes getPipelineExcludedNodes() { + return PipelineExcludedNodes.parse(pipelineExcludeDatanodes); + } + public int getBlockDeletionLimit() { return blockDeletionLimit; } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestScmConfig.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestScmConfig.java new file mode 100644 index 000000000000..4c40035c61b3 --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestScmConfig.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.UUID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link ScmConfig}. + */ +class TestScmConfig { + + @Test + void testPipelineExcludedNodesDefaultsToEmpty() { + OzoneConfiguration conf = new OzoneConfiguration(); + ScmConfig scmConfig = conf.getObject(ScmConfig.class); + + assertTrue(scmConfig.getPipelineExcludedNodes().isEmpty()); + } + + @Test + void testPipelineExcludedNodesParsesUuidHostnameAndIp() { + String uuid = UUID.randomUUID().toString(); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("hdds.scm.pipeline.exclude.datanodes", + uuid + ", DN-1.EXAMPLE.COM, 10.0.0.12, dn-1.example.com"); + ScmConfig scmConfig = conf.getObject(ScmConfig.class); + + PipelineExcludedNodes first = scmConfig.getPipelineExcludedNodes(); + PipelineExcludedNodes second = scmConfig.getPipelineExcludedNodes(); + + assertFalse(first.isEmpty()); + assertEquals(first.getExcludedDatanodeIds(), second.getExcludedDatanodeIds()); + assertEquals(first.getExcludedAddressTokens(), second.getExcludedAddressTokens()); + assertEquals(1, first.getExcludedDatanodeIds().size()); + assertTrue(first.getExcludedDatanodeIds().contains(DatanodeID.fromUuidString(uuid))); + assertTrue(first.getExcludedAddressTokens().contains("dn-1.example.com")); + assertTrue(first.getExcludedAddressTokens().contains("10.0.0.12")); + assertEquals(2, first.getExcludedAddressTokens().size()); + } + + @Test + void testPipelineExcludedNodesMatchesDatanodeByIdAndAddress() { + DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails(); + datanode.setHostName("dn-2.example.com"); + datanode.setIpAddress("10.10.10.10"); + + PipelineExcludedNodes byUUID = PipelineExcludedNodes.parse(datanode.getUuidString()); + assertTrue(byUUID.isExcluded(datanode)); + + PipelineExcludedNodes byHost = PipelineExcludedNodes.parse("DN-2.EXAMPLE.COM"); + assertTrue(byHost.isExcluded(datanode)); + + PipelineExcludedNodes byIp = PipelineExcludedNodes.parse("10.10.10.10"); + assertTrue(byIp.isExcluded(datanode)); + } +} + diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index dc701a0be661..8639ba003c29 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -34,11 +34,14 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; @@ -80,6 +83,7 @@ public class ContainerManagerImpl implements ContainerManager { private final Random random = new Random(); private final long maxContainerSize; + private final PipelineExcludedNodes pipelineExcludedNodes; /** * @@ -108,6 +112,9 @@ public ContainerManagerImpl( maxContainerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + pipelineExcludedNodes = + new OzoneConfiguration(conf).getObject(ScmConfig.class) + .getPipelineExcludedNodes(); this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create(); } @@ -182,6 +189,7 @@ public ContainerInfo allocateContainer( try { pipelines = pipelineManager .getPipelines(replicationConfig, Pipeline.PipelineState.OPEN); + pipelines.removeIf(pipelineExcludedNodes::isExcluded); if (!pipelines.isEmpty()) { pipeline = pipelines.get(random.nextInt(pipelines.size())); containerInfo = createContainer(pipeline, owner); @@ -209,6 +217,7 @@ public ContainerInfo allocateContainer( try { pipelines = pipelineManager .getPipelines(replicationConfig, Pipeline.PipelineState.OPEN); + pipelines.removeIf(pipelineExcludedNodes::isExcluded); if (!pipelines.isEmpty()) { pipeline = pipelines.get(random.nextInt(pipelines.size())); containerInfo = createContainer(pipeline, owner); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index e9a019945c1f..7ea621879bf5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; @@ -370,6 +371,18 @@ Map getTotalDatanodeCommandCounts( /** @return the datanode of the given id if it exists; otherwise, return null. */ @Nullable DatanodeDetails getNode(@Nullable DatanodeID id); + /** + * Resolves pipeline exclusion configuration against datanodes currently known to this manager. + * Call each time an exclusion set is needed (for example when creating a pipeline); + * do not cache the result across calls, so DNs that register after SCM startup are included when + * they match a configured hostname or IP, and UUIDs resolve as soon as the node exists. + * Removed DNs simply stop appearing in the result on the next call. + * + * @param pipelineExcludedNodes parsed {@code hdds.scm.pipeline.exclude.datanodes}; null or empty yields an empty set + * @return immutable copy of matching registered datanodes + */ + Set resolvePipelineExcludedDatanodes(PipelineExcludedNodes pipelineExcludedNodes); + /** * Given datanode address(Ipaddress or hostname), returns a list of * DatanodeDetails for the datanodes running at that address. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 3289e7b312a8..057ae4adac60 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; import jakarta.annotation.Nullable; import java.io.IOException; import java.math.RoundingMode; @@ -69,6 +70,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -1894,6 +1896,27 @@ public List getNodesByAddress(String address) { .collect(Collectors.toList()); } + @Override + public Set resolvePipelineExcludedDatanodes(PipelineExcludedNodes pipelineExcludedNodes) { + if (pipelineExcludedNodes == null || pipelineExcludedNodes.isEmpty()) { + return Collections.emptySet(); + } + Set resolved = new HashSet<>(); + for (DatanodeID datanodeID : pipelineExcludedNodes.getExcludedDatanodeIds()) { + DatanodeDetails datanodeDetails = getNode(datanodeID); + if (datanodeDetails != null) { + resolved.add(datanodeDetails); + } + } + for (String address : pipelineExcludedNodes.getExcludedAddressTokens()) { + List datanodes = getNodesByAddress(address); + if (datanodes != null) { + resolved.addAll(datanodes); + } + } + return ImmutableSet.copyOf(resolved); + } + /** * Get cluster map as in network topology for this node manager. * @return cluster map diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 9c529e22e7e1..bfb66bc45466 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -23,9 +23,11 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -44,7 +46,9 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -97,6 +101,7 @@ public class PipelineManagerImpl implements PipelineManager { // SCM is already out of SafeMode. private AtomicBoolean freezePipelineCreation; private final Clock clock; + private final PipelineExcludedNodes pipelineExcludedNodes; @SuppressWarnings("checkstyle:parameterNumber") protected PipelineManagerImpl(ConfigurationSource conf, @@ -124,6 +129,7 @@ protected PipelineManagerImpl(ConfigurationSource conf, HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); this.freezePipelineCreation = new AtomicBoolean(); + this.pipelineExcludedNodes = conf.getObject(ScmConfig.class).getPipelineExcludedNodes(); } @SuppressWarnings("checkstyle:parameterNumber") @@ -216,8 +222,8 @@ public Pipeline buildECPipeline(ReplicationConfig replicationConfig, throw new IllegalArgumentException("Replication type must be EC"); } checkIfPipelineCreationIsAllowed(replicationConfig); - return pipelineFactory.create(replicationConfig, excludedNodes, - favoredNodes); + List allExcludedNodes = mergeConfiguredExcludedNodes(excludedNodes); + return pipelineFactory.create(replicationConfig, allExcludedNodes, favoredNodes); } /** @@ -250,13 +256,13 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig, List excludedNodes, List favoredNodes) throws IOException { checkIfPipelineCreationIsAllowed(replicationConfig); + List allExcludedNodes = mergeConfiguredExcludedNodes(excludedNodes); acquireWriteLock(); final Pipeline pipeline; try { try { - pipeline = pipelineFactory.create(replicationConfig, - excludedNodes, favoredNodes); + pipeline = pipelineFactory.create(replicationConfig, allExcludedNodes, favoredNodes); } catch (IOException e) { metrics.incNumPipelineCreationFailed(); throw e; @@ -302,6 +308,25 @@ private void addPipelineToManager(Pipeline pipeline) recordMetricsForPipeline(pipeline); } + private List mergeConfiguredExcludedNodes(List excludedNodes) { + Set configuredExcludedDatanodeDetails = + nodeManager.resolvePipelineExcludedDatanodes(pipelineExcludedNodes); + if ((excludedNodes == null || excludedNodes.isEmpty()) && configuredExcludedDatanodeDetails.isEmpty()) { + return Collections.emptyList(); + } + if (excludedNodes == null || excludedNodes.isEmpty()) { + return new ArrayList<>(configuredExcludedDatanodeDetails); + } + if (configuredExcludedDatanodeDetails.isEmpty()) { + return excludedNodes; + } + + Set mergedExcludedNodes = + new HashSet<>(configuredExcludedDatanodeDetails); + mergedExcludedNodes.addAll(excludedNodes); + return new ArrayList<>(mergedExcludedNodes); + } + private boolean factorOne(ReplicationConfig replicationConfig) { if (replicationConfig.getReplicationType() == ReplicationType.RATIS) { return ((RatisReplicationConfig) replicationConfig).getReplicationFactor() @@ -860,6 +885,10 @@ public PipelineStateManager getStateManager() { return stateManager; } + PipelineExcludedNodes getPipelineExcludedNodesConfig() { + return pipelineExcludedNodes; + } + @VisibleForTesting public SCMHAManager getScmhaManager() { return scmhaManager; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java index d7c4b7705f46..4d7da667e23b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.conf.ReconfigurableConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.PipelineRequestInformation; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -60,6 +61,7 @@ public class WritableECContainerProvider private final ContainerManager containerManager; private final long containerSize; private final WritableECContainerProviderConfig providerConfig; + private final PipelineExcludedNodes pipelineExcludedNodes; public WritableECContainerProvider(WritableECContainerProviderConfig config, long containerSize, @@ -73,6 +75,19 @@ public WritableECContainerProvider(WritableECContainerProviderConfig config, this.containerManager = containerManager; this.pipelineChoosePolicy = pipelineChoosePolicy; this.containerSize = containerSize; + this.pipelineExcludedNodes = configuredPipelineExcludedNodes(pipelineManager); + } + + private static PipelineExcludedNodes configuredPipelineExcludedNodes( + PipelineManager pipelineManager) { + if (pipelineManager instanceof PipelineManagerImpl) { + PipelineExcludedNodes excludedNodes = + ((PipelineManagerImpl) pipelineManager).getPipelineExcludedNodesConfig(); + if (excludedNodes != null) { + return excludedNodes; + } + } + return PipelineExcludedNodes.EMPTY; } /** @@ -115,6 +130,7 @@ public ContainerInfo getContainer(final long size, } List existingPipelines = pipelineManager.getPipelines( repConfig, Pipeline.PipelineState.OPEN); + existingPipelines.removeIf(pipelineExcludedNodes::isExcluded); final int pipelineCount = existingPipelines.size(); LOG.debug("Checking existing pipelines: {}", existingPipelines); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java index a61b32892352..5050f4455f71 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java @@ -23,6 +23,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.PipelineRequestInformation; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -43,6 +44,7 @@ public class WritableRatisContainerProvider private final PipelineManager pipelineManager; private final PipelineChoosePolicy pipelineChoosePolicy; private final ContainerManager containerManager; + private final PipelineExcludedNodes pipelineExcludedNodes; public WritableRatisContainerProvider( PipelineManager pipelineManager, @@ -51,6 +53,19 @@ public WritableRatisContainerProvider( this.pipelineManager = pipelineManager; this.containerManager = containerManager; this.pipelineChoosePolicy = pipelineChoosePolicy; + this.pipelineExcludedNodes = configuredPipelineExcludedNodes(pipelineManager); + } + + private static PipelineExcludedNodes configuredPipelineExcludedNodes( + PipelineManager pipelineManager) { + if (pipelineManager instanceof PipelineManagerImpl) { + PipelineExcludedNodes excludedNodes = + ((PipelineManagerImpl) pipelineManager).getPipelineExcludedNodesConfig(); + if (excludedNodes != null) { + return excludedNodes; + } + } + return PipelineExcludedNodes.EMPTY; } @Override @@ -164,10 +179,13 @@ private List findPipelinesByState( List pipelines = pipelineManager.getPipelines(repConfig, pipelineState, excludeList.getDatanodes(), excludeList.getPipelineIds()); + pipelines.removeIf(pipelineExcludedNodes::isExcluded); if (pipelines.isEmpty() && !excludeList.isEmpty()) { // if no pipelines can be found, try finding pipeline without // exclusion pipelines = pipelineManager.getPipelines(repConfig, pipelineState); + // but still, configured excluded nodes must be excluded + pipelines.removeIf(pipelineExcludedNodes::isExcluded); } return pipelines; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 013f14b16504..9bf492609936 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; +import com.google.common.collect.ImmutableSet; import jakarta.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; @@ -28,6 +29,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -51,6 +53,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.net.NetConstants; @@ -903,6 +906,27 @@ public List getNodesByAddress(String address) { return results; } + @Override + public Set resolvePipelineExcludedDatanodes(PipelineExcludedNodes pipelineExcludedNodes) { + if (pipelineExcludedNodes == null || pipelineExcludedNodes.isEmpty()) { + return Collections.emptySet(); + } + Set resolved = new HashSet<>(); + for (DatanodeID datanodeID : pipelineExcludedNodes.getExcludedDatanodeIds()) { + DatanodeDetails datanodeDetails = getNode(datanodeID); + if (datanodeDetails != null) { + resolved.add(datanodeDetails); + } + } + for (String address : pipelineExcludedNodes.getExcludedAddressTokens()) { + List datanodes = getNodesByAddress(address); + if (datanodes != null) { + resolved.addAll(datanodes); + } + } + return ImmutableSet.copyOf(resolved); + } + @Override public NetworkTopology getClusterNetworkTopologyMap() { return clusterMap; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index 4f0679470eab..fda3d9037dbf 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.container; +import com.google.common.collect.ImmutableSet; import jakarta.annotation.Nullable; import java.io.IOException; import java.util.Collections; @@ -35,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -359,6 +361,27 @@ public List getNodesByAddress(String address) { return null; } + @Override + public Set resolvePipelineExcludedDatanodes(PipelineExcludedNodes pipelineExcludedNodes) { + if (pipelineExcludedNodes == null || pipelineExcludedNodes.isEmpty()) { + return Collections.emptySet(); + } + Set resolved = new HashSet<>(); + for (DatanodeID datanodeID : pipelineExcludedNodes.getExcludedDatanodeIds()) { + DatanodeDetails datanodeDetails = getNode(datanodeID); + if (datanodeDetails != null) { + resolved.add(datanodeDetails); + } + } + for (String address : pipelineExcludedNodes.getExcludedAddressTokens()) { + List datanodes = getNodesByAddress(address); + if (datanodes != null) { + resolved.addAll(datanodes); + } + } + return ImmutableSet.copyOf(resolved); + } + @Override public NetworkTopology getClusterNetworkTopologyMap() { return null; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java index daf0b4b4c6a8..9b794fe973d1 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -27,8 +27,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -347,6 +351,43 @@ void testAllocateContainersWithECReplicationConfig() throws Exception { containerManager.getContainer(admin.containerID())); } + /** + * First there's only one pipeline, and a DN in that pipeline is excluded, so container creation fails. Then + * another allowed is made availabel and container creation passes. + */ + @Test + void testAllocateContainerFiltersConfiguredExcludedPipelines() + throws Exception { + OzoneConfiguration conf = SCMTestUtils.getConf(new File(testDir, "exclude")); + RatisReplicationConfig replicationConfig = + RatisReplicationConfig.getInstance(ReplicationFactor.THREE); + List pipelines = pipelineManager.getPipelines(replicationConfig, Pipeline.PipelineState.OPEN); + assertEquals(1, pipelines.size()); + Pipeline excludedPipeline = pipelines.get(0); + String excludedUuid = excludedPipeline.getNodes().get(0).getUuidString(); + conf.set("hdds.scm.pipeline.exclude.datanodes", excludedUuid); + + ContainerManager manager = new ContainerManagerImpl(conf, + scmhaManager, sequenceIdGen, pipelineManager, + SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock); + clearInvocations(pipelineManager); + + // No non-excluded pipeline is available, and pipeline creation is blocked. + doThrow(new IOException("No pipeline available")) + .when(pipelineManager).createPipeline(replicationConfig); + assertThrows(IOException.class, () -> manager.allocateContainer(replicationConfig, "admin")); + verify(pipelineManager, times(1)).createPipeline(replicationConfig); + + // Make a new pipeline available and ensure allocation succeeds. + doCallRealMethod().when(pipelineManager).createPipeline(replicationConfig); + Pipeline allowedPipeline = pipelineManager.createPipeline(replicationConfig); + clearInvocations(pipelineManager); + + ContainerInfo container = manager.allocateContainer(replicationConfig, "admin"); + assertEquals(allowedPipeline.getId(), container.getPipelineID()); + verify(pipelineManager, never()).createPipeline(replicationConfig); + } + @Test void testUpdateContainerReplicaInvokesPendingOp() throws IOException, TimeoutException { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index e7fc6f14f9b6..67138d5bf804 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -44,6 +44,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -51,6 +52,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import java.io.File; import java.io.IOException; import java.time.Instant; @@ -76,6 +78,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -123,6 +126,7 @@ * Tests for PipelineManagerImpl. */ public class TestPipelineManagerImpl { + private OzoneConfiguration conf; private DBStore dbStore; private MockNodeManager nodeManager; @@ -999,6 +1003,163 @@ public void testHasEnoughSpace() throws IOException { assertFalse(pipelineManager.hasEnoughSpace(pipeline, containerSize)); } + /** + * {@link PipelineManagerImpl#createPipeline(ReplicationConfig, List, List)} calls + * {@link PipelineFactory#create(ReplicationConfig, List, List)} to create a pipeline. This test asserts that + * {@link PipelineManagerImpl#createPipeline(ReplicationConfig, List, List)} creates a merged list of both configured + * excluded nodes (hdds.scm.pipeline.exclude.datanodes) and client provided excluded nodes to pass as argument to the + * PipelineFactory create method. + */ + @Test + public void testConfiguredExcludedNodesMergedForCreatePipeline() throws Exception { + DatanodeDetails excludedByUuid = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails excludedByAddress = MockDatanodeDetails.randomDatanodeDetails(); + excludedByAddress.setIpAddress("10.0.0.25"); + DatanodeDetails excludedByClient = MockDatanodeDetails.randomDatanodeDetails(); + + conf.set("hdds.scm.pipeline.exclude.datanodes", + excludedByUuid.getUuidString() + "," + excludedByAddress.getIpAddress()); + + SCMContext localContext = SCMContext.emptyContext(); + localContext.updateLeaderAndTerm(true, 1); + NodeManager localNodeManager = mock(NodeManager.class); + when(localNodeManager.getNode(excludedByUuid.getID())).thenReturn(excludedByUuid); + when(localNodeManager.getNodesByAddress(excludedByAddress.getIpAddress())) + .thenReturn(Collections.singletonList(excludedByAddress)); + stubResolvePipelineExcludedDatanodes(localNodeManager); + PipelineStateManager localStateManager = mock(PipelineStateManager.class); + PipelineFactory localFactory = mock(PipelineFactory.class); + + try (PipelineManagerImpl pipelineManager = new PipelineManagerImpl(conf, + SCMHAManagerStub.getInstance(true), localNodeManager, localStateManager, + localFactory, new EventQueue(), localContext, testClock)) { + RatisReplicationConfig replicationConfig = RatisReplicationConfig.getInstance(ReplicationFactor.THREE); + Pipeline createdPipeline = Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.ALLOCATED) + .setReplicationConfig(replicationConfig) + .setNodes(ImmutableList.of( + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails())) + .build(); + + when(localFactory.create(eq(replicationConfig), any(), eq(Collections.emptyList()))).thenReturn(createdPipeline); + + pipelineManager.createPipeline(replicationConfig, + Collections.singletonList(excludedByClient), Collections.emptyList()); + + ArgumentCaptor> excludedCaptor = ArgumentCaptor.forClass(List.class); + verify(localFactory).create(eq(replicationConfig), excludedCaptor.capture(), eq(Collections.emptyList())); + List allExcluded = excludedCaptor.getValue(); + assertThat(allExcluded).contains(excludedByClient, excludedByUuid, excludedByAddress); + } + } + + @Test + public void testConfiguredExcludedNodesMergedForBuildECPipeline() throws Exception { + DatanodeDetails excludedByUuid = MockDatanodeDetails.randomDatanodeDetails(); + conf.set("hdds.scm.pipeline.exclude.datanodes", excludedByUuid.getUuidString()); + + SCMContext localContext = SCMContext.emptyContext(); + localContext.updateLeaderAndTerm(true, 1); + NodeManager localNodeManager = mock(NodeManager.class); + when(localNodeManager.getNode(excludedByUuid.getID())).thenReturn(excludedByUuid); + stubResolvePipelineExcludedDatanodes(localNodeManager); + PipelineStateManager localStateManager = mock(PipelineStateManager.class); + PipelineFactory localFactory = mock(PipelineFactory.class); + + try (PipelineManagerImpl pipelineManager = new PipelineManagerImpl(conf, + SCMHAManagerStub.getInstance(true), localNodeManager, localStateManager, + localFactory, new EventQueue(), localContext, testClock)) { + ECReplicationConfig ecConfig = new ECReplicationConfig(3, 2); + Pipeline builtPipeline = MockPipeline.createEcPipeline(ecConfig); + when(localFactory.create(eq(ecConfig), any(), eq(Collections.emptyList()))).thenReturn(builtPipeline); + + pipelineManager.buildECPipeline(ecConfig, new ArrayList<>(), Collections.emptyList()); + + ArgumentCaptor> excludedCaptor = ArgumentCaptor.forClass(List.class); + verify(localFactory).create(eq(ecConfig), excludedCaptor.capture(), eq(Collections.emptyList())); + assertThat(excludedCaptor.getValue()).contains(excludedByUuid); + } + } + + @Test + public void testConfiguredUnknownAddressTokenIgnoredInCreatePipeline() throws Exception { + DatanodeDetails callerExcluded = MockDatanodeDetails.randomDatanodeDetails(); + conf.set("hdds.scm.pipeline.exclude.datanodes", "unknown-dn-host"); + + SCMContext localContext = SCMContext.emptyContext(); + localContext.updateLeaderAndTerm(true, 1); + NodeManager localNodeManager = mock(NodeManager.class); + when(localNodeManager.getNodesByAddress("unknown-dn-host")).thenReturn(Collections.emptyList()); + stubResolvePipelineExcludedDatanodes(localNodeManager); + PipelineStateManager localStateManager = mock(PipelineStateManager.class); + PipelineFactory localFactory = mock(PipelineFactory.class); + + try (PipelineManagerImpl pipelineManager = new PipelineManagerImpl(conf, + SCMHAManagerStub.getInstance(true), localNodeManager, localStateManager, + localFactory, new EventQueue(), localContext, testClock)) { + RatisReplicationConfig replicationConfig = RatisReplicationConfig.getInstance(ReplicationFactor.THREE); + Pipeline createdPipeline = Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.ALLOCATED) + .setReplicationConfig(replicationConfig) + .setNodes(ImmutableList.of( + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails())) + .build(); + + when(localFactory.create(eq(replicationConfig), any(), eq(Collections.emptyList()))).thenReturn(createdPipeline); + + pipelineManager.createPipeline(replicationConfig, + Collections.singletonList(callerExcluded), Collections.emptyList()); + + ArgumentCaptor> excludedCaptor = ArgumentCaptor.forClass(List.class); + verify(localFactory).create(eq(replicationConfig), excludedCaptor.capture(), eq(Collections.emptyList())); + assertThat(excludedCaptor.getValue()).containsExactly(callerExcluded); + } + } + + @Test + public void testConfiguredAndCallerExcludedNodesAreDeduplicated() throws Exception { + DatanodeDetails excludedByUuid = MockDatanodeDetails.randomDatanodeDetails(); + conf.set("hdds.scm.pipeline.exclude.datanodes", excludedByUuid.getUuidString()); + + SCMContext localContext = SCMContext.emptyContext(); + localContext.updateLeaderAndTerm(true, 1); + NodeManager localNodeManager = mock(NodeManager.class); + when(localNodeManager.getNode(excludedByUuid.getID())).thenReturn(excludedByUuid); + stubResolvePipelineExcludedDatanodes(localNodeManager); + PipelineStateManager localStateManager = mock(PipelineStateManager.class); + PipelineFactory localFactory = mock(PipelineFactory.class); + + try (PipelineManagerImpl pipelineManager = new PipelineManagerImpl(conf, + SCMHAManagerStub.getInstance(true), localNodeManager, localStateManager, + localFactory, new EventQueue(), localContext, testClock)) { + RatisReplicationConfig replicationConfig = RatisReplicationConfig.getInstance(ReplicationFactor.THREE); + Pipeline createdPipeline = Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.ALLOCATED) + .setReplicationConfig(replicationConfig) + .setNodes(ImmutableList.of( + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails())) + .build(); + + when(localFactory.create(eq(replicationConfig), any(), eq(Collections.emptyList()))).thenReturn(createdPipeline); + + pipelineManager.createPipeline(replicationConfig, + Collections.singletonList(excludedByUuid), Collections.emptyList()); + + ArgumentCaptor> excludedCaptor = ArgumentCaptor.forClass(List.class); + verify(localFactory).create(eq(replicationConfig), excludedCaptor.capture(), eq(Collections.emptyList())); + assertThat(excludedCaptor.getValue()).containsExactly(excludedByUuid); + } + } + private Set createContainerReplicasList( List dns) { Set replicas = new HashSet<>(); @@ -1049,4 +1210,28 @@ private static void assertFailsNotLeader(CheckedRunnable block) { assertEquals(ResultCodes.SCM_NOT_LEADER, e.getResult()); assertInstanceOf(NotLeaderException.class, e.getCause()); } + + private static void stubResolvePipelineExcludedDatanodes(NodeManager nodeManager) { + lenient().when(nodeManager.resolvePipelineExcludedDatanodes(any())) + .thenAnswer(invocation -> { + PipelineExcludedNodes pipelineExcludedNodes = invocation.getArgument(0); + if (pipelineExcludedNodes == null || pipelineExcludedNodes.isEmpty()) { + return Collections.emptySet(); + } + Set resolved = new HashSet<>(); + for (DatanodeID datanodeID : pipelineExcludedNodes.getExcludedDatanodeIds()) { + DatanodeDetails datanodeDetails = nodeManager.getNode(datanodeID); + if (datanodeDetails != null) { + resolved.add(datanodeDetails); + } + } + for (String address : pipelineExcludedNodes.getExcludedAddressTokens()) { + List datanodes = nodeManager.getNodesByAddress(address); + if (datanodes != null) { + resolved.addAll(datanodes); + } + } + return ImmutableSet.copyOf(resolved); + }); + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java index b3c34b44e4c2..d4e50676588c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java @@ -33,8 +33,10 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; @@ -47,6 +49,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.Set; +import java.util.TreeSet; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -57,6 +60,7 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -70,6 +74,8 @@ import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.net.NodeSchema; import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig; import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.CapacityPipelineChoosePolicy; import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy; @@ -555,6 +561,92 @@ public void testExcludedNodesPassedToCreatePipelineIfProvided( Collections.emptyList()); } + @ParameterizedTest + @MethodSource("policies") + public void testConfiguredExcludedPipelineFilteredBeforeSelection( + PipelineChoosePolicy policy) throws IOException { + /* + This test first has only one pipeline, and a DN in that pipeline is excluded by configuration. Since there's only + one pipeline and that's excluded, no container can be selected from that. Also since there are only 3 (mocked) + Datanodes and one out of them is excluded, no new pipelines can be created either. So + WritableECContainerProvider#getContainer should throw an IOException. WRT code coverage, this tests the first if + branch `if (openPipelineCount < maximumPipelines)` in getContainer(), when that throws, the while loop is not + entered because all pipelines are excluded. Then it also hits the `maximumPipelines = nodeCount;` path where + maximum pipelines limit is increased. That path also throws since no new pipelines can be created. + */ + + // pipelinePerVolumeFactor set to 0 and minimumPipelines set to 2 so that getMaximumPipelines() returns 2 + providerConf.setPipelinePerVolumeFactor(0); + providerConf.setMinimumPipelines(2); + + NodeManager localNodeManager = mock(NodeManager.class); + when(localNodeManager.getNodeCount(NodeStatus.inServiceHealthy())).thenReturn(3); + PipelineManagerImpl localPipelineManager = mock(PipelineManagerImpl.class); + ContainerManager localContainerManager = mock(ContainerManager.class); + + DatanodeDetails excludedDn = MockDatanodeDetails.randomDatanodeDetails(); + Pipeline excludedPipeline = createEcPipelineWithFirstNode(excludedDn); + + // there's only one pipeline (excludedPipeline) + when(localPipelineManager.getPipelineCount(repConfig, Pipeline.PipelineState.OPEN)).thenReturn(1); + List pipelines = new ArrayList<>(); + pipelines.add(excludedPipeline); + when(localPipelineManager.getPipelines(repConfig, Pipeline.PipelineState.OPEN)).thenReturn(pipelines); + when(localPipelineManager.getPipelineExcludedNodesConfig()).thenReturn(PipelineExcludedNodes.parse( + excludedDn.getUuidString())); + + WritableContainerProvider localProvider = new WritableECContainerProvider( + providerConf, getMaxContainerSize(), localNodeManager, localPipelineManager, localContainerManager, policy); + assertThrows(IOException.class, () -> localProvider.getContainer(1, repConfig, OWNER, new ExcludeList())); + + /* + Now, we introduce another pipeline to the cluster which is not excluded. So now a container from this allowed + pipeline can be selected. WRT code coverage, this additionally tests the while loop branch + `while (!existingPipelines.isEmpty())`. + */ + Pipeline allowedPipeline = createEcPipelineWithFirstNode(MockDatanodeDetails.randomDatanodeDetails()); + pipelines.add(allowedPipeline); + ContainerID allowedContainerId = ContainerID.valueOf(100L); + NavigableSet allowedContainerSet = new TreeSet<>(); + allowedContainerSet.add(allowedContainerId); + ContainerInfo allowedContainer = new ContainerInfo.Builder() + .setContainerID(allowedContainerId.getId()) + .setPipelineID(allowedPipeline.getId()) + .build(); + // there are two pipelines in the cluster now + when(localPipelineManager.getPipelineCount(repConfig, Pipeline.PipelineState.OPEN)).thenReturn(2); + when(localNodeManager.getNodeCount(NodeStatus.inServiceHealthy())).thenReturn(6); + when(localPipelineManager.getContainersInPipeline(allowedPipeline.getId())).thenReturn(allowedContainerSet); + when(localContainerManager.getContainer(allowedContainerId)).thenReturn(allowedContainer); + + ContainerInfo selected = localProvider.getContainer(1, repConfig, OWNER, new ExcludeList()); + + assertEquals(allowedContainer, selected); + verify(localPipelineManager, never()).getContainersInPipeline(excludedPipeline.getId()); + } + + private Pipeline createEcPipelineWithFirstNode(DatanodeDetails firstNode) { + List nodes = new ArrayList<>(); + nodes.add(firstNode); + while (nodes.size() < repConfig.getRequiredNodes()) { + nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + } + + Map nodeIndexes = new HashMap<>(); + int ecIndex = 1; + for (DatanodeDetails dn : nodes) { + nodeIndexes.put(dn, ecIndex++); + } + + return Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.OPEN) + .setReplicationConfig(repConfig) + .setNodes(nodes) + .setReplicaIndexes(nodeIndexes) + .build(); + } + private ContainerInfo createContainer(Pipeline pipeline, ReplicationConfig repConf, long containerID) { return new ContainerInfo.Builder() diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java index a1ba81d0a70a..803605696c6c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableRatisContainerProvider.java @@ -24,6 +24,8 @@ import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -35,9 +37,11 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.PipelineExcludedNodes; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -61,12 +65,11 @@ class TestWritableRatisContainerProvider { private static final int CONTAINER_SIZE = 1234; private static final ExcludeList NO_EXCLUSION = new ExcludeList(); - private final OzoneConfiguration conf = new OzoneConfiguration(); private final PipelineChoosePolicy policy = new RandomPipelineChoosePolicy(); private final AtomicLong containerID = new AtomicLong(1); @Mock - private PipelineManager pipelineManager; + private PipelineManagerImpl pipelineManager; @Mock private ContainerManager containerManager; @@ -118,6 +121,73 @@ void failsIfContainerCannotBeCreated() throws Exception { verifyPipelineCreated(); } + @Test + void configuredExcludedPipelineReturnsNoContainerUntilGoodPipelineAppears() + throws Exception { + DatanodeDetails excludedDn = MockDatanodeDetails.randomDatanodeDetails(); + Pipeline excludedPipeline = MockPipeline.createPipeline(asList( + excludedDn, + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails())); + Pipeline allowedPipeline = MockPipeline.createPipeline(3); + ContainerInfo containerFromAllowedPipeline = + pipelineHasContainer(allowedPipeline); + List listWithAllowedPipeline = new ArrayList<>(); + listWithAllowedPipeline.add(allowedPipeline); + listWithAllowedPipeline.add(excludedPipeline); + when(pipelineManager.getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet())) + .thenReturn(new ArrayList<>(singletonList(excludedPipeline))) + .thenReturn(new ArrayList<>(singletonList(excludedPipeline))) + .thenReturn(listWithAllowedPipeline); + when(pipelineManager.createPipeline(REPLICATION_CONFIG)) + .thenThrow(new SCMException(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE)); + + PipelineExcludedNodes pipelineExcludedNodes = + PipelineExcludedNodes.parse(excludedDn.getUuidString()); + WritableRatisContainerProvider subject = createSubject(pipelineExcludedNodes); + + assertThrows(IOException.class, () -> subject.getContainer( + CONTAINER_SIZE, REPLICATION_CONFIG, OWNER, NO_EXCLUSION)); + + ContainerInfo container = subject.getContainer(CONTAINER_SIZE, REPLICATION_CONFIG, OWNER, NO_EXCLUSION); + + assertSame(containerFromAllowedPipeline, container); + verify(pipelineManager, times(1)).createPipeline(REPLICATION_CONFIG); + } + + @Test + void usesExcludeListFallbackButStillFiltersConfiguredExcludedPipelines() + throws Exception { + DatanodeDetails excludedDn = MockDatanodeDetails.randomDatanodeDetails(); + Pipeline excludedPipeline = MockPipeline.createPipeline(asList( + excludedDn, + MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails())); + Pipeline allowedPipeline = MockPipeline.createPipeline(3); + ContainerInfo containerFromAllowedPipeline = + pipelineHasContainer(allowedPipeline); + + ExcludeList excludeList = new ExcludeList(); + excludeList.addDatanode(MockDatanodeDetails.randomDatanodeDetails()); + when(pipelineManager.getPipelines(eq(REPLICATION_CONFIG), eq(OPEN), + anySet(), anySet())) + .thenReturn(emptyList()); + when(pipelineManager.getPipelines(REPLICATION_CONFIG, OPEN)) + .thenReturn(new ArrayList<>(asList(excludedPipeline, allowedPipeline))); + + PipelineExcludedNodes pipelineExcludedNodes = + PipelineExcludedNodes.parse(excludedDn.getUuidString()); + + ContainerInfo container = createSubject(pipelineExcludedNodes) + .getContainer(CONTAINER_SIZE, REPLICATION_CONFIG, OWNER, excludeList); + + assertSame(containerFromAllowedPipeline, container); + verify(pipelineManager).getPipelines(eq(REPLICATION_CONFIG), eq(OPEN), + eq(excludeList.getDatanodes()), eq(excludeList.getPipelineIds())); + verify(pipelineManager).getPipelines(REPLICATION_CONFIG, OPEN); + verify(pipelineManager, never()).createPipeline(REPLICATION_CONFIG); + } + private void existingPipelines(Pipeline... pipelines) { existingPipelines(new ArrayList<>(asList(pipelines))); } @@ -161,6 +231,14 @@ private WritableRatisContainerProvider createSubject() { pipelineManager, containerManager, policy); } + private WritableRatisContainerProvider createSubject( + PipelineExcludedNodes pipelineExcludedNodes) { + when(pipelineManager.getPipelineExcludedNodesConfig()) + .thenReturn(pipelineExcludedNodes); + return new WritableRatisContainerProvider( + pipelineManager, containerManager, policy); + } + private void verifyPipelineCreated() throws IOException { verify(pipelineManager, times(2)) .getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet());