Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c0a2665
added metadat and data path in case of dynamic routing
Aug 9, 2025
67619ec
spotless
Aug 9, 2025
6b15ae4
Revert "spotless"
Aug 9, 2025
8398e4c
Revert "added metadat and data path in case of dynamic routing"
Aug 9, 2025
fbf52a9
Merge branch 'apache:main' into main
kumarpritam863 Sep 12, 2025
c92ec66
Merge branch 'apache:main' into main
kumarpritam863 Sep 16, 2025
9392a6d
Merge branch 'apache:main' into main
kumarpritam863 Oct 29, 2025
ecd8b55
Merge branch 'apache:main' into main
kumarpritam863 Nov 12, 2025
5e76e04
Merge branch 'apache:main' into main
kumarpritam863 Nov 20, 2025
a1ec7e6
Merge branch 'apache:main' into main
kumarpritam863 Nov 24, 2025
4eaf70b
Merge branch 'apache:main' into main
kumarpritam863 Nov 27, 2025
1508513
Merge branch 'apache:main' into main
kumarpritam863 Dec 9, 2025
e5908c8
Merge branch 'apache:main' into main
kumarpritam863 Jan 12, 2026
cbefe9a
Merge branch 'apache:main' into main
kumarpritam863 Feb 1, 2026
57d4667
Merge branch 'apache:main' into main
kumarpritam863 Feb 5, 2026
ee658ea
Merge branch 'apache:main' into main
kumarpritam863 Feb 18, 2026
7c5976d
Merge branch 'apache:main' into main
kumarpritam863 Feb 21, 2026
8a9654f
Merge branch 'apache:main' into main
kumarpritam863 Mar 14, 2026
888e659
Merge branch 'apache:main' into main
kumarpritam863 Apr 3, 2026
1daa4dd
Merge branch 'apache:main' into main
kumarpritam863 Apr 12, 2026
8b7ec63
Merge branch 'apache:main' into main
kumarpritam863 Apr 13, 2026
92c7e89
Merge branch 'apache:main' into main
kumarpritam863 Apr 14, 2026
fe9e7f0
Merge branch 'apache:main' into main
kumarpritam863 Apr 18, 2026
aaa8f89
Merge branch 'apache:main' into main
kumarpritam863 Apr 29, 2026
9d64ec9
Merge branch 'apache:main' into main
kumarpritam863 May 16, 2026
292909c
added provision to tolearte transient commit failed exception during …
May 16, 2026
fec8aaa
added list of exception on which to retry configurable
May 16, 2026
f7e62c8
added list of exception on which to retry configurable
May 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
Expand Down Expand Up @@ -105,6 +106,22 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String COORDINATOR_EXECUTOR_KEEP_ALIVE_TIMEOUT_MS =
"iceberg.coordinator-executor-keep-alive-timeout-ms";

private static final String TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP =
"iceberg.kafka.transactional-commit-retriable-exceptions";

/**
* Default set of producer-side exceptions that are translated to {@link
* org.apache.kafka.connect.errors.RetriableException} when raised from {@code Channel.send()}'s
* transactional commit. {@code ProducerFencedException} is included because the connector closes
* and re-creates the worker (and its producer) on a retriable commit failure, which re-runs
* {@code initTransactions()} and obtains a fresh producer epoch.
*/
private static final List<String> DEFAULT_TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS =
ImmutableList.of(
"org.apache.kafka.clients.consumer.CommitFailedException",
"org.apache.kafka.common.errors.InvalidProducerEpochException",
"org.apache.kafka.common.errors.ProducerFencedException");

@VisibleForTesting static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))";

