From ed282ed3e181519f34259aaf6f6d3082ca9879c6 Mon Sep 17 00:00:00 2001 From: harshal-16 Date: Tue, 24 Feb 2026 14:20:51 +0530 Subject: [PATCH] HIVE-29459: [DR][HiveACIDReplication] Add clearDanglingTxnTask at the end Details: * Currently, at the end of replLoadTask, clearDanglingTxnTask is added. That works in normal scenario ```java if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET)) { ClearDanglingTxnWork clearDanglingTxnWork = new ClearDanglingTxnWork(work.getDumpDirectory(), targetDb.getName()); Task clearDanglingTxnTask = TaskFactory.get(clearDanglingTxnWork, conf); if (childTasks.isEmpty()) { childTasks.add(clearDanglingTxnTask); } else { DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(Collections.singletonList(clearDanglingTxnTask))); } } return 0; ``` * But if the no of events for incremental load is > hive.repl.approx.max.load.tasks then Load operation can break down the tasks into batches of approx hive.repl.approx.max.load.tasks{{ (Not a hard limit)}} * In this case, it can lead to pre-maturely cleaning of repl_txn_map and aborting the transaction in between the replication because clearDanglingTxnTask gets called in between the batches rather than calling at the end only once per Load cycle. Fix: * Add clearDanglingTxnTask only if it doesn't have more work Testing: * Tested on live cluster * Added test-case --- .../TestReplicationScenariosAcidTables.java | 42 +++++++++++++++++++ .../hive/ql/exec/repl/ReplLoadTask.java | 9 +++- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index d59f00d4dc14..c81a0d11899b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -436,6 +436,48 @@ public void testRemoveDanglingTxnWithOpenTxnOnSourceAndDanglingTxnOnDR() throws } } + @Test + public void testClearDanglingTxnRunsOnlyAfterFinalIncrementalRound() throws Throwable { + List withClauseList = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET + "'='true'", + "'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + "'='1'"); + String insertStmt = "insert into sales_transactional partition(country) values " + + "(102, 'Phone', 800.00, '2026-02-11 11:30:00', 'Canada')," + + "(103, 'Tablet', 450.00, '2026-02-11 12:15:00', 'USA')," + + "(104, 'Monitor', 300.00, '2026-02-11 14:00:00', 'UK')"; + + primary.run("use " + primaryDbName) + .run("create table sales_transactional (sale_id int, product_name string, amount decimal(10,2), " + + "sale_date timestamp) partitioned by (country string) stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run(insertStmt) + .run(insertStmt); + + primary.dump(primaryDbName, withClauseList); + replica.load(replicatedDbName, primaryDbName, withClauseList) + .run("use " + replicatedDbName) + .run("select count(*) from sales_transactional") + .verifyResult("6"); + + primary.run("use " + primaryDbName); + for (int i = 0; i < 12; i++) { + primary.run(insertStmt); + } + primary.run("truncate table sales_transactional"); + for (int i = 0; i < 5; i++) { + primary.run(insertStmt); + } + + WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, withClauseList); + + replica.load(replicatedDbName, primaryDbName, withClauseList) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(incrementalDump.lastReplicationId) + .run("select count(*) from sales_transactional") + .verifyResult("15"); + } + private List getOpenTxnCountFromDump(FileSystem fs, Path openTxnDumpPath) throws IOException { List openTxnIds = new ArrayList<>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 1e8a82b3a605..9287fd75e766 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -963,8 +963,11 @@ private int executeIncrementalLoad(long loadStartTime) throws Exception { ((IncrementalLoadLogger)work.incrementalLoadTasksBuilder().getReplLogger()).initiateEventTimestamp(currentTimestamp); LOG.info("REPL_INCREMENTAL_LOAD stage duration : {} ms", currentTimestamp - loadStartTime); - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET)) { - + // Clear dangling transactions only once all incremental work for this dump is exhausted. + // Running this in intermediate rounds can remove source->target txn mappings that later + // rounds still depend on for write-id replay. + boolean hasPendingIncrementalWork = builder.hasMoreWork() || work.hasBootstrapLoadTasks(); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET) && !hasPendingIncrementalWork) { ClearDanglingTxnWork clearDanglingTxnWork = new ClearDanglingTxnWork(work.getDumpDirectory(), targetDb.getName()); Task clearDanglingTxnTaskTask = TaskFactory.get(clearDanglingTxnWork, conf); if (childTasks.isEmpty()) { @@ -972,6 +975,8 @@ private int executeIncrementalLoad(long loadStartTime) throws Exception { } else { DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(Collections.singletonList(clearDanglingTxnTaskTask))); } + } else if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET)) { + LOG.info("Skipping dangling transaction cleanup in this iteration as incremental load has pending work."); } return 0;