diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java index d1c7e3972d03..ccb8260fa6fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.txn.compactor.CompactorFactory; import org.apache.hadoop.hive.ql.txn.compactor.CompactorPipeline; import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable; import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.Ref; @@ -211,32 +212,27 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { // Don't start compaction or cleaning if not necessary if (isDynPartAbort(table, ci)) { - msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); - compactionTxn.wasSuccessful(); + compactionTxn.markForCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); return false; } dir = getAcidStateForWorker(ci, sd, tblValidWriteIds); if (!isEnoughToCompact(ci, dir, sd)) { if (needsCleaning(dir, sd)) { - msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); + compactionTxn.markForCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); } else { // do nothing ci.errorMessage = "None of the compaction thresholds met, compaction request is refused!"; LOG.debug(ci.errorMessage + " Compaction info: {}", ci); - msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)); + compactionTxn.markForCommit(() -> msc.markRefused(CompactionInfo.compactionInfoToStruct(ci))); + } - compactionTxn.wasSuccessful(); return false; } if (!ci.isMajorCompaction() && !CompactorUtil.isMinorCompactionSupported(conf, table.getParameters(), dir)) { ci.errorMessage = "Query based Minor compaction is not possible for full acid tables having raw format " + "(non-acid) data in them."; LOG.error(ci.errorMessage + " Compaction info: {}", ci); - try { - msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)); - } catch (Throwable tr) { - LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, tr); - } + compactionTxn.markForAbort(() -> msc.markRefused(CompactionInfo.compactionInfoToStruct(ci))); return false; } CompactorUtil.checkInterrupt(CLASS_NAME); @@ -261,8 +257,7 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { LOG.info("Completed " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + compactionTxn + ", marking as compacted."); - msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); - compactionTxn.wasSuccessful(); + compactionTxn.markForCommit(() -> msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci))); AcidMetricService.updateMetricsFromWorker(ci.dbname, ci.tableName, ci.partName, ci.type, dir.getCurrentDirectories().size(), dir.getDeleteDeltas().size(), conf, msc); @@ -346,7 +341,11 @@ class CompactionTxn implements AutoCloseable { private long lockId = 0; private TxnStatus status = TxnStatus.UNKNOWN; - private boolean successfulCompaction = false; + + private ThrowingRunnable onCommitSuccess; + private ThrowingRunnable onAbortSuccess; + + private boolean rollbackOnly = true; /** * Try to open a new txn. @@ -377,11 +376,14 @@ private LockRequest createLockRequest(CompactionInfo ci) { return CompactorUtil.createLockRequest(conf, ci, txnId, lockAndOpType.getKey(), lockAndOpType.getValue()); } - /** - * Mark compaction as successful. This means the txn will be committed; otherwise it will be aborted. - */ - void wasSuccessful() { - this.successfulCompaction = true; + void markForCommit(ThrowingRunnable action) { + this.rollbackOnly = false; + this.onCommitSuccess = action; + } + + void markForAbort(ThrowingRunnable action) { + this.rollbackOnly = true; + this.onAbortSuccess = action; } /** @@ -396,10 +398,16 @@ public void close() throws Exception { //the transaction is about to close, we can stop heartbeating regardless of it's state CompactionHeartbeatService.getInstance(conf).stopHeartbeat(txnId); } finally { - if (successfulCompaction) { - commit(); - } else { + if (rollbackOnly) { abort(); + if (onAbortSuccess != null) { + onAbortSuccess.run(); + } + return; + } + commit(); + if (onCommitSuccess != null) { + onCommitSuccess.run(); } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index bf01034711e3..7ed0688e0fa8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest; @@ -41,7 +42,10 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.utils.StringableMap; +import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -70,8 +74,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hive.common.AcidConstants.VISIBILITY_PATTERN; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -100,9 +109,9 @@ public void stringableMap() throws Exception { // Empty map case StringableMap m = new StringableMap(new HashMap()); String s = m.toString(); - Assert.assertEquals("0:", s); + assertEquals("0:", s); m = new StringableMap(s); - Assert.assertEquals(0, m.size()); + assertEquals(0, m.size()); Map base = new HashMap(); base.put("mary", "poppins"); @@ -111,22 +120,22 @@ public void stringableMap() throws Exception { m = new StringableMap(base); s = m.toString(); m = new StringableMap(s); - Assert.assertEquals(3, m.size()); + assertEquals(3, m.size()); Map saw = new HashMap(3); saw.put("mary", false); saw.put("bert", false); saw.put(null, false); for (Map.Entry e : m.entrySet()) { saw.put(e.getKey(), true); - if ("mary".equals(e.getKey())) Assert.assertEquals("poppins", e.getValue()); + if ("mary".equals(e.getKey())) assertEquals("poppins", e.getValue()); else if ("bert".equals(e.getKey())) Assert.assertNull(e.getValue()); - else if (null == e.getKey()) Assert.assertEquals("banks", e.getValue()); + else if (null == e.getKey()) assertEquals("banks", e.getValue()); else Assert.fail("Unexpected value " + e.getKey()); } - Assert.assertEquals(3, saw.size()); - Assert.assertTrue(saw.get("mary")); - Assert.assertTrue(saw.get("bert")); - Assert.assertTrue(saw.get(null)); + assertEquals(3, saw.size()); + assertTrue(saw.get("mary")); + assertTrue(saw.get("bert")); + assertTrue(saw.get(null)); } @Test @@ -134,26 +143,26 @@ public void stringableList() throws Exception { // Empty list case MRCompactor.StringableList ls = new MRCompactor.StringableList(); String s = ls.toString(); - Assert.assertEquals("0:", s); + assertEquals("0:", s); ls = new MRCompactor.StringableList(s); - Assert.assertEquals(0, ls.size()); + assertEquals(0, ls.size()); ls = new MRCompactor.StringableList(); ls.add(new Path("/tmp")); ls.add(new Path("/usr")); s = ls.toString(); - Assert.assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s, + assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s, "2:4:/tmp4:/usr".equals(s) || "2:4:/usr4:/tmp".equals(s)); ls = new MRCompactor.StringableList(s); - Assert.assertEquals(2, ls.size()); + assertEquals(2, ls.size()); boolean sawTmp = false, sawUsr = false; for (Path p : ls) { if ("/tmp".equals(p.toString())) sawTmp = true; else if ("/usr".equals(p.toString())) sawUsr = true; else Assert.fail("Unexpected path " + p.toString()); } - Assert.assertTrue(sawTmp); - Assert.assertTrue(sawUsr); + assertTrue(sawTmp); + assertTrue(sawUsr); } @Test @@ -181,10 +190,10 @@ public void inputSplit() throws Exception { MRCompactor.CompactorInputSplit split = new MRCompactor.CompactorInputSplit(conf, 3, files, new Path(basename), deltas, new HashMap()); - Assert.assertEquals(520L, split.getLength()); + assertEquals(520L, split.getLength()); String[] locations = split.getLocations(); - Assert.assertEquals(1, locations.length); - Assert.assertEquals("localhost", locations[0]); + assertEquals(1, locations.length); + assertEquals("localhost", locations[0]); ByteArrayOutputStream buf = new ByteArrayOutputStream(); DataOutput out = new DataOutputStream(buf); @@ -194,12 +203,12 @@ public void inputSplit() throws Exception { DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); split.readFields(in); - Assert.assertEquals(3, split.getBucket()); - Assert.assertEquals(basename, split.getBaseDir().toString()); + assertEquals(3, split.getBucket()); + assertEquals(basename, split.getBaseDir().toString()); deltas = split.getDeltaDirs(); - Assert.assertEquals(2, deltas.length); - Assert.assertEquals(delta1, deltas[0].toString()); - Assert.assertEquals(delta2, deltas[1].toString()); + assertEquals(2, deltas.length); + assertEquals(delta1, deltas[0].toString()); + assertEquals(delta2, deltas[1].toString()); } @Test @@ -234,12 +243,12 @@ public void inputSplitNullBase() throws Exception { DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); split.readFields(in); - Assert.assertEquals(3, split.getBucket()); + assertEquals(3, split.getBucket()); Assert.assertNull(split.getBaseDir()); deltas = split.getDeltaDirs(); - Assert.assertEquals(2, deltas.length); - Assert.assertEquals(delta1, deltas[0].toString()); - Assert.assertEquals(delta2, deltas[1].toString()); + assertEquals(2, deltas.length); + assertEquals(delta1, deltas[0].toString()); + assertEquals(delta2, deltas[1].toString()); } @Test @@ -264,7 +273,7 @@ public void sortedTable() throws Exception { // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); } @Test @@ -291,7 +300,7 @@ public void sortedPartition() throws Exception { // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); } @Test @@ -312,13 +321,13 @@ public void minorTableWithBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(5, stat.length); + assertEquals(5, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; @@ -326,26 +335,26 @@ public void minorTableWithBase() throws Exception { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewDelta); + assertTrue(toString(stat), sawNewDelta); } /** @@ -372,20 +381,20 @@ public void minorWithOpenInMiddle() throws Exception { // since compaction was not run, state should not be "ready for cleaning" but "refused" ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(0).getState()); // There should still be 4 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(toString(stat), 4, stat.length); + assertEquals(toString(stat), 4, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); - Assert.assertEquals("base_20", stat[0].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName()); + assertEquals("base_20", stat[0].getPath().getName()); + assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName()); + assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName()); + assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName()); } @Test @@ -407,22 +416,22 @@ public void minorWithAborted() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 6 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(6, stat.length); + assertEquals(6, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); - Assert.assertEquals("base_20", stat[0].getPath().getName()); - Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27) + "_v0000028", stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirNameCompacted(21, 27) + "_v0000028", stat[3].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName()); + assertEquals("base_20", stat[0].getPath().getName()); + assertEquals(makeDeleteDeltaDirNameCompacted(21, 27) + "_v0000028", stat[1].getPath().getName()); + assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); + assertEquals(makeDeltaDirNameCompacted(21, 27) + "_v0000028", stat[3].getPath().getName()); + assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName()); + assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName()); } @Test @@ -444,13 +453,13 @@ public void minorPartitionWithBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); - Assert.assertEquals(5, stat.length); + assertEquals(5, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; @@ -458,25 +467,25 @@ public void minorPartitionWithBase() throws Exception { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewDelta); + assertTrue(toString(stat), sawNewDelta); } @Test @@ -496,13 +505,13 @@ public void minorTableNoBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; @@ -510,25 +519,25 @@ public void minorTableNoBase() throws Exception { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4) + "_v0000006")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4) + "_v0000006")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewDelta); + assertTrue(toString(stat), sawNewDelta); } @Test @@ -549,13 +558,13 @@ public void majorTableWithBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; @@ -563,16 +572,16 @@ public void majorTableWithBase() throws Exception { if (stat[i].getPath().getName().equals("base_0000024_v0000026")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(624L, buckets[0].getLen()); - Assert.assertEquals(624L, buckets[1].getLen()); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(624L, buckets[0].getLen()); + assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewBase); + assertTrue(toString(stat), sawNewBase); } @Test @@ -625,14 +634,14 @@ private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); /* delete_delta_21_23 and delete_delta_25_33 which are created as a result of compacting*/ int numFilesExpected = 11 + (type == CompactionType.MINOR ? 1 : 0); - Assert.assertEquals(numFilesExpected, stat.length); + assertEquals(numFilesExpected, stat.length); // Find the new delta file and make sure it has the right contents List matchesNotFound = new ArrayList<>(numFilesExpected); @@ -665,7 +674,7 @@ private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception { if(matchesNotFound.size() == 0) { return; } - Assert.assertTrue("Files remaining: " + matchesNotFound + "; " + toString(stat), false); + assertTrue("Files remaining: " + matchesNotFound + "; " + toString(stat), false); } @Test public void majorPartitionWithBase() throws Exception { @@ -687,13 +696,13 @@ public void majorPartitionWithBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; @@ -701,16 +710,16 @@ public void majorPartitionWithBase() throws Exception { if (stat[i].getPath().getName().equals("base_0000024_v0000026")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(624L, buckets[0].getLen()); - Assert.assertEquals(624L, buckets[1].getLen()); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(624L, buckets[0].getLen()); + assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewBase); + assertTrue(toString(stat), sawNewBase); } @Test @@ -730,13 +739,13 @@ public void majorTableNoBase() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should now be 3 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(3, stat.length); + assertEquals(3, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; @@ -744,16 +753,16 @@ public void majorTableNoBase() throws Exception { if (stat[i].getPath().getName().equals("base_0000004_v0000005")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(104L, buckets[0].getLen()); - Assert.assertEquals(104L, buckets[1].getLen()); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(104L, buckets[0].getLen()); + assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewBase); + assertTrue(toString(stat), sawNewBase); } private static String toString(FileStatus[] stat) { @@ -785,8 +794,8 @@ public void majorTableLegacy() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); @@ -799,16 +808,16 @@ public void majorTableLegacy() throws Exception { if (stat[i].getPath().getName().equals("base_0000024_v0000026")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(624L, buckets[0].getLen()); - Assert.assertEquals(624L, buckets[1].getLen()); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(624L, buckets[0].getLen()); + assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewBase); + assertTrue(toString(stat), sawNewBase); } @Test @@ -829,8 +838,8 @@ public void minorTableLegacy() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); @@ -842,14 +851,14 @@ public void minorTableLegacy() throws Exception { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) + "_v0000026")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewDelta); + assertTrue(toString(stat), sawNewDelta); } @Test @@ -873,13 +882,13 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; @@ -887,11 +896,11 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { if (stat[i].getPath().getName().equals("base_0000026_v0000028")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(2, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + assertEquals(2, buckets.length); + assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); // Bucket 0 should be small and bucket 1 should be large, make sure that's the case - Assert.assertTrue( + assertTrue( ("bucket_00000".equals(buckets[0].getPath().getName()) && 104L == buckets[0].getLen() && "bucket_00001".equals(buckets[1].getPath().getName()) && 676L == buckets[1] .getLen()) @@ -904,7 +913,7 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } } - Assert.assertTrue(toString(stat), sawNewBase); + assertTrue(toString(stat), sawNewBase); } @Test @@ -926,21 +935,21 @@ public void majorWithOpenInMiddle() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(5, stat.length); + assertEquals(5, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); - Assert.assertEquals("base_0000022_v0000028", stat[0].getPath().getName()); - Assert.assertEquals("base_20", stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); + assertEquals("base_0000022_v0000028", stat[0].getPath().getName()); + assertEquals("base_20", stat[1].getPath().getName()); + assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); + assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); + assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); } @Test @@ -962,21 +971,21 @@ public void majorWithAborted() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(5, stat.length); + assertEquals(5, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); - Assert.assertEquals("base_0000027_v0000028", stat[0].getPath().getName()); - Assert.assertEquals("base_20", stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); + assertEquals("base_0000027_v0000028", stat[0].getPath().getName()); + assertEquals("base_20", stat[1].getPath().getName()); + assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); + assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); + assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); } @Override boolean useHive130DeltaDirName() { @@ -1008,10 +1017,10 @@ public void testWorkerAndInitiatorVersion() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - Assert.assertEquals(initiatorVersion, compacts.get(0).getInitiatorVersion()); - Assert.assertEquals(workerVersion, compacts.get(0).getWorkerVersion()); + assertEquals(1, compacts.size()); + assertEquals("ready for cleaning", compacts.get(0).getState()); + assertEquals(initiatorVersion, compacts.get(0).getInitiatorVersion()); + assertEquals(workerVersion, compacts.get(0).getWorkerVersion()); } @@ -1072,7 +1081,7 @@ public void droppedTable() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(0, compacts.size()); + assertEquals(0, compacts.size()); } @Test @@ -1097,7 +1106,7 @@ public void droppedPartition() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(0, compacts.size()); + assertEquals(0, compacts.size()); } @Test @@ -1148,8 +1157,8 @@ public void insertOnlyDisabled() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("failed", compacts.get(0).getState()); + assertEquals(1, compacts.size()); + assertEquals("failed", compacts.get(0).getState()); } @@ -1162,21 +1171,21 @@ private void verifyTxn1IsAborted(int compactionNum, Table t, CompactionType type // Compaction should not have run on a single delta file FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(1, stat.length); - Assert.assertEquals(makeDeltaDirName(0, 2), stat[0].getPath().getName()); + assertEquals(1, stat.length); + assertEquals(makeDeltaDirName(0, 2), stat[0].getPath().getName()); // State should not be "ready for cleaning" because we skip cleaning List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals(compactionNum + 1, compacts.size()); - Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(compactionNum).getState()); + assertEquals(compactionNum + 1, compacts.size()); + assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(compactionNum).getState()); // assert transaction with txnId=1 is still aborted after cleaner is run startCleaner(); List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); - Assert.assertEquals(1, openTxns.get(0).getId()); - Assert.assertEquals(TxnState.ABORTED, openTxns.get(0).getState()); + assertEquals(1, openTxns.get(0).getId()); + assertEquals(TxnState.ABORTED, openTxns.get(0).getState()); } // With high timeout, but fast run we should finish without a problem @@ -1197,6 +1206,178 @@ public void testTimeoutWithoutInterrupt() throws Exception { runTimeoutTest(1, true, true); } + @Test + public void testExceptionWhenTxnCommitAndMarkFailed() throws Exception { + prepareTableAndCompaction("default", "tableForCommitAndMarkFailedError"); + runWorkerWithException(MethodToFail.COMMIT_TXN, MethodToFail.MARK_FAILED); + + List compacts = + txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + assertEquals(TxnStore.WORKING_RESPONSE, compacts.get(0).getState()); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(1, openTxns.size()); + TxnInfo txn = openTxns.get(0); + assertEquals(compacts.get(0).getTxnId(), txn.getId()); + assertEquals(TxnState.OPEN, txn.getState()); + txnHandler.abortTxn(new AbortTxnRequest(txn.getId())); + } + + @Test + public void testExceptionWhenTxnCommit() throws Exception { + prepareTableAndCompaction("default", "tableForCommitError"); + runWorkerWithException(MethodToFail.COMMIT_TXN); + + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState()); + assertEquals("Simulated failure in commitTxn", compaction.getErrorMessage()); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(1, openTxns.size()); + TxnInfo txn = openTxns.get(0); + assertEquals(compaction.getTxnId(), txn.getId()); + assertEquals(TxnState.OPEN, txn.getState()); + txnHandler.abortTxn(new AbortTxnRequest(txn.getId())); + } + + @Test + public void testExceptionWhenMarkCompacted() throws Exception { + prepareTableAndCompaction("default", "tableForMarkCompactedError"); + runWorkerWithException(MethodToFail.MARK_COMPACTED); + + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState()); + assertEquals("Simulated failure in markCompacted", compaction.getErrorMessage()); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(0, openTxns.size()); + } + + @Test + public void testExceptionDuringCompact() throws Exception { + prepareTableAndCompaction("default", "tableForCompactError"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST, true); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TEST_MODE_FAIL_COMPACTION, true); + startWorker(); + + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState()); + assertEquals("HIVE_TEST_MODE_FAIL_COMPACTION=true", compaction.getErrorMessage()); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(1, openTxns.size()); + TxnInfo txn = openTxns.get(0); + assertEquals(compaction.getTxnId(), txn.getId()); + assertEquals(TxnState.ABORTED, txn.getState()); + } + + @Test + public void testWorkerIfIsDynPartAbort() throws Exception { + String dbName = "default"; + String tableName = "tableWithPartition"; + Table t = newTable(dbName, tableName, true); + addBaseFile(t, null, 1L, 3, 1); + addDeltaFile(t, null, 2L, 2L, 1); + addDeltaFile(t, null, 3L, 3L, 1); + addDeltaFile(t, null, 4L, 4L, 1); + burnThroughTransactions(dbName, tableName, 4, null, null); + // trigger compaction + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + rqst.setPartitionname(null); + txnHandler.compact(rqst); + startWorker(); + + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.CLEANING_RESPONSE, compaction.getState()); + assertTrue(compaction.getNextTxnId() > 0L); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(0, openTxns.size()); + } + + @Test + public void testWorkerNotEnoughToCompact() throws Exception { + String dbName = "default"; + String tableName = "tableWithNoDelta"; + Table t = newTable(dbName, tableName, false); + addBaseFile(t, null, 1L, 3, 1); + burnThroughTransactions(dbName, tableName, 1, null, null); + // trigger compaction + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + txnHandler.compact(rqst); + startWorker(); + + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.REFUSED_RESPONSE, compaction.getState()); + assertTrue(compaction.getErrorMessage().contains("None of the compaction thresholds met, compaction request is refused!")); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(0, openTxns.size()); + } + + @Test + public void testWorkerNotEnoughToCompactNeedsCleaning() throws Exception { + String dbName = "default"; + String tableName = "tableNeedsCleaning"; + Table t = newTable(dbName, tableName, false); + addDeltaFile(t, null, 1L, 1L, 1); + addDeltaFile(t, null, 2L, 2L, 1); + addBaseFile(t, null, 4L, 3, 6); + burnThroughTransactions(dbName, tableName, 6, null, new HashSet(Arrays.asList(1L, 2L))); + // trigger compaction + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + txnHandler.compact(rqst); + startWorker(); + + List compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + ShowCompactResponseElement compaction = compacts.get(0); + assertEquals(TxnStore.CLEANING_RESPONSE, compaction.getState()); + assertTrue(compaction.getNextTxnId() > 0L); + List openTxns = HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + assertEquals(2, openTxns.size()); + assertEquals(1L, openTxns.get(0).getId()); + assertEquals(2L, openTxns.get(1).getId()); + } + + private void runWorkerWithException(MethodToFail... methodToFail) throws Exception { + IMetaStoreClient spyMsc = Mockito.spy(ms); + for (MethodToFail method: methodToFail) { + switch (method) { + case MARK_FAILED -> doThrow(new TTransportException("Simulated failure in markFailed")).when(spyMsc).markFailed(any()); + case COMMIT_TXN -> doThrow(new TException("Simulated failure in commitTxn")).when(spyMsc).commitTxn(anyLong()); + case MARK_COMPACTED -> doThrow(new TTransportException("Simulated failure in markCompacted")).when(spyMsc).markCompacted(any()); + } + } + + TestTxnDbUtil.setConfValues(conf); + Worker worker = Mockito.spy(new Worker()); + worker.setConf(conf); + AtomicBoolean stop = new AtomicBoolean(); + stop.set(true); + worker.init(stop); + worker.msc = spyMsc; + worker.setName("testworker"); + CompactorThread ct = worker; + ct.run(); + } + + private void prepareTableAndCompaction(String dbName, String tableName) throws Exception { + Table t = newTable(dbName, tableName, false); + addBaseFile(t, null, 1L, 3, 1); + addDeltaFile(t, null, 2L, 2L, 1); + addDeltaFile(t, null, 3L, 3L, 1); + addDeltaFile(t, null, 4L, 4L, 1); + burnThroughTransactions(dbName, tableName, 4, null, null); + // trigger compaction + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + txnHandler.compact(rqst); + } + + enum MethodToFail { + MARK_COMPACTED, + MARK_FAILED, + COMMIT_TXN; + } + private void runTimeoutTest(long timeout, boolean runForever, boolean swallowInterrupt) throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(); HiveConf timeoutConf = new HiveConf(conf);