Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;

Expand All @@ -60,6 +61,10 @@ public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
private final Table icebergTable;
private final RecordWriter recordWriter;

// Timeout for waiting on the async compaction future during complete().
// Prevents complete() from blocking indefinitely when compaction is slow.
private static final long COMPACTION_TIMEOUT_SECONDS = 300L;

@Nullable private final ExecutorService compactionExecutor;
@Nullable private CompletableFuture<RewriteDataFileResult> compactionFuture;

Expand Down Expand Up @@ -114,7 +119,17 @@ public IcebergWriteResult complete() throws IOException {

RewriteDataFileResult rewriteDataFileResult = null;
if (compactionFuture != null) {
rewriteDataFileResult = compactionFuture.get();
try {
rewriteDataFileResult =
compactionFuture.get(COMPACTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
LOG.warn(
"Compaction timed out after {} seconds for table {}. "
+ "Skipping compaction result for this write.",
COMPACTION_TIMEOUT_SECONDS,
icebergTable.name());
compactionFuture.cancel(true);
}
}
return new IcebergWriteResult(writeResult, rewriteDataFileResult);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -187,13 +188,20 @@ void testLogTableCompaction() throws Exception {
checkFileStatusInIcebergTable(t1, 3, false);

// Write should trigger compaction now since the current data file count is greater or
// equal MIN_FILES_TO_COMPACT
// equal MIN_FILES_TO_COMPACT. Use a longer timeout because complete() blocks until
// compaction finishes.
flussRows.addAll(
writeIcebergTableRecords(
t1, t1Bucket, ++i, true, Collections.singletonList(row(1, "v1"))));
// Should only have two files now, one file it for newly written, one file is for target
// compacted file
checkFileStatusInIcebergTable(t1, 2, false);
t1,
t1Bucket,
++i,
true,
Collections.singletonList(row(1, "v1")),
Duration.ofMinutes(2)));
// Should only have two files now, one file for newly written, one file for target
// compacted file. Use retry since compaction commit is part of the same pipeline
// round and may not be immediately visible.
waitForFileStatusInIcebergTable(t1, 2, false);

// check data in iceberg to make sure compaction won't lose data or duplicate data
checkRecords(getIcebergRecords(t1), flussRows);
Expand All @@ -213,4 +221,17 @@ private List<InternalRow> writeIcebergTableRecords(
assertReplicaStatus(tableBucket, expectedLogEndOffset);
return rows;
}

private List<InternalRow> writeIcebergTableRecords(
TablePath tablePath,
TableBucket tableBucket,
long expectedLogEndOffset,
boolean append,
List<InternalRow> rows,
Duration assertTimeout)
throws Exception {
writeRows(tablePath, rows, append);
assertReplicaStatus(tableBucket, expectedLogEndOffset, assertTimeout);
return rows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,33 @@ protected void assertReplicaStatus(TableBucket tb, long expectedLogEndOffset) {
});
}

protected void assertReplicaStatus(
TableBucket tb, long expectedLogEndOffset, Duration timeout) {
retry(
timeout,
() -> {
Replica replica = getLeaderReplica(tb);
// datalake snapshot id should be updated
assertThat(replica.getLogTablet().getLakeTableSnapshotId())
.isGreaterThanOrEqualTo(0);
assertThat(replica.getLakeLogEndOffset()).isEqualTo(expectedLogEndOffset);
});
}

protected void waitForFileStatusInIcebergTable(
TablePath tablePath, int expectedFileCount, boolean shouldDeleteFileExist) {
retry(
Duration.ofMinutes(2),
() -> {
try {
checkFileStatusInIcebergTable(
tablePath, expectedFileCount, shouldDeleteFileExist);
} catch (IOException e) {
throw new AssertionError("Failed to check file status in Iceberg table", e);
}
});
}

/**
* Wait until the default number of partitions is created. Return the map from partition id to
* partition name.
Expand Down