[fix][client] Serialize chunked-message bookkeeping to fix use-after-…#26084
Open
SongOf wants to merge 1 commit into
Open
[fix][client] Serialize chunked-message bookkeeping to fix use-after-…#26084SongOf wants to merge 1 commit into
SongOf wants to merge 1 commit into
Conversation
lhotari
reviewed
Jun 24, 2026
| * that writes into the same {@code chunkedMsgBuffer} — exactly when the expiry task may release/recycle that ctx. | ||
| * Without serialization this races (use-after-free / double-recycle). With the fix, both paths take | ||
| * chunkedMessageLock, so it passes and pendingChunkedMessageCount stays equal to chunkedMessagesMap.size(). | ||
| * See .claude/pulsar-client-review-findings.md (#1). |
lhotari
reviewed
Jun 24, 2026
| } | ||
|
|
||
| /** | ||
| * Deterministic reproducer for finding #2: a duplicated first chunk (chunkId == 0, redelivered) for a uuid that |
Member
There was a problem hiding this comment.
referring to findings doesn't make sense in code comments
Contributor
Author
There was a problem hiding this comment.
removed, thanks for your comments.
8b391e4 to
69fb6f6
Compare
…free and count/queue drift
69fb6f6 to
910db42
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
ConsumerImpl's chunked-message reassembly state — the per-uuidChunkedMessageCtx,its
chunkedMsgBuffer,pendingChunkedMessageCountandpendingChunkedMessageUuidQueue— is mutated from two different threads with no synchronization:
processMessageChunk, and the last-chunk finalize inmessageReceived) runs on the Netty IO event-loop thread(
ClientCnx.handleMessagecallsconsumer.messageReceived(...)directly);removeExpireIncompleteChunkedMessages) runs onthe client's
internalPinnedExecutor— a separate single-thread pool(
client.getInternalExecutorService()), not theeventLoopGroup.When a late chunk for a uuid arrives while the expiry task is removing that same ctx,
the expiry thread can
release()/recycle()aChunkedMessageCtxand its buffer whilethe receive thread is still writing into it. This races into:
chunkedMsgBuffer(IllegalReferenceCountException,or worse — writing into memory the allocator already handed to someone else);
ChunkedMessageCtx.recycle(), which corrupts the NettyRecyclerpool and canhand the same instance to two different chunked messages;
pendingChunkedMessageCountdrift (non-atomicintmutated from both threads).Incomplete-chunk expiry is enabled by default (
expireTimeOfIncompleteChunkedMessageMillis= 1 minute), so any consumer of chunked messages is exposed.
Separately, a redelivered first chunk (
chunkId == 0for a uuid that already has anin-progress ctx) replaced the old ctx without decrementing
pendingChunkedMessageCountand re-enqueued the uuid, so the counter over-counted and
pendingChunkedMessageUuidQueueended up with duplicate / mis-ordered entries (the queue is meant to be ordered oldest-first
by
receivedTime, with one entry per in-progress uuid, sinceremoveExpire/removeOldestclean from the head).
Modifications
chunkedMessageLock.processMessageChunk,removeOldestPendingChunkedMessageandremoveExpireIncompleteChunkedMessagesnow acquireit (thin wrapper over an extracted body), and
messageReceivedholds it across thelast-chunk assembly + finalize so the assemble→finalize window is closed. All chunk
bookkeeping is serialized, and
pendingChunkedMessageCountis mutated only under the lock(so it needs no
volatile/Atomic). Heavy per-message work (newMessage, decryption ofnon-chunk payloads,
executeNotifyCallback) stays outside the lock, and non-chunkedmessages never touch it.
(
ConcurrentHashMapops,doAcknowledgewhich is async/non-blocking) never acquire itin reverse, so there is no new lock-ordering / deadlock and no blocking call held across
the lock.
pendingChunkedMessageCountfor thereplaced ctx,
remove(uuid)its stale entry frompendingChunkedMessageUuidQueueandre-
add(uuid)at the tail — keeping the queue ordered oldest-first with exactly one entryper in-progress uuid (net 0 count change on replace, +1 on a genuinely new uuid).
This is an internal client-side locking change only; no public API, wire protocol, schema,
config defaults or metrics are changed.
Verifying this change
This change added tests and can be verified as follows:
ConsumerImplTest.testChunkedMessageCountRaceBetweenReceiveAndExpiry— drivesprocessMessageChunkon a "receiver" thread concurrently with the realremoveExpireIncompleteChunkedMessageson an "expirer" thread, for the "a late chunkarrives for a uuid that is concurrently being expired" scenario. Verified to fail before
the fix (
IllegalReferenceCountException/ corruptedpendingChunkedMessageCount) andpass after.
ConsumerImplTest.testDuplicateFirstChunkOvercountsPendingChunkedMessageCount—deterministic; delivers a redelivered first chunk and asserts
pendingChunkedMessageCount == chunkedMessagesMap.size()and that the uuid appears exactlyonce in
pendingChunkedMessageUuidQueue. Verified to fail before and pass after.Full
pulsar-clientunit suite (712 tests) passes with no regressions.Does this pull request potentially affect one of the following parts:
Documentation
doc-requireddoc-not-needed(internal bug fix; no user-facing behavior or config change)
docdoc-complete