From 59d284e4943a5d3d86ecd318885c3213b857c0db Mon Sep 17 00:00:00 2001 From: hemanthsavasere Date: Sat, 14 Mar 2026 15:47:36 +0000 Subject: [PATCH] Fix testLogTableCompaction flakiness by adding compaction timeout and retry logic --- .../iceberg/tiering/IcebergLakeWriter.java | 17 +++++++++- .../maintenance/IcebergRewriteITCase.java | 31 ++++++++++++++++--- .../FlinkIcebergTieringTestBase.java | 27 ++++++++++++++++ 3 files changed, 69 insertions(+), 6 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java index 227112eae8..6391f0ee81 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java @@ -48,6 +48,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg; @@ -60,6 +61,10 @@ public class IcebergLakeWriter implements LakeWriter { private final Table icebergTable; private final RecordWriter recordWriter; + // Timeout for waiting on the async compaction future during complete(). + // Prevents complete() from blocking indefinitely when compaction is slow. + private static final long COMPACTION_TIMEOUT_SECONDS = 300L; + @Nullable private final ExecutorService compactionExecutor; @Nullable private CompletableFuture compactionFuture; @@ -114,7 +119,17 @@ public IcebergWriteResult complete() throws IOException { RewriteDataFileResult rewriteDataFileResult = null; if (compactionFuture != null) { - rewriteDataFileResult = compactionFuture.get(); + try { + rewriteDataFileResult = + compactionFuture.get(COMPACTION_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (TimeoutException e) { + LOG.warn( + "Compaction timed out after {} seconds for table {}. " + + "Skipping compaction result for this write.", + COMPACTION_TIMEOUT_SECONDS, + icebergTable.name()); + compactionFuture.cancel(true); + } } return new IcebergWriteResult(writeResult, rewriteDataFileResult); } catch (Exception e) { diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java index 4b6d9de71a..7cb9cdef7d 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -187,13 +188,20 @@ void testLogTableCompaction() throws Exception { checkFileStatusInIcebergTable(t1, 3, false); // Write should trigger compaction now since the current data file count is greater or - // equal MIN_FILES_TO_COMPACT + // equal MIN_FILES_TO_COMPACT. Use a longer timeout because complete() blocks until + // compaction finishes. flussRows.addAll( writeIcebergTableRecords( - t1, t1Bucket, ++i, true, Collections.singletonList(row(1, "v1")))); - // Should only have two files now, one file it for newly written, one file is for target - // compacted file - checkFileStatusInIcebergTable(t1, 2, false); + t1, + t1Bucket, + ++i, + true, + Collections.singletonList(row(1, "v1")), + Duration.ofMinutes(2))); + // Should only have two files now, one file for newly written, one file for target + // compacted file. Use retry since compaction commit is part of the same pipeline + // round and may not be immediately visible. + waitForFileStatusInIcebergTable(t1, 2, false); // check data in iceberg to make sure compaction won't lose data or duplicate data checkRecords(getIcebergRecords(t1), flussRows); @@ -213,4 +221,17 @@ private List writeIcebergTableRecords( assertReplicaStatus(tableBucket, expectedLogEndOffset); return rows; } + + private List writeIcebergTableRecords( + TablePath tablePath, + TableBucket tableBucket, + long expectedLogEndOffset, + boolean append, + List rows, + Duration assertTimeout) + throws Exception { + writeRows(tablePath, rows, append); + assertReplicaStatus(tableBucket, expectedLogEndOffset, assertTimeout); + return rows; + } } diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java index 5bd021b9ad..6149dbca4c 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java @@ -257,6 +257,33 @@ protected void assertReplicaStatus(TableBucket tb, long expectedLogEndOffset) { }); } + protected void assertReplicaStatus( + TableBucket tb, long expectedLogEndOffset, Duration timeout) { + retry( + timeout, + () -> { + Replica replica = getLeaderReplica(tb); + // datalake snapshot id should be updated + assertThat(replica.getLogTablet().getLakeTableSnapshotId()) + .isGreaterThanOrEqualTo(0); + assertThat(replica.getLakeLogEndOffset()).isEqualTo(expectedLogEndOffset); + }); + } + + protected void waitForFileStatusInIcebergTable( + TablePath tablePath, int expectedFileCount, boolean shouldDeleteFileExist) { + retry( + Duration.ofMinutes(2), + () -> { + try { + checkFileStatusInIcebergTable( + tablePath, expectedFileCount, shouldDeleteFileExist); + } catch (IOException e) { + throw new AssertionError("Failed to check file status in Iceberg table", e); + } + }); + } + /** * Wait until the default number of partitions is created. Return the map from partition id to * partition name.