diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java index 9b9c4ad41a1f6..b2da179dfb7e9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java @@ -119,6 +119,27 @@ public class IoTDBRegionOperationReliabilityITFramework { LOGGER.info("Cluster has been restarted"); }; + /** + * Gracefully stop (SIGTERM, not a forcible kill) the ConfigNode that hit the kill point, then + * restart it. A graceful stop lets the ConfigNode run its shutdown hooks, which interrupts the + * in-flight region-operation procedure worker. This reproduces a leader switch / graceful + * shutdown during AddRegionPeer: the interrupted {@code waitTaskFinish()} returns PROCESSING + * while the AddRegionPeerTask is still running on the coordinator DataNode. The procedure must + * NOT silently end here, otherwise the parent RegionMigrateProcedure would falsely treat AddPeer + * as complete and remove the source replica before the destination replica is actually Running. + * See AddRegionPeerProcedure#executeFromState DO_ADD_REGION_PEER PROCESSING branch. + */ + public static Consumer actionOfGracefullyRestartConfigNode = + context -> { + Assert.assertTrue(context.getNodeWrapper() instanceof ConfigNodeWrapper); + context.getNodeWrapper().stop(); + LOGGER.info("ConfigNode {} gracefully stopped.", context.getNodeWrapper().getId()); + Assert.assertFalse(context.getNodeWrapper().isAlive()); + context.getNodeWrapper().start(); + LOGGER.info("ConfigNode {} restarted.", context.getNodeWrapper().getId()); + Assert.assertTrue(context.getNodeWrapper().isAlive()); + }; + @Before public void setUp() throws Exception { EnvFactory.getEnv() @@ -155,6 +176,28 @@ public void successTest( killNode); } + public void successTestWithAction( + final int dataReplicateFactor, + final int schemaReplicationFactor, + final int configNodeNum, + final int dataNodeNum, + KeySetView killConfigNodeKeywords, + KeySetView killDataNodeKeywords, + Consumer actionWhenDetectKeyWords, + KillNode killNode) + throws Exception { + generalTestWithAllOptions( + dataReplicateFactor, + schemaReplicationFactor, + configNodeNum, + dataNodeNum, + killConfigNodeKeywords, + killDataNodeKeywords, + actionWhenDetectKeyWords, + true, + killNode); + } + public void failTest( final int dataReplicateFactor, final int schemaReplicationFactor, diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateConfigNodeCrashIoTV1IT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateConfigNodeCrashIoTV1IT.java index ebda0c36ee327..db01ab312e333 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateConfigNodeCrashIoTV1IT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateConfigNodeCrashIoTV1IT.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.utils.KillPoint.KillNode; import org.apache.iotdb.commons.utils.KillPoint.KillPoint; +import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints; import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework; import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState; import org.apache.iotdb.confignode.procedure.state.RegionTransitionState; @@ -28,6 +29,7 @@ import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.DailyIT; import org.junit.Before; @@ -92,6 +94,34 @@ public void testCnCrashDuringDoAddPeer() throws Exception { KillNode.CONFIG_NODE); } + /** + * Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in + * waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish() + * (after the first poll confirms the task is still running), so the graceful shutdown + * deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration + * must still finish correctly after a leader switch: previously the AddRegionPeerProcedure + * silently ended on PROCESSING, letting the parent procedure remove the source replica before the + * destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens. + * The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen + * once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to + * be exercised. + */ + @Test + // Temporarily also categorized as ClusterIT so the per-PR Cluster IT (1C3D) job runs it for + // validation; will be narrowed back to DailyIT-only before merge. + @Category({DailyIT.class, ClusterIT.class}) + public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception { + successTestWithAction( + 1, + 1, + 3, + 2, + buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING), + noKillPoints(), + actionOfGracefullyRestartConfigNode, + KillNode.CONFIG_NODE); + } + @Test public void cnCrashDuringUpdateCacheTest() throws Exception { successTest( diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java index bc4f477b6bd84..adec48f883a34 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java @@ -21,11 +21,13 @@ import org.apache.iotdb.commons.utils.KillPoint.KillNode; import org.apache.iotdb.commons.utils.KillPoint.KillPoint; +import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints; import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework; import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState; import org.apache.iotdb.confignode.procedure.state.RegionTransitionState; import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState; import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.DailyIT; import org.junit.Ignore; @@ -79,6 +81,34 @@ public void testCnCrashDuringDoAddPeer() throws Exception { KillNode.CONFIG_NODE); } + /** + * Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in + * waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish() + * (after the first poll confirms the task is still running), so the graceful shutdown + * deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration + * must still finish correctly after a leader switch: previously the AddRegionPeerProcedure + * silently ended on PROCESSING, letting the parent procedure remove the source replica before the + * destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens. + * The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen + * once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to + * be exercised. + */ + @Test + // Temporarily also categorized as ClusterIT so the per-PR Cluster IT (1C3D) job runs it for + // validation; will be narrowed back to DailyIT-only before merge. + @Category({DailyIT.class, ClusterIT.class}) + public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception { + successTestWithAction( + 1, + 1, + 3, + 2, + buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING), + noKillPoints(), + actionOfGracefullyRestartConfigNode, + KillNode.CONFIG_NODE); + } + @Test public void cnCrashDuringUpdateCacheTest() throws Exception { successTest( diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java index 39b5953de4a15..6ee82324bf46d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.utils.KillPoint.KillNode; import org.apache.iotdb.commons.utils.KillPoint.KillPoint; +import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints; import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework; import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState; import org.apache.iotdb.confignode.procedure.state.RegionTransitionState; @@ -28,6 +29,7 @@ import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.DailyIT; import org.junit.Before; @@ -93,6 +95,34 @@ public void testCnCrashDuringDoAddPeer() throws Exception { KillNode.CONFIG_NODE); } + /** + * Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in + * waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish() + * (after the first poll confirms the task is still running), so the graceful shutdown + * deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration + * must still finish correctly after a leader switch: previously the AddRegionPeerProcedure + * silently ended on PROCESSING, letting the parent procedure remove the source replica before the + * destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens. + * The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen + * once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to + * be exercised. + */ + @Test + // Temporarily also categorized as ClusterIT so the per-PR Cluster IT (1C3D) job runs it for + // validation; will be narrowed back to DailyIT-only before merge. + @Category({DailyIT.class, ClusterIT.class}) + public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception { + successTestWithAction( + 1, + 1, + 3, + 2, + buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING), + noKillPoints(), + actionOfGracefullyRestartConfigNode, + KillNode.CONFIG_NODE); + } + @Test public void cnCrashDuringUpdateCacheTest() throws Exception { successTest( diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/ratis/IoTDBRegionMigrateConfigNodeCrashForRatisIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/ratis/IoTDBRegionMigrateConfigNodeCrashForRatisIT.java index 67b318691a945..e6f9f1f298428 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/ratis/IoTDBRegionMigrateConfigNodeCrashForRatisIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/ratis/IoTDBRegionMigrateConfigNodeCrashForRatisIT.java @@ -21,10 +21,12 @@ import org.apache.iotdb.commons.utils.KillPoint.KillNode; import org.apache.iotdb.commons.utils.KillPoint.KillPoint; +import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints; import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateITFrameworkForRatis; import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState; import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState; import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.DailyIT; import org.junit.Test; @@ -76,6 +78,34 @@ public void cnCrashDuringDoAddPeerTest() throws Exception { KillNode.CONFIG_NODE); } + /** + * Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in + * waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish() + * (after the first poll confirms the task is still running), so the graceful shutdown + * deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration + * must still finish correctly after a leader switch: previously the AddRegionPeerProcedure + * silently ended on PROCESSING, letting the parent procedure remove the source replica before the + * destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens. + * The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen + * once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to + * be exercised. + */ + @Test + // Temporarily also categorized as ClusterIT so the per-PR Cluster IT (1C3D) job runs it for + // validation; will be narrowed back to DailyIT-only before merge. + @Category({DailyIT.class, ClusterIT.class}) + public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception { + successTestWithAction( + 1, + 1, + 3, + 2, + buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING), + noKillPoints(), + actionOfGracefullyRestartConfigNode, + KillNode.CONFIG_NODE); + } + @Test public void cnCrashDuringUpdateCacheTest() throws Exception { successTest( diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java index a43e5151d4042..f4ae087b159b7 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java @@ -1001,7 +1001,7 @@ public final class ProcedureMessages { public static final String VALIDATE_TABLE_FOR_TABLE_WHEN_SETTING_PROPERTIES = "Validate table for table {}.{} when setting properties"; public static final String WAITTASKFINISH_RETURNS_PROCESSING_WHICH_MEANS_THE_WAITING_HAS_BEEN_INTERRUPTED = - "waitTaskFinish() returns PROCESSING, which means the waiting has been interrupted, this procedure will end without rollback"; + "waitTaskFinish() returns PROCESSING, which means the waiting has been interrupted (ConfigNode shutdown or leader change); the AddRegionPeer task is still running on the coordinator, this procedure will stay at DO_ADD_REGION_PEER and resume polling after recovery"; public static final String FAILED_TO_CREATE_DATABASE_THE_TTL_SHOULD_BE_NON_NEGATIVE = "Failed to create database. The TTL should be non-negative."; public static final String FAILED_TO_CREATE_DATABASE_THE_DATAREGIONGROUPNUM_SHOULD_BE_POSITIVE = "Failed to create database. The dataRegionGroupNum should be positive."; diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java index dbad526cce7b8..126055a7c57c7 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java @@ -999,7 +999,7 @@ public final class ProcedureMessages { public static final String VALIDATE_TABLE_FOR_TABLE_WHEN_SETTING_PROPERTIES = "Validate table for table {}.{} when setting properties"; public static final String WAITTASKFINISH_RETURNS_PROCESSING_WHICH_MEANS_THE_WAITING_HAS_BEEN_INTERRUPTED = - "waitTaskFinish() returns PROCESSING, which means the waiting has been interrupted, this procedure will end without rollback"; + "waitTaskFinish() 返回 PROCESSING,表示等待被中断(ConfigNode 关闭或主节点切换);AddRegionPeer 任务仍在协调者上运行,该流程将停留在 DO_ADD_REGION_PEER 状态,恢复后继续轮询"; public static final String FAILED_TO_CREATE_DATABASE_THE_TTL_SHOULD_BE_NON_NEGATIVE = "创建数据库失败。TTL 不能为负数。"; public static final String FAILED_TO_CREATE_DATABASE_THE_DATAREGIONGROUPNUM_SHOULD_BE_POSITIVE = "创建数据库失败。dataRegionGroupNum 应为正数。"; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 82afea3859fd4..b0e5d73783571 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -454,7 +454,15 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure p updateStoreOnExecution(rootProcStack, proc, subprocs); - if (!store.isRunning()) { + // Stop the in-place re-execution loop once this executor is shutting down (e.g. ConfigNode + // leader switch / restart). Checking store.isRunning() alone is not enough: stopExecutor() + // calls executor.stop() and executor.join() before store.stop(), so the store is still + // running while join() waits for this very worker to finish. Without also checking the + // executor's own running flag, a procedure that keeps returning HAS_MORE_STATE for the same + // state (e.g. AddRegionPeerProcedure parking at DO_ADD_REGION_PEER after waitTaskFinish() is + // interrupted) would re-execute forever here and join() would hang. The persisted state lets + // the next leader resume from where it stopped. + if (!isRunning() || !store.isRunning()) { return; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java index 8637e6c8fe781..9ce0c9f72a3c8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java @@ -89,6 +89,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY; +import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint; import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS; import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS; import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2; @@ -360,10 +361,26 @@ public Map resetPeerList( // TODO: will use 'procedure yield' to refactor later public TRegionMigrateResult waitTaskFinish(long taskId, TDataNodeLocation dataNodeLocation) { + return waitTaskFinish(taskId, dataNodeLocation, null); + } + + /** + * Poll the coordinator DataNode until the region-maintain task identified by {@code taskId} + * reaches a terminal state. + * + * @param killPoint if non-null, fired once right after the first poll confirms the task is still + * PROCESSING. At that point the worker thread is provably blocked inside this method, so + * tests can use the kill point to deterministically interrupt the wait (e.g. by gracefully + * stopping the ConfigNode leader) and exercise the interrupted-PROCESSING path. It is a no-op + * outside integration tests. + */ + public > TRegionMigrateResult waitTaskFinish( + long taskId, TDataNodeLocation dataNodeLocation, T killPoint) { final long MAX_DISCONNECTION_TOLERATE_MS = 600_000; final long INITIAL_DISCONNECTION_TOLERATE_MS = 60_000; long startTime = System.nanoTime(); long lastReportTime = System.nanoTime(); + boolean killPointTriggered = false; while (true) { try (SyncDataNodeInternalServiceClient dataNodeClient = dataNodeClientManager.borrowClient(dataNodeLocation.getInternalEndPoint())) { @@ -372,6 +389,12 @@ public TRegionMigrateResult waitTaskFinish(long taskId, TDataNodeLocation dataNo if (report.getTaskStatus() != TRegionMaintainTaskStatus.PROCESSING) { return report; } + // The task is confirmed still running and this thread is blocked here, so it is now safe to + // fire the kill point that tests use to interrupt waitTaskFinish() deterministically. + if (killPoint != null && !killPointTriggered) { + setKillPoint(killPoint); + killPointTriggered = true; + } } catch (Exception ignore) { } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java index d09647a332f5b..acce4b2e8491e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; +import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.i18n.ProcedureMessages; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -104,8 +105,15 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, AddRegionPeerState s break; case DO_ADD_REGION_PEER: handler.forceUpdateRegionCache(regionId, targetDataNode, RegionStatus.Adding); - // We don't want to re-submit AddRegionPeerTask when leader change or ConfigNode reboot - if (!this.isStateDeserialized()) { + // Only submit the AddRegionPeerTask on the very first entry of this state. We must NOT + // re-submit when: + // - the state was restored from disk after a leader change / ConfigNode reboot + // (isStateDeserialized()), or + // - this state is being re-entered in place because a previous attempt parked here on + // PROCESSING (getCycles() > 0, see the PROCESSING branch below). + // The coordinator DataNode also dedups by taskId, so a duplicate submit would be a no-op, + // but skipping it here avoids the useless RPC and keeps the re-poll cheap. + if (!this.isStateDeserialized() && getCycles() == 0) { TSStatus tsStatus = handler.submitAddRegionPeerTask( this.getProcId(), targetDataNode, regionId, coordinator); @@ -115,7 +123,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, AddRegionPeerState s env, handler, "submit DO_ADD_REGION_PEER task fail"); } } - TRegionMigrateResult result = handler.waitTaskFinish(this.getProcId(), coordinator); + TRegionMigrateResult result = + handler.waitTaskFinish( + this.getProcId(), coordinator, RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING); switch (result.getTaskStatus()) { case TASK_NOT_EXIST: // coordinator crashed and lost its task table @@ -124,10 +134,22 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, AddRegionPeerState s return warnAndRollBackAndNoMoreState( env, handler, String.format("%s result is %s", state, result.getTaskStatus())); case PROCESSING: + // waitTaskFinish() only returns PROCESSING when its polling loop was interrupted by + // an InterruptedException, i.e. this ConfigNode is shutting down / losing leadership + // (a user CANCEL or a coordinator disconnection both go through the FAIL branch + // above). The AddRegionPeerTask is still running on the coordinator DataNode, so we + // must NOT silently end here: doing so would let the parent RegionMigrateProcedure + // proceed to CHECK_ADD_REGION_PEER / REMOVE_REGION_PEER and remove the source replica + // before the destination replica has actually finished receiving the snapshot. + // Instead, stay in DO_ADD_REGION_PEER and persist it; after recovery the new leader + // re-enters this state and re-polls the still-running coordinator task (the + // isStateDeserialized() guard above prevents re-submitting the task) until it really + // reaches SUCCESS or FAIL. LOGGER.info( ProcedureMessages .WAITTASKFINISH_RETURNS_PROCESSING_WHICH_MEANS_THE_WAITING_HAS_BEEN_INTERRUPTED); - return Flow.NO_MORE_STATE; + setNextState(AddRegionPeerState.DO_ADD_REGION_PEER); + break outerSwitch; case SUCCESS: setNextState(UPDATE_REGION_LOCATION_CACHE); break outerSwitch; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/RegionMaintainKillPoints.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/RegionMaintainKillPoints.java new file mode 100644 index 0000000000000..da906b845fdaa --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/RegionMaintainKillPoints.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.KillPoint; + +/** Kill points for the ConfigNode-side region-maintain procedures (AddPeer / RemovePeer). */ +public enum RegionMaintainKillPoints { + /** + * Fired from {@code RegionMaintainHandler.waitTaskFinish()} once the coordinator DataNode has + * confirmed the task is still running (the first poll returned PROCESSING). Unlike the + * AddRegionPeerState.DO_ADD_REGION_PEER kill point, which fires right after the task is submitted + * and before waitTaskFinish() starts polling, this kill point guarantees the procedure worker is + * actually blocked inside waitTaskFinish(). Tests use it to deterministically interrupt + * waitTaskFinish() (e.g. by gracefully stopping the ConfigNode leader) so the PROCESSING branch + * is exercised instead of racing the task to completion. + */ + WAIT_TASK_FINISH_POLLING, +}