Skip to content

Conversation

@sushantmane
Copy link
Contributor

Switch back to PubSubPosition based reads with offset as fallback

Code changes

  • Added new code behind a config. If so list the config names and their default values in the PR description.
  • Introduced new log lines.
    • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList).
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination.

How was this PR tested?

  • New unit tests added.
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility (if applicable).

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

Copilot AI review requested due to automatic review settings October 28, 2025 20:08
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR removes the getPubSubPositionString method and its associated test, replacing its usage with direct calls to pubSubPositionDeserializer.toPosition(). The key changes enable better position comparison by using actual PubSubPosition objects rather than numeric offsets, and utilize the deserializePositionWithOffsetFallback method for safer deserialization with fallback logic.

  • Removes PubSubUtil.getPubSubPositionString utility method and its test
  • Replaces numeric offset comparisons with proper position comparisons using diffPosition and object equality
  • Updates deserializePositionWithOffsetFallback to remove the offset > 0 guard condition
  • Integrates proper position deserialization with fallback in extractUpstreamPosition

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
PubSubUtil.java Removes getPubSubPositionString utility method
PubSubUtilTest.java Removes test coverage for getPubSubPositionString
OffsetRecord.java Removes unused import and updates deserializePositionWithOffsetFallback condition
KafkaTopicDumper.java Replaces getPubSubPositionString with direct toPosition calls
StoreIngestionTask.java Updates position deserialization condition and integrates fallback in extractUpstreamPosition
StoreIngestionTaskTest.java Changes test assertions to use full position equality instead of numeric offset comparison
PartitionTracker.java Replaces getPubSubPositionString with direct toPosition call
LeaderFollowerStoreIngestionTask.java Replaces numeric offset comparisons with proper position comparisons using diffPosition and symbolic position checks

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@github-actions
Copy link

Hi there. This pull request has been inactive for 30 days. To keep our review queue healthy, we plan to close it in 7 days unless there is new activity. If you are still working on this, please push a commit, leave a comment, or convert it to draft to signal intent. Thank you for your time and contributions.

@github-actions github-actions bot added the stale label Nov 28, 2025
@sushantmane sushantmane force-pushed the sumane/switch-back-reads-to-PubSubPosition branch from 6fa7342 to e86b854 Compare December 3, 2025 19:38
Copilot AI review requested due to automatic review settings December 3, 2025 22:47
@sushantmane sushantmane force-pushed the sumane/switch-back-reads-to-PubSubPosition branch from e86b854 to cd66555 Compare December 3, 2025 22:47
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@github-actions github-actions bot removed the stale label Dec 4, 2025
sushantmane and others added 4 commits December 9, 2025 14:23
…bolic positions

The deserializePositionWithOffsetFallback method was incorrectly treating
symbolic positions (EARLIEST, LATEST) as regular positions and comparing
their numeric offsets against the provided minimum offset. This caused
EARLIEST (numeric offset -1) to be replaced with an offset-based position
when the minimum offset was >= 0.

Added a guard to detect and preserve symbolic positions before performing
numeric offset comparison, ensuring they are returned as-is regardless of
the minimum offset parameter.

This fix resolves the failing testDeserializePositionWithOffsetFallback
test in LeaderFollowerStoreIngestionTaskTest.

Test: ./gradlew :clients:da-vinci-client:test --tests "com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTaskTest"
@sushantmane sushantmane force-pushed the sumane/switch-back-reads-to-PubSubPosition branch from cd66555 to a5271ff Compare December 9, 2025 22:26
@sushantmane sushantmane force-pushed the sumane/switch-back-reads-to-PubSubPosition branch from a5271ff to a80f0a5 Compare December 9, 2025 22:58
Copilot AI review requested due to automatic review settings December 9, 2025 22:58
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}

@Test
public void testGetPubSubPositionString() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think PubSubUtil.getPubSubPositionString() should also be removed at this point?

* causes issues in production.
*/
public static final String SERVER_USE_UPSTREAM_PUBSUB_POSITION_WITH_FALLBACK =
"server.use.upstream.pubsub.position.with.fallback";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"server.use.upstream.pubsub.position.with.fallback";
"server.use.upstream.pubsub.positions";

I think it's okay to leave out the fallback part in the config and rely on it being mentioned in the docstring. How about a shorter name like this? If not, I feel like it should be possible to arrive at something shorter than what's in the PR.

Same with checkpointed.

