Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void checkAndRollbackCTAS(QueryLifeTimeHookContext ctx) {
if (table != null) {
LOG.info("Performing cleanup as part of rollback: {}", table.getFullTableName().toString());
try {
CompactionRequest request = new CompactionRequest(table.getDbName(), table.getTableName(), CompactionType.MAJOR);
CompactionRequest request = new CompactionRequest(table.getDbName(), table.getTableName(), CompactionType.DEFERRED_CLEANUP);
request.setRunas(TxnUtils.findUserToRunAs(tblPath.toString(), table.getTTable(), conf));
request.putToProperties(META_TABLE_LOCATION, tblPath.toString());
request.putToProperties(IF_PURGE, Boolean.toString(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest.CleanupRequestBuilder;
Expand Down Expand Up @@ -99,6 +101,18 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
LOG.info("Starting cleaning for {}, based on min open {}", ci,
(ci.minOpenWriteId > 0) ? "writeId: " + ci.minOpenWriteId : "txnId: " + minOpenTxn);

if (ci.nextTxnId == 0 && ci.txnId > 0 &&
(ci.type == CompactionType.MAJOR || ci.type == CompactionType.MINOR || ci.type == CompactionType.REBALANCE)) {
TxnStatus status = txnHandler.getTransactionStatus(ci.txnId);
Copy link
Copy Markdown
Member

@deniskuzZ deniskuzZ May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think that is an optimal solution.
if i get it right, it requires checking every cleanup request for an open txn and creates additional stress on backend DB

Copy link
Copy Markdown
Member

@deniskuzZ deniskuzZ May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can happen that a compaction is marked as finished and get into "ready for cleaning" state, but the compaction txn stays open. And when the timeout reached, the txn gets aborted.

could we handle this in abortTxn? if txnType=3 (compaction) cleanup the COMPACTION_QUEUE as well?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With min.history.level, a compaction like this can block the cleaning for all consecutive compaction, even after the txn is aborted.

if i get it right, it's not the case with min.history.writeid, however, the check is generic

Copy link
Copy Markdown
Contributor Author

@kuczoram kuczoram May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It checks for the transaction state only for compactions which don't have nextTxnId. If I understand it correctly if a compaction's txn is committed successfully, the nextTxnId is filled. So it will mean just one additional HMS call for compactions without successfully committed txn. For compaction's with comitted txn, it will do nothing.

I tried an other approach as well, when I made the compactions failed in the AbortTxnsFunction, but I didn't really like that either. I didn't find a way to call the markFailed from there, so I either had to extend the SQLs there or fetch the compactioninfo for all txn ids and then check if the condition's are matched. Like it has to be a compaction, not a soft-delete and it has to be in ready-for-cleaning state, because if it is in working state, we cannot fail it, as it could be revoked. At the end I felt that going with this approach could have more side effects. So I went with this one, but I can go back to making the failure in AbortTxnsFunction if you'd like that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With min.history.level, a compaction like this can block the cleaning for all consecutive compaction, even after the txn is aborted.

if i get it right, it's not the case with min.history.writeid, however, the check is generic

No, with min.history.writeid it is not blocking the following cleaners. Because in that case what happens is that the cleaner's highwatermark will be 0, so it won't delete anything, but there is no remainder checking for min.history.writeid, so it will consider the cleaning as successful. Marks the compaction as successful and just goes on. With min.history.level, it will clean nothing, but finds deltas which should be deleted, so it will stay in the queue with "ready for cleaning" state.

Copy link
Copy Markdown
Member

@deniskuzZ deniskuzZ May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand it correctly if a compaction's txn is committed successfully, the nextTxnId is filled.

why uncommited txn would be even eligible in first place? is it a race between mark ready-for-cleaning and commit/abort? what if we mark after commit/abort or make commit atomic?

if (TxnStatus.ABORTED == status) {
LOG.warn("The compaction {} is in invalid state. The compaction is marked as 'ready for cleaning', " +
"but its txn is in aborted state. Marking this compaction as failed.");
ci.errorMessage = "Invalid state: the compaction txn (" + ci.txnId + ") is already aborted.";
txnHandler.markFailed(ci);
return;
}
}

PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
(!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
Expand All @@ -36,6 +38,7 @@
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.testutil.TxnStoreHelper;
import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
Expand Down Expand Up @@ -63,6 +66,10 @@
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
import static org.apache.hadoop.hive.metastore.txn.TxnStore.CLEANING_RESPONSE;
import static org.apache.hadoop.hive.metastore.txn.TxnStore.INITIATED_STATE;
import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_RESPONSE;
import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_STATE;
import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -1347,7 +1354,86 @@ public void testCompactionHwmIsHonoredWithMinOpenWriteIdSetAndAbortedIOW() throw
}
}

private String createDeltasAndRunMajorCompaction(Table table, long minTxnId, int numberOfDeltas) throws Exception {
@Test
public void testCleanerRunsWithOpenCompactionTxn() throws Exception {
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 0L);

String dbName = "default";
String tblName = "campcnb";
Table t = newTable(dbName, tblName, false);
addDeltaFile(t, null, 1L, 1L, 1);
addDeltaFile(t, null, 2L, 2L, 1);
addDeltaFile(t, null, 3L, 3L, 1);
addDeltaFile(t, null, 4L, 4L, 1);
burnThroughTransactions(dbName, tblName, 4, null, null);

// trigger compaction
CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
long txnId = compactInTxn(rqst, CommitAction.MARK_COMPACTED);
addBaseFile(t, null, 4L, 6, txnId);
String deltaName1 = "base_4_v0000005";

// should not clean anything since the compaction txn is still open
startCleaner();

ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(1, rsp.getCompactsSize());
assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());

String deltaName2 = createDeltasAndRunMajorCompaction(t, 5, 2);

// should not clean anything since the compaction txn is still open
startCleaner();

rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(2, rsp.getCompactsSize());
assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(1).getState());

String deltaName3 = createDeltasAndRunMajorCompaction(t, 7, 2);

//Abort the compaction txn
txnHandler.abortTxns(new AbortTxnsRequest(Collections.singletonList(txnId)));
Thread.sleep(10000L);

Set<String> expectedDirs = new HashSet<>();
expectedDirs.add(deltaName1);
expectedDirs.add(deltaName2);
expectedDirs.add(deltaName3);
for (int i = 1; i < 9; i++) {
expectedDirs.add(makeDeltaDirName(i, i));
}
verifyDirectories(t, expectedDirs);
// Should mark the compaction 1 failed as its txn is aborted
startCleaner();
verifyDirectories(t, expectedDirs);

// Should find the second compaction
startCleaner();
expectedDirs.remove(deltaName1);
for (int i = 1; i < 7; i++) {
expectedDirs.remove(makeDeltaDirName(i, i));
}
verifyDirectories(t, expectedDirs);
// Should find the third compaction and deletes the directories accordingly
startCleaner();
expectedDirs.remove(deltaName2);
for (int i = 7; i < 9; i++) {
expectedDirs.remove(makeDeltaDirName(i, i));
}
verifyDirectories(t, expectedDirs);

rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(3, rsp.getCompactsSize());
assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState());
assertEquals(TxnStore.FAILED_RESPONSE, rsp.getCompacts().get(2).getState());
assertEquals("Invalid state: the compaction txn (" + txnId + ") is already aborted.",
rsp.getCompacts().get(2).getErrorMessage());
}

private String createDeltasAndRunMajorCompaction(Table table, long minTxnId, int numberOfDeltas)
throws Exception {
String dbName = table.getDbName();
String tableName = table.getTableName();
for (int i = 0; i < numberOfDeltas; i++) {
Expand All @@ -1367,4 +1453,4 @@ private void verifyDirectories(Table table, Set<String> expectedDirs) throws Exc
Assert.assertEquals(expectedDirs, actualDirs);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1100,8 +1100,13 @@ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) throw

private static void shouldNeverHappen(long txnid) {
throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid));
}

}

