Skip to content

[fix][ml] Track all pending read callbacks for timeouts#26081

Open
Technoboy- wants to merge 2 commits into
apache:masterfrom
Technoboy-:codex/fix-managed-ledger-read-timeout-tracking
Open

[fix][ml] Track all pending read callbacks for timeouts#26081
Technoboy- wants to merge 2 commits into
apache:masterfrom
Technoboy-:codex/fix-managed-ledger-read-timeout-tracking

Conversation

@Technoboy-

Copy link
Copy Markdown
Contributor

Motivation

When managed ledger read-entry timeout is enabled, ManagedLedgerImpl only keeps the most recent ReadEntryCallbackWrapper in lastReadCallback. If multiple reads are pending and an older read hangs, a newer read can overwrite that callback, so the older operation is no longer checked by checkReadTimeout().

That can leave a cursor read pending indefinitely and block follow-up cursor operations such as reset/mark-delete progress.

Modifications

  • Replace the single lastReadCallback with a pending-read callback map keyed by read operation id.
  • Remove callbacks from the pending map when they are recycled by success, failure, or timeout paths.
  • Iterate all pending callbacks during read-timeout checks so each timed-out read can fail independently.
  • Add a regression test covering concurrent read-entry timeouts.

Verifying this change

  • ./gradlew :managed-ledger:test --tests org.apache.bookkeeper.mledger.impl.ManagedLedgerTest.testManagedLedgerWithReadEntryTimeOut --tests org.apache.bookkeeper.mledger.impl.ManagedLedgerTest.testManagedLedgerWithConcurrentReadEntryTimeOut

@Technoboy- Technoboy- marked this pull request as ready for review June 24, 2026 10:02
@Technoboy- Technoboy- self-assigned this Jun 24, 2026
@void-ptr974

Copy link
Copy Markdown
Contributor

I think the current priority-queue approach can still retain too much state after reads complete.

The wrapper is added before entryCache.asyncReadEntry(...), so cache hits are also inserted into the timeout queue. On normal completion, the callback reference is cleared, but the queue node remains until its timeout deadline is polled. With read-entry timeout enabled, the queue size can become proportional to read rate * timeout seconds, rather than the number of reads that are actually still pending.

A bucketed timeout structure may be a better fit here: group reads by timeout bucket, and keep an inner map from readOpCount to callback. The wrapper can keep a direct reference to its bucket, so normal completion removes itself in average O(1), while timeout checks only process expired buckets.

// bucketId -> (readOpCount -> callback wrapper)
private final ConcurrentLongHashMap<ConcurrentLongHashMap<ReadEntryCallbackWrapper>> readTimeoutBuckets =
        ConcurrentLongHashMap.<ConcurrentLongHashMap<ReadEntryCallbackWrapper>>newBuilder().build();

static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback {
    volatile ConcurrentLongHashMap<ReadEntryCallbackWrapper> timeoutBucket;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants