Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
import org.apache.ratis.util.ReflectionUtils;

/**
* A factory to create {@link ContainerChoosingPolicy} instances for the DiskBalancer.
Expand All @@ -49,7 +48,14 @@ public static ContainerChoosingPolicy getDiskBalancerPolicy(ConfigurationSource
Class<? extends ContainerChoosingPolicy> policyClass = conf.getClass(
HDDS_DATANODE_DISKBALANCER_CONTAINER_CHOOSING_POLICY,
DEFAULT_CONTAINER_CHOOSING_POLICY, ContainerChoosingPolicy.class);
return ReflectionUtils.newInstance(policyClass, new Class<?>[]{ReentrantLock.class},
VolumeChoosingPolicyFactory.getVolumeSpaceReservationLock());
ReentrantLock lock = VolumeChoosingPolicyFactory.getVolumeSpaceReservationLock();
try {
return policyClass.getConstructor(ReentrantLock.class, ConfigurationSource.class)
.newInstance(lock, conf);
} catch (ReflectiveOperationException e) {
throw new IllegalStateException(
"Disk balancer container choosing policy must implement "
+ "(ReentrantLock, ConfigurationSource): " + policyClass.getName(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ public final class DiskBalancerConfiguration {
"Unit could be defined with postfix (ns,ms,s,m,h,d).")
private long replicaDeletionDelay = Duration.ofMinutes(5).toMillis();

@Config(key = "hdds.datanode.disk.balancer.include.non.standard.containers",
defaultValue = "false",
type = ConfigType.BOOLEAN,
tags = { DATANODE, ConfigTag.DISKBALANCER },
description = "If true, balancer include non-standard states, i.e, QUASI_CLOSED." +
" So both CLOSED and QUASI_CLOSED state containers are eligible for move. " +
"If false (default), balancer only moves CLOSED containers.")
private boolean includeNonStandardContainers = false;

public DiskBalancerConfiguration(Double threshold,
Long bandwidthInMB,
Integer parallelThread,
Expand Down Expand Up @@ -262,6 +271,14 @@ public void setParallelThread(int parallelThread) {
this.parallelThread = parallelThread;
}

public boolean getIncludeNonStandardContainers() {
return includeNonStandardContainers;
}

public void setIncludeNonStandardContainers(boolean includeNonStandardContainers) {
this.includeNonStandardContainers = includeNonStandardContainers;
}

@Override
public String toString() {
return String.format("Disk Balancer Configuration values:%n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage;
import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerCandidate;
import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy.MovableContainerStates;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
Expand Down Expand Up @@ -130,6 +130,8 @@ public class DiskBalancerService extends BackgroundService {

private DiskBalancerServiceMetrics metrics;

private final MovableContainerStates movableContainerStates;

public DiskBalancerService(OzoneContainer ozoneContainer,
long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
int workerSize, ConfigurationSource conf) throws IOException {
Expand All @@ -153,8 +155,9 @@ public DiskBalancerService(OzoneContainer ozoneContainer,
throw new IOException(e);
}

replicaDeletionDelay = conf.getObject(DiskBalancerConfiguration.class)
.getReplicaDeletionDelay();
DiskBalancerConfiguration diskBalancerConfiguration = conf.getObject(DiskBalancerConfiguration.class);
replicaDeletionDelay = diskBalancerConfiguration.getReplicaDeletionDelay();
movableContainerStates = MovableContainerStates.fromConfig(conf);
metrics = DiskBalancerServiceMetrics.create();

loadDiskBalancerInfo();
Expand Down Expand Up @@ -485,14 +488,10 @@ public BackgroundTaskResult call() {
}

// Double check container state before acquiring lock to start move process.
// Container state may have changed after selection. Only CLOSED containers can be moved.
// QUASI_CLOSED is allowed when test mode is enabled, this is done to test in production
// these containers are rejected.
// Container state may have changed after selection.
State containerState = container.getContainerData().getState();
boolean isTestMode = DefaultContainerChoosingPolicy.isTest();
if (containerState != State.CLOSED && !(isTestMode && containerState == State.QUASI_CLOSED)) {
LOG.warn("Container {} is in {} state, skipping move process. Only CLOSED containers can be moved.",
containerId, containerState);
if (!movableContainerStates.allows(containerState)) {
LOG.warn("Container {} is in {} state, skipping move process.", containerId, containerState);
postCall(false, startTime);
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getIdealUsage;
import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.newVolumeFixedUsage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Comparator;
Expand All @@ -33,13 +32,16 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.fs.SpaceUsageSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerConfiguration;
import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.slf4j.Logger;
Expand All @@ -50,6 +52,8 @@
* then chooses a container from the source volume that can be moved to the destination without
* exceeding the upper threshold. Space is reserved on the destination only when a container is
* chosen, using the actual container size.
*
* Which replica states may move is defined by {@link MovableContainerStates}.
*/
public class DefaultContainerChoosingPolicy implements ContainerChoosingPolicy {
public static final Logger LOG = LoggerFactory.getLogger(
Expand All @@ -59,13 +63,12 @@ public class DefaultContainerChoosingPolicy implements ContainerChoosingPolicy {
ThreadLocal.withInitial(
() -> CacheBuilder.newBuilder().recordStats().expireAfterAccess(1, HOURS).build());

// for test
private static boolean test = false;

private final ReentrantLock lock;
private final MovableContainerStates movableContainerStates;

public DefaultContainerChoosingPolicy(ReentrantLock globalLock) {
public DefaultContainerChoosingPolicy(ReentrantLock globalLock, ConfigurationSource conf) {
this.lock = globalLock;
this.movableContainerStates = MovableContainerStates.fromConfig(conf);
}

@Override
Expand Down Expand Up @@ -128,15 +131,15 @@ public ContainerCandidate chooseVolumesAndContainer(OzoneContainer ozoneContaine
dst.incCommittedBytes(containerSize);
LOG.debug("Chosen volume pair for disk balancing: source={} (utilization={}), "
+ "destination={} (utilization={})",
src.getStorageID(), srcUsage.getUtilization(),
dst.getStorageID(), dstUsage.getUtilization());
src.getStorageDir().getPath(), srcUsage.getUtilization(),
dst.getStorageDir().getPath(), dstUsage.getUtilization());
return new ContainerCandidate(containerData, src, dst);
}
LOG.debug("No suitable container found for destination {}, trying next volume.",
dst.getStorageID());
LOG.debug("No container to move for destination {}, trying next volume.",
dst.getStorageDir().getPath());
} else {
LOG.debug("Destination volume {} does not have enough space, trying next volume.",
dst.getStorageID());
dst.getStorageDir().getPath());
}
}
LOG.debug("Failed to find appropriate destination volume and container.");
Expand All @@ -147,7 +150,7 @@ public ContainerCandidate chooseVolumesAndContainer(OzoneContainer ozoneContaine
}

