fix: reduce MDT compaction heap pressure in BitCaskDiskMap and ExternalSpillableMap#19085
fix: reduce MDT compaction heap pressure in BitCaskDiskMap and ExternalSpillableMap#19085nsivabalan wants to merge 1 commit into
Conversation
…and ExternalSpillableMap Fixes apache#19084. ## Problem MDT `record_index` compaction tasks were OOMing on executors with 9 GB heap, four concurrent tasks, and `spark.memory.fraction=0.3`. Heap analysis identified three structural consumers that aren't bounded by the configured compaction memory cap: B ~3.0–3.2 GB BitCaskDiskMap.valueMetadataMap — 18.3M ValueMetadata entries with Integer/Long boxing overhead dominating (57.2M java.lang.Long instances = ~1.28 GB pure boxing) F ~513 MB LazyFileIterator sorted ArrayList (4.58M entries × 4 tasks) materialised at iterator-open time because ConcurrentHashMap doesn't preserve insertion order E ~256 MB peak ExternalSpillableMap.keySet() HashSet spike (6.29M keys copied to seed a PriorityQueue) Total ~1.6 GB per task across 4 concurrent tasks, on top of the bounded in-memory record map. Lowering hoodie.memory.compaction.fraction makes B larger and the OOM worse — fix has to be code, not config. ## Changes 1. Unbox ValueMetadata primitives (BitCaskDiskMap.java) Integer/Long → int/long for sizeOfValue, offsetOfValue, timestamp. Saves ~880 MB across 4 tasks. 2. LinkedHashMap + ReentrantReadWriteLock for valueMetadataMap (BitCaskDiskMap.java, LazyFileIterable.java) - filePosition is strictly monotonic under the write lock (append-only file + monotonic SizeAwareDataOutputStream.getSize()), so insertion order = disk offset order. LazyFileIterator no longer sorts; it consumes a pre-built snapshot list taken under the read lock. - put() does remove-before-put under the write lock so re-inserted keys land at the LinkedHashMap tail (preserves offset-order). - All read paths (get, containsKey, size, iterator, valueStream, entrySet, keySet, iterator(Predicate)) take the read lock and snapshot before releasing. - close() and clear() take the write lock so a concurrent reader under SpillableMapBasedFileSystemView's shared readLock can't observe a half-cleared map. Saves ~513 MB across 4 tasks. 3. ExternalSpillableMap.keyStream() + HoodieSortedMergeHandle uses it (ExternalSpillableMap.java, HoodieSortedMergeHandle.java) keyStream() returns Stream.concat(inMemoryKeys, diskKeys) without allocating a HashSet copy. HoodieSortedMergeHandle now collects the stream into an ArrayList and passes it to new PriorityQueue<>(list) to get the O(N) heapify (vs. O(N log N) per-add). Saves ~256 MB peak spike per task. ## Total expected savings ~1.65 GB per task across the three fixes. 4-task demand drops from ~6.25 GB to ~4.57 GB on a 9 GB executor heap with 4 concurrent tasks. ## Tests - testReInsertionPreservesOffsetOrder (parameterized × compression on/off): puts a key twice, asserts re-inserted key lands at iteration tail. Locks down the LinkedHashMap-insertion-order == disk-offset-order invariant. - testConcurrentReadersAndWriter: writer + reader threads; asserts no CME and that the RW-lock contract holds. - Full TestBitCaskDiskMap + TestExternalSpillableMap suite: 45/0/1 on Java 11 with -Dscala-2.12 -Dspark3.5. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
Thanks for working on this! This PR reduces MDT compaction heap pressure by unboxing ValueMetadata primitives, switching BitCaskDiskMap from ConcurrentHashMap to a LinkedHashMap guarded by a ReentrantReadWriteLock (so iteration relies on insertion-order == disk-offset-order instead of an O(N log N) sort), and adding a lazy keyStream() to avoid a transient key-set copy in HoodieSortedMergeHandle.
I traced the locking model, the insertion-order/offset-order invariant, the serialization/flush moves outside the write lock, the primitive getter change, and the keyStream dedup — these all look carefully handled and correct. One inline question on whether the keyStream() path fully realizes the keySet-copy savings once the map has spilled to BitCaskDiskMap. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A few small readability nits in the new test and one redundant stream call in the production code, but the overall structure and commenting of the changes is solid.
| if (diskBasedMap == null) { | ||
| return inMemoryMap.keySet().stream(); | ||
| } | ||
| return Stream.concat(inMemoryMap.keySet().stream(), diskBasedMap.keySet().stream()); |
There was a problem hiding this comment.
🤖 When the map has spilled, keyStream() calls diskBasedMap.keySet(), which this PR changes to eagerly return new HashSet<>(valueMetadataMap.keySet()) of all disk keys. So for a heavily-spilled map (the MDT case) the transient HashSet is still materialized here — does this fully realize the consumer-E savings, or would a truly streaming keySet on BitCaskDiskMap be needed? The javadoc's claim that keyStream "does NOT allocate a new HashSet copying all keys" seems only partially true once disk keys dominate.
| records.put(firstKey, updatedValue); | ||
|
|
||
| // size must remain 5 (re-insertion replaces the existing entry). | ||
| List<HoodieRecord> iteratedFromEntrySet = new ArrayList<>(); |
There was a problem hiding this comment.
🤖 nit: iteratedFromEntrySet is populated by calling records.iterator(), not records.entrySet() — could you rename it to something like iteratedRecords or iteratedValues so a future reader isn't sent looking for an entrySet() call?
| allValues.add(value); | ||
| } | ||
|
|
||
| java.util.concurrent.atomic.AtomicBoolean writerDone = new java.util.concurrent.atomic.AtomicBoolean(false); |
There was a problem hiding this comment.
🤖 nit: AtomicBoolean and AtomicReference are used with fully-qualified names here — could you add imports at the top of the file instead? Fully-qualified names mid-method add visual noise and make the lines quite hard to scan.
| } finally { | ||
| mapReadLock.unlock(); | ||
| } | ||
| return snapshot.stream().sequential().map(valueMetaData -> (R) get(valueMetaData, file, valueSerializer, isCompressionEnabled)); |
There was a problem hiding this comment.
🤖 nit: .sequential() is a no-op here — ArrayList.stream() already returns a sequential stream. Could you drop it? It may leave a future reader wondering whether parallelism was a concern and whether the stream might become parallel.
Change Logs
Fixes #19084.
MDT
record_indexpartition compaction OOMs executors configured with 9 GB heap, four concurrent tasks, andspark.memory.fraction=0.3. Heap analysis identified three structural consumers that aren't bounded by the configured compaction memory cap (hoodie.memory.compaction.fraction):BitCaskDiskMap.valueMetadataMap— 18.3MValueMetadataentries withInteger/Longboxing overhead dominating (57.2Mjava.lang.Longinstances = ~1.28 GB pure boxing)LazyFileIteratormaterialised a full sortedArrayList(4.58M entries × 4 tasks) at iterator-open time becauseConcurrentHashMapdoesn't preserve insertion orderExternalSpillableMap.keySet()allocated a transientHashSetcopying all 6.29M keys solely to seed thePriorityQueueinHoodieSortedMergeHandle's constructor4-task demand: ~6.25 GB on a 9 GB executor — OOMs under GC pressure. Lowering
hoodie.memory.compaction.fractionmakes B larger and the OOM worse, so the fix must be code, not config.Changes
Unbox
ValueMetadataprimitives (BitCaskDiskMap.java) —Integer/Long→int/longforsizeOfValue,offsetOfValue,timestamp. Saves ~880 MB across 4 tasks. The Lombok@AllArgsConstructor+@Getterannotations are preserved; getter return types switch from boxed to primitive (autoboxing handles any external caller).LinkedHashMap+ReentrantReadWriteLockforvalueMetadataMap(BitCaskDiskMap.java,LazyFileIterable.java) —filePositionis strictly monotonic under the write lock (append-only file + monotonicSizeAwareDataOutputStream.getSize()), so insertion order = disk offset order.LazyFileIteratorno longer sorts; it consumes a pre-built snapshot list taken under the read lock.put()does remove-before-put under the write lock so re-inserted keys land at theLinkedHashMaptail. All read paths (get,containsKey,size,iterator,valueStream,entrySet,keySet,iterator(Predicate)) take the read lock and snapshot before releasing.close()andclear()take the write lock so a concurrent reader underSpillableMapBasedFileSystemView's sharedreadLockcan't observe a half-cleared map. Saves ~513 MB across 4 tasks.ExternalSpillableMap.keyStream()+HoodieSortedMergeHandleuses it (ExternalSpillableMap.java,HoodieSortedMergeHandle.java) —keyStream()returnsStream.concat(inMemoryKeys, diskKeys)without allocating aHashSetcopy.HoodieSortedMergeHandlenow collects the stream into anArrayListand passes it tonew PriorityQueue<>(list)to get the O(N) heapify (vs. O(N log N) per-add). Saves ~256 MB peak spike per task.Total: ~1.65 GB per task. 4-task demand drops from ~6.25 GB to ~4.57 GB.
Impact
Eliminates executor OOM during MDT
record_indexcompaction on commonly-sized executors. No on-disk format change. No public-API surface change (ValueMetadata.getOffsetOfValue()return type changes fromLongtolong, butValueMetadatais an internal class and autoboxing handles any external callers).Risk Level
Low. The locking model is conservative (RW-lock with read-shared concurrency preserved), the insertion-order = offset-order invariant is proved from append-only file + monotonic
filePosition+ write-lock, and the regression test suite locks down both the invariant and the concurrent-read contract.Documentation Update
None required.
Tests
TestBitCaskDiskMap#testReInsertionPreservesOffsetOrder(parameterized × compression on/off): puts a key twice, asserts re-inserted key lands at iteration tail. Locks down theLinkedHashMap-insertion-order == disk-offset-order invariant against future refactors ofput().TestBitCaskDiskMap#testConcurrentReadersAndWriter: writer + reader threads; asserts noConcurrentModificationExceptionand that the RW-lock contract holds.TestBitCaskDiskMap+TestExternalSpillableMapsuite: 45 run / 0 failed / 1 skipped on Java 11 with-Dscala-2.12 -Dspark3.5.Contributor's checklist