Skip to content

[fix][broker] Remove lock contention in delayed delivery stats read paths#25990

Open
nodece wants to merge 1 commit into
apache:masterfrom
nodece:fix/broker-delayed-stats-remove-synchronization
Open

[fix][broker] Remove lock contention in delayed delivery stats read paths#25990
nodece wants to merge 1 commit into
apache:masterfrom
nodece:fix/broker-delayed-stats-remove-synchronization

Conversation

@nodece

@nodece nodece commented Jun 10, 2026

Copy link
Copy Markdown
Member

Motivation

Under a large delayed-delivery workload (~500M delayed messages), jstack analysis
shows the dispatcher lock is held while sealing delayed-delivery buckets.

A broker worker thread spends significant CPU time in:

TripleLongPriorityQueue.siftDown()
  -> pop()
  -> createImmutableBucketAndAsyncPersistent()

while holding both:

PersistentDispatcherMultipleConsumers
BucketDelayedDeliveryTracker

As a result, stats collection threads (Prometheus, admin APIs, getStatsAsync)
are blocked waiting for the dispatcher monitor.

The same pattern affects InMemoryDelayedDeliveryTracker, where
getBufferMemoryUsage() iterates the entire TreeMap<Long, TreeMap<Long, Roaring64Bitmap>>
while holding the dispatcher lock.

Modifications

1. BucketDelayedDeliveryTracker — lock-free bucket stats via AtomicLong counters

Introduce two AtomicLong counters (bucketsCount and totalSnapshotLengthBytes)
maintained alongside the existing TreeRangeMap<Long, ImmutableBucket>:

  • Add private helper methods putBucket(), removeBucket(), and updateBucketSnapshotLength()
    that wrap TreeRangeMap operations and update counters atomically
  • putBucket() calculates removed bucket lengths before insertion (since TreeRangeMap.put()
    silently removes/splits overlapping entries) and updates counters by delta
  • removeBucket() decrements counters only when removal succeeds
  • updateBucketSnapshotLength() updates counters when snapshot length changes asynchronously
  • In recoverBucketSnapshot(), recalculate counters after all snapshots are loaded
    to ensure accuracy (since buckets are created with snapshotLength=0 initially)
  • genTopicMetricMap() reads counters without holding any lock

2. InMemoryDelayedDeliveryTracker — lock-free memory usage via delta tracking

Add AtomicLong memoryUsage that is updated by delta at each mutation point
(addMessage, getScheduledMessages, clear). getBufferMemoryUsage() returns
the cached value directly instead of iterating the nested TreeMap.

3. Dispatcher classes — remove synchronized from stats read paths

In both PersistentDispatcherMultipleConsumers and PersistentDispatcherMultipleConsumersClassic:

  • getNumberOfDelayedMessages(), getDelayedTrackerMemoryUsage(),
    getBucketDelayedIndexStats(), shouldPauseDeliveryForDelayTracker()
    removed synchronized; now use Optional.map() with the volatile field
  • delayedDeliveryTracker field changed to volatile for safe publication

4. BucketDelayedMessageIndexStats — synchronized metric map generation

Add synchronized to genTopicMetricMap() to ensure atomic reads of multiple fields
when generating metrics.

5. ImmutableBucket — fix async chain in recover

Fix asyncRecoverBucketSnapshotEntry() to properly chain asyncUpdateSnapshotLength()
using thenCompose() instead of thenApply(), ensuring the snapshot length is updated
before recovery completes.

Verifying this change

  • Existing unit tests in BucketDelayedDeliveryTrackerTest pass
  • Manual verification: bucket count and snapshot length stats remain accurate
    across create, merge, trim, and recover operations

@nodece nodece requested review from dao-jun and lhotari June 10, 2026 13:10

@lhotari lhotari left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronization was explicitly added in #25681 to fix race conditions. Did you take that into account? I think that synchronization cannot be removed from shouldPauseAllDeliveries. For other methods it's most likely fine.

@lhotari

lhotari commented Jun 10, 2026

Copy link
Copy Markdown
Member

Under a large delayed-delivery workload (~500M delayed messages)

Is this in a single topic and single subscription?

FYI, there's a limit of tracking up to 30M backlogs (with default BK nettyMaxFrameSizeBytes) when using managedLedgerPersistIndividualAckAsLongArray=true. More can be stored to metadata when using LZ4 compression, but there will be failures each time cursor state is attempted to be saved. Issue is #25985.

@nodece nodece requested a review from lhotari June 11, 2026 11:24
@nodece

nodece commented Jun 11, 2026

Copy link
Copy Markdown
Member Author

Is this in a single topic and single subscription?

Yes.

FYI, there's a limit of tracking up to 30M backlogs (with default BK nettyMaxFrameSizeBytes) when using managedLedgerPersistIndividualAckAsLongArray=true. More can be stored to metadata when using LZ4 compression, but there will be failures each time cursor state is attempted to be saved. Issue is #25985.

I will check this later.

@nodece nodece force-pushed the fix/broker-delayed-stats-remove-synchronization branch from 7795eec to c8f77d9 Compare June 11, 2026 13:12
@nodece nodece marked this pull request as draft June 17, 2026 10:36
@nodece nodece force-pushed the fix/broker-delayed-stats-remove-synchronization branch from a86ce4a to bdfc915 Compare June 22, 2026 10:04
@nodece nodece marked this pull request as ready for review June 22, 2026 10:04
@nodece nodece requested a review from void-ptr974 June 22, 2026 13:10
immutableBucket.setSnapshotSegments(null);
immutableBucket.asyncUpdateSnapshotLength();
immutableBucket.asyncUpdateSnapshotLength()
.thenRun(() -> immutableBuckets.recomputeCounters());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This calls immutableBuckets.recomputeCounters() from an async callback without the tracker lock, while other paths mutate the same TreeRangeMap under that lock. TreeRangeMap is not thread-safe, so this can race with put/remove/clear and produce incorrect counters or fail during concurrent modification.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes entries through the mutable asMapOfRanges() view, bypassing ImmutableBucketIndex.remove(). The cached count and totalSnapshotLength can stay stale after clearDelayedMessages(), so later metrics and decisions based on immutableBuckets.count() can observe an inconsistent index state.

@void-ptr974

Copy link
Copy Markdown
Contributor

At a higher level, moving stats reads away from the dispatcher lock looks like the right direction for this PR. The remaining concern is making sure the newly unsynchronized stats paths do not read or mutate tracker internals without a clear concurrency boundary.

One possible short-term approach is to use the tracker lock for the small fixed-size stats sections and unsafe TreeRangeMap access, while keeping the dispatcher lock out of these stats paths. Longer term, it may be worth moving more mutable TreeRangeMap access and cached-counter maintenance behind ImmutableBucketIndex APIs, so callers do not need to touch live map views directly and stats paths can read cached/atomic values.

@nodece nodece marked this pull request as draft June 23, 2026 10:33
@nodece nodece force-pushed the fix/broker-delayed-stats-remove-synchronization branch from bdfc915 to 30b06a0 Compare June 24, 2026 09:43
@nodece nodece changed the title [fix][broker] remove lock contention in delayed delivery stats read paths [fix][broker] Remove lock contention in delayed delivery stats read paths Jun 24, 2026
@nodece nodece marked this pull request as ready for review June 24, 2026 09:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants