diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java index 8f56ddc8f2fe..09005cadb18e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java @@ -322,7 +322,7 @@ public synchronized void processAll() { processedCount++; if (processedCount % logEvery == 0 || processedCount == containers.size()) { - LOG.info("Processed {}/{} containers", processedCount, containers.size()); + LOG.debug("Processed {}/{} containers", processedCount, containers.size()); } } catch (ContainerNotFoundException e) { LOG.error("Container {} not found", container.getContainerID(), e); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java index e42e0ccac903..c362f45cbf1e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java @@ -18,7 +18,11 @@ package org.apache.hadoop.ozone.recon.scm; import static java.util.Comparator.comparingLong; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FINALIZE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; @@ -190,16 +194,16 @@ public void checkAndAddNewContainerBatch( * targeted sync path use this method to avoid divergence in the count exposed * to the Recon Node API. * - *
If the container was recorded without a pipeline (null pipeline at - * {@code addNewContainer} time) the count decrement is safely skipped. + *
If the container was recorded without a pipeline, the count decrement is + * safely skipped. * * @param containerID container to advance from OPEN to CLOSING - * @param containerInfo already-fetched {@code ContainerInfo} for the container - * (avoids a redundant lookup inside this method) + * @param containerInfo already-fetched ContainerInfo for the container * @throws IOException if the state update fails - * @throws InvalidStateTransitionException if the container is not in OPEN state + * @throws InvalidStateTransitionException if the state transition is invalid */ - void transitionOpenToClosing(ContainerID containerID, ContainerInfo containerInfo) + void transitionOpenToClosing(ContainerID containerID, + ContainerInfo containerInfo) throws IOException, InvalidStateTransitionException { PipelineID pipelineID = containerInfo.getPipelineID(); if (pipelineID != null) { @@ -210,28 +214,102 @@ void transitionOpenToClosing(ContainerID containerID, ContainerInfo containerInf pipelineToOpenContainer.put(pipelineID, curCnt - 1); } } - updateContainerState(containerID, FINALIZE); // OPEN → CLOSING + updateContainerState(containerID, FINALIZE); } /** - * Check if an OPEN container should move to CLOSING based on a healthy - * non-OPEN DN replica report. + * Check if Recon's container lifecycle state can be corrected based on a DN + * replica report and SCM's authoritative state. + * + *
OPEN uses the DN replica state as the signal: only a healthy non-OPEN + * replica can move Recon to CLOSING. For CLOSING, the DN report is only a + * wake-up signal and Recon queries SCM before advancing. For DELETED, Recon + * recovers only on a live terminal replica report and only when SCM still + * reports the container as live. + * + * @param containerID containerID to check + * @param state replica state reported by a DataNode */ private void checkContainerStateAndUpdate(ContainerID containerID, - ContainerReplicaProto.State replicaState) + ContainerReplicaProto.State state) throws IOException, InvalidStateTransitionException { ContainerInfo containerInfo = getContainer(containerID); HddsProtos.LifeCycleState reconState = containerInfo.getState(); - if (reconState != HddsProtos.LifeCycleState.OPEN - || replicaState == ContainerReplicaProto.State.OPEN - || !isHealthy(replicaState)) { + if (reconState == HddsProtos.LifeCycleState.OPEN) { + if (state.equals(ContainerReplicaProto.State.OPEN) || !isHealthy(state)) { + return; + } + LOG.info("Container {} has state OPEN, but given state is {}.", + containerID, state); + transitionOpenToClosing(containerID, containerInfo); + return; + } + + if (reconState == HddsProtos.LifeCycleState.CLOSING) { + reconcileClosingContainerFromScm(containerID); + return; + } + + if (reconState == HddsProtos.LifeCycleState.DELETED) { + recoverDeletedContainerFromScm(containerID, state); + } + } + + private void reconcileClosingContainerFromScm(ContainerID containerID) + throws IOException, InvalidStateTransitionException { + HddsProtos.LifeCycleState scmState = getScmContainerState(containerID); + + if (scmState == HddsProtos.LifeCycleState.QUASI_CLOSED) { + updateContainerState(containerID, QUASI_CLOSE); + LOG.info("Container {} advanced to QUASI_CLOSED in Recon " + + "based on SCM state.", containerID); + } else if (scmState == HddsProtos.LifeCycleState.CLOSED) { + updateContainerState(containerID, CLOSE); + LOG.info("Container {} advanced to CLOSED in Recon based on SCM state.", + containerID); + } else if (scmState == HddsProtos.LifeCycleState.DELETING + || scmState == HddsProtos.LifeCycleState.DELETED) { + updateContainerState(containerID, CLOSE); + updateContainerState(containerID, DELETE); + if (scmState == HddsProtos.LifeCycleState.DELETED) { + updateContainerState(containerID, CLEANUP); + } + LOG.info("Container {} advanced to {} in Recon based on SCM state.", + containerID, scmState); + } + } + + private void recoverDeletedContainerFromScm(ContainerID containerID, + ContainerReplicaProto.State replicaState) throws IOException { + if (replicaState != ContainerReplicaProto.State.CLOSED + && replicaState != ContainerReplicaProto.State.QUASI_CLOSED) { + return; + } + + ContainerWithPipeline scmContainer = + scmClient.getContainerWithPipeline(containerID.getId()); + HddsProtos.LifeCycleState scmState = + scmContainer.getContainerInfo().getState(); + if (scmState != HddsProtos.LifeCycleState.CLOSED + && scmState != HddsProtos.LifeCycleState.QUASI_CLOSED) { + LOG.info("Container {} is DELETED in Recon and DN reported {}, " + + "but SCM state is {}. Skipping recovery.", + containerID, replicaState, scmState); return; } - LOG.info("Container {} is OPEN in Recon but DN reports replica state {}. " - + "Moving to CLOSING.", containerID, replicaState); - transitionOpenToClosing(containerID, containerInfo); + deleteContainer(containerID); + addNewContainer(scmContainer); + LOG.info("Recovered container {} from DELETED in Recon to {} based on " + + "DN report {} and SCM state {}.", + containerID, scmState, replicaState, scmState); + } + + private HddsProtos.LifeCycleState getScmContainerState(ContainerID containerID) + throws IOException { + return scmClient.getContainerWithPipeline(containerID.getId()) + .getContainerInfo().getState(); } private boolean isHealthy(ContainerReplicaProto.State replicaState) { @@ -249,7 +327,8 @@ private boolean isHealthy(ContainerReplicaProto.State replicaState) { * the container is still recorded in the state manager without pipeline * tracking so that it is not permanently absent from Recon. * - * @param containerWithPipeline containerInfo with pipeline info (pipeline may be null) + * @param containerWithPipeline containerInfo with pipeline info + * (pipeline may be null) * @throws IOException on Error. */ public void addNewContainer(ContainerWithPipeline containerWithPipeline) @@ -263,21 +342,23 @@ public void addNewContainer(ContainerWithPipeline containerWithPipeline) PipelineID pipelineID = pipeline.getId(); // Check if the pipeline is present in Recon; add it if not. if (reconPipelineManager.addPipeline(pipeline)) { - LOG.info("Added new pipeline {} to Recon pipeline metadata from SCM.", pipelineID); + LOG.info("Added new pipeline {} to Recon pipeline metadata from " + + "SCM.", pipelineID); } getContainerStateManager().addContainer(containerInfo.getProtobuf()); - pipelineManager.addContainerToPipeline(pipelineID, containerInfo.containerID()); + pipelineManager.addContainerToPipeline(pipelineID, + containerInfo.containerID()); // Update open container count on all datanodes on this pipeline. pipelineToOpenContainer.put(pipelineID, pipelineToOpenContainer.getOrDefault(pipelineID, 0) + 1); - LOG.info("Successfully added OPEN container {} with pipeline {} to Recon.", - containerInfo.containerID(), pipelineID); + LOG.info("Successfully added OPEN container {} with pipeline {} to " + + "Recon.", containerInfo.containerID(), pipelineID); } else { // Pipeline not available (cleaned up in SCM). Record the container // without pipeline tracking so it is not permanently absent from Recon. getContainerStateManager().addContainer(containerInfo.getProtobuf()); LOG.warn("Added OPEN container {} to Recon without pipeline " - + "(pipeline was null — likely cleaned up on SCM side). " + + "(pipeline was null - likely cleaned up on SCM side). " + "Pipeline tracking unavailable for this container.", containerInfo.containerID()); } @@ -287,7 +368,8 @@ public void addNewContainer(ContainerWithPipeline containerWithPipeline) containerInfo.containerID(), containerInfo.getState()); } } catch (IOException ex) { - LOG.info("Exception while adding container {}.", containerInfo.containerID(), ex); + LOG.info("Exception while adding container {}.", + containerInfo.containerID(), ex); PipelineID pipelineID = containerInfo.getPipelineID(); if (pipelineID != null) { pipelineManager.removeContainerFromPipeline( diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index 1712e711fe02..679e3c3c71ee 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -896,19 +896,6 @@ public boolean triggerTargetedSCMContainerSync() { } } - public boolean syncWithSCMContainerInfo() { - if (isSyncDataFromSCMRunning.compareAndSet(false, true)) { - try { - return runTargetedSyncWithMetrics(); - } finally { - isSyncDataFromSCMRunning.compareAndSet(true, false); - } - } else { - LOG.debug("SCM DB sync is already running."); - return false; - } - } - private boolean runTargetedSyncWithMetrics() { long startTime = Time.monotonicNow(); containerSyncMetrics.setTargetedSyncStatus( diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java index 9a1aa48a1e1a..bbd03aab0d5c 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java @@ -90,6 +90,7 @@ class ReconStorageContainerSyncHelper { * (DELETED ID list). */ private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12; + private static final long DELETED_SYNC_TRANSITION_LOG_SAMPLE_INTERVAL = 1000L; /** * Conservative wire-size upper bound for one {@code ContainerWithPipeline} @@ -365,6 +366,7 @@ private boolean syncDeletedContainers() { OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE_DEFAULT); int batchSize = (int) getContainerCountPerCall(configuredBatch); int retiredCount = 0; + long processedCount = 0; // Existing Recon containers need only the ID to retire to DELETED. Fetch // full ContainerInfo only for IDs absent from Recon, where we must add a @@ -380,7 +382,8 @@ private boolean syncDeletedContainers() { if (page == null || page.isEmpty()) { break; } - retiredCount += processDeletedPage(page); + retiredCount += processDeletedPage(page, processedCount); + processedCount += page.size(); start = ContainerID.valueOf( page.get(page.size() - 1).getId() + 1); } @@ -403,9 +406,12 @@ private boolean syncDeletedContainers() { *