Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm;

import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;

/**
* Parsed snapshot of {@code hdds.scm.pipeline.exclude.datanodes}: UUIDs, hostnames,
* and IP tokens to exclude from pipeline creation and selection.
*/
public final class PipelineExcludedNodes {
public static final PipelineExcludedNodes EMPTY = new PipelineExcludedNodes(
Collections.emptySet(), Collections.emptySet());

private final Set<DatanodeID> excludedDatanodeIds;
private final Set<String> excludedAddressTokens;

private PipelineExcludedNodes(Set<DatanodeID> excludedDatanodeIds, Set<String> excludedAddressTokens) {
this.excludedDatanodeIds = ImmutableSet.copyOf(excludedDatanodeIds);
this.excludedAddressTokens = ImmutableSet.copyOf(excludedAddressTokens);
}

public static PipelineExcludedNodes parse(String rawValue) {
if (rawValue == null || StringUtils.isBlank(rawValue)) {
return EMPTY;
}

Set<DatanodeID> datanodeIDs = new HashSet<>();
Set<String> addressTokens = new HashSet<>();

Arrays.stream(rawValue.split(","))
.map(String::trim)
.filter(token -> !token.isEmpty())
.forEach(token -> {
try {
datanodeIDs.add(DatanodeID.fromUuidString(token));
} catch (IllegalArgumentException ignored) {
addressTokens.add(normalizeAddress(token));
}
});

if (datanodeIDs.isEmpty() && addressTokens.isEmpty()) {
return EMPTY;
}
return new PipelineExcludedNodes(datanodeIDs, addressTokens);
}

public boolean isEmpty() {
return excludedDatanodeIds.isEmpty() && excludedAddressTokens.isEmpty();
}

public Set<DatanodeID> getExcludedDatanodeIds() {
return excludedDatanodeIds;
}

public Set<String> getExcludedAddressTokens() {
return excludedAddressTokens;
}

public boolean isExcluded(DatanodeDetails datanodeDetails) {
if (datanodeDetails == null) {
return false;
}
if (excludedDatanodeIds.contains(datanodeDetails.getID())) {
return true;
}

final String hostName = datanodeDetails.getHostName();
if (hostName != null && excludedAddressTokens.contains(normalizeAddress(hostName))) {
return true;
}

final String ipAddress = datanodeDetails.getIpAddress();
return ipAddress != null && excludedAddressTokens.contains(normalizeAddress(ipAddress));
}

public boolean isExcluded(Pipeline pipeline) {
for (DatanodeDetails dn : pipeline.getNodes()) {
if (isExcluded(dn)) {
return true;
}
}
return false;
}

private static String normalizeAddress(String value) {
return value.toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ public class ScmConfig extends ReconfigurableConfig {
)
private String ecPipelineChoosePolicyName;

@Config(key = "hdds.scm.pipeline.exclude.datanodes",
type = ConfigType.STRING,
defaultValue = "",
tags = { ConfigTag.SCM, ConfigTag.PIPELINE },
description =
"Comma-separated list of Datanodes to exclude from SCM pipeline creation and pipeline selection. "
+ "Each entry may be a Datanode UUID, hostname, or IP address. "
+ "Example: \"<uuid>,dn-1.example.com,10.0.0.12\"."
)
private String pipelineExcludeDatanodes = "";

@Config(key = "hdds.scm.block.deletion.per-interval.max",
type = ConfigType.INT,
defaultValue = "500000",
Expand Down Expand Up @@ -194,6 +205,14 @@ public String getECPipelineChoosePolicyName() {
return ecPipelineChoosePolicyName;
}

/**
* Parsed view of {@link #pipelineExcludeDatanodes}. Computed on each call so it stays aligned
* with the current config string (including dynamic reconfiguration).
*/
public PipelineExcludedNodes getPipelineExcludedNodes() {
return PipelineExcludedNodes.parse(pipelineExcludeDatanodes);
}

public int getBlockDeletionLimit() {
return blockDeletionLimit;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.UUID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.junit.jupiter.api.Test;

/**
* Tests for {@link ScmConfig}.
*/
class TestScmConfig {

@Test
void testPipelineExcludedNodesDefaultsToEmpty() {
OzoneConfiguration conf = new OzoneConfiguration();
ScmConfig scmConfig = conf.getObject(ScmConfig.class);

assertTrue(scmConfig.getPipelineExcludedNodes().isEmpty());
}

@Test
void testPipelineExcludedNodesParsesUuidHostnameAndIp() {
String uuid = UUID.randomUUID().toString();
OzoneConfiguration conf = new OzoneConfiguration();
conf.set("hdds.scm.pipeline.exclude.datanodes",
uuid + ", DN-1.EXAMPLE.COM, 10.0.0.12, dn-1.example.com");
ScmConfig scmConfig = conf.getObject(ScmConfig.class);

PipelineExcludedNodes first = scmConfig.getPipelineExcludedNodes();
PipelineExcludedNodes second = scmConfig.getPipelineExcludedNodes();

assertFalse(first.isEmpty());
assertEquals(first.getExcludedDatanodeIds(), second.getExcludedDatanodeIds());
assertEquals(first.getExcludedAddressTokens(), second.getExcludedAddressTokens());
assertEquals(1, first.getExcludedDatanodeIds().size());
assertTrue(first.getExcludedDatanodeIds().contains(DatanodeID.fromUuidString(uuid)));
assertTrue(first.getExcludedAddressTokens().contains("dn-1.example.com"));
assertTrue(first.getExcludedAddressTokens().contains("10.0.0.12"));
assertEquals(2, first.getExcludedAddressTokens().size());
}

@Test
void testPipelineExcludedNodesMatchesDatanodeByIdAndAddress() {
DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails();
datanode.setHostName("dn-2.example.com");
datanode.setIpAddress("10.10.10.10");

PipelineExcludedNodes byUUID = PipelineExcludedNodes.parse(datanode.getUuidString());
assertTrue(byUUID.isExcluded(datanode));

PipelineExcludedNodes byHost = PipelineExcludedNodes.parse("DN-2.EXAMPLE.COM");
assertTrue(byHost.isExcluded(datanode));

PipelineExcludedNodes byIp = PipelineExcludedNodes.parse("10.10.10.10");
assertTrue(byIp.isExcluded(datanode));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.PipelineExcludedNodes;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
Expand Down Expand Up @@ -80,6 +83,7 @@ public class ContainerManagerImpl implements ContainerManager {
private final Random random = new Random();

private final long maxContainerSize;
private final PipelineExcludedNodes pipelineExcludedNodes;

/**
*
Expand Down Expand Up @@ -108,6 +112,9 @@ public ContainerManagerImpl(

maxContainerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
pipelineExcludedNodes =
new OzoneConfiguration(conf).getObject(ScmConfig.class)
.getPipelineExcludedNodes();

this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
}
Expand Down Expand Up @@ -182,6 +189,7 @@ public ContainerInfo allocateContainer(
try {
pipelines = pipelineManager
.getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
pipelines.removeIf(pipelineExcludedNodes::isExcluded);
if (!pipelines.isEmpty()) {
pipeline = pipelines.get(random.nextInt(pipelines.size()));
containerInfo = createContainer(pipeline, owner);
Expand Down Expand Up @@ -209,6 +217,7 @@ public ContainerInfo allocateContainer(
try {
pipelines = pipelineManager
.getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
pipelines.removeIf(pipelineExcludedNodes::isExcluded);
if (!pipelines.isEmpty()) {
pipeline = pipelines.get(random.nextInt(pipelines.size()));
containerInfo = createContainer(pipeline, owner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.PipelineExcludedNodes;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
Expand Down Expand Up @@ -370,6 +371,18 @@ Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
/** @return the datanode of the given id if it exists; otherwise, return null. */
@Nullable DatanodeDetails getNode(@Nullable DatanodeID id);

/**
* Resolves pipeline exclusion configuration against datanodes currently known to this manager.
* Call each time an exclusion set is needed (for example when creating a pipeline);
* do not cache the result across calls, so DNs that register after SCM startup are included when
* they match a configured hostname or IP, and UUIDs resolve as soon as the node exists.
* Removed DNs simply stop appearing in the result on the next call.
*
* @param pipelineExcludedNodes parsed {@code hdds.scm.pipeline.exclude.datanodes}; null or empty yields an empty set
* @return immutable copy of matching registered datanodes
*/
Set<DatanodeDetails> resolvePipelineExcludedDatanodes(PipelineExcludedNodes pipelineExcludedNodes);

/**
* Given datanode address(Ipaddress or hostname), returns a list of
* DatanodeDetails for the datanodes running at that address.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.math.RoundingMode;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.PipelineExcludedNodes;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand Down Expand Up @@ -1894,6 +1896,27 @@ public List<DatanodeDetails> getNodesByAddress(String address) {
.collect(Collectors.toList());
}

@Override
public Set<DatanodeDetails> resolvePipelineExcludedDatanodes(PipelineExcludedNodes pipelineExcludedNodes) {
if (pipelineExcludedNodes == null || pipelineExcludedNodes.isEmpty()) {
return Collections.emptySet();
}
Set<DatanodeDetails> resolved = new HashSet<>();
for (DatanodeID datanodeID : pipelineExcludedNodes.getExcludedDatanodeIds()) {
DatanodeDetails datanodeDetails = getNode(datanodeID);
if (datanodeDetails != null) {
resolved.add(datanodeDetails);
}
}
for (String address : pipelineExcludedNodes.getExcludedAddressTokens()) {
List<DatanodeDetails> datanodes = getNodesByAddress(address);
if (datanodes != null) {
resolved.addAll(datanodes);
}
}
return ImmutableSet.copyOf(resolved);
}

/**
* Get cluster map as in network topology for this node manager.
* @return cluster map
Expand Down
Loading