public static final ConfigDef CONFIG_DEF = newConfigDef();
Expand Down Expand Up @@ -235,6 +252,18 @@ private static ConfigDef newConfigDef() {
120000L,
Importance.LOW,
"config to control coordinator executor keep alive time");
configDef.define(
TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP,
ConfigDef.Type.LIST,
DEFAULT_TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS,
Importance.LOW,
"Comma-separated list of fully qualified Throwable class names raised by the producer's "
+ "transactional commit (commitTransaction / sendOffsetsToTransaction) that should be "
+ "translated to RetriableException so Connect re-delivers the batch after the "
+ "consumer-group re-balance settles. Defaults cover CommitFailedException, "
+ "InvalidProducerEpochException, and ProducerFencedException — the latter is "
+ "recoverable here because the worker is closed and re-created on retry, "
+ "re-running initTransactions() to obtain a fresh producer epoch.");
return configDef;
}

Expand All @@ -246,6 +275,7 @@ private static ConfigDef newConfigDef() {
private final Map<String, String> writeProps;
private final Map<String, TableSinkConfig> tableConfigMap = Maps.newHashMap();
private final JsonConverter jsonConverter;
private volatile List<Class<? extends Throwable>> transactionalCommitRetriableExceptions;

public IcebergSinkConfig(Map<String, String> originalProps) {
super(CONFIG_DEF, originalProps);
Expand Down Expand Up @@ -428,6 +458,59 @@ public String transactionalPrefix() {
return "";
}

/**
* Returns the resolved {@link Throwable} classes whose occurrence (anywhere in the cause chain)
* during {@code Channel.send()}'s transactional commit should be translated to {@link
* org.apache.kafka.connect.errors.RetriableException}. Names that fail to resolve to a {@code
* Throwable} subclass are logged at WARN and skipped — never thrown — so a misconfiguration
* cannot stop the connector from starting.
*/
public List<Class<? extends Throwable>> transactionalCommitRetriableExceptionClasses() {
List<Class<? extends Throwable>> resolved = transactionalCommitRetriableExceptions;
if (resolved != null) {
return resolved;
}
synchronized (this) {
if (transactionalCommitRetriableExceptions == null) {
transactionalCommitRetriableExceptions =
resolveExceptionClasses(getList(TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP));
}
return transactionalCommitRetriableExceptions;
}
}

private static List<Class<? extends Throwable>> resolveExceptionClasses(List<String> names) {
if (names == null || names.isEmpty()) {
return ImmutableList.of();
}
ImmutableList.Builder<Class<? extends Throwable>> builder = ImmutableList.builder();
for (String name : names) {
String trimmed = name == null ? "" : name.trim();
if (trimmed.isEmpty()) {
continue;
}
Class<?> klass = DynClasses.builder().impl(trimmed).orNull().build();
if (klass == null) {
LOG.warn(
"Ignoring entry '{}' in {}: class not found on classpath",
trimmed,
TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP);
continue;
}
if (Throwable.class.isAssignableFrom(klass)) {
@SuppressWarnings("unchecked")
Class<? extends Throwable> throwableClass = (Class<? extends Throwable>) klass;
builder.add(throwableClass);
} else {
LOG.warn(
"Ignoring entry '{}' in {}: class is not a Throwable subtype",
trimmed,
TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP);
}
}
return builder.build();
}

public String hadoopConfDir() {
return getString(HADOOP_CONF_DIR_PROP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.data.Offset;
import org.apache.iceberg.connect.events.AvroUtil;
Expand All @@ -37,6 +36,7 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,6 +53,7 @@ abstract class Channel {
private final Admin admin;
private final Map<Integer, Long> controlTopicOffsets = Maps.newHashMap();
private final String producerId;
private final List<Class<? extends Throwable>> retriableCommitExceptions;

Channel(
String name,
Expand All @@ -70,6 +71,7 @@ abstract class Channel {
this.admin = clientFactory.createAdmin();

this.producerId = UUID.randomUUID().toString();
this.retriableCommitExceptions = config.transactionalCommitRetriableExceptionClasses();
}

protected void send(Event event) {
Expand All @@ -90,11 +92,13 @@ protected void send(List<Event> events, Map<TopicPartition, Offset> sourceOffset
// key by producer ID to keep event order
return new ProducerRecord<>(controlTopic, producerId, data);
})
.collect(Collectors.toList());
.toList();

synchronized (producer) {
producer.beginTransaction();
boolean transactionStarted = false;
try {
producer.beginTransaction();
transactionStarted = true;
// NOTE: we shouldn't call get() on the future in a transactional context,
// see docs for org.apache.kafka.clients.producer.KafkaProducer
recordList.forEach(producer::send);
Expand All @@ -104,16 +108,61 @@ protected void send(List<Event> events, Map<TopicPartition, Offset> sourceOffset
}
producer.commitTransaction();
} catch (Exception e) {
try {
producer.abortTransaction();
} catch (Exception ex) {
LOG.warn("Error aborting producer transaction", ex);
safeAbortTransaction(transactionStarted);
if (isRecoverableCommitError(e)) {
// The transactional commit hit a configured-recoverable failure (default: a
// consumer-group re-balance — CommitFailedException — or a producer-epoch bump —
// InvalidProducerEpochException / ProducerFencedException). The transaction was
// aborted, so source offsets did not advance — when Connect re-delivers the same
// batch (after the worker is closed and re-created with a fresh producer epoch via
// initTransactions(), and the affected partitions are reassigned) processing resumes
// from the last committed offsets with no data loss.
LOG.warn(
"Transactional offset commit failed with recoverable exception; "
+ "aborted transaction and signalling Connect to retry",
e);
throw new RetriableException(
"Transactional offset commit failed due to consumer group re-balance", e);
}
throw e;
}
}
}

private void safeAbortTransaction(boolean transactionStarted) {
if (!transactionStarted) {
return;
}
try {
producer.abortTransaction();
} catch (Exception ex) {
LOG.warn("Error aborting producer transaction", ex);
}
}

/**
* Returns true when the throwable (or any cause in its chain) matches one of the configured
* {@code iceberg.kafka.transactional-commit-retriable-exceptions} classes — i.e. a transient
* commit-time failure that should be translated to {@link RetriableException} so Connect pauses
* the consumer, lets the worker close + recreate (which obtains a fresh producer epoch via {@code
* initTransactions()}), and re-delivers the same batch.
*/
private boolean isRecoverableCommitError(Throwable throwable) {
if (retriableCommitExceptions.isEmpty()) {
return false;
}
Throwable cause = throwable;
while (cause != null) {
for (Class<? extends Throwable> klass : retriableCommitExceptions) {
if (klass.isInstance(cause)) {
return true;
}
}
cause = cause.getCause();
}
return false;
}

protected abstract boolean receive(Envelope envelope);

protected void consumeAvailable(Duration pollDuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -181,7 +182,11 @@ public void close(Collection<TopicPartition> closedPartitions) {
stopCoordinator();
}

// Reset offsets to last committed to avoid data loss.
// Reset offsets to last committed to avoid data loss. This is required for incremental
// cooperative rebalance (ICR), where Connect can invoke close() on a partition without a
// following open() — meaning rewind() in onPartitionsAssigned never runs for it. Seeking the
// main consumer here guarantees that any records we read past the broker-committed offset
// (and never managed to commit transactionally) are re-fetched on the next poll.
LOG.info("Seeking to last committed offsets for worker {}.", taskId);
KafkaUtils.seekToLastCommittedOffsets(context);
}
Expand All @@ -201,7 +206,16 @@ private void processControlEvents() {
String.format("Coordinator unexpectedly terminated on committer %s", taskId));
}
if (worker != null) {
worker.process();
try {
worker.process();
} catch (RetriableException retriableException) {
LOG.info(
"Committer {} got retriable exception while processing control events. This can happen during re-balance.",
taskId,
retriableException.getCause());
stopWorker();
throw retriableException;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public SinkWriter(Catalog catalog, IcebergSinkConfig config) {

public void close() {
writers.values().forEach(RecordWriter::close);
writers.clear();
sourceOffsets.clear();
}

public SinkWriterResult completeWrite() {
Expand Down
Loading