/**
* Choose a container from source volume that can be moved to destination volume.
* Finds a container on {@code src} that can move to {@code dst}.
*/
private ContainerData chooseContainer(OzoneContainer ozoneContainer,
HddsVolume src, HddsVolume dst, VolumeFixedUsage dstUsage,
Expand All @@ -166,23 +169,63 @@ private ContainerData chooseContainer(OzoneContainer ozoneContainer,

while (itr.hasNext()) {
ContainerData containerData = itr.next().getContainerData();
long containerId = containerData.getContainerID();

// Skip containers removed from containerSet after iterator was cached
if (ozoneContainer.getContainerSet().getContainer(containerData.getContainerID()) == null) {
if (ozoneContainer.getContainerSet().getContainer(containerId) == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping container {} from volume {}: not in container set "
+ "(removed after iterator was cached)",
containerId, src.getStorageDir().getPath());
}
continue;
}

if (inProgressContainerIDs.contains(ContainerID.valueOf(containerId))) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping container {} from volume {}: disk balancer move already in progress",
containerId, src.getStorageDir().getPath());
}
continue;
}

if (containerData.getBytesUsed() > 0 &&
!inProgressContainerIDs.contains(ContainerID.valueOf(containerData.getContainerID())) &&
(containerData.isClosed() || (test && containerData.isQuasiClosed()))) {
long containerSize = containerData.getBytesUsed();
if (containerSize <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping container {} from volume {}: bytes used is {}",
containerId, src.getStorageDir().getPath(), containerData.getBytesUsed());
}
continue;
}

long containerSize = containerData.getBytesUsed();
// Check if dst has enough space and can accept the container without exceeding threshold
if (containerSize < usableSpace &&
computeUtilization(dstSpaceUsage, dstCommittedBytes, containerSize) < upperThreshold) {
return containerData;
if (!movableContainerStates.allows(containerData.getState())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping container {} from volume {}: state is {}. Policy: {}",
containerId, src.getStorageDir().getPath(), containerData.getState(), movableContainerStates);
}
continue;
}

if (containerSize >= usableSpace) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping container {} ({}B) from volume {}: exceeds destination {} "
+ "usable space {}B", containerId, containerSize, src.getStorageDir().getPath(),
dst.getStorageDir().getPath(), usableSpace);
}
continue;
}

double newUtilization = computeUtilization(dstSpaceUsage, dstCommittedBytes, containerSize);
if (newUtilization >= upperThreshold) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping container {} ({}B) from volume {}: moving to {} would "
+ "result in utilization {} exceeding upper threshold {}",
containerId, containerSize, src.getStorageDir().getPath(), dst.getStorageDir().getPath(),
newUtilization, upperThreshold);
}
continue;
}

return containerData;
}

CACHE.get().invalidate(src);
Expand All @@ -206,19 +249,37 @@ private void logVolumeBalancingState(List<VolumeFixedUsage> volumeUsages,
long usableSpace = vfu.computeUsableSpace();
LOG.debug("Volume[{}] - disk={}, utilization={}, capacity={}, "
+ "effectiveUsed={}, available={}, usableSpace={}, committedBytes={}, delta={}",
i, vol.getStorageID(), String.format("%.10f", vfu.getUtilization()),
i, vol.getStorageDir().getPath(), String.format("%.10f", vfu.getUtilization()),
usage.getCapacity(), vfu.getEffectiveUsed(), usage.getAvailable(),
usableSpace, vol.getCommittedBytes(), deltaMap.getOrDefault(vol, 0L));
}
}

@VisibleForTesting
public static void setTest(boolean isTest) {
test = isTest;
}
/**
* This shows which container {@link State}s can be moved.
* {@link #CLOSED_ONLY}
* {@link #NON_STANDARD_INCLUDED}: CLOSED as well as QUASI_CLOSED (add more container states in
* {@link #allows} when needed).
*/
public enum MovableContainerStates {
CLOSED_ONLY,
NON_STANDARD_INCLUDED;

public static MovableContainerStates fromConfig(ConfigurationSource conf) {
boolean includeNonStandard =
conf.getObject(DiskBalancerConfiguration.class).getIncludeNonStandardContainers();
return includeNonStandard ? NON_STANDARD_INCLUDED : CLOSED_ONLY;
}

@VisibleForTesting
public static boolean isTest() {
return test;
public boolean allows(State state) {
switch (state) {
case CLOSED:
return true;
case QUASI_CLOSED:
return this == NON_STANDARD_INCLUDED;
default:
return false;
}
}
}
}
Loading