From 5fa336c4c2ce1a25a480f955e898f3352c2aa339 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Sun, 28 Jun 2026 14:25:26 -0700 Subject: [PATCH] fix(metadata): reduce MDT compaction heap pressure in BitCaskDiskMap and ExternalSpillableMap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes apache/hudi#19084. ## Problem MDT `record_index` compaction tasks were OOMing on executors with 9 GB heap, four concurrent tasks, and `spark.memory.fraction=0.3`. Heap analysis identified three structural consumers that aren't bounded by the configured compaction memory cap: B ~3.0–3.2 GB BitCaskDiskMap.valueMetadataMap — 18.3M ValueMetadata entries with Integer/Long boxing overhead dominating (57.2M java.lang.Long instances = ~1.28 GB pure boxing) F ~513 MB LazyFileIterator sorted ArrayList (4.58M entries × 4 tasks) materialised at iterator-open time because ConcurrentHashMap doesn't preserve insertion order E ~256 MB peak ExternalSpillableMap.keySet() HashSet spike (6.29M keys copied to seed a PriorityQueue) Total ~1.6 GB per task across 4 concurrent tasks, on top of the bounded in-memory record map. Lowering hoodie.memory.compaction.fraction makes B larger and the OOM worse — fix has to be code, not config. ## Changes 1. Unbox ValueMetadata primitives (BitCaskDiskMap.java) Integer/Long → int/long for sizeOfValue, offsetOfValue, timestamp. Saves ~880 MB across 4 tasks. 2. LinkedHashMap + ReentrantReadWriteLock for valueMetadataMap (BitCaskDiskMap.java, LazyFileIterable.java) - filePosition is strictly monotonic under the write lock (append-only file + monotonic SizeAwareDataOutputStream.getSize()), so insertion order = disk offset order. LazyFileIterator no longer sorts; it consumes a pre-built snapshot list taken under the read lock. - put() does remove-before-put under the write lock so re-inserted keys land at the LinkedHashMap tail (preserves offset-order). - All read paths (get, containsKey, size, iterator, valueStream, entrySet, keySet, iterator(Predicate)) take the read lock and snapshot before releasing. - close() and clear() take the write lock so a concurrent reader under SpillableMapBasedFileSystemView's shared readLock can't observe a half-cleared map. Saves ~513 MB across 4 tasks. 3. ExternalSpillableMap.keyStream() + HoodieSortedMergeHandle uses it (ExternalSpillableMap.java, HoodieSortedMergeHandle.java) keyStream() returns Stream.concat(inMemoryKeys, diskKeys) without allocating a HashSet copy. HoodieSortedMergeHandle now collects the stream into an ArrayList and passes it to new PriorityQueue<>(list) to get the O(N) heapify (vs. O(N log N) per-add). Saves ~256 MB peak spike per task. ## Total expected savings ~1.65 GB per task across the three fixes. 4-task demand drops from ~6.25 GB to ~4.57 GB on a 9 GB executor heap with 4 concurrent tasks. ## Tests - testReInsertionPreservesOffsetOrder (parameterized × compression on/off): puts a key twice, asserts re-inserted key lands at iteration tail. Locks down the LinkedHashMap-insertion-order == disk-offset-order invariant. - testConcurrentReadersAndWriter: writer + reader threads; asserts no CME and that the RW-lock contract holds. - Full TestBitCaskDiskMap + TestExternalSpillableMap suite: 45/0/1 on Java 11 with -Dscala-2.12 -Dspark3.5. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/io/HoodieSortedMergeHandle.java | 32 +++- .../util/collection/BitCaskDiskMap.java | 180 +++++++++++++++--- .../util/collection/ExternalSpillableMap.java | 16 ++ .../util/collection/LazyFileIterable.java | 61 ++++-- .../util/collection/TestBitCaskDiskMap.java | 120 ++++++++++++ 5 files changed, 365 insertions(+), 44 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java index 9456d5ce586bb..81f7dc99e542d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.keygen.BaseKeyGenerator; @@ -32,11 +33,13 @@ import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; +import java.util.stream.Collectors; /** * Hoodie merge handle which writes records (new inserts or updates) sorted by their key. @@ -47,13 +50,13 @@ @NotThreadSafe public class HoodieSortedMergeHandle extends HoodieWriteMergeHandle { - private final Queue newRecordKeysSorted = new PriorityQueue<>(); + private final Queue newRecordKeysSorted; public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); - newRecordKeysSorted.addAll(keyToNewRecords.keySet()); + this.newRecordKeysSorted = buildSortedKeys(keyToNewRecords); } /** @@ -63,8 +66,31 @@ public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, Hoo Map> keyToNewRecordsOrig, String partitionPath, String fileId, HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { super(config, instantTime, hoodieTable, keyToNewRecordsOrig, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); + this.newRecordKeysSorted = buildSortedKeys(keyToNewRecords); + } - newRecordKeysSorted.addAll(keyToNewRecords.keySet()); + /** + * Builds the sorted-keys {@link PriorityQueue} from the given map. + * + *

