Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public synchronized void processAll() {
processedCount++;

if (processedCount % logEvery == 0 || processedCount == containers.size()) {
LOG.info("Processed {}/{} containers", processedCount, containers.size());
LOG.debug("Processed {}/{} containers", processedCount, containers.size());
}
} catch (ContainerNotFoundException e) {
LOG.error("Container {} not found", container.getContainerID(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.hadoop.ozone.recon.scm;

import static java.util.Comparator.comparingLong;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FINALIZE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
Expand Down Expand Up @@ -190,16 +194,16 @@ public void checkAndAddNewContainerBatch(
* targeted sync path use this method to avoid divergence in the count exposed
* to the Recon Node API.
*
* <p>If the container was recorded without a pipeline (null pipeline at
* {@code addNewContainer} time) the count decrement is safely skipped.
* <p>If the container was recorded without a pipeline, the count decrement is
* safely skipped.
*
* @param containerID container to advance from OPEN to CLOSING
* @param containerInfo already-fetched {@code ContainerInfo} for the container
* (avoids a redundant lookup inside this method)
* @param containerInfo already-fetched ContainerInfo for the container
* @throws IOException if the state update fails
* @throws InvalidStateTransitionException if the container is not in OPEN state
* @throws InvalidStateTransitionException if the state transition is invalid
*/
void transitionOpenToClosing(ContainerID containerID, ContainerInfo containerInfo)
void transitionOpenToClosing(ContainerID containerID,
ContainerInfo containerInfo)
throws IOException, InvalidStateTransitionException {
PipelineID pipelineID = containerInfo.getPipelineID();
if (pipelineID != null) {
Expand All @@ -210,28 +214,102 @@ void transitionOpenToClosing(ContainerID containerID, ContainerInfo containerInf
pipelineToOpenContainer.put(pipelineID, curCnt - 1);
}
}
updateContainerState(containerID, FINALIZE); // OPEN → CLOSING
updateContainerState(containerID, FINALIZE);
}

/**
* Check if an OPEN container should move to CLOSING based on a healthy
* non-OPEN DN replica report.
* Check if Recon's container lifecycle state can be corrected based on a DN
* replica report and SCM's authoritative state.
*
* <p>OPEN uses the DN replica state as the signal: only a healthy non-OPEN
* replica can move Recon to CLOSING. For CLOSING, the DN report is only a
* wake-up signal and Recon queries SCM before advancing. For DELETED, Recon
* recovers only on a live terminal replica report and only when SCM still
* reports the container as live.
*
* @param containerID containerID to check
* @param state replica state reported by a DataNode
*/
private void checkContainerStateAndUpdate(ContainerID containerID,
ContainerReplicaProto.State replicaState)
ContainerReplicaProto.State state)
throws IOException, InvalidStateTransitionException {
ContainerInfo containerInfo = getContainer(containerID);
HddsProtos.LifeCycleState reconState = containerInfo.getState();

if (reconState != HddsProtos.LifeCycleState.OPEN
|| replicaState == ContainerReplicaProto.State.OPEN
|| !isHealthy(replicaState)) {
if (reconState == HddsProtos.LifeCycleState.OPEN) {
if (state.equals(ContainerReplicaProto.State.OPEN) || !isHealthy(state)) {
return;
}
LOG.info("Container {} has state OPEN, but given state is {}.",
containerID, state);
transitionOpenToClosing(containerID, containerInfo);
return;
}

if (reconState == HddsProtos.LifeCycleState.CLOSING) {
reconcileClosingContainerFromScm(containerID);
return;
}

if (reconState == HddsProtos.LifeCycleState.DELETED) {
recoverDeletedContainerFromScm(containerID, state);
}
}

private void reconcileClosingContainerFromScm(ContainerID containerID)
throws IOException, InvalidStateTransitionException {
HddsProtos.LifeCycleState scmState = getScmContainerState(containerID);

if (scmState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
updateContainerState(containerID, QUASI_CLOSE);
LOG.info("Container {} advanced to QUASI_CLOSED in Recon "
+ "based on SCM state.", containerID);
} else if (scmState == HddsProtos.LifeCycleState.CLOSED) {
updateContainerState(containerID, CLOSE);
LOG.info("Container {} advanced to CLOSED in Recon based on SCM state.",
containerID);
} else if (scmState == HddsProtos.LifeCycleState.DELETING
|| scmState == HddsProtos.LifeCycleState.DELETED) {
updateContainerState(containerID, CLOSE);
updateContainerState(containerID, DELETE);
if (scmState == HddsProtos.LifeCycleState.DELETED) {
updateContainerState(containerID, CLEANUP);
}
LOG.info("Container {} advanced to {} in Recon based on SCM state.",
containerID, scmState);
}
}

private void recoverDeletedContainerFromScm(ContainerID containerID,
ContainerReplicaProto.State replicaState) throws IOException {
if (replicaState != ContainerReplicaProto.State.CLOSED
&& replicaState != ContainerReplicaProto.State.QUASI_CLOSED) {
return;
}

ContainerWithPipeline scmContainer =
scmClient.getContainerWithPipeline(containerID.getId());
HddsProtos.LifeCycleState scmState =
scmContainer.getContainerInfo().getState();
if (scmState != HddsProtos.LifeCycleState.CLOSED
&& scmState != HddsProtos.LifeCycleState.QUASI_CLOSED) {
LOG.info("Container {} is DELETED in Recon and DN reported {}, "
+ "but SCM state is {}. Skipping recovery.",
containerID, replicaState, scmState);
return;
}

LOG.info("Container {} is OPEN in Recon but DN reports replica state {}. "
+ "Moving to CLOSING.", containerID, replicaState);
transitionOpenToClosing(containerID, containerInfo);
deleteContainer(containerID);
addNewContainer(scmContainer);
LOG.info("Recovered container {} from DELETED in Recon to {} based on "
+ "DN report {} and SCM state {}.",
containerID, scmState, replicaState, scmState);
}

private HddsProtos.LifeCycleState getScmContainerState(ContainerID containerID)
throws IOException {
return scmClient.getContainerWithPipeline(containerID.getId())
.getContainerInfo().getState();
}

private boolean isHealthy(ContainerReplicaProto.State replicaState) {
Expand All @@ -249,7 +327,8 @@ private boolean isHealthy(ContainerReplicaProto.State replicaState) {
* the container is still recorded in the state manager without pipeline
* tracking so that it is not permanently absent from Recon.
*
* @param containerWithPipeline containerInfo with pipeline info (pipeline may be null)
* @param containerWithPipeline containerInfo with pipeline info
* (pipeline may be null)
* @throws IOException on Error.
*/
public void addNewContainer(ContainerWithPipeline containerWithPipeline)
Expand All @@ -263,21 +342,23 @@ public void addNewContainer(ContainerWithPipeline containerWithPipeline)
PipelineID pipelineID = pipeline.getId();
// Check if the pipeline is present in Recon; add it if not.
if (reconPipelineManager.addPipeline(pipeline)) {
LOG.info("Added new pipeline {} to Recon pipeline metadata from SCM.", pipelineID);
LOG.info("Added new pipeline {} to Recon pipeline metadata from "
+ "SCM.", pipelineID);
}
getContainerStateManager().addContainer(containerInfo.getProtobuf());
pipelineManager.addContainerToPipeline(pipelineID, containerInfo.containerID());
pipelineManager.addContainerToPipeline(pipelineID,
containerInfo.containerID());
// Update open container count on all datanodes on this pipeline.
pipelineToOpenContainer.put(pipelineID,
pipelineToOpenContainer.getOrDefault(pipelineID, 0) + 1);
LOG.info("Successfully added OPEN container {} with pipeline {} to Recon.",
containerInfo.containerID(), pipelineID);
LOG.info("Successfully added OPEN container {} with pipeline {} to "
+ "Recon.", containerInfo.containerID(), pipelineID);
} else {
// Pipeline not available (cleaned up in SCM). Record the container
// without pipeline tracking so it is not permanently absent from Recon.
getContainerStateManager().addContainer(containerInfo.getProtobuf());
LOG.warn("Added OPEN container {} to Recon without pipeline "
+ "(pipeline was null likely cleaned up on SCM side). "
+ "(pipeline was null - likely cleaned up on SCM side). "
+ "Pipeline tracking unavailable for this container.",
containerInfo.containerID());
}
Expand All @@ -287,7 +368,8 @@ public void addNewContainer(ContainerWithPipeline containerWithPipeline)
containerInfo.containerID(), containerInfo.getState());
}
} catch (IOException ex) {
LOG.info("Exception while adding container {}.", containerInfo.containerID(), ex);
LOG.info("Exception while adding container {}.",
containerInfo.containerID(), ex);
PipelineID pipelineID = containerInfo.getPipelineID();
if (pipelineID != null) {
pipelineManager.removeContainerFromPipeline(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,19 +896,6 @@ public boolean triggerTargetedSCMContainerSync() {
}
}

public boolean syncWithSCMContainerInfo() {
if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
try {
return runTargetedSyncWithMetrics();
} finally {
isSyncDataFromSCMRunning.compareAndSet(true, false);
}
} else {
LOG.debug("SCM DB sync is already running.");
return false;
}
}

private boolean runTargetedSyncWithMetrics() {
long startTime = Time.monotonicNow();
containerSyncMetrics.setTargetedSyncStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class ReconStorageContainerSyncHelper {
* (DELETED ID list).
*/
private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12;
private static final long DELETED_SYNC_TRANSITION_LOG_SAMPLE_INTERVAL = 1000L;

/**
* Conservative wire-size upper bound for one {@code ContainerWithPipeline}
Expand Down Expand Up @@ -365,6 +366,7 @@ private boolean syncDeletedContainers() {
OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE_DEFAULT);
int batchSize = (int) getContainerCountPerCall(configuredBatch);
int retiredCount = 0;
long processedCount = 0;

// Existing Recon containers need only the ID to retire to DELETED. Fetch
// full ContainerInfo only for IDs absent from Recon, where we must add a
Expand All @@ -380,7 +382,8 @@ private boolean syncDeletedContainers() {
if (page == null || page.isEmpty()) {
break;
}
retiredCount += processDeletedPage(page);
retiredCount += processDeletedPage(page, processedCount);
processedCount += page.size();
start = ContainerID.valueOf(
page.get(page.size() - 1).getId() + 1);
}
Expand All @@ -403,9 +406,12 @@ private boolean syncDeletedContainers() {
* <li>If already DELETED in Recon: no-op.</li>
* </ul>
*/
private int processDeletedPage(List<ContainerID> page) {
private int processDeletedPage(List<ContainerID> page,
long processedCountBeforePage) {
int retiredCount = 0;
long processedCount = processedCountBeforePage;
for (ContainerID containerID : page) {
processedCount++;
if (!containerManager.containerExist(containerID)) {
if (addContainerInfoFallback(containerID,
HddsProtos.LifeCycleState.DELETED, "DELETED sync")) {
Expand All @@ -417,7 +423,7 @@ private int processDeletedPage(List<ContainerID> page) {
ContainerInfo reconInfo = containerManager.getContainer(containerID);
if (reconInfo.getState() != HddsProtos.LifeCycleState.DELETED) {
retireContainerToDeleted(containerID, reconInfo,
HddsProtos.LifeCycleState.DELETED);
HddsProtos.LifeCycleState.DELETED, processedCount);
retiredCount++;
}
// reconState == DELETED: already terminal, nothing to do.
Expand Down Expand Up @@ -464,10 +470,13 @@ private int processDeletedPage(List<ContainerID> page) {
* @param reconInfo current Recon snapshot of the container (used for
* OPEN→CLOSING transition and log messages)
* @param scmState always DELETED (passed through to log messages)
* @param processedCount current number of SCM DELETED IDs scanned in this
* sync cycle
*/
private void retireContainerToDeleted(ContainerID containerID,
ContainerInfo reconInfo,
HddsProtos.LifeCycleState scmState) {
HddsProtos.LifeCycleState scmState,
long processedCount) {
try {
HddsProtos.LifeCycleState reconState = reconInfo.getState();

Expand All @@ -486,9 +495,11 @@ private void retireContainerToDeleted(ContainerID containerID,
// DELETING → DELETED.
containerManager.updateContainerState(containerID, CLEANUP);

LOG.info("DELETED sync: container {} transitioned "
+ "{} → DELETED in Recon (SCM state: {}).",
containerID, reconInfo.getState(), scmState);
if (processedCount % DELETED_SYNC_TRANSITION_LOG_SAMPLE_INTERVAL == 0) {
LOG.debug("DELETED sync: container {} transitioned "
+ "{} → DELETED in Recon (SCM state: {}).",
containerID, reconInfo.getState(), scmState);
}
} catch (InvalidStateTransitionException | IOException e) {
LOG.warn("DELETED sync: failed to retire container {} "
+ "from {} toward DELETED.", containerID, reconInfo.getState(), e);
Expand Down
Loading