Skip to content

[FLINK-39519][checkpoint] Allocate pre-filter source buffers from a reusable heap segment#28001

Open
1996fanrui wants to merge 3 commits intoapache:masterfrom
1996fanrui:39519/Allocate-pre-filter-source-buffer
Open

[FLINK-39519][checkpoint] Allocate pre-filter source buffers from a reusable heap segment#28001
1996fanrui wants to merge 3 commits intoapache:masterfrom
1996fanrui:39519/Allocate-pre-filter-source-buffer

Conversation

@1996fanrui
Copy link
Copy Markdown
Member

What is the purpose of the change

Under FLINK-39519, the channel-state recovery loop can deadlock when checkpointing-during-recovery filters inflight buffers: the single-threaded recovery thread first requests buffers to read pre-filter state, then requests more buffers to write post-filter output. If pre-filter buffers exhaust the Network Buffer Pool, post-filter allocation blocks and stalls the thread, so pre-filter buffers can never be consumed and released.

This PR isolates the pre-filter buffer from the Network Buffer Pool by allocating it from a dedicated heap segment.

Brief change log

  • InputChannelRecoveredStateHandler lazily allocates one heap MemorySegment per task (sized to memorySegmentSize), reuses it across every pre-filter getBuffer() call, and frees it in close(). Non-filtering mode is unchanged.
  • A runtime check (!preFilterBufferInUse) enforces the one-buffer-at-a-time invariant. The custom BufferRecycler flips the flag back on recycle without freeing the segment. Any future regression of the invariant fails loudly with IllegalStateException instead of silently corrupting memory.
  • Wiring:
    • RecordFilterContext gains a required memorySegmentSize parameter (with checkArgument > 0) and a getMemorySegmentSize() accessor. disabled() factory uses MemoryManager.DEFAULT_PAGE_SIZE.
    • StreamTask.createRecordFilterContext() passes ConfigurationParserUtils.getPageSize(jobConfiguration).
    • SequentialChannelStateReaderImpl passes filterContext.getMemorySegmentSize() to the handler constructor.
  • Added CommonTestUtils.waitForNCheckpointsWithInflightBuffers to wait for N completed checkpoints that carry inflight channel state.

Why a single reusable segment is enough