Comment on lines +374 to +407
public static PubSubPosition deserializePositionWithOffsetFallback(
ByteBuffer wireFormatBytes,
long offset,
PubSubPositionDeserializer pubSubPositionDeserializer) {
// Fast path: nothing to deserialize
if (wireFormatBytes == null || !wireFormatBytes.hasRemaining()) {
return fromKafkaOffset(offset);
}

try {
final PubSubPosition position = pubSubPositionDeserializer.toPosition(wireFormatBytes);

// Guard against regressions: honor the caller-provided minimum offset.
// This applies to both symbolic and concrete positions.
if (position.getNumericOffset() < offset) {
LOGGER.info(
"Deserialized position: {} is behind the provided offset: {}. Using offset-based position.",
position.getNumericOffset(),
offset);
return fromKafkaOffset(offset);
}

// If position is ahead of or equal to offset, return it as-is (including symbolic positions like LATEST)
return position;
} catch (RuntimeException e) {
LOGGER.warn(
"Failed to deserialize PubSubPosition. Using offset-based position (offset={}, bufferRem={}, bufferCap={}).",
offset,
wireFormatBytes.remaining(),
wireFormatBytes.capacity(),
e);
return fromKafkaOffset(offset);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static PubSubPosition deserializePositionWithOffsetFallback(
ByteBuffer wireFormatBytes,
long offset,
PubSubPositionDeserializer pubSubPositionDeserializer) {
// Fast path: nothing to deserialize
if (wireFormatBytes == null || !wireFormatBytes.hasRemaining()) {
return fromKafkaOffset(offset);
}
try {
final PubSubPosition position = pubSubPositionDeserializer.toPosition(wireFormatBytes);
// Guard against regressions: honor the caller-provided minimum offset.
// This applies to both symbolic and concrete positions.
if (position.getNumericOffset() < offset) {
LOGGER.info(
"Deserialized position: {} is behind the provided offset: {}. Using offset-based position.",
position.getNumericOffset(),
offset);
return fromKafkaOffset(offset);
}
// If position is ahead of or equal to offset, return it as-is (including symbolic positions like LATEST)
return position;
} catch (RuntimeException e) {
LOGGER.warn(
"Failed to deserialize PubSubPosition. Using offset-based position (offset={}, bufferRem={}, bufferCap={}).",
offset,
wireFormatBytes.remaining(),
wireFormatBytes.capacity(),
e);
return fromKafkaOffset(offset);
}
}
public static PubSubPosition deserializePositionWithOffsetFallback(
ByteBuffer wireFormatBytes,
long offset,
PubSubPositionDeserializer pubSubPositionDeserializer) {
if (wireFormatBytes != null && wireFormatBytes.hasRemaining()) {
try {
final PubSubPosition position = pubSubPositionDeserializer.toPosition(wireFormatBytes);
if (position.getNumericOffset() >= offset) {
return position; // Valid position: ahead of or equal to offset
}
LOGGER.info(
"Deserialized position: {} is behind the provided offset: {}. Using offset-based position.",
position.getNumericOffset(),
offset);
} catch (RuntimeException e) {
LOGGER.warn(
"Failed to deserialize PubSubPosition. Using offset-based position (offset={}, bufferRem={}, bufferCap={}).",
offset,
wireFormatBytes.remaining(),
wireFormatBytes.capacity(),
e);
}
}
// Offset fallback for all invalid or missing cases
return fromKafkaOffset(offset);
}

I see every branch having the same offset fallback return value. How about this layout, instead?

It's a bit less-explicit with the individual fallback cases, but I find it a bit more clear to have one single offset fallback case at the end.

return pubSubContext;
}

PubSubPosition deserializePositionWithOffsetFallback(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just basically the same as the PubSubUtil version? Can it be unified?

Comment on lines +1124 to +1125
if ((PubSubSymbolicPosition.EARLIEST.equals(latestConsumedRtPosition)
&& !PubSubSymbolicPosition.EARLIEST.equals(upstreamStartPosition))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand what the first condition means

.stream()
.map(bb -> pubSubPositionDeserializer.toPosition(bb).getNumericOffset())
.collect(Collectors.toList());
List<Long> highWatermarkOffsets;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
List<Long> highWatermarkOffsets;
List<ByteBuffer> positions = versionSwap.getLocalHighWatermarkPubSubPositions();
List<Long> offsets = versionSwap.getLocalHighWatermarks();
List<Long> highWatermarkOffsets = new ArrayList<>();
if (positions != null && !positions.isEmpty()) {
for (int i = 0; i < positions.size(); i++) {
long fallbackOffset = (offsets != null && i < offsets.size()) ? offsets.get(i) : -1L;
PubSubPosition position = PubSubUtil
.deserializePositionWithOffsetFallback(positions.get(i), fallbackOffset, pubSubPositionDeserializer);
highWatermarkOffsets.add(position.getNumericOffset());
}
}

nit: how about this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants