diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..2de4b0b95499 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -105,6 +106,22 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String COORDINATOR_EXECUTOR_KEEP_ALIVE_TIMEOUT_MS = "iceberg.coordinator-executor-keep-alive-timeout-ms"; + private static final String TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP = + "iceberg.kafka.transactional-commit-retriable-exceptions"; + + /** + * Default set of producer-side exceptions that are translated to {@link + * org.apache.kafka.connect.errors.RetriableException} when raised from {@code Channel.send()}'s + * transactional commit. {@code ProducerFencedException} is included because the connector closes + * and re-creates the worker (and its producer) on a retriable commit failure, which re-runs + * {@code initTransactions()} and obtains a fresh producer epoch. + */ + private static final List DEFAULT_TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS = + ImmutableList.of( + "org.apache.kafka.clients.consumer.CommitFailedException", + "org.apache.kafka.common.errors.InvalidProducerEpochException", + "org.apache.kafka.common.errors.ProducerFencedException"); + @VisibleForTesting static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))"; public static final ConfigDef CONFIG_DEF = newConfigDef(); @@ -235,6 +252,18 @@ private static ConfigDef newConfigDef() { 120000L, Importance.LOW, "config to control coordinator executor keep alive time"); + configDef.define( + TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP, + ConfigDef.Type.LIST, + DEFAULT_TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS, + Importance.LOW, + "Comma-separated list of fully qualified Throwable class names raised by the producer's " + + "transactional commit (commitTransaction / sendOffsetsToTransaction) that should be " + + "translated to RetriableException so Connect re-delivers the batch after the " + + "consumer-group re-balance settles. Defaults cover CommitFailedException, " + + "InvalidProducerEpochException, and ProducerFencedException — the latter is " + + "recoverable here because the worker is closed and re-created on retry, " + + "re-running initTransactions() to obtain a fresh producer epoch."); return configDef; } @@ -246,6 +275,7 @@ private static ConfigDef newConfigDef() { private final Map writeProps; private final Map tableConfigMap = Maps.newHashMap(); private final JsonConverter jsonConverter; + private volatile List> transactionalCommitRetriableExceptions; public IcebergSinkConfig(Map originalProps) { super(CONFIG_DEF, originalProps); @@ -428,6 +458,59 @@ public String transactionalPrefix() { return ""; } + /** + * Returns the resolved {@link Throwable} classes whose occurrence (anywhere in the cause chain) + * during {@code Channel.send()}'s transactional commit should be translated to {@link + * org.apache.kafka.connect.errors.RetriableException}. Names that fail to resolve to a {@code + * Throwable} subclass are logged at WARN and skipped — never thrown — so a misconfiguration + * cannot stop the connector from starting. + */ + public List> transactionalCommitRetriableExceptionClasses() { + List> resolved = transactionalCommitRetriableExceptions; + if (resolved != null) { + return resolved; + } + synchronized (this) { + if (transactionalCommitRetriableExceptions == null) { + transactionalCommitRetriableExceptions = + resolveExceptionClasses(getList(TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP)); + } + return transactionalCommitRetriableExceptions; + } + } + + private static List> resolveExceptionClasses(List names) { + if (names == null || names.isEmpty()) { + return ImmutableList.of(); + } + ImmutableList.Builder> builder = ImmutableList.builder(); + for (String name : names) { + String trimmed = name == null ? "" : name.trim(); + if (trimmed.isEmpty()) { + continue; + } + Class klass = DynClasses.builder().impl(trimmed).orNull().build(); + if (klass == null) { + LOG.warn( + "Ignoring entry '{}' in {}: class not found on classpath", + trimmed, + TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP); + continue; + } + if (Throwable.class.isAssignableFrom(klass)) { + @SuppressWarnings("unchecked") + Class throwableClass = (Class) klass; + builder.add(throwableClass); + } else { + LOG.warn( + "Ignoring entry '{}' in {}: class is not a Throwable subtype", + trimmed, + TRANSACTIONAL_COMMIT_RETRIABLE_EXCEPTIONS_PROP); + } + } + return builder.build(); + } + public String hadoopConfDir() { return getString(HADOOP_CONF_DIR_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java index 01cf165de66b..2d87a281d1c8 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.stream.Collectors; import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.connect.data.Offset; import org.apache.iceberg.connect.events.AvroUtil; @@ -37,6 +36,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +53,7 @@ abstract class Channel { private final Admin admin; private final Map controlTopicOffsets = Maps.newHashMap(); private final String producerId; + private final List> retriableCommitExceptions; Channel( String name, @@ -70,6 +71,7 @@ abstract class Channel { this.admin = clientFactory.createAdmin(); this.producerId = UUID.randomUUID().toString(); + this.retriableCommitExceptions = config.transactionalCommitRetriableExceptionClasses(); } protected void send(Event event) { @@ -90,11 +92,13 @@ protected void send(List events, Map sourceOffset // key by producer ID to keep event order return new ProducerRecord<>(controlTopic, producerId, data); }) - .collect(Collectors.toList()); + .toList(); synchronized (producer) { - producer.beginTransaction(); + boolean transactionStarted = false; try { + producer.beginTransaction(); + transactionStarted = true; // NOTE: we shouldn't call get() on the future in a transactional context, // see docs for org.apache.kafka.clients.producer.KafkaProducer recordList.forEach(producer::send); @@ -104,16 +108,61 @@ protected void send(List events, Map sourceOffset } producer.commitTransaction(); } catch (Exception e) { - try { - producer.abortTransaction(); - } catch (Exception ex) { - LOG.warn("Error aborting producer transaction", ex); + safeAbortTransaction(transactionStarted); + if (isRecoverableCommitError(e)) { + // The transactional commit hit a configured-recoverable failure (default: a + // consumer-group re-balance — CommitFailedException — or a producer-epoch bump — + // InvalidProducerEpochException / ProducerFencedException). The transaction was + // aborted, so source offsets did not advance — when Connect re-delivers the same + // batch (after the worker is closed and re-created with a fresh producer epoch via + // initTransactions(), and the affected partitions are reassigned) processing resumes + // from the last committed offsets with no data loss. + LOG.warn( + "Transactional offset commit failed with recoverable exception; " + + "aborted transaction and signalling Connect to retry", + e); + throw new RetriableException( + "Transactional offset commit failed due to consumer group re-balance", e); } throw e; } } } + private void safeAbortTransaction(boolean transactionStarted) { + if (!transactionStarted) { + return; + } + try { + producer.abortTransaction(); + } catch (Exception ex) { + LOG.warn("Error aborting producer transaction", ex); + } + } + + /** + * Returns true when the throwable (or any cause in its chain) matches one of the configured + * {@code iceberg.kafka.transactional-commit-retriable-exceptions} classes — i.e. a transient + * commit-time failure that should be translated to {@link RetriableException} so Connect pauses + * the consumer, lets the worker close + recreate (which obtains a fresh producer epoch via {@code + * initTransactions()}), and re-delivers the same batch. + */ + private boolean isRecoverableCommitError(Throwable throwable) { + if (retriableCommitExceptions.isEmpty()) { + return false; + } + Throwable cause = throwable; + while (cause != null) { + for (Class klass : retriableCommitExceptions) { + if (klass.isInstance(cause)) { + return true; + } + } + cause = cause.getCause(); + } + return false; + } + protected abstract boolean receive(Envelope envelope); protected void consumeAvailable(Duration pollDuration) { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 7b2d4a25363d..4d574cb81fe1 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; @@ -181,7 +182,11 @@ public void close(Collection closedPartitions) { stopCoordinator(); } - // Reset offsets to last committed to avoid data loss. + // Reset offsets to last committed to avoid data loss. This is required for incremental + // cooperative rebalance (ICR), where Connect can invoke close() on a partition without a + // following open() — meaning rewind() in onPartitionsAssigned never runs for it. Seeking the + // main consumer here guarantees that any records we read past the broker-committed offset + // (and never managed to commit transactionally) are re-fetched on the next poll. LOG.info("Seeking to last committed offsets for worker {}.", taskId); KafkaUtils.seekToLastCommittedOffsets(context); } @@ -201,7 +206,16 @@ private void processControlEvents() { String.format("Coordinator unexpectedly terminated on committer %s", taskId)); } if (worker != null) { - worker.process(); + try { + worker.process(); + } catch (RetriableException retriableException) { + LOG.info( + "Committer {} got retriable exception while processing control events. This can happen during re-balance.", + taskId, + retriableException.getCause()); + stopWorker(); + throw retriableException; + } } } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index 48a01881935b..a6c9782a4241 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -49,6 +49,8 @@ public SinkWriter(Catalog catalog, IcebergSinkConfig config) { public void close() { writers.values().forEach(RecordWriter::close); + writers.clear(); + sourceOffsets.clear(); } public SinkWriterResult completeWrite() { diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestChannelRetry.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestChannelRetry.java new file mode 100644 index 000000000000..3450b63fb65c --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestChannelRetry.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.data.Offset; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +/** + * Unit tests for the re-balance-aware error translation in {@link Channel#send(java.util.List, + * java.util.Map)}. These tests use mocked {@link Producer} instances to exercise re-balance-induced + * commit failures that {@link org.apache.kafka.clients.producer.MockProducer} cannot easily + * reproduce. + * + *

Behavior contract: when {@code commitTransaction} or {@code sendOffsetsToTransaction} fails + * with one of the configured retriable exception classes (default: {@link CommitFailedException}, + * {@link InvalidProducerEpochException}, {@link ProducerFencedException}), {@code send()} aborts + * the transaction and throws {@link RetriableException} so the Connect framework pauses the + * consumer, lets the worker close + recreate (re-running {@code initTransactions()} for a fresh + * producer epoch), and re-delivers the same batch. Exceptions outside the configured list remain + * fatal. + */ +public class TestChannelRetry { + + private static final String CONTROL_TOPIC = "ctl-topic"; + private static final String CONNECT_GROUP = "cg-connect"; + private static final String CONSUMER_GROUP = "worker-cg"; + + private static final List> DEFAULT_RETRIABLE_EXCEPTIONS = + ImmutableList.of( + CommitFailedException.class, + InvalidProducerEpochException.class, + ProducerFencedException.class); + + private Producer producer; + private Consumer consumer; + private Admin admin; + private KafkaClientFactory clientFactory; + private IcebergSinkConfig config; + private SinkTaskContext context; + private MockedStatic mockedKafkaUtils; + + @BeforeEach + @SuppressWarnings("unchecked") + void before() { + producer = mock(Producer.class); + consumer = mock(Consumer.class); + admin = mock(Admin.class); + + clientFactory = mock(KafkaClientFactory.class); + when(clientFactory.createProducer(any())).thenReturn(producer); + when(clientFactory.createConsumer(any())).thenReturn(consumer); + when(clientFactory.createAdmin()).thenReturn(admin); + + config = mock(IcebergSinkConfig.class); + when(config.controlTopic()).thenReturn(CONTROL_TOPIC); + when(config.connectGroupId()).thenReturn(CONNECT_GROUP); + when(config.transactionalPrefix()).thenReturn(""); + when(config.transactionalSuffix()).thenReturn(""); + when(config.transactionalCommitRetriableExceptionClasses()) + .thenReturn(DEFAULT_RETRIABLE_EXCEPTIONS); + + context = mock(SinkTaskContext.class); + + ConsumerGroupMetadata groupMetadata = mock(ConsumerGroupMetadata.class); + mockedKafkaUtils = mockStatic(KafkaUtils.class); + mockedKafkaUtils + .when(() -> KafkaUtils.consumerGroupMetadata(any(SinkTaskContext.class))) + .thenReturn(groupMetadata); + } + + @AfterEach + void after() { + if (mockedKafkaUtils != null) { + mockedKafkaUtils.close(); + } + } + + // ------------------------------------------------------------ + // Happy path + // ------------------------------------------------------------ + + @Test + public void sendSucceedsOnFirstAttempt() { + StubChannel channel = newChannel(); + + channel.sendForTest(startCommitEvent(), offsetsForSrc(42L)); + + verify(producer, times(1)).beginTransaction(); + verify(producer, times(1)).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class)); + verify(producer, times(1)).commitTransaction(); + verify(producer, never()).abortTransaction(); + } + + @Test + public void sendWithEmptySourceOffsetsSkipsSendOffsetsToTransaction() { + StubChannel channel = newChannel(); + + channel.sendForTest(startCommitEvent(), ImmutableMap.of()); + + verify(producer, times(1)).beginTransaction(); + verify(producer, never()).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class)); + verify(producer, times(1)).commitTransaction(); + } + + // ------------------------------------------------------------ + // Recoverable re-balance failures — translated to RetriableException + // ------------------------------------------------------------ + + @Test + public void commitFailedExceptionIsTranslatedToRetriableException() { + CommitFailedException rebalance = new CommitFailedException("generation id mismatch"); + doThrow(rebalance).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(7L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(rebalance); + + verify(producer, times(1)).beginTransaction(); + verify(producer, times(1)).commitTransaction(); + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void invalidProducerEpochExceptionIsTranslatedToRetriableException() { + InvalidProducerEpochException epochBump = new InvalidProducerEpochException("stale epoch"); + doThrow(epochBump).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(11L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(epochBump); + + verify(producer, times(1)).beginTransaction(); + verify(producer, times(1)).commitTransaction(); + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void sendOffsetsToTransactionFailureIsAlsoTranslated() { + CommitFailedException rebalance = new CommitFailedException("metadata mismatch"); + doThrow(rebalance) + .when(producer) + .sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class)); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(5L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(rebalance); + + verify(producer, times(1)).beginTransaction(); + verify(producer, times(1)).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class)); + verify(producer, never()).commitTransaction(); + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void wrappedCommitFailedExceptionIsAlsoTranslated() { + CommitFailedException inner = new CommitFailedException("rebalanced"); + KafkaException wrapper = new KafkaException("wrapped", inner); + doThrow(wrapper).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(3L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(wrapper); + + verify(producer, times(1)).abortTransaction(); + } + + // ------------------------------------------------------------ + // ProducerFencedException is recoverable by default + // ------------------------------------------------------------ + + @Test + public void producerFencedExceptionIsRetriableByDefault() { + // Default config includes ProducerFencedException among retriable exceptions: when the + // worker is closed and re-created on retry, initTransactions() obtains a fresh epoch. + ProducerFencedException fenced = new ProducerFencedException("fenced by newer producer"); + doThrow(fenced).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(fenced); + + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void producerFencedIsFatalWhenExcludedFromConfiguredList() { + // User-supplied list omits ProducerFencedException → exception surfaces as-is. + when(config.transactionalCommitRetriableExceptionClasses()) + .thenReturn( + ImmutableList.of(CommitFailedException.class, InvalidProducerEpochException.class)); + + ProducerFencedException fenced = new ProducerFencedException("fenced by newer producer"); + doThrow(fenced).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isSameAs(fenced) + .hasMessageContaining("fenced"); + + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void emptyConfiguredListMakesEverythingFatal() { + when(config.transactionalCommitRetriableExceptionClasses()).thenReturn(ImmutableList.of()); + + CommitFailedException rebalance = new CommitFailedException("rebalanced"); + doThrow(rebalance).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isSameAs(rebalance) + .hasMessageContaining("rebalanced"); + } + + @Test + public void customSubclassListMatchesViaIsInstance() { + // Configuring KafkaException as retriable should match all of its subclasses + // (e.g. CommitFailedException) via Class.isInstance. + when(config.transactionalCommitRetriableExceptionClasses()) + .thenReturn(ImmutableList.of(KafkaException.class)); + + CommitFailedException rebalance = new CommitFailedException("rebalanced"); + doThrow(rebalance).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCause(rebalance); + } + + @Test + public void nonRebalanceKafkaExceptionIsRethrownAsIs() { + KafkaException broken = new KafkaException("broker connection broken"); + doThrow(broken).when(producer).commitTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isSameAs(broken) + .hasMessageContaining("broker connection broken"); + + verify(producer, times(1)).abortTransaction(); + } + + @Test + public void beginTransactionFailureIsRethrownAndDoesNotAbort() { + IllegalStateException beginFailure = new IllegalStateException("producer not initialized"); + doThrow(beginFailure).when(producer).beginTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isSameAs(beginFailure) + .hasMessageContaining("not initialized"); + + verify(producer, times(1)).beginTransaction(); + verify(producer, never()).commitTransaction(); + verify(producer, never()).abortTransaction(); + } + + // ------------------------------------------------------------ + // Abort robustness + // ------------------------------------------------------------ + + @Test + public void abortFailureIsSwallowedAndDoesNotMaskRetriableException() { + doThrow(new CommitFailedException("rebalanced")).when(producer).commitTransaction(); + doThrow(new IllegalStateException("abort failed")).when(producer).abortTransaction(); + + StubChannel channel = newChannel(); + + assertThatThrownBy(() -> channel.sendForTest(startCommitEvent(), offsetsForSrc(1L))) + .isInstanceOf(RetriableException.class) + .hasMessageContaining("consumer group re-balance") + .hasCauseInstanceOf(CommitFailedException.class); + + verify(producer, times(1)).abortTransaction(); + } + + // ------------------------------------------------------------ + // Helpers + // ------------------------------------------------------------ + + private StubChannel newChannel() { + return new StubChannel("worker", CONSUMER_GROUP, config, clientFactory, context); + } + + private Event startCommitEvent() { + return new Event(CONNECT_GROUP, new StartCommit(java.util.UUID.randomUUID())); + } + + private Map offsetsForSrc(long offset) { + return ImmutableMap.of(new TopicPartition("src-topic", 0), new Offset(offset, null)); + } + + /** Concrete subclass exposing the protected {@code send()} method for tests. */ + private static class StubChannel extends Channel { + + StubChannel( + String name, + String consumerGroupId, + IcebergSinkConfig config, + KafkaClientFactory clientFactory, + SinkTaskContext context) { + super(name, consumerGroupId, config, clientFactory, context); + } + + @Override + protected boolean receive(Envelope envelope) { + return false; + } + + void sendForTest(Event event, Map sourceOffsets) { + send(ImmutableList.of(event), sourceOffsets); + } + } + + @Test + public void startCommitPayloadTypeIsStable() { + assertThat(startCommitEvent().payload().type()).isEqualTo(PayloadType.START_COMMIT); + } +}