Two memory/CPU considerations: + *

    + *
  • When the map is a spilled {@link ExternalSpillableMap}, {@link java.util.Map#keySet()} + * allocates a full {@link java.util.HashSet} copy of all keys (in-memory + disk) before + * returning, creating a large transient heap spike (ENG-43078). + * {@link ExternalSpillableMap#keyStream()} streams keys lazily without that copy.
  • + *
  • The {@link PriorityQueue#PriorityQueue(java.util.Collection) Collection constructor} + * runs a single O(N) heapify, whereas individual {@code add()} calls are O(log N) each + * (total O(N log N)). We collect to an {@link ArrayList} first to get the heapify path.
  • + *
+ */ + private Queue buildSortedKeys(Map> map) { + List keys; + if (map instanceof ExternalSpillableMap) { + keys = ((ExternalSpillableMap>) map).keyStream().collect(Collectors.toList()); + } else { + keys = new ArrayList<>(map.keySet()); + } + return new PriorityQueue<>(keys); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index e7b298dc2b8d4..3c2c708ca59e4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -45,14 +45,15 @@ import java.util.Collection; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -77,8 +78,35 @@ public final class BitCaskDiskMap extends DiskMap DISK_COMPRESSION_REF = ThreadLocal.withInitial(CompressionHandler::new); - // Stores the key and corresponding value's latest metadata spilled to disk + // Stores the key and corresponding value's latest metadata spilled to disk. + // LinkedHashMap preserves insertion order, which equals disk offset order because the file is + // append-only and all writes hold mapWriteLock. LazyFileIterator relies on this to iterate in + // offset order without an extra sort or ArrayList copy (see ENG-43078). + // + // LinkedHashMap invariants that must be maintained internally to this class + // (callers of put/get/remove see only the Map contract; these are NOT caller-facing rules): + // (a) accessOrder is false (default constructor) — get() must NOT move entries; only put() + // appends to the tail. Never use LinkedHashMap(capacity, loadFactor, accessOrder=true). + // (b) Re-insertion: LinkedHashMap.put(k, v) does NOT move an existing key to the tail. + // BitCaskDiskMap.put() therefore calls valueMetadataMap.remove(key) before the put() + // (under mapWriteLock) so the new, higher disk offset lands at the tail and the + // insertion-order == disk-offset-order invariant is preserved. Internal modifications + // to valueMetadataMap must do the same — never use compute(), merge(), replace(), or + // putIfAbsent() directly on valueMetadataMap; they share this re-insertion pitfall. + // + // Thread-safety model: a ReentrantReadWriteLock guards valueMetadataMap. + // mapWriteLock (exclusive): held by put(), remove(), clear(), close() for the entire + // operation (map update + disk write must be atomic together). + // mapReadLock (shared): held by get(), containsKey(), size(), isEmpty(), and snapshot + // operations (iterator, valueStream, keySet, entrySet). Multiple concurrent readers are + // allowed; disk I/O in get() is performed after releasing mapReadLock to avoid blocking + // other readers while waiting on a seek/read. + // This restores the concurrent-read behaviour that ConcurrentHashMap provided while keeping + // LinkedHashMap's insertion-order guarantee for lock-free iteration (ENG-43078). private final Map valueMetadataMap; + private final ReentrantReadWriteLock mapLock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock mapReadLock = mapLock.readLock(); + private final ReentrantReadWriteLock.WriteLock mapWriteLock = mapLock.writeLock(); // Enables compression for all values stored in the disk map private final boolean isCompressionEnabled; // Write only file @@ -101,7 +129,7 @@ public final class BitCaskDiskMap extends DiskMap valueSerializer, boolean isCompressionEnabled) throws IOException { super(baseFilePath, ExternalSpillableMap.DiskMapType.BITCASK.name()); - this.valueMetadataMap = new ConcurrentHashMap<>(); + this.valueMetadataMap = new LinkedHashMap<>(); this.isCompressionEnabled = isCompressionEnabled; this.writeOnlyFile = new File(diskMapPath, UUID.randomUUID().toString()); this.filePath = writeOnlyFile.getPath(); @@ -159,7 +187,16 @@ private void flushToDisk() { */ @Override public Iterator iterator() { - ClosableIterator iterator = new LazyFileIterable(filePath, valueMetadataMap, valueSerializer, isCompressionEnabled).iterator(); + // Snapshot under mapReadLock so that concurrent puts cannot cause + // ConcurrentModificationException when the iterator is consumed. + final List> snapshot; + mapReadLock.lock(); + try { + snapshot = new ArrayList<>(valueMetadataMap.entrySet()); + } finally { + mapReadLock.unlock(); + } + ClosableIterator iterator = new LazyFileIterable<>(filePath, snapshot, valueSerializer, isCompressionEnabled).iterator(); this.iterators.add(iterator); return iterator; } @@ -169,8 +206,16 @@ public Iterator iterator() { */ @Override public Iterator iterator(Predicate filter) { - Map filteredValueMetadata = valueMetadataMap.entrySet().stream().filter(e -> filter.test(e.getKey())).collect(Collectors.toMap(Entry::getKey, Entry::getValue)); - ClosableIterator iterator = new LazyFileIterable(filePath, filteredValueMetadata, valueSerializer, isCompressionEnabled).iterator(); + final List> snapshot; + mapReadLock.lock(); + try { + snapshot = valueMetadataMap.entrySet().stream() + .filter(e -> filter.test(e.getKey())) + .collect(Collectors.toList()); + } finally { + mapReadLock.unlock(); + } + ClosableIterator iterator = new LazyFileIterable<>(filePath, snapshot, valueSerializer, isCompressionEnabled).iterator(); this.iterators.add(iterator); return iterator; } @@ -185,17 +230,32 @@ public long sizeOfFileOnDiskInBytes() { @Override public int size() { - return valueMetadataMap.size(); + mapReadLock.lock(); + try { + return valueMetadataMap.size(); + } finally { + mapReadLock.unlock(); + } } @Override public boolean isEmpty() { - return valueMetadataMap.isEmpty(); + mapReadLock.lock(); + try { + return valueMetadataMap.isEmpty(); + } finally { + mapReadLock.unlock(); + } } @Override public boolean containsKey(Object key) { - return valueMetadataMap.containsKey(key); + mapReadLock.lock(); + try { + return valueMetadataMap.containsKey(key); + } finally { + mapReadLock.unlock(); + } } @Override @@ -205,7 +265,15 @@ public boolean containsValue(Object value) { @Override public R get(Object key) { - ValueMetadata entry = valueMetadataMap.get(key); + // Hold mapReadLock only for the metadata lookup; release before disk I/O so that + // concurrent get() calls on different keys are not serialized by the I/O wait. + ValueMetadata entry; + mapReadLock.lock(); + try { + entry = valueMetadataMap.get(key); + } finally { + mapReadLock.unlock(); + } if (entry == null) { return null; } @@ -228,18 +296,31 @@ public static V get(ValueMetadata entry, RandomAccessFile file, CustomSerial } } - private synchronized R put(T key, R value, boolean flush) { + private R put(T key, R value, boolean flush) { try { byte[] val = isCompressionEnabled ? DISK_COMPRESSION_REF.get().compressBytes(valueSerializer.serialize(value)) : valueSerializer.serialize(value); int valueSize = val.length; long timestamp = System.currentTimeMillis(); - this.valueMetadataMap.put(key, - new BitCaskDiskMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp)); byte[] serializedKey = SerializationUtils.serialize(key); - filePosition - .set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(generateChecksum(val), - serializedKey.length, valueSize, serializedKey, val, timestamp))); + // Hold mapWriteLock for the entire map-update + disk-write to keep them atomic: + // a concurrent get() must not see a metadata entry whose bytes haven't been flushed yet. + mapWriteLock.lock(); + try { + // Remove before re-inserting so the key always lands at the LinkedHashMap tail. + // LinkedHashMap does not move an existing key on put() — without this remove, a + // re-inserted key would stay at its original position while its new offset is higher + // than all entries that follow it, breaking the insertion-order == offset-order invariant. + // The old bytes remain in the disk file as dead bytes (existing behaviour for remove()). + valueMetadataMap.remove(key); + this.valueMetadataMap.put(key, + new BitCaskDiskMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp)); + filePosition + .set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(generateChecksum(val), + serializedKey.length, valueSize, serializedKey, val, timestamp))); + } finally { + mapWriteLock.unlock(); + } if (flush) { flushToDisk(); } @@ -256,9 +337,17 @@ public R put(T key, R value) { @Override public R remove(Object key) { - R value = get(key); - valueMetadataMap.remove(key); - return value; + ValueMetadata entry; + mapWriteLock.lock(); + try { + entry = valueMetadataMap.remove(key); + } finally { + mapWriteLock.unlock(); + } + if (entry == null) { + return null; + } + return get(entry); } @Override @@ -271,14 +360,27 @@ public void putAll(Map m) { @Override public void clear() { - valueMetadataMap.clear(); + // Acquire mapWriteLock so a concurrent reader under mapReadLock (e.g. via + // SpillableMapBasedFileSystemView's shared readLock) can't observe a half-cleared map. + mapWriteLock.lock(); + try { + valueMetadataMap.clear(); + } finally { + mapWriteLock.unlock(); + } // Do not delete file-handles & file as there is no way to do it without synchronizing get/put(and // reducing concurrency). Instead, just clear the pointer map. The file will be removed on exit. } @Override public void close() { - valueMetadataMap.clear(); + // See clear() for why this needs mapWriteLock. + mapWriteLock.lock(); + try { + valueMetadataMap.clear(); + } finally { + mapWriteLock.unlock(); + } try { if (writeOnlyFileHandle != null) { writeOnlyFileHandle.flush(); @@ -309,7 +411,12 @@ public void close() { @Override public Set keySet() { - return valueMetadataMap.keySet(); + mapReadLock.lock(); + try { + return new HashSet<>(valueMetadataMap.keySet()); + } finally { + mapReadLock.unlock(); + } } @Override @@ -320,13 +427,30 @@ public Collection values() { @Override public Stream valueStream() { final BufferedRandomAccessFile file = getRandomAccessFile(); - return valueMetadataMap.values().stream().sorted().map(valueMetaData -> get(valueMetaData, file, valueSerializer, isCompressionEnabled)); + // Snapshot under mapReadLock. LinkedHashMap insertion order equals disk offset order + // (see put()), so no sort is needed — values are already in forward-read order. + final List snapshot; + mapReadLock.lock(); + try { + snapshot = new ArrayList<>(valueMetadataMap.values()); + } finally { + mapReadLock.unlock(); + } + return snapshot.stream().sequential().map(valueMetaData -> (R) get(valueMetaData, file, valueSerializer, isCompressionEnabled)); } @Override public Set> entrySet() { + // Snapshot keys under mapReadLock; disk I/O per-key happens outside the lock. + final Set keySnapshot; + mapReadLock.lock(); + try { + keySnapshot = new HashSet<>(valueMetadataMap.keySet()); + } finally { + mapReadLock.unlock(); + } Set> entrySet = new HashSet<>(); - for (T key : valueMetadataMap.keySet()) { + for (T key : keySnapshot) { entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key))); } return entrySet; @@ -388,11 +512,13 @@ public static final class ValueMetadata implements Comparable { // FilePath to store the spilled data private final String filePath; // Size (numberOfBytes) of the value written to disk - private final Integer sizeOfValue; + // ENG-43078: primitive fields avoid ~64 B/entry of boxing overhead (Integer + 2 Long wrappers) + // across the millions of spilled entries held in valueMetadataMap. + private final int sizeOfValue; // FilePosition of the value written to disk - private final Long offsetOfValue; + private final long offsetOfValue; // Current timestamp when the value was written to disk - private final Long timestamp; + private final long timestamp; @Override public int compareTo(ValueMetadata o) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index ce49df4ef602f..ca974aaf34cfc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -315,6 +315,22 @@ public Stream valueStream() { return Stream.concat(inMemoryMap.values().stream(), diskBasedMap.valueStream()); } + /** + * Returns a lazy stream over all keys in this map (in-memory + spilled). + * + * Unlike {@link #keySet()}, this method does NOT allocate a new {@link java.util.HashSet} copying + * all keys. When the map has spilled ({@code diskBasedMap != null}), {@link #keySet()} must + * materialise a full copy to combine both key sets into a single {@link java.util.Set}. Callers + * that only need to iterate keys once (e.g. to seed a {@link java.util.PriorityQueue}) should + * prefer this method to avoid the transient per-task heap spike (ENG-43078). + */ + public Stream keyStream() { + if (diskBasedMap == null) { + return inMemoryMap.keySet().stream(); + } + return Stream.concat(inMemoryMap.keySet().stream(), diskBasedMap.keySet().stream()); + } + @Override public Set> entrySet() { if (diskBasedMap == null) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java index 493593dd1fe90..4cc5ad53d44c5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java @@ -25,10 +25,11 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.function.Consumer; -import java.util.stream.Collectors; /** * Iterable to lazily fetch values spilled to disk. This class uses BufferedRandomAccessFile to randomly access the position of @@ -39,25 +40,54 @@ public class LazyFileIterable implements Iterable { // Used to access the value written at a specific position in the file private final String filePath; - // Stores the key and corresponding value's latest metadata spilled to disk - private final Map inMemoryMetadataOfSpilledData; + // Pre-built snapshot of key→metadata entries in disk-offset order. Taking the snapshot at + // construction time (under the BitCaskDiskMap lock) ensures thread safety when + // SpillableMapBasedFileSystemView has concurrent puts and reads under its readLock. + private final List> snapshot; // Was compressions enabled for the values when inserted into the file/ map private final boolean isCompressionEnabled; private final CustomSerializer serializer; private transient Thread shutdownThread = null; - public LazyFileIterable(String filePath, Map map, CustomSerializer serializer, boolean isCompressionEnabled) { + /** + * Constructs a LazyFileIterable with a pre-built snapshot of metadata entries. + * Callers (e.g. {@link BitCaskDiskMap#iterator()}) must take the snapshot under the + * BitCaskDiskMap lock before invoking this constructor. + */ + LazyFileIterable(String filePath, List> snapshot, + CustomSerializer serializer, boolean isCompressionEnabled) { this.filePath = filePath; - this.inMemoryMetadataOfSpilledData = map; + this.snapshot = snapshot; + this.serializer = serializer; this.isCompressionEnabled = isCompressionEnabled; + } + + /** + * Constructs a LazyFileIterable from a live metadata map. The snapshot is taken immediately + * at construction time, but WITHOUT any lock — concurrent writers can cause + * {@link java.util.ConcurrentModificationException}. Prefer the list-based constructor when + * the caller already holds a lock that must cover the snapshot (e.g. in + * {@link BitCaskDiskMap#iterator()}). + * + * @deprecated The unsynchronized snapshot is unsafe under the new {@code BitCaskDiskMap} + * locking model (ENG-43078). Callers should take a snapshot under the appropriate lock + * and use the {@link #LazyFileIterable(String, List, CustomSerializer, boolean) list-based + * constructor}. Retained only for binary compatibility with any external callers. + */ + @Deprecated + public LazyFileIterable(String filePath, Map map, + CustomSerializer serializer, boolean isCompressionEnabled) { + this.filePath = filePath; + this.snapshot = new ArrayList<>(map.entrySet()); this.serializer = serializer; + this.isCompressionEnabled = isCompressionEnabled; } @Override public ClosableIterator iterator() { try { - return new LazyFileIterator<>(filePath, inMemoryMetadataOfSpilledData, serializer); + return new LazyFileIterator<>(filePath, snapshot, serializer); } catch (IOException io) { throw new HoodieException("Unable to initialize iterator for file on disk", io); } @@ -66,24 +96,27 @@ public ClosableIterator iterator() { /** * Iterator implementation for the iterable defined above. */ - public class LazyFileIterator implements ClosableIterator { + public class LazyFileIterator implements ClosableIterator { private final String filePath; private BufferedRandomAccessFile readOnlyFileHandle; private final Iterator> metadataIterator; private final CustomSerializer serializer; - public LazyFileIterator(String filePath, Map map, CustomSerializer serializer) throws IOException { + /** + * Constructs the iterator from a pre-built snapshot list. + * The snapshot must have been taken by the caller (e.g. under the BitCaskDiskMap lock) + * to ensure no entries are missed and no ConcurrentModificationException occurs. + * ENG-43078: BitCaskDiskMap.valueMetadataMap is a LinkedHashMap whose insertion order + * equals disk offset order, so no sort is needed. + */ + public LazyFileIterator(String filePath, List> snapshot, + CustomSerializer serializer) throws IOException { this.filePath = filePath; this.readOnlyFileHandle = new BufferedRandomAccessFile(filePath, "r", BitCaskDiskMap.BUFFER_SIZE); this.serializer = serializer; readOnlyFileHandle.seek(0); - - // sort the map in increasing order of offset of value so disk seek is only in one(forward) direction - this.metadataIterator = map.entrySet().stream() - .sorted((Map.Entry o1, Map.Entry o2) -> o1 - .getValue().getOffsetOfValue().compareTo(o2.getValue().getOffsetOfValue())) - .collect(Collectors.toList()).iterator(); + this.metadataIterator = snapshot.iterator(); this.addShutdownHook(); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java index 98174665b8358..034f0b9784a70 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java @@ -61,6 +61,8 @@ import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -275,4 +277,122 @@ private void verifyCleanup(BitCaskDiskMap records) { records.close(); assertEquals(Objects.requireNonNull(basePathDir.list()).length, 0); } + + /** + * Re-insertion order: when a key is put twice, iteration order must place it last (i.e. the + * second put's higher disk offset is at the LinkedHashMap tail), and reads must return the + * latest value. Locks down the invariant that {@code BitCaskDiskMap.put} removes-before-puts + * to keep {@code LinkedHashMap} insertion-order == disk-offset-order (ENG-43078). + */ + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testReInsertionPreservesOffsetOrder(boolean isCompressionEnabled) throws IOException, URISyntaxException { + try (BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, new DefaultSerializer<>(), isCompressionEnabled)) { + SchemaTestUtil testUtil = new SchemaTestUtil(); + List iRecords = testUtil.generateHoodieTestRecords(0, 5); + List orderedKeys = new ArrayList<>(); + for (IndexedRecord r : iRecords) { + String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partitionPath = ((GenericRecord) r).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + HoodieRecord value = new HoodieAvroRecord<>(new HoodieKey(key, partitionPath), new HoodieAvroPayload(Option.of((GenericRecord) r))); + records.put(key, value); + orderedKeys.add(key); + } + // Re-insert the first key; it must move to the iteration tail. + String firstKey = orderedKeys.get(0); + List updatedRec = testUtil.generateHoodieTestRecords(100, 1); + GenericRecord updatedGeneric = (GenericRecord) updatedRec.get(0); + HoodieRecord updatedValue = new HoodieAvroRecord<>( + new HoodieKey(firstKey, updatedGeneric.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()), + new HoodieAvroPayload(Option.of(updatedGeneric))); + records.put(firstKey, updatedValue); + + // size must remain 5 (re-insertion replaces the existing entry). + List iteratedFromEntrySet = new ArrayList<>(); + Iterator iter = records.iterator(); + while (iter.hasNext()) { + iteratedFromEntrySet.add(iter.next()); + } + assertEquals(5, iteratedFromEntrySet.size()); + // Use the stable HoodieRecord.getRecordKey() accessor rather than dereffing payload avro + // (whose schema may not be metadata-augmented in this test setup). + assertEquals(firstKey, + iteratedFromEntrySet.get(4).getRecordKey(), + "Re-inserted key must appear last in iteration order"); + + // Read-back of the re-inserted key must return the latest value. + HoodieRecord readBack = records.get(firstKey); + assertNotNull(readBack); + } + } + + /** + * Concurrent reader + writer: while a writer thread does put/remove under {@code mapWriteLock}, + * a reader thread doing {@code get}/{@code containsKey} under {@code mapReadLock} must not throw + * {@link java.util.ConcurrentModificationException} and must observe values eventually. + * Locks down the RW-lock semantics introduced in ENG-43078. + */ + @Test + public void testConcurrentReadersAndWriter() throws Exception { + try (BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, new DefaultSerializer<>(), false)) { + SchemaTestUtil testUtil = new SchemaTestUtil(); + List iRecords = testUtil.generateHoodieTestRecords(0, 200); + List allKeys = new ArrayList<>(); + List allValues = new ArrayList<>(); + for (IndexedRecord r : iRecords) { + String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partitionPath = ((GenericRecord) r).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + HoodieRecord value = new HoodieAvroRecord<>(new HoodieKey(key, partitionPath), new HoodieAvroPayload(Option.of((GenericRecord) r))); + allKeys.add(key); + allValues.add(value); + } + + java.util.concurrent.atomic.AtomicBoolean writerDone = new java.util.concurrent.atomic.AtomicBoolean(false); + java.util.concurrent.atomic.AtomicReference failure = new java.util.concurrent.atomic.AtomicReference<>(); + + Thread writer = new Thread(() -> { + try { + for (int i = 0; i < allKeys.size(); i++) { + records.put(allKeys.get(i), allValues.get(i)); + } + } catch (Throwable t) { + failure.compareAndSet(null, t); + } finally { + writerDone.set(true); + } + }, "bitcask-writer"); + + Thread reader = new Thread(() -> { + try { + while (!writerDone.get()) { + for (String key : allKeys) { + records.containsKey(key); // must not throw + HoodieRecord r = records.get(key); + if (r != null) { + assertNotNull(r.getData()); + } + } + records.iterator(); + records.size(); + } + } catch (Throwable t) { + failure.compareAndSet(null, t); + } + }, "bitcask-reader"); + + writer.start(); + reader.start(); + writer.join(30_000); + reader.join(30_000); + assertFalse(writer.isAlive(), "writer should have finished"); + assertFalse(reader.isAlive(), "reader should have finished"); + if (failure.get() != null) { + throw new AssertionError("concurrent access threw", failure.get()); + } + assertEquals(allKeys.size(), records.size()); + for (String key : allKeys) { + assertNotNull(records.get(key)); + } + } + } }