Skip to content

[fix][broker] Release entry on GetLastMessageId when parseMessageMetadata throws#26089

Open
SongOf wants to merge 1 commit into
apache:masterfrom
SongOf:fix-broker-getlastmsgid-entry-leak
Open

[fix][broker] Release entry on GetLastMessageId when parseMessageMetadata throws#26089
SongOf wants to merge 1 commit into
apache:masterfrom
SongOf:fix-broker-getlastmsgid-entry-leak

Conversation

@SongOf

@SongOf SongOf commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Motivation

When a consumer/reader issues GetLastMessageId, the broker handles it in
ServerCnx#getLargestBatchIndexWhenPossible: it reads the entry at the last
position and parses its message metadata to compute the largest batch index.

The entry was released only after the metadata was parsed:

CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
    MessageMetadata metadata = entry.getMessageMetadata();
    if (metadata == null) {
        metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
    }
    int batchSize = metadata.getNumMessagesInBatch();
    entry.release();                       // skipped if parse throws
    return metadata.hasNumMessagesInBatch() ? batchSize : -1;
});

If Commands.parseMessageMetadata() (or getNumMessagesInBatch()) throws on a
corrupt/truncated entry, entry.release() is skipped, leaking the Entry and
its backing direct ByteBuf on every such GetLastMessageId request. The
exception is then reported to the client as a generic MetadataError, which
masks the leak.

Modifications

  • ServerCnx#getLargestBatchIndexWhenPossible: moved entry.release() into a
    finally block so the entry is released on every path (success, metadata
    parse failure, or any other exception), without changing the error reported
    to the client.
  • Added GetLastMessageIdEntryLeakTest: it spies the topic's ManagedLedgerImpl
    so asyncReadEntry returns an entry wrapping a deliberately corrupt 2-byte
    buffer (which makes parseMessageMetadata throw at readUnsignedInt), drives
    the request via consumer.getLastMessageId(), and asserts the entry's
    ByteBuf refCnt drops to 0. The test fails on the unpatched code
    (refCnt == 1, i.e. leaked) and passes after the fix.

Verifying this change

This change added tests and can be verified as follows:

  • GetLastMessageIdEntryLeakTest#testEntryReleasedWhenParseMetadataThrows —
    reproduces the leak by injecting a corrupt entry so parseMessageMetadata
    throws, and asserts the entry's ByteBuf is released (refCnt == 0). It
    fails for the real reason on the pre-fix code and passes after it.

Does this pull request potentially affect one of the following parts:

This is an internal behavior change and does not change any public API, schema,
configuration, wire protocol, REST endpoint, CLI option, or metric.

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@SongOf SongOf marked this pull request as draft June 25, 2026 13:59
@SongOf SongOf marked this pull request as ready for review June 25, 2026 14:00

@lhotari lhotari left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @SongOf Just curios, did you actually run a memory leak issue or is this based on AI analysis of the code base?
For the maven build in branch-4.2 there's a setup for detecting Netty buffer leaks in test runs with reporting in a way that leaks get extracted to a separate report. This currently reports a lot of false positives since test code itself contains buffer leaks.
It would be useful to fix all leaks and then make the build fail if leaks are detected. The solution hasn't been yet ported from the maven build to the gradle build.

@SongOf

SongOf commented Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

LGTM. @SongOf Just curios, did you actually run a memory leak issue or is this based on AI analysis of the code base? For the maven build in branch-4.2 there's a setup for detecting Netty buffer leaks in test runs with reporting in a way that leaks get extracted to a separate report. This currently reports a lot of false positives since test code itself contains buffer leaks. It would be useful to fix all leaks and then make the build fail if leaks are detected. The solution hasn't been yet ported from the maven build to the gradle build.

From the AI ​​assistant

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants