NIFI-15644 Added ConsumeKafka consumer lag gauge record#10934
Open
dariuszseweryn wants to merge 1 commit intoapache:mainfrom
Open
NIFI-15644 Added ConsumeKafka consumer lag gauge record#10934dariuszseweryn wants to merge 1 commit intoapache:mainfrom
dariuszseweryn wants to merge 1 commit intoapache:mainfrom
Conversation
exceptionfactory
requested changes
Feb 25, 2026
Contributor
There was a problem hiding this comment.
@dariuszseweryn This pull request appears to overlap with the current pull request at add consumer lag tracking in #10880.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
NIFI-15644
As of NIFI-15545, the KafkaConsumerService exposes a currentLag(TopicPartitionSummary) method that returns the current consumer lag (in records) for a given topic-partition. However, this information is not currently utilized by the ConsumeKafka processor.
This ticket adds gauge metric recording to ConsumeKafka so that per-partition consumer lag is reported through the NiFi metrics framework via ProcessSession.recordGauge(). This enables operators to monitor consumer lag through any configured ComponentMetricReporter (e.g., OpenTelemetry).
Changes:
Gauge naming format:
consumer.lag[topic="",partition=""]
For example: consumer.lag[topic="orders",partition="3"]
Behavior:
One gauge is recorded per topic-partition that was consumed during the onTrigger invocation.
Gauges are only emitted when currentLag returns a value (it may be unavailable in some Kafka client states).
Gauges use SESSION_COMMITTED timing, meaning they are only reported when the session commits successfully.
No gauges are recorded when no records are consumed (the processor returns early before reaching the gauge recording path).
Testing:
Unit tests added for OffsetTracker.getTrackedPartitions() covering empty, single, multi-partition, and deduplication scenarios.
Integration-style tests added to ConsumeKafkaTest using mocked KafkaConsumerService verifying gauge recording for single partition, multiple partitions, unavailable lag, and no-records-consumed cases.
Additional fix:
MockProcessSession did not clear gauges recorded with SESSION_COMMITTED on Session commit.
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation