Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -184,10 +184,13 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception {
* multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */
compactionTxn.open(ci);

final ValidTxnList validTxnList = msc.getValidTxns(compactionTxn.getTxnId());
final ValidTxnList validTxnList =
TxnUtils.createValidTxnListForCompactor(msc.getOpenTxns(), compactionTxn.getTxnId());
//with this ValidWriteIdList is capped at whatever HWM validTxnList has
tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds(
Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0));
tblValidWriteIds =
TxnUtils.createValidCompactWriteIdList(
msc.getValidWriteIds(List.of(fullTableName), validTxnList.writeToString()).get(0)
);
LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public class TxnCommonUtils {
* @return a valid txn list.
*/
public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
return createValidReadTxnList(txns, currentTxn, true);
}

static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn,
boolean excludeCurrentTxn) {
assert currentTxn <= txns.getTxn_high_water_mark();
/*
* The highWaterMark should be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
Expand All @@ -61,7 +66,13 @@ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long
// txn is read-only or aborted by AcidHouseKeeperService and compactor actually cleans up the aborted txns.
// So, for such cases, we get negative value for sizeToHwm with found position for currentTxn, and so,
// we just negate it to get the size.
int sizeToHwm = (currentTxn > 0) ? Math.abs(Collections.binarySearch(openTxns, currentTxn)) : openTxns.size();
int sizeToHwm;
if (currentTxn > 0) {
int pos = Collections.binarySearch(openTxns, currentTxn);
sizeToHwm = (pos >= 0 && !excludeCurrentTxn) ? pos + 1 : Math.abs(pos);
} else {
sizeToHwm = openTxns.size();
}
sizeToHwm = Math.min(sizeToHwm, openTxns.size());
long[] exceptions = new long[sizeToHwm];
BitSet inAbortedBits = BitSet.valueOf(txns.getAbortedBits());
Expand All @@ -70,8 +81,9 @@ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long
int i = 0;
for (long txn : openTxns) {
// For snapshot isolation, we don't care about txns greater than current txn and so stop here.
// Also, we need not include current txn to exceptions list.
if ((currentTxn > 0) && (txn >= currentTxn)) {
// When excludeCurrentTxn is true, we also exclude current txn from the exceptions list
// (own-txn exclusion for regular reads). When false, we include it (compaction worker).
if ((currentTxn > 0) && (excludeCurrentTxn ? txn >= currentTxn : txn > currentTxn)) {
break;
}
if (inAbortedBits.get(i)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ public class TxnUtils {
private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);

/**
* Returns a valid txn list for cleaner.
* @param txns Response containing open txns list.
* @param minOpenTxn Minimum open txn which is min open write txn on the table in the case of abort cleanup.
* @param isAbortCleanup Whether the request is for abort cleanup.
* Returns a valid txn list for the cleaner. The high watermark is set to {@code minOpenTxn - 1}
* so the cleaner only sees transactions that committed before the oldest open write txn on the table.
*
* @param txns response containing the currently open txns list
* @param minOpenTxn minimum open write txn id on the table being cleaned
* @param isAbortCleanup whether the request is for abort cleanup
* @return a valid txn list
*/
public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns, long minOpenTxn, boolean isAbortCleanup) {
Expand Down Expand Up @@ -106,6 +108,18 @@ public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns
}
}

/**
* Returns a valid txn list for the compaction worker. The high watermark is capped at {@code compactionTxnId}
* and the compaction transaction itself is kept in the exceptions list as OPEN.
*
* @param txns open txns response from the metastore
* @param compactionTxnId the transaction ID of the compaction
* @return a valid txn list
*/
public static ValidTxnList createValidTxnListForCompactor(GetOpenTxnsResponse txns, long compactionTxnId) {
return TxnCommonUtils.createValidReadTxnList(txns, compactionTxnId, false);
}

/**
* Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
* {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}. This assumes that the caller intends to
Expand Down
Loading