From c0a2665a60ab53feac6a1e47214da910b13f9701 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 11:27:29 +0530 Subject: [PATCH 1/7] added metadat and data path in case of dynamic routing --- committer/build.gradle | 36 ++ .../java/org/apache/iceberg/Coordinator.java | 315 ++++++++++++++++++ .../main/java/org/apache/iceberg/Main.java | 7 + .../iceberg/connect/IcebergSinkConfig.java | 34 +- .../iceberg/connect/TableSinkConfig.java | 13 +- .../connect/data/IcebergWriterFactory.java | 11 +- 6 files changed, 413 insertions(+), 3 deletions(-) create mode 100644 committer/build.gradle create mode 100644 committer/src/main/java/org/apache/iceberg/Coordinator.java create mode 100644 committer/src/main/java/org/apache/iceberg/Main.java diff --git a/committer/build.gradle b/committer/build.gradle new file mode 100644 index 000000000000..0de61912d850 --- /dev/null +++ b/committer/build.gradle @@ -0,0 +1,36 @@ +plugins { + id 'java-library' +} + +group = 'org.apache.iceberg' +version = '1.7.0-SNAPSHOT' + +repositories { + mavenCentral() +} + +dependencies { + api project(':iceberg-api') + implementation project(':iceberg-kafka-connect') + implementation project(':iceberg-core') + implementation project(':iceberg-common') + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + implementation project(':iceberg-data') + implementation project(':iceberg-kafka-connect:iceberg-kafka-connect-events') + implementation platform(libs.jackson.bom) + implementation libs.jackson.core + implementation libs.jackson.databind + implementation libs.avro.avro + + compileOnly libs.kafka.clients + compileOnly libs.kafka.connect.api + compileOnly libs.kafka.connect.json + + testImplementation libs.hadoop3.client + testRuntimeOnly project(':iceberg-parquet') + testRuntimeOnly project(':iceberg-orc') +} + +test { + useJUnitPlatform() +} diff --git a/committer/src/main/java/org/apache/iceberg/Coordinator.java b/committer/src/main/java/org/apache/iceberg/Coordinator.java new file mode 100644 index 000000000000..6a260d91bb80 --- /dev/null +++ b/committer/src/main/java/org/apache/iceberg/Coordinator.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.channels.Channel; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.channel.CommitState; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class Coordinator { + + private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; + private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; + private static final Duration POLL_DURATION = Duration.ofSeconds(1); + + private final Catalog catalog; + private final IcebergSinkConfig config; + private final int totalPartitionCount; + private final String snapshotOffsetsProp; + private final ExecutorService exec; + private final CommitState commitState; + private volatile boolean terminated; + + Coordinator( + Catalog catalog, + IcebergSinkConfig config, + Collection members, + KafkaClientFactory clientFactory, + SinkTaskContext context) { + // pass consumer group ID to which we commit low watermark offsets + super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context); + + this.catalog = catalog; + this.config = config; + this.totalPartitionCount = + members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum(); + this.snapshotOffsetsProp = + String.format( + "kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId()); + this.exec = ThreadPools.newFixedThreadPool("iceberg-committer", config.commitThreads()); + this.commitState = new CommitState(config); + } + + void process() { + if (commitState.isCommitIntervalReached()) { + // send out begin commit + commitState.startNewCommit(); + Event event = + new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); + send(event); + LOG.info("Commit {} initiated", commitState.currentCommitId()); + } + + consumeAvailable(POLL_DURATION); + + if (commitState.isCommitTimedOut()) { + commit(true); + } + } + + @Override + protected boolean receive(Envelope envelope) { + switch (envelope.event().payload().type()) { + case DATA_WRITTEN: + commitState.addResponse(envelope); + return true; + case DATA_COMPLETE: + commitState.addReady(envelope); + if (commitState.isCommitReady(totalPartitionCount)) { + commit(false); + } + return true; + } + return false; + } + + private void commit(boolean partialCommit) { + try { + doCommit(partialCommit); + } catch (Exception e) { + LOG.warn("Commit failed, will try again next cycle", e); + } finally { + commitState.endCurrentCommit(); + } + } + + private void doCommit(boolean partialCommit) { + Map> commitMap = commitState.tableCommitMap(); + + String offsetsJson = offsetsJson(); + OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit); + + Tasks.foreach(commitMap.entrySet()) + .executeWith(exec) + .stopOnFailure() + .run( + entry -> { + commitToTable(entry.getKey(), entry.getValue(), offsetsJson, validThroughTs); + }); + + // we should only get here if all tables committed successfully... + commitConsumerOffsets(); + commitState.clearResponses(); + + Event event = + new Event( + config.connectGroupId(), + new CommitComplete(commitState.currentCommitId(), validThroughTs)); + send(event); + + LOG.info( + "Commit {} complete, committed to {} table(s), valid-through {}", + commitState.currentCommitId(), + commitMap.size(), + validThroughTs); + } + + private String offsetsJson() { + try { + return MAPPER.writeValueAsString(controlTopicOffsets()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void commitToTable( + TableReference tableReference, + List envelopeList, + String offsetsJson, + OffsetDateTime validThroughTs) { + TableIdentifier tableIdentifier = tableReference.identifier(); + Table table; + try { + table = catalog.loadTable(tableIdentifier); + } catch (NoSuchTableException e) { + LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e); + return; + } + + String branch = config.tableConfig(tableIdentifier.toString()).commitBranch(); + + Map committedOffsets = lastCommittedOffsetsForTable(table, branch); + + List payloads = + envelopeList.stream() + .filter( + envelope -> { + Long minOffset = committedOffsets.get(envelope.partition()); + return minOffset == null || envelope.offset() >= minOffset; + }) + .map(envelope -> (DataWritten) envelope.event().payload()) + .collect(Collectors.toList()); + + List dataFiles = + payloads.stream() + .filter(payload -> payload.dataFiles() != null) + .flatMap(payload -> payload.dataFiles().stream()) + .filter(dataFile -> dataFile.recordCount() > 0) + .filter(distinctByKey(ContentFile::location)) + .collect(Collectors.toList()); + + List deleteFiles = + payloads.stream() + .filter(payload -> payload.deleteFiles() != null) + .flatMap(payload -> payload.deleteFiles().stream()) + .filter(deleteFile -> deleteFile.recordCount() > 0) + .filter(distinctByKey(ContentFile::location)) + .collect(Collectors.toList()); + + if (terminated) { + throw new ConnectException("Coordinator is terminated, commit aborted"); + } + + if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { + LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); + } else { + if (deleteFiles.isEmpty()) { + AppendFiles appendOp = table.newAppend(); + if (branch != null) { + appendOp.toBranch(branch); + } + appendOp.set(snapshotOffsetsProp, offsetsJson); + appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (validThroughTs != null) { + appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); + } + dataFiles.forEach(appendOp::appendFile); + appendOp.commit(); + } else { + RowDelta deltaOp = table.newRowDelta(); + if (branch != null) { + deltaOp.toBranch(branch); + } + deltaOp.set(snapshotOffsetsProp, offsetsJson); + deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (validThroughTs != null) { + deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); + } + dataFiles.forEach(deltaOp::addRows); + deleteFiles.forEach(deltaOp::addDeletes); + deltaOp.commit(); + } + + Long snapshotId = latestSnapshot(table, branch).snapshotId(); + Event event = + new Event( + config.connectGroupId(), + new CommitToTable( + commitState.currentCommitId(), tableReference, snapshotId, validThroughTs)); + send(event); + + LOG.info( + "Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", + tableIdentifier, + snapshotId, + commitState.currentCommitId(), + validThroughTs); + } + } + + private Predicate distinctByKey(Function keyExtractor) { + Map seen = Maps.newConcurrentMap(); + return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; + } + + private Snapshot latestSnapshot(Table table, String branch) { + if (branch == null) { + return table.currentSnapshot(); + } + return table.snapshot(branch); + } + + private Map lastCommittedOffsetsForTable(Table table, String branch) { + Snapshot snapshot = latestSnapshot(table, branch); + while (snapshot != null) { + Map summary = snapshot.summary(); + String value = summary.get(snapshotOffsetsProp); + if (value != null) { + TypeReference> typeRef = new TypeReference>() {}; + try { + return MAPPER.readValue(value, typeRef); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + Long parentSnapshotId = snapshot.parentId(); + snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; + } + return ImmutableMap.of(); + } + + void terminate() { + this.terminated = true; + + exec.shutdownNow(); + + // wait for coordinator termination, else cause the sink task to fail + try { + if (!exec.awaitTermination(1, TimeUnit.MINUTES)) { + throw new ConnectException("Timed out waiting for coordinator shutdown"); + } + } catch (InterruptedException e) { + throw new ConnectException("Interrupted while waiting for coordinator shutdown", e); + } + } +} diff --git a/committer/src/main/java/org/apache/iceberg/Main.java b/committer/src/main/java/org/apache/iceberg/Main.java new file mode 100644 index 000000000000..3a6b1ae495ae --- /dev/null +++ b/committer/src/main/java/org/apache/iceberg/Main.java @@ -0,0 +1,7 @@ +package org.apache.iceberg; + +public class Main { + public static void main(String[] args) { + System.out.println("Hello world!"); + } +} \ No newline at end of file diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..f6dd7cf3c0a6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -56,6 +57,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -235,6 +237,13 @@ private static ConfigDef newConfigDef() { 120000L, Importance.LOW, "config to control coordinator executor keep alive time"); + configDef.define( + DYNAMIC_ROUTE_DATA_METADATA_PREFIX, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "prefix for creation of metadata path and data path in case of dynamic routing" + ); return configDef; } @@ -375,7 +384,20 @@ public TableSinkConfig tableConfig(String tableName) { String commitBranch = tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch()); - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch); + String metadataPath = "", dataPath = ""; + + if (dynamicTablesEnabled()) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + if (originalProps.containsKey("iceberg.catalog.warehouse")) { + metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + } else { + metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); + } + } + + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -388,6 +410,16 @@ static List stringToList(String value, String regex) { return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList()); } + private String defaultDataPath(String tableName) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + } + + private String defaultMetadataPath(String tableName) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + } + public String controlTopic() { return getString(CONTROL_TOPIC_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 0ecde1f7dd0b..28fdd4f36d15 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -27,13 +27,24 @@ public class TableSinkConfig { private final List idColumns; private final List partitionBy; private final String commitBranch; + private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; this.commitBranch = commitBranch; + this.dataPath = dataPath; + this.metadataPath = metadataPath; + } + + public String getDataPath() { + return dataPath; + } + + public String getMetadataPath() { + return metadataPath; } public Pattern routeRegex() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 92f5af2d7a87..5de1039e77b0 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -89,6 +90,14 @@ Table autoCreateTable(String tableName, SinkRecord sample) { createNamespaceIfNotExist(catalog, identifier.namespace()); List partitionBy = config.tableConfig(tableName).partitionBy(); + + Map tableAutoCreateProps = config.autoCreateProps(); + + if (config.dynamicTablesEnabled()) { + tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); + } + PartitionSpec spec; try { spec = SchemaUtils.createPartitionSpec(schema, partitionBy); @@ -110,7 +119,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { try { result.set( catalog.createTable( - identifier, schema, partitionSpec, config.autoCreateProps())); + identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 67619ec140a65c7768af074178c95408d76e605d Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 11:28:07 +0530 Subject: [PATCH 2/7] spotless --- .../iceberg/connect/IcebergSinkConfig.java | 44 +++++++++++++++---- .../iceberg/connect/TableSinkConfig.java | 7 ++- .../connect/data/IcebergWriterFactory.java | 6 +-- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index f6dd7cf3c0a6..8339cf7f89ac 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -57,7 +57,8 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = + "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -242,8 +243,7 @@ private static ConfigDef newConfigDef() { ConfigDef.Type.STRING, "", Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing" - ); + "prefix for creation of metadata path and data path in case of dynamic routing"); return configDef; } @@ -389,15 +389,27 @@ public TableSinkConfig tableConfig(String tableName) { if (dynamicTablesEnabled()) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + metadataPath = + originalProps.get("iceberg.catalog.warehouse") + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/metadata"; + dataPath = + originalProps.get("iceberg.catalog.warehouse") + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/data"; } else { - metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + metadataPath = + tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); } } - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig( + routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -412,12 +424,26 @@ static List stringToList(String value, String regex) { private String defaultDataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + + "/" + + connectorName() + + "/" + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/data"; } private String defaultMetadataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + + "/" + + connectorName() + + "/" + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/metadata"; } public String controlTopic() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 28fdd4f36d15..36879bf6929a 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -30,7 +30,12 @@ public class TableSinkConfig { private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { + Pattern routeRegex, + List idColumns, + List partitionBy, + String commitBranch, + String dataPath, + String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 5de1039e77b0..df87e8d2a958 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -94,7 +94,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) { Map tableAutoCreateProps = config.autoCreateProps(); if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put( + "write.metadata.path", config.tableConfig(tableName).getMetadataPath()); tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); } @@ -118,8 +119,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { notUsed -> { try { result.set( - catalog.createTable( - identifier, schema, partitionSpec, tableAutoCreateProps)); + catalog.createTable(identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 6b15ae402deded39a34843f85f5342dbb9ae28a0 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 17:54:24 +0530 Subject: [PATCH 3/7] Revert "spotless" This reverts commit 67619ec140a65c7768af074178c95408d76e605d. --- .../iceberg/connect/IcebergSinkConfig.java | 44 ++++--------------- .../iceberg/connect/TableSinkConfig.java | 7 +-- .../connect/data/IcebergWriterFactory.java | 6 +-- 3 files changed, 13 insertions(+), 44 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 8339cf7f89ac..f6dd7cf3c0a6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -57,8 +57,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = - "iceberg.dynamic-route-data-metadata-prefix"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -243,7 +242,8 @@ private static ConfigDef newConfigDef() { ConfigDef.Type.STRING, "", Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing"); + "prefix for creation of metadata path and data path in case of dynamic routing" + ); return configDef; } @@ -389,27 +389,15 @@ public TableSinkConfig tableConfig(String tableName) { if (dynamicTablesEnabled()) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = - originalProps.get("iceberg.catalog.warehouse") - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/metadata"; - dataPath = - originalProps.get("iceberg.catalog.warehouse") - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/data"; + metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; } else { - metadataPath = - tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); } } - return new TableSinkConfig( - routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -424,26 +412,12 @@ static List stringToList(String value, String regex) { private String defaultDataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) - + "/" - + connectorName() - + "/" - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/data"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; } private String defaultMetadataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) - + "/" - + connectorName() - + "/" - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/metadata"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; } public String controlTopic() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 36879bf6929a..28fdd4f36d15 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -30,12 +30,7 @@ public class TableSinkConfig { private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, - List idColumns, - List partitionBy, - String commitBranch, - String dataPath, - String metadataPath) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index df87e8d2a958..5de1039e77b0 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -94,8 +94,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { Map tableAutoCreateProps = config.autoCreateProps(); if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put( - "write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); } @@ -119,7 +118,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) { notUsed -> { try { result.set( - catalog.createTable(identifier, schema, partitionSpec, tableAutoCreateProps)); + catalog.createTable( + identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 8398e4c2675f5e319850c6fe1555007c153ec143 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 17:54:36 +0530 Subject: [PATCH 4/7] Revert "added metadat and data path in case of dynamic routing" This reverts commit c0a2665a60ab53feac6a1e47214da910b13f9701. --- committer/build.gradle | 36 -- .../java/org/apache/iceberg/Coordinator.java | 315 ------------------ .../main/java/org/apache/iceberg/Main.java | 7 - .../iceberg/connect/IcebergSinkConfig.java | 34 +- .../iceberg/connect/TableSinkConfig.java | 13 +- .../connect/data/IcebergWriterFactory.java | 11 +- 6 files changed, 3 insertions(+), 413 deletions(-) delete mode 100644 committer/build.gradle delete mode 100644 committer/src/main/java/org/apache/iceberg/Coordinator.java delete mode 100644 committer/src/main/java/org/apache/iceberg/Main.java diff --git a/committer/build.gradle b/committer/build.gradle deleted file mode 100644 index 0de61912d850..000000000000 --- a/committer/build.gradle +++ /dev/null @@ -1,36 +0,0 @@ -plugins { - id 'java-library' -} - -group = 'org.apache.iceberg' -version = '1.7.0-SNAPSHOT' - -repositories { - mavenCentral() -} - -dependencies { - api project(':iceberg-api') - implementation project(':iceberg-kafka-connect') - implementation project(':iceberg-core') - implementation project(':iceberg-common') - implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') - implementation project(':iceberg-data') - implementation project(':iceberg-kafka-connect:iceberg-kafka-connect-events') - implementation platform(libs.jackson.bom) - implementation libs.jackson.core - implementation libs.jackson.databind - implementation libs.avro.avro - - compileOnly libs.kafka.clients - compileOnly libs.kafka.connect.api - compileOnly libs.kafka.connect.json - - testImplementation libs.hadoop3.client - testRuntimeOnly project(':iceberg-parquet') - testRuntimeOnly project(':iceberg-orc') -} - -test { - useJUnitPlatform() -} diff --git a/committer/src/main/java/org/apache/iceberg/Coordinator.java b/committer/src/main/java/org/apache/iceberg/Coordinator.java deleted file mode 100644 index 6a260d91bb80..000000000000 --- a/committer/src/main/java/org/apache/iceberg/Coordinator.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.channels.Channel; -import java.time.Duration; -import java.time.OffsetDateTime; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.connect.IcebergSinkConfig; -import org.apache.iceberg.connect.channel.CommitState; -import org.apache.iceberg.connect.events.CommitComplete; -import org.apache.iceberg.connect.events.CommitToTable; -import org.apache.iceberg.connect.events.DataWritten; -import org.apache.iceberg.connect.events.Event; -import org.apache.iceberg.connect.events.StartCommit; -import org.apache.iceberg.connect.events.TableReference; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; -import org.apache.kafka.clients.admin.MemberDescription; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.sink.SinkTaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class Coordinator { - - private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); - private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; - private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; - private static final Duration POLL_DURATION = Duration.ofSeconds(1); - - private final Catalog catalog; - private final IcebergSinkConfig config; - private final int totalPartitionCount; - private final String snapshotOffsetsProp; - private final ExecutorService exec; - private final CommitState commitState; - private volatile boolean terminated; - - Coordinator( - Catalog catalog, - IcebergSinkConfig config, - Collection members, - KafkaClientFactory clientFactory, - SinkTaskContext context) { - // pass consumer group ID to which we commit low watermark offsets - super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context); - - this.catalog = catalog; - this.config = config; - this.totalPartitionCount = - members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum(); - this.snapshotOffsetsProp = - String.format( - "kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId()); - this.exec = ThreadPools.newFixedThreadPool("iceberg-committer", config.commitThreads()); - this.commitState = new CommitState(config); - } - - void process() { - if (commitState.isCommitIntervalReached()) { - // send out begin commit - commitState.startNewCommit(); - Event event = - new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); - send(event); - LOG.info("Commit {} initiated", commitState.currentCommitId()); - } - - consumeAvailable(POLL_DURATION); - - if (commitState.isCommitTimedOut()) { - commit(true); - } - } - - @Override - protected boolean receive(Envelope envelope) { - switch (envelope.event().payload().type()) { - case DATA_WRITTEN: - commitState.addResponse(envelope); - return true; - case DATA_COMPLETE: - commitState.addReady(envelope); - if (commitState.isCommitReady(totalPartitionCount)) { - commit(false); - } - return true; - } - return false; - } - - private void commit(boolean partialCommit) { - try { - doCommit(partialCommit); - } catch (Exception e) { - LOG.warn("Commit failed, will try again next cycle", e); - } finally { - commitState.endCurrentCommit(); - } - } - - private void doCommit(boolean partialCommit) { - Map> commitMap = commitState.tableCommitMap(); - - String offsetsJson = offsetsJson(); - OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit); - - Tasks.foreach(commitMap.entrySet()) - .executeWith(exec) - .stopOnFailure() - .run( - entry -> { - commitToTable(entry.getKey(), entry.getValue(), offsetsJson, validThroughTs); - }); - - // we should only get here if all tables committed successfully... - commitConsumerOffsets(); - commitState.clearResponses(); - - Event event = - new Event( - config.connectGroupId(), - new CommitComplete(commitState.currentCommitId(), validThroughTs)); - send(event); - - LOG.info( - "Commit {} complete, committed to {} table(s), valid-through {}", - commitState.currentCommitId(), - commitMap.size(), - validThroughTs); - } - - private String offsetsJson() { - try { - return MAPPER.writeValueAsString(controlTopicOffsets()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private void commitToTable( - TableReference tableReference, - List envelopeList, - String offsetsJson, - OffsetDateTime validThroughTs) { - TableIdentifier tableIdentifier = tableReference.identifier(); - Table table; - try { - table = catalog.loadTable(tableIdentifier); - } catch (NoSuchTableException e) { - LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e); - return; - } - - String branch = config.tableConfig(tableIdentifier.toString()).commitBranch(); - - Map committedOffsets = lastCommittedOffsetsForTable(table, branch); - - List payloads = - envelopeList.stream() - .filter( - envelope -> { - Long minOffset = committedOffsets.get(envelope.partition()); - return minOffset == null || envelope.offset() >= minOffset; - }) - .map(envelope -> (DataWritten) envelope.event().payload()) - .collect(Collectors.toList()); - - List dataFiles = - payloads.stream() - .filter(payload -> payload.dataFiles() != null) - .flatMap(payload -> payload.dataFiles().stream()) - .filter(dataFile -> dataFile.recordCount() > 0) - .filter(distinctByKey(ContentFile::location)) - .collect(Collectors.toList()); - - List deleteFiles = - payloads.stream() - .filter(payload -> payload.deleteFiles() != null) - .flatMap(payload -> payload.deleteFiles().stream()) - .filter(deleteFile -> deleteFile.recordCount() > 0) - .filter(distinctByKey(ContentFile::location)) - .collect(Collectors.toList()); - - if (terminated) { - throw new ConnectException("Coordinator is terminated, commit aborted"); - } - - if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { - LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); - } else { - if (deleteFiles.isEmpty()) { - AppendFiles appendOp = table.newAppend(); - if (branch != null) { - appendOp.toBranch(branch); - } - appendOp.set(snapshotOffsetsProp, offsetsJson); - appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (validThroughTs != null) { - appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); - } - dataFiles.forEach(appendOp::appendFile); - appendOp.commit(); - } else { - RowDelta deltaOp = table.newRowDelta(); - if (branch != null) { - deltaOp.toBranch(branch); - } - deltaOp.set(snapshotOffsetsProp, offsetsJson); - deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (validThroughTs != null) { - deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); - } - dataFiles.forEach(deltaOp::addRows); - deleteFiles.forEach(deltaOp::addDeletes); - deltaOp.commit(); - } - - Long snapshotId = latestSnapshot(table, branch).snapshotId(); - Event event = - new Event( - config.connectGroupId(), - new CommitToTable( - commitState.currentCommitId(), tableReference, snapshotId, validThroughTs)); - send(event); - - LOG.info( - "Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", - tableIdentifier, - snapshotId, - commitState.currentCommitId(), - validThroughTs); - } - } - - private Predicate distinctByKey(Function keyExtractor) { - Map seen = Maps.newConcurrentMap(); - return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; - } - - private Snapshot latestSnapshot(Table table, String branch) { - if (branch == null) { - return table.currentSnapshot(); - } - return table.snapshot(branch); - } - - private Map lastCommittedOffsetsForTable(Table table, String branch) { - Snapshot snapshot = latestSnapshot(table, branch); - while (snapshot != null) { - Map summary = snapshot.summary(); - String value = summary.get(snapshotOffsetsProp); - if (value != null) { - TypeReference> typeRef = new TypeReference>() {}; - try { - return MAPPER.readValue(value, typeRef); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - Long parentSnapshotId = snapshot.parentId(); - snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; - } - return ImmutableMap.of(); - } - - void terminate() { - this.terminated = true; - - exec.shutdownNow(); - - // wait for coordinator termination, else cause the sink task to fail - try { - if (!exec.awaitTermination(1, TimeUnit.MINUTES)) { - throw new ConnectException("Timed out waiting for coordinator shutdown"); - } - } catch (InterruptedException e) { - throw new ConnectException("Interrupted while waiting for coordinator shutdown", e); - } - } -} diff --git a/committer/src/main/java/org/apache/iceberg/Main.java b/committer/src/main/java/org/apache/iceberg/Main.java deleted file mode 100644 index 3a6b1ae495ae..000000000000 --- a/committer/src/main/java/org/apache/iceberg/Main.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.iceberg; - -public class Main { - public static void main(String[] args) { - System.out.println("Hello world!"); - } -} \ No newline at end of file diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index f6dd7cf3c0a6..9650ce16270c 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,7 +28,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -57,7 +56,6 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -237,13 +235,6 @@ private static ConfigDef newConfigDef() { 120000L, Importance.LOW, "config to control coordinator executor keep alive time"); - configDef.define( - DYNAMIC_ROUTE_DATA_METADATA_PREFIX, - ConfigDef.Type.STRING, - "", - Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing" - ); return configDef; } @@ -384,20 +375,7 @@ public TableSinkConfig tableConfig(String tableName) { String commitBranch = tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch()); - String metadataPath = "", dataPath = ""; - - if (dynamicTablesEnabled()) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; - } else { - metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); - dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); - } - } - - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch); }); } @@ -410,16 +388,6 @@ static List stringToList(String value, String regex) { return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList()); } - private String defaultDataPath(String tableName) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; - } - - private String defaultMetadataPath(String tableName) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - } - public String controlTopic() { return getString(CONTROL_TOPIC_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 28fdd4f36d15..0ecde1f7dd0b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -27,24 +27,13 @@ public class TableSinkConfig { private final List idColumns; private final List partitionBy; private final String commitBranch; - private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; this.commitBranch = commitBranch; - this.dataPath = dataPath; - this.metadataPath = metadataPath; - } - - public String getDataPath() { - return dataPath; - } - - public String getMetadataPath() { - return metadataPath; } public Pattern routeRegex() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 5de1039e77b0..92f5af2d7a87 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -20,7 +20,6 @@ import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -90,14 +89,6 @@ Table autoCreateTable(String tableName, SinkRecord sample) { createNamespaceIfNotExist(catalog, identifier.namespace()); List partitionBy = config.tableConfig(tableName).partitionBy(); - - Map tableAutoCreateProps = config.autoCreateProps(); - - if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); - tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); - } - PartitionSpec spec; try { spec = SchemaUtils.createPartitionSpec(schema, partitionBy); @@ -119,7 +110,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { try { result.set( catalog.createTable( - identifier, schema, partitionSpec, tableAutoCreateProps)); + identifier, schema, partitionSpec, config.autoCreateProps())); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 292909ccb5d215ee36bdbe9ca8db0b8769d11ac8 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 16 May 2026 19:59:33 +0530 Subject: [PATCH 5/7] added provision to tolearte transient commit failed exception during rebalance which requires manual intervention to restart the failed tasks --- .../iceberg/connect/channel/Channel.java | 63 +++- .../connect/channel/CommitterImpl.java | 18 +- .../iceberg/connect/data/SinkWriter.java | 2 + .../connect/channel/TestChannelRetry.java | 341 ++++++++++++++++++ 4 files changed, 414 insertions(+), 10 deletions(-) create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestChannelRetry.java diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java index 01cf165de66b..73d5773e5bb6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.stream.Collectors; import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.connect.data.Offset; import org.apache.iceberg.connect.events.AvroUtil; @@ -31,12 +30,16 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,30 +93,74 @@ protected void send(List events, Map sourceOffset // key by producer ID to keep event order return new ProducerRecord<>(controlTopic, producerId, data); }) - .collect(Collectors.toList()); + .toList(); synchronized (producer) { - producer.beginTransaction(); + boolean transactionStarted = false; try { + producer.beginTransaction(); + transactionStarted = true; // NOTE: we shouldn't call get() on the future in a transactional context, // see docs for org.apache.kafka.clients.producer.KafkaProducer recordList.forEach(producer::send); - if (!sourceOffsets.isEmpty()) { + if (!offsetsToCommit.isEmpty()) { producer.sendOffsetsToTransaction( offsetsToCommit, KafkaUtils.consumerGroupMetadata(context)); } producer.commitTransaction(); } catch (Exception e) { - try { - producer.abortTransaction(); - } catch (Exception ex) { - LOG.warn("Error aborting producer transaction", ex); + safeAbortTransaction(transactionStarted); + if (isRecoverableRebalanceError(e)) { + // A consumer-group re-balance happened between this transaction's preparation and the + // broker-side offset commit (CommitFailedException), or the producer epoch was bumped + // (InvalidProducerEpochException). The transaction was aborted, so source offsets did + // not advance — when Connect re-delivers the same batch (after the re-balance settles + // and the affected partitions are reassigned) processing resumes from the last + // committed offsets with no data loss. + LOG.warn( + "Transactional offset commit failed due to consumer group re-balance; " + + "aborted transaction and signalling Connect to retry", + e); + throw new RetriableException( + "Transactional offset commit failed due to consumer group re-balance", e); } throw e; } } } + private void safeAbortTransaction(boolean transactionStarted) { + if (!transactionStarted) { + return; + } + try { + producer.abortTransaction(); + } catch (Exception ex) { + LOG.warn("Error aborting producer transaction", ex); + } + } + + /** + * Returns true when the throwable (or any cause in its chain) indicates a transient + * consumer-group re-balance or producer-epoch bump that can be safely retried after aborting the + * transaction. {@link ProducerFencedException} is explicitly excluded — it requires producer + * recreation and must not be retried. + */ + private static boolean isRecoverableRebalanceError(Throwable throwable) { + Throwable cause = throwable; + while (cause != null) { + if (cause instanceof ProducerFencedException) { + return false; + } + if (cause instanceof CommitFailedException + || cause instanceof InvalidProducerEpochException) { + return true; + } + cause = cause.getCause(); + } + return false; + } + protected abstract boolean receive(Envelope envelope); protected void consumeAvailable(Duration pollDuration) { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 7b2d4a25363d..4d574cb81fe1 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; @@ -181,7 +182,11 @@ public void close(Collection closedPartitions) { stopCoordinator(); } - // Reset offsets to last committed to avoid data loss. + // Reset offsets to last committed to avoid data loss. This is required for incremental + // cooperative rebalance (ICR), where Connect can invoke close() on a partition without a + // following open() — meaning rewind() in onPartitionsAssigned never runs for it. Seeking the + // main consumer here guarantees that any records we read past the broker-committed offset + // (and never managed to commit transactionally) are re-fetched on the next poll. LOG.info("Seeking to last committed offsets for worker {}.", taskId); KafkaUtils.seekToLastCommittedOffsets(context); } @@ -201,7 +206,16 @@ private void processControlEvents() { String.format("Coordinator unexpectedly terminated on committer %s", taskId)); } if (worker != null) { - worker.process(); + try { + worker.process(); + } catch (RetriableException retriableException) { + LOG.info( + "Committer {} got retriable exception while processing control events. This can happen during re-balance.", + taskId, + retriableException.getCause()); + stopWorker(); + throw retriableException; + } } } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index 48a01881935b..a6c9782a4241 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -49,6 +49,8 @@ public SinkWriter(Catalog catalog, IcebergSinkConfig config) { public void close() { writers.values().forEach(RecordWriter::close); + writers.clear(); + sourceOffsets.clear(); } public SinkWriterResult completeWrite() { diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestChannelRetry.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestChannelRetry.java new file mode 100644 index 000000000000..f13ae6823853 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestChannelRetry.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.data.Offset; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +/** + * Unit tests for the re-balance-aware error translation in {@link Channel#send(java.util.List, + * java.util.Map)}. These tests use mocked {@link Producer} instances to exercise re-balance-induced + * commit failures that {@link org.apache.kafka.clients.producer.MockProducer} cannot easily + * reproduce. + * + *

Behavior contract: when {@code commitTransaction} or {@code sendOffsetsToTransaction} fails + * because the consumer group re-balanced ({@link CommitFailedException}) or the producer epoch was + * bumped ({@link InvalidProducerEpochException}), {@code send()} aborts the transaction and throws + * {@link RetriableException} so the Connect framework pauses the consumer and re-delivers the same + * batch — after which the re-balance settles, partitions are reassigned, and processing resumes + * from the last committed offsets. {@link ProducerFencedException} and other failures remain fatal. + */ +public class TestChannelRetry { + + private static final String CONTROL_TOPIC = "ctl-topic"; + private static final String CONNECT_GROUP = "cg-connect"; + private static final String CONSUMER_GROUP = "worker-cg"; + + private Producer producer; + private Consumer consumer; + private Admin admin; + private KafkaClientFactory clientFactory; + private IcebergSinkConfig config; + private SinkTaskContext context; + private MockedStatic mockedKafkaUtils; + + @BeforeEach + @SuppressWarnings("unchecked") + void before() { + producer = mock(Producer.class); + consumer = mock(Consumer.class); + admin = mock(Admin.class); + + clientFactory = mock(KafkaClientFactory.class); + when(clientFactory.createProducer(any())).thenReturn(producer); + when(clientFactory.createConsumer(any())).thenReturn(consumer); + when(clientFactory.createAdmin()).thenReturn(admin); + + config = mock(IcebergSinkConfig.class); + when(config.controlTopic()).thenReturn(CONTROL_TOPIC); + when(config.connectGroupId()).thenReturn(CONNECT_GROUP); + when(config.transactionalPrefix()).thenReturn(""); + when(config.transactionalSuffix()).thenReturn(""); + + context = mock(SinkTaskContext.class); + + ConsumerGroupMetadata groupMetadata = mock(ConsumerGroupMetadata.class); + mockedKafkaUtils = mockStatic(KafkaUtils.class); + mockedKafkaUtils + .when(() -> KafkaUtils.consumerGroupMetadata(any(SinkTaskContext.class))) + .thenReturn(groupMetadata); + } + + @AfterEach + void after() { + if (mockedKafkaUtils != null) { + mockedKafkaUtils.close(); + } + } + + // ------------------------------------------------------------ + // Happy path + // ------------------------------------------------------------ + + @Test + public void sendSucceedsOnFirstAttempt() { + StubChannel channel = newChannel(); + + channel.sendForTest(startCommitEvent(), offsetsForSrc(42L)); + + verify(producer, times(1)).beginTransaction(); + verify(producer, times(1)).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class)); + verify(producer, times(1)).commitTransaction(); + verify(producer, never()).abortTransaction(); + } + + @Test + public void sendWithEmptySourceOffsetsSkipsSendOffsetsToTransaction() { + StubChannel channel = newChannel(); + + channel.sendForTest(startCommitEvent(), ImmutableMap.of()); + + verify(producer, times(1)).beginTransaction(); + verify(producer, never()).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class)); + verify(producer, times(1)).commitTransaction(); + } + + // ------------------------------------------------------------ + // Recoverable re-balance failures — translated to RetriableException + // ------------------------------------------------------------ + + @Test + public void commitFailedExceptionIsTranslatedToRetriableException() { + CommitFailedException rebalance = new CommitFailedException("generation id mismatch"); + doThrow(rebalance).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(7L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(rebalance); + + verify(producer, times(1)).beginTransaction(); + verify(producer, times(1)).commitTransaction(); + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void invalidProducerEpochExceptionIsTranslatedToRetriableException() { + InvalidProducerEpochException epochBump = new InvalidProducerEpochException("stale epoch"); + doThrow(epochBump).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(11L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(epochBump); + + verify(producer, times(1)).beginTransaction(); + verify(producer, times(1)).commitTransaction(); + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void sendOffsetsToTransactionFailureIsAlsoTranslated() { + CommitFailedException rebalance = new CommitFailedException("metadata mismatch"); + doThrow(rebalance) + .when(producer) + .sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class)); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(5L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(rebalance); + + verify(producer, times(1)).beginTransaction(); + verify(producer, times(1)).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class)); + verify(producer, never()).commitTransaction(); + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void wrappedCommitFailedExceptionIsAlsoTranslated() { + CommitFailedException inner = new CommitFailedException("rebalanced"); + KafkaException wrapper = new KafkaException("wrapped", inner); + doThrow(wrapper).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(3L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(wrapper); + + verify(producer, times(1)).abortTransaction(); + } + + // ------------------------------------------------------------ + // Fatal exceptions — surfaced as-is, NOT retriable + // ------------------------------------------------------------ + + @Test + public void producerFencedExceptionIsFatalAndNotRetriable() { + ProducerFencedException fenced = new ProducerFencedException("fenced by newer producer"); + doThrow(fenced).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isSameAs(fenced) + .hasMessageContaining("fenced"); + + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void producerFencedWrappingARebalanceCauseIsStillFatal() { + ProducerFencedException fenced = new ProducerFencedException("fenced"); + fenced.initCause(new CommitFailedException("rebalanced")); + doThrow(fenced).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isSameAs(fenced) + .hasMessageContaining("fenced"); + } + + @Test + public void nonRebalanceKafkaExceptionIsRethrownAsIs() { + KafkaException broken = new KafkaException("broker connection broken"); + doThrow(broken).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isSameAs(broken) + .hasMessageContaining("broker connection broken"); + + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void beginTransactionFailureIsRethrownAndDoesNotAbort() { + IllegalStateException beginFailure = new IllegalStateException("producer not initialized"); + doThrow(beginFailure).when(producer).beginTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isSameAs(beginFailure) + .hasMessageContaining("not initialized"); + + verify(producer, times(1)).beginTransaction(); + verify(producer, never()).commitTransaction(); + verify(producer, never()).abortTransaction(); + } + + // ------------------------------------------------------------ + // Abort robustness + // ------------------------------------------------------------ + + @Test + public void abortFailureIsSwallowedAndDoesNotMaskRetriableException() { + doThrow(new CommitFailedException("rebalanced")).when(producer).commitTransaction(); + doThrow(new IllegalStateException("abort failed")).when(producer).abortTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCauseInstanceOf(CommitFailedException.class); + + verify(producer, times(1)).abortTransaction(); + } + + // ------------------------------------------------------------ + // Helpers + // ------------------------------------------------------------ + + private StubChannel newChannel() { + return new StubChannel("worker", CONSUMER_GROUP, config, clientFactory, context); + } + + private Event startCommitEvent() { + return new Event(CONNECT_GROUP, new StartCommit(java.util.UUID.randomUUID())); + } + + private Map offsetsForSrc(long offset) { + return ImmutableMap.of(new TopicPartition("src-topic", 0), new Offset(offset, null)); + } + + /** Concrete subclass exposing the protected {@code send()} method for tests. */ + private static class StubChannel extends Channel { + + StubChannel( + String name, + String consumerGroupId, + IcebergSinkConfig config, + KafkaClientFactory clientFactory, + SinkTaskContext context) { + super(name, consumerGroupId, config, clientFactory, context); + } + + @Override + protected boolean receive(Envelope envelope) { + return false; + } + + void sendForTest(Event event, Map sourceOffsets) { + send(ImmutableList.of(event), sourceOffsets); + } + } + + @Test + public void startCommitPayloadTypeIsStable() { + assertThat(startCommitEvent().payload().type()).isEqualTo(PayloadType.START_COMMIT); + } +} From fec8aaa096fc0b56d35c08ce8a95690698d470bc Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 16 May 2026 20:18:50 +0530 Subject: [PATCH 6/7] added list of exception on which to retry configurable --- .../iceberg/connect/IcebergSinkConfig.java | 83 +++++++++++++++++++ .../iceberg/connect/channel/Channel.java | 46 +++++----- .../connect/channel/TestChannelRetry.java | 75 ++++++++++++++--- 3 files changed, 170 insertions(+), 34 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..2de4b0b95499 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -105,6 +106,22 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String COORDINATOR_EXECUTOR_KEEP_ALIVE_TIMEOUT_MS = "iceberg.coordinator-executor-keep-alive-timeout-ms"; + private static final String TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP = + "iceberg.kafka.transactional-commit-retriable-exceptions"; + + /** + * Default set of producer-side exceptions that are translated to {@link + * org.apache.kafka.connect.errors.RetriableException} when raised from {@code Channel.send()}'s + * transactional commit. {@code ProducerFencedException} is included because the connector closes + * and re-creates the worker (and its producer) on a retriable commit failure, which re-runs + * {@code initTransactions()} and obtains a fresh producer epoch. + */ + private static final List DEFAULT_TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS = + ImmutableList.of( + "org.apache.kafka.clients.consumer.CommitFailedException", + "org.apache.kafka.common.errors.InvalidProducerEpochException", + "org.apache.kafka.common.errors.ProducerFencedException"); + @VisibleForTesting static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))"; public static final ConfigDef CONFIG_DEF = newConfigDef(); @@ -235,6 +252,18 @@ private static ConfigDef newConfigDef() { 120000L, Importance.LOW, "config to control coordinator executor keep alive time"); + configDef.define( + TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP, + ConfigDef.Type.LIST, + DEFAULT_TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS, + Importance.LOW, + "Comma-separated list of fully qualified Throwable class names raised by the producer's " + + "transactional commit (commitTransaction / sendOffsetsToTransaction) that should be " + + "translated to RetriableException so Connect re-delivers the batch after the " + + "consumer-group re-balance settles. Defaults cover CommitFailedException, " + + "InvalidProducerEpochException, and ProducerFencedException — the latter is " + + "recoverable here because the worker is closed and re-created on retry, " + + "re-running initTransactions() to obtain a fresh producer epoch."); return configDef; } @@ -246,6 +275,7 @@ private static ConfigDef newConfigDef() { private final Map writeProps; private final Map tableConfigMap = Maps.newHashMap(); private final JsonConverter jsonConverter; + private volatile List> transactionalCommitRetriableExceptions; public IcebergSinkConfig(Map originalProps) { super(CONFIG_DEF, originalProps); @@ -428,6 +458,59 @@ public String transactionalPrefix() { return ""; } + /** + * Returns the resolved {@link Throwable} classes whose occurrence (anywhere in the cause chain) + * during {@code Channel.send()}'s transactional commit should be translated to {@link + * org.apache.kafka.connect.errors.RetriableException}. Names that fail to resolve to a {@code + * Throwable} subclass are logged at WARN and skipped — never thrown — so a misconfiguration + * cannot stop the connector from starting. + */ + public List> transactionalCommitRetriableExceptionClasses() { + List> resolved = transactionalCommitRetriableExceptions; + if (resolved != null) { + return resolved; + } + synchronized (this) { + if (transactionalCommitRetriableExceptions == null) { + transactionalCommitRetriableExceptions = + resolveExceptionClasses(getList(TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP)); + } + return transactionalCommitRetriableExceptions; + } + } + + private static List> resolveExceptionClasses(List names) { + if (names == null || names.isEmpty()) { + return ImmutableList.of(); + } + ImmutableList.Builder> builder = ImmutableList.builder(); + for (String name : names) { + String trimmed = name == null ? "" : name.trim(); + if (trimmed.isEmpty()) { + continue; + } + Class klass = DynClasses.builder().impl(trimmed).orNull().build(); + if (klass == null) { + LOG.warn( + "Ignoring entry '{}' in {}: class not found on classpath", + trimmed, + TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP); + continue; + } + if (Throwable.class.isAssignableFrom(klass)) { + @SuppressWarnings("unchecked") + Class throwableClass = (Class) klass; + builder.add(throwableClass); + } else { + LOG.warn( + "Ignoring entry '{}' in {}: class is not a Throwable subtype", + trimmed, + TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP); + } + } + return builder.build(); + } + public String hadoopConfDir() { return getString(HADOOP_CONF_DIR_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java index 73d5773e5bb6..89567a011d80 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java @@ -30,15 +30,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.InvalidProducerEpochException; -import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; @@ -56,6 +53,7 @@ abstract class Channel { private final Admin admin; private final Map controlTopicOffsets = Maps.newHashMap(); private final String producerId; + private final List> retriableCommitExceptions; Channel( String name, @@ -73,6 +71,7 @@ abstract class Channel { this.admin = clientFactory.createAdmin(); this.producerId = UUID.randomUUID().toString(); + this.retriableCommitExceptions = config.transactionalCommitRetriableExceptionClasses(); } protected void send(Event event) { @@ -110,15 +109,16 @@ protected void send(List events, Map sourceOffset producer.commitTransaction(); } catch (Exception e) { safeAbortTransaction(transactionStarted); - if (isRecoverableRebalanceError(e)) { - // A consumer-group re-balance happened between this transaction's preparation and the - // broker-side offset commit (CommitFailedException), or the producer epoch was bumped - // (InvalidProducerEpochException). The transaction was aborted, so source offsets did - // not advance — when Connect re-delivers the same batch (after the re-balance settles - // and the affected partitions are reassigned) processing resumes from the last - // committed offsets with no data loss. + if (isRecoverableCommitError(e)) { + // The transactional commit hit a configured-recoverable failure (default: a + // consumer-group re-balance — CommitFailedException — or a producer-epoch bump — + // InvalidProducerEpochException / ProducerFencedException). The transaction was + // aborted, so source offsets did not advance — when Connect re-delivers the same + // batch (after the worker is closed and re-created with a fresh producer epoch via + // initTransactions(), and the affected partitions are reassigned) processing resumes + // from the last committed offsets with no data loss. LOG.warn( - "Transactional offset commit failed due to consumer group re-balance; " + "Transactional offset commit failed with recoverable exception; " + "aborted transaction and signalling Connect to retry", e); throw new RetriableException( @@ -141,20 +141,22 @@ private void safeAbortTransaction(boolean transactionStarted) { } /** - * Returns true when the throwable (or any cause in its chain) indicates a transient - * consumer-group re-balance or producer-epoch bump that can be safely retried after aborting the - * transaction. {@link ProducerFencedException} is explicitly excluded — it requires producer - * recreation and must not be retried. + * Returns true when the throwable (or any cause in its chain) matches one of the configured + * {@code iceberg.kafka.transactional-commit-retriable-exceptions} classes — i.e. a transient + * commit-time failure that should be translated to {@link RetriableException} so Connect pauses + * the consumer, lets the worker close + recreate (which obtains a fresh producer epoch via {@code + * initTransactions()}), and re-delivers the same batch. */ - private static boolean isRecoverableRebalanceError(Throwable throwable) { + private boolean isRecoverableCommitError(Throwable throwable) { + if (retriableCommitExceptions.isEmpty()) { + return false; + } Throwable cause = throwable; while (cause != null) { - if (cause instanceof ProducerFencedException) { - return false; - } - if (cause instanceof CommitFailedException - || cause instanceof InvalidProducerEpochException) { - return true; + for (Class klass : retriableCommitExceptions) { + if (klass.isInstance(cause)) { + return true; + } } cause = cause.getCause(); } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestChannelRetry.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestChannelRetry.java index f13ae6823853..3450b63fb65c 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestChannelRetry.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestChannelRetry.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.List; import java.util.Map; import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.connect.data.Offset; @@ -61,11 +62,12 @@ * reproduce. * *

Behavior contract: when {@code commitTransaction} or {@code sendOffsetsToTransaction} fails - * because the consumer group re-balanced ({@link CommitFailedException}) or the producer epoch was - * bumped ({@link InvalidProducerEpochException}), {@code send()} aborts the transaction and throws - * {@link RetriableException} so the Connect framework pauses the consumer and re-delivers the same - * batch — after which the re-balance settles, partitions are reassigned, and processing resumes - * from the last committed offsets. {@link ProducerFencedException} and other failures remain fatal. + * with one of the configured retriable exception classes (default: {@link CommitFailedException}, + * {@link InvalidProducerEpochException}, {@link ProducerFencedException}), {@code send()} aborts + * the transaction and throws {@link RetriableException} so the Connect framework pauses the + * consumer, lets the worker close + recreate (re-running {@code initTransactions()} for a fresh + * producer epoch), and re-delivers the same batch. Exceptions outside the configured list remain + * fatal. */ public class TestChannelRetry { @@ -73,6 +75,12 @@ public class TestChannelRetry { private static final String CONNECT_GROUP = "cg-connect"; private static final String CONSUMER_GROUP = "worker-cg"; + private static final List> DEFAULT_RETRIABLE_EXCEPTIONS = + ImmutableList.of( + CommitFailedException.class, + InvalidProducerEpochException.class, + ProducerFencedException.class); + private Producer producer; private Consumer consumer; private Admin admin; @@ -98,6 +106,8 @@ void before() { when(config.connectGroupId()).thenReturn(CONNECT_GROUP); when(config.transactionalPrefix()).thenReturn(""); when(config.transactionalSuffix()).thenReturn(""); + when(config.transactionalCommitRetriableExceptionClasses()) + .thenReturn(DEFAULT_RETRIABLE_EXCEPTIONS); context = mock(SinkTaskContext.class); @@ -217,27 +227,34 @@ public void wrappedCommitFailedExceptionIsAlsoTranslated() { } // ------------------------------------------------------------ - // Fatal exceptions — surfaced as-is, NOT retriable + // ProducerFencedException is recoverable by default // ------------------------------------------------------------ @Test - public void producerFencedExceptionIsFatalAndNotRetriable() { + public void producerFencedExceptionIsRetriableByDefault() { + // Default config includes ProducerFencedException among retriable exceptions: when the + // worker is closed and re-created on retry, initTransactions() obtains a fresh epoch. ProducerFencedException fenced = new ProducerFencedException("fenced by newer producer"); doThrow(fenced).when(producer).commitTransaction(); StubChannel channel = newChannel(); assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) - .isSameAs(fenced) - .hasMessageContaining("fenced"); + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(fenced); verify(producer, times(1)).abortTransaction(); } @Test - public void producerFencedWrappingARebalanceCauseIsStillFatal() { - ProducerFencedException fenced = new ProducerFencedException("fenced"); - fenced.initCause(new CommitFailedException("rebalanced")); + public void producerFencedIsFatalWhenExcludedFromConfiguredList() { + // User-supplied list omits ProducerFencedException → exception surfaces as-is. + when(config.transactionalCommitRetriableExceptionClasses()) + .thenReturn( + ImmutableList.of(CommitFailedException.class, InvalidProducerEpochException.class)); + + ProducerFencedException fenced = new ProducerFencedException("fenced by newer producer"); doThrow(fenced).when(producer).commitTransaction(); StubChannel channel = newChannel(); @@ -245,6 +262,40 @@ public void producerFencedWrappingARebalanceCauseIsStillFatal() { assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) .isSameAs(fenced) .hasMessageContaining("fenced"); + + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void emptyConfiguredListMakesEverythingFatal() { + when(config.transactionalCommitRetriableExceptionClasses()).thenReturn(ImmutableList.of()); + + CommitFailedException rebalance = new CommitFailedException("rebalanced"); + doThrow(rebalance).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isSameAs(rebalance) + .hasMessageContaining("rebalanced"); + } + + @Test + public void customSubclassListMatchesViaIsInstance() { + // Configuring KafkaException as retriable should match all of its subclasses + // (e.g. CommitFailedException) via Class.isInstance. + when(config.transactionalCommitRetriableExceptionClasses()) + .thenReturn(ImmutableList.of(KafkaException.class)); + + CommitFailedException rebalance = new CommitFailedException("rebalanced"); + doThrow(rebalance).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(rebalance); } @Test From f7e62c8e87d26f236c46df1eaac89f5bade8e3be Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 16 May 2026 20:22:44 +0530 Subject: [PATCH 7/7] added list of exception on which to retry configurable --- .../main/java/org/apache/iceberg/connect/channel/Channel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java index 89567a011d80..2d87a281d1c8 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java @@ -102,7 +102,7 @@ protected void send(List events, Map sourceOffset // NOTE: we shouldn't call get() on the future in a transactional context, // see docs for org.apache.kafka.clients.producer.KafkaProducer recordList.forEach(producer::send); - if (!offsetsToCommit.isEmpty()) { + if (!sourceOffsets.isEmpty()) { producer.sendOffsetsToTransaction( offsetsToCommit, KafkaUtils.consumerGroupMetadata(context)); }