At most one pre-filter source buffer is in flight per task at any moment, guaranteed by Flink's existing recovery design:

  1. ChannelStateChunkReader.readChunk() is a single-threaded while-loop (getBuffer → fill → recover(finally recycle)). The next getBuffer() can only start after recover() returns — structural concurrency = 1.
  2. SpillingAdaptiveSpanningRecordDeserializer.getNextRecord() recycles the current buffer the instant isBufferConsumed=true, which happens on both PARTIAL_RECORD (record spans buffers) and LAST_RECORD_FROM_BUFFER (buffer exhausted). filterAndRewrite() breaks its loop on the same condition.
  3. Cross-buffer bytes are always copied out before recycle: SpanningWrapper.transferFrom / addNextChunkFromMemorySegment copy partial bytes into an internal byte[] (or into SpanningWrapper's own spill file for records ≥ 5 MB). The only retained segment pointer (leftOverData / NonSpanningWrapper.segment) is read via segment.get(...) — copies into the target — and only while isBufferConsumed=false.
  4. Exception paths converge through ChannelStateFilteringHandler.close() → VirtualChannel.clear() → deserializer.clear(), which recycles any buffer the deserializer still holds.

Verifying this change

Unit tests (InputChannelRecoveredStateHandlerTest)

  • testPreFilterBufferIsolationFromNetworkBufferPool: filtering-mode getBuffer() returns a heap-backed buffer; the Network Buffer Pool is untouched.
  • testNonFilteringModeUsesNetworkBufferPool: non-filtering path preserved.
  • testPreFilterSegmentReusedAcrossCalls: successive getBuffer()/recycle cycles return the same MemorySegment instance.
  • testGetBufferThrowsWhenPriorBufferNotRecycled: runtime invariant check throws IllegalStateException.
  • testPreFilterSegmentFreedOnClose: segment freed on handler close.
  • testMemorySegmentSizeExposedAndValidated: context validation and getter.

Integration test (RecoveredStateFilteringLargeRecordITCase)

Pins the network memory segment to Flink's minimum page size and generates records 8× the segment size so each record spans many consecutive source buffers. A sleeping downstream map induces back-pressure on the keyBy shuffle so inflight buffers actually accumulate. The test runs three phases with random parallelism per phase:

  1. Phase 1: take an unaligned checkpoint with inflight buffers.
  2. Phase 2: restore from phase 1 and wait for one post-recovery checkpoint with inflight buffers. That checkpoint — the one taken during recovery itself — becomes phase 3's restore target, so phase 3 exercises the "recovery-of-a-recovered-checkpoint" path.
  3. Phase 3: restore from phase 2 and wait for five post-recovery checkpoints to shake out stability issues beyond the first post-recovery checkpoint.

Both filtering flags (UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM and CHECKPOINTING_DURING_RECOVERY_ENABLED) are pinned on so every run deterministically exercises the filtering handler.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no (only the recovery path)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes — channel state recovery with filtering
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no (bug fix / robustness improvement)
  • If yes, how is the feature documented? not applicable

…eusable heap segment

In filtering-mode channel state recovery, InputChannelRecoveredStateHandler
now allocates the pre-filter source buffer from heap instead of the Network
Buffer Pool, eliminating the deadlock where pre-filter and post-filter
buffers compete for the same pool in the single-threaded
channel-state-unspilling recovery loop.

Memory bound:
- One MemorySegment per task, sized to memorySegmentSize, lazily allocated
  on the first getBuffer() call, reused across every subsequent call, and
  freed in close(). Worst-case footprint per task is therefore one
  memorySegmentSize (default 32 KB).
- The one-buffer-at-a-time invariant is guaranteed structurally by Flink's
  serial recovery loop and the deserializer's isBufferConsumed contract,
  so no semaphore or per-gate counter is needed.
- A runtime check asserts !preFilterBufferInUse before each allocation.
  The custom BufferRecycler flips the flag back on recycle without freeing
  the segment. Any future regression of the invariant fails loudly with
  IllegalStateException instead of silently corrupting memory.

Non-filtering mode is unchanged: getBuffer() still delegates to the
channel's Network Buffer Pool via requestBufferBlocking().

Wiring:
- RecordFilterContext gains a required memorySegmentSize parameter (with
  checkArgument > 0) and a getMemorySegmentSize() accessor. disabled()
  factory uses MemoryManager.DEFAULT_PAGE_SIZE.
- StreamTask.createRecordFilterContext() passes
  ConfigurationParserUtils.getPageSize(jobConfiguration).
- SequentialChannelStateReaderImpl passes
  filterContext.getMemorySegmentSize() to the handler constructor.

Tests:
- testPreFilterBufferIsolationFromNetworkBufferPool: filtering-mode
  getBuffer() returns a heap-backed buffer; the Network Buffer Pool is
  untouched.
- testNonFilteringModeUsesNetworkBufferPool: non-filtering path preserved.
- testPreFilterSegmentReusedAcrossCalls: successive getBuffer()/recycle
  cycles return the same MemorySegment instance.
- testGetBufferThrowsWhenPriorBufferNotRecycled: runtime invariant check.
- testPreFilterSegmentFreedOnClose: segment freed on handler close.
- testMemorySegmentSizeExposedAndValidated: context validation and getter.
… filtering recovery

Adds RecoveredStateFilteringLargeRecordITCase, an end-to-end verification
of the pre-filter source buffer one-at-a-time invariant under realistic
recovery conditions: the network memory segment is pinned to Flink's
minimum page size and records are generated at 8x the segment size, so
every record spans many consecutive source buffers. A sleeping downstream
map induces back-pressure on the keyBy shuffle so inflight buffers
actually accumulate and are captured by the unaligned checkpoint;
otherwise the recovery path would have nothing to filter.

The test runs three phases with random parallelism per phase:
1. Phase 1: run at parallelism N1 and take an unaligned checkpoint with
   inflight buffers.
2. Phase 2: restore from phase 1 at parallelism N2 and wait for one
   post-recovery checkpoint with inflight buffers. That single checkpoint
   is the artifact produced during recovery itself, which phase 3
   consumes to exercise the "recovery-of-a-recovered-checkpoint" path.
3. Phase 3: restore from phase 2 at parallelism N3 and wait for five
   post-recovery checkpoints with inflight buffers so the restored job
   runs long enough to shake out stability issues beyond the first
   post-recovery checkpoint.

Filtering during recovery requires both feature flags to be enabled
(UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM gates the feature,
CHECKPOINTING_DURING_RECOVERY_ENABLED turns it on). Both are pinned on
explicitly so every run deterministically exercises the filtering handler.

If the one-at-a-time invariant were violated during recovery, the runtime
check in InputChannelRecoveredStateHandler.getPreFilterBuffer would throw
IllegalStateException and the job would fail. Reaching a completed
post-recovery checkpoint therefore proves the invariant held across all
buffer cycles.

Adds CommonTestUtils.waitForNCheckpointsWithInflightBuffers to wait for
at least N completed checkpoints that carry inflight channel state,
used by phase 3 above.
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 22, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

…esITCase segment size to MIN_PAGE_SIZE

The test set TaskManagerOptions.MEMORY_SEGMENT_SIZE to 1 KB at the job
level to shrink per-buffer record count and speed up recovery/aligned
drain. However, StreamTask#createRecordFilterContext reads this option
from the job configuration via ConfigurationParserUtils#getPageSize,
which enforces MemoryManager.MIN_PAGE_SIZE (4 KB).

When TestStreamEnvironment.randomizeConfiguration happens to set both
UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM=true and
CHECKPOINTING_DURING_RECOVERY_ENABLED=true for a given parameterized
invocation (deterministically driven by the test-name seed), the page
size check throws IllegalConfigurationException during channel-state
recovery and fails the sub-test. Today this reliably hits sub-test [3];
other indices pass only because their seed draws at least one of the
flags as false.

Use new MemorySize(MemoryManager.MIN_PAGE_SIZE) so the configured value
stays at the enforced floor, preserving the original intent of keeping
buffers small while staying above the validated minimum.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants