From 3a7de1794dd7b57cd2a0b234cbc87973e51990ce Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Tue, 14 Apr 2026 15:32:53 +0300 Subject: [PATCH 1/2] create ValidTxnList for Compactor and add compaction transaction in the exceptions --- .../service/AcidCompactionService.java | 11 ++++++---- .../hive/metastore/txn/TxnCommonUtils.java | 18 ++++++++++++--- .../hadoop/hive/metastore/txn/TxnUtils.java | 22 +++++++++++++++---- 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java index ab1581c836e5..3538b07c1696 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java @@ -57,7 +57,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; -import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -184,10 +184,13 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */ compactionTxn.open(ci); - final ValidTxnList validTxnList = msc.getValidTxns(compactionTxn.getTxnId()); + final ValidTxnList validTxnList = + TxnUtils.createValidTxnListForCompactor(msc.getOpenTxns(), compactionTxn.getTxnId()); //with this ValidWriteIdList is capped at whatever HWM validTxnList has - tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds( - Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0)); + tblValidWriteIds = + TxnUtils.createValidCompactWriteIdList( + msc.getValidWriteIds(List.of(fullTableName), validTxnList.writeToString()).get(0) + ); LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java index 582b41c546a9..8d875e812007 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java @@ -41,6 +41,11 @@ public class TxnCommonUtils { * @return a valid txn list. */ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) { + return createValidReadTxnList(txns, currentTxn, true); + } + + static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn, + boolean excludeCurrentTxn) { assert currentTxn <= txns.getTxn_high_water_mark(); /* * The highWaterMark should be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0 @@ -61,7 +66,13 @@ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long // txn is read-only or aborted by AcidHouseKeeperService and compactor actually cleans up the aborted txns. // So, for such cases, we get negative value for sizeToHwm with found position for currentTxn, and so, // we just negate it to get the size. - int sizeToHwm = (currentTxn > 0) ? Math.abs(Collections.binarySearch(openTxns, currentTxn)) : openTxns.size(); + int sizeToHwm; + if (currentTxn > 0) { + int pos = Collections.binarySearch(openTxns, currentTxn); + sizeToHwm = (pos >= 0 && !excludeCurrentTxn) ? pos + 1 : Math.abs(pos); + } else { + sizeToHwm = openTxns.size(); + } sizeToHwm = Math.min(sizeToHwm, openTxns.size()); long[] exceptions = new long[sizeToHwm]; BitSet inAbortedBits = BitSet.valueOf(txns.getAbortedBits()); @@ -70,8 +81,9 @@ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long int i = 0; for (long txn : openTxns) { // For snapshot isolation, we don't care about txns greater than current txn and so stop here. - // Also, we need not include current txn to exceptions list. - if ((currentTxn > 0) && (txn >= currentTxn)) { + // When excludeCurrentTxn is true, we also exclude current txn from the exceptions list + // (own-txn exclusion for regular reads). When false, we include it (compaction worker). + if ((currentTxn > 0) && (excludeCurrentTxn ? txn >= currentTxn : txn > currentTxn)) { break; } if (inAbortedBits.get(i)) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index cd56311abc05..525d6654a275 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -67,10 +67,12 @@ public class TxnUtils { private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class); /** - * Returns a valid txn list for cleaner. - * @param txns Response containing open txns list. - * @param minOpenTxn Minimum open txn which is min open write txn on the table in the case of abort cleanup. - * @param isAbortCleanup Whether the request is for abort cleanup. + * Returns a valid txn list for the cleaner. The high watermark is set to {@code minOpenTxn - 1} + * so the cleaner only sees transactions that committed before the oldest open write txn on the table. + * + * @param txns response containing the currently open txns list + * @param minOpenTxn minimum open write txn id on the table being cleaned + * @param isAbortCleanup whether the request is for abort cleanup * @return a valid txn list */ public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns, long minOpenTxn, boolean isAbortCleanup) { @@ -106,6 +108,18 @@ public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns } } + /** + * Returns a valid txn list for the compaction worker. The high watermark is capped at {@code compactionTxnId} + * and the compaction transaction itself is kept in the exceptions list as OPEN. + * + * @param txns open txns response from the metastore + * @param compactionTxnId the transaction ID of the compaction + * @return a valid txn list + */ + public static ValidTxnList createValidTxnListForCompactor(GetOpenTxnsResponse txns, long compactionTxnId) { + return TxnCommonUtils.createValidReadTxnList(txns, compactionTxnId, false); + } + /** * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a * {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}. This assumes that the caller intends to From 9ebe4a5507a7a1d7a495888deb2b26c36dee5f06 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Wed, 15 Apr 2026 12:13:06 +0300 Subject: [PATCH 2/2] include empty Acid dirs into the DirSnapshot --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index c0e3b25001fc..909b22cf8a47 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1558,6 +1558,10 @@ public static Map getHdfsDirSnapshots(final FileSystem fs FileStatus fStatus = itr.next(); Path fPath = fStatus.getPath(); if (fStatus.isDirectory()) { + if (baseFileFilter.accept(fPath) || deltaFileFilter.accept(fPath) + || deleteEventDeltaDirFilter.accept(fPath)) { + addToSnapshot(dirToSnapshots, fPath); + } stack.push(FileUtils.listStatusIterator(fs, fPath, acidHiddenFileFilter)); } else { Path parentDirPath = fPath.getParent();