Skip to content

Commit 8241d96

Browse files
authored
Don't return negative offset lags except for -1 when -1001 is the high watermark (#406)
1 parent 3723cd4 commit 8241d96

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ v1.7.0 is a feature release. It is supported for all usage.
1212
client types (#399).
1313
4. Fix for at-least-once guarantee not ensured in case a seek happens on one
1414
partition and there are messages being fetched about other partitions (#393).
15-
15+
5. Avoid returning a negative lag in case there is no cached offset for
16+
the HWM (#406).
1617

1718
# confluent-kafka-javascript 1.6.0
1819

lib/kafkajs/_consumer.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -919,12 +919,18 @@ class Consumer {
919919
this.#logger.warn(`Could not get watermark offsets for batch: ${e}`, this.#createConsumerBindingMessageMetadata());
920920
}
921921

922-
if (Number.isInteger(watermarkOffsets.highOffset)) {
922+
/* Keep default values if it's not a real offset (OFFSET_INVALID: -1001). */
923+
if (Number.isInteger(watermarkOffsets.highOffset) &&
924+
watermarkOffsets.highOffset >= 0) {
923925
highWatermark = watermarkOffsets.highOffset.toString();
924926
/* While calculating lag, we subtract 1 from the high offset
925927
* for compatibility reasons with KafkaJS's API */
926928
offsetLag_ = (watermarkOffsets.highOffset - 1) - messages[messages.length - 1].offset;
927929
offsetLagLow_ = (watermarkOffsets.highOffset - 1) - messages[0].offset;
930+
931+
/* In any case don't return a negative lag but a zero lag instead. */
932+
offsetLag_ = offsetLag_ > 0 ? offsetLag_ : 0;
933+
offsetLagLow_ = offsetLagLow_ > 0 ? offsetLagLow_ : 0;
928934
}
929935

930936
const messagesConverted = [];

0 commit comments

Comments
 (0)