public TxnStatus getTransactionStatus(long txnId) throws MetaException {
TxnStatus status = jdbcResource.execute(new FindTxnStateHandler(txnId));
return status;
}

private void deleteInvalidOpenTransactions(List<Long> txnIds) throws MetaException {
try {
sqlRetryHandler.executeWithRetry(new SqlRetryCallProperties().withCallerId("deleteInvalidOpenTransactions"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionMetricsData;
import org.apache.hadoop.hive.metastore.txn.entities.MetricsInfo;
import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
import org.apache.hadoop.hive.metastore.txn.retry.SqlRetry;
import org.apache.hadoop.hive.metastore.txn.retry.SqlRetryException;
Expand Down Expand Up @@ -336,6 +337,10 @@ GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
@RetrySemantics.SafeToRetry
void addWriteIdsToMinHistory(long txnId, Map<String, Long> minOpenWriteIds) throws MetaException;

@SqlRetry(lockInternally = true, retryOnDuplicateKey = true)
@Transactional(POOL_TX)
public TxnStatus getTransactionStatus(long txnId) throws MetaException;

/**
* Allocate a write ID for the given table and associate it with a transaction
* @param rqst info on transaction and table to allocate write id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public String getParameterizedQueryString(DatabaseProduct databaseProduct) throw
String queryStr =
" \"CQ_ID\", \"cq1\".\"CQ_DATABASE\", \"cq1\".\"CQ_TABLE\", \"cq1\".\"CQ_PARTITION\"," +
" \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TBLPROPERTIES\", \"CQ_RETRY_RETENTION\", " +
" \"CQ_NEXT_TXN_ID\"";
" \"CQ_TXN_ID\", \"CQ_NEXT_TXN_ID\"";
if (TxnHandler.ConfVars.useMinHistoryWriteId()) {
queryStr += ", \"MIN_OPEN_WRITE_ID\"";
}
Expand Down Expand Up @@ -118,9 +118,10 @@ public List<CompactionInfo> extractData(ResultSet rs) throws SQLException, DataA
info.highestWriteId = rs.getLong(7);
info.properties = rs.getString(8);
info.retryRetention = rs.getInt(9);
info.nextTxnId = rs.getLong(10);
info.txnId = rs.getLong(10);
info.nextTxnId = rs.getLong(11);
if (TxnHandler.ConfVars.useMinHistoryWriteId()) {
long value = rs.getLong(11);
long value = rs.getLong(12);
info.minOpenWriteId = !rs.wasNull() ? value : Long.MAX_VALUE;
}
infos.add(info);
Expand Down
Loading