Skip to content

NIFI-15614 - ConsumeKafka - Duplicate messages during consumer group rebalance#10908

Open
pvillard31 wants to merge 3 commits intoapache:mainfrom
pvillard31:NIFI-15614
Open

NIFI-15614 - ConsumeKafka - Duplicate messages during consumer group rebalance#10908
pvillard31 wants to merge 3 commits intoapache:mainfrom
pvillard31:NIFI-15614

Conversation

@pvillard31
Copy link
Contributor

Summary

NIFI-15614 - ConsumeKafka - Duplicate messages during consumer group rebalance

When using ConsumeKafka with Kafka3ConnectionService, duplicate messages may be processed when a consumer group rebalance occurs.

NIFI-15464 addressed a related issue by deferring offset commits during rebalance. The fix stored revoked partitions in onPartitionsRevoked() and had the processor call commitOffsetsForRevokedPartitions() after its session commit.

The deferred commit approach is not enough because by the time poll() returns and the processor attempts to commit, the consumer is no longer part of an active group. Kafka rejects the commit with RebalanceInProgressException, offsets are rolled back, and messages are re-consumed as duplicates.

The Kafka consumer is only in a valid state to commit offsets during the onPartitionsRevoked() callback. Once this callback returns, the consumer's group membership is revoked and any commit attempt will fail.

We need to implement synchronous offset commit inside onPartitionsRevoked() callback, similar to how NiFi 1.x handled rebalances in ConsumerLease. This requires introducing a callback mechanism to ensure the NiFi session is committed before Kafka offsets are committed, preventing both data loss and duplicates.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

@pvillard31 pvillard31 added the bug label Feb 17, 2026
@pvillard31 pvillard31 marked this pull request as draft February 18, 2026 15:32
@pvillard31 pvillard31 marked this pull request as draft February 18, 2026 15:32
…rebalance

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
@pvillard31 pvillard31 marked this pull request as ready for review February 19, 2026 09:48
Comment on lines 441 to 445
System.out.println("Consumer 1 polled: " + consumer1Count.get() + " records");
System.out.println("Consumer 2 polled: " + consumer2Count.get() + " records");
System.out.println("Total unique messages: " + allConsumedMessages.size());
System.out.println("Duplicate count: " + duplicateCount.get());
System.out.println("Rebalance count: " + rebalanceCount.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be using loggers here, not System.out.println

revokedPartitions.size());

try {
session.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid session.commit and instead ensure that we're using session.commitAsync with callbacks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I agree with that comment. The synchronous commit is intentional and necessary. The whole point of this fix is that:

  • The Kafka consumer is only in a valid state to commit offsets during onPartitionsRevoked()
  • Once this callback returns, the consumer's group membership is revoked
  • Any commit attempt after that fails with RebalanceInProgressException

Using commitAsync() would defeat the purpose because the async callback would execute after onPartitionsRevoked() returns, no? At which point the Kafka consumer is no longer valid and we'd get the same duplicate message issue. IMO, the sequence must be:

  1. onPartitionsRevoked() is called by Kafka
  2. NiFi session commit (must complete before step 3)
  3. Kafka offset commit (must happen while still in valid callback)
  4. onPartitionsRevoked() returns

Or did I miss something with your comment?

Comment on lines 352 to 353
private final ThreadLocal<ProcessSession> currentSession = new ThreadLocal<>();
private final ThreadLocal<OffsetTracker> currentOffsetTracker = new ThreadLocal<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We generally want to avoid ThreadLocal in processors because of the shared thread pool.

}

private RebalanceCallback createRebalanceCallback() {
return revokedPartitions -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid any lambdas over 3-5 lines long in favor of anonymous inner classes.

Comment on lines 621 to 622
final ProcessSession session = currentSession.get();
final OffsetTracker offsetTracker = currentOffsetTracker.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that I understand the intention behind the ThreadLocal here. This should just be passed into the createRebalanceCallback method (piped through the getConsumerService method)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can just pass the session directly because the consumer service is created once and pooled, and the session is different for each onTrigger() call. I'll change for a holder approach that goes with your previous comment for ThreadLocal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants