Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.keygen.BaseKeyGenerator;
Expand All @@ -32,11 +33,13 @@
import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.stream.Collectors;

/**
* Hoodie merge handle which writes records (new inserts or updates) sorted by their key.
Expand All @@ -47,13 +50,13 @@
@NotThreadSafe
public class HoodieSortedMergeHandle<T, I, K, O> extends HoodieWriteMergeHandle<T, I, K, O> {

private final Queue<String> newRecordKeysSorted = new PriorityQueue<>();
private final Queue<String> newRecordKeysSorted;

public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
newRecordKeysSorted.addAll(keyToNewRecords.keySet());
this.newRecordKeysSorted = buildSortedKeys(keyToNewRecords);
}

/**
Expand All @@ -63,8 +66,31 @@ public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, Hoo
Map<String, HoodieRecord<T>> keyToNewRecordsOrig, String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, instantTime, hoodieTable, keyToNewRecordsOrig, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
this.newRecordKeysSorted = buildSortedKeys(keyToNewRecords);
}

newRecordKeysSorted.addAll(keyToNewRecords.keySet());
/**
* Builds the sorted-keys {@link PriorityQueue} from the given map.
*
* <p>Two memory/CPU considerations:
* <ul>
* <li>When the map is a spilled {@link ExternalSpillableMap}, {@link java.util.Map#keySet()}
* allocates a full {@link java.util.HashSet} copy of all keys (in-memory + disk) before
* returning, creating a large transient heap spike (ENG-43078).
* {@link ExternalSpillableMap#keyStream()} streams keys lazily without that copy.</li>
* <li>The {@link PriorityQueue#PriorityQueue(java.util.Collection) Collection constructor}
* runs a single O(N) heapify, whereas individual {@code add()} calls are O(log N) each
* (total O(N log N)). We collect to an {@link ArrayList} first to get the heapify path.</li>
* </ul>
*/
private Queue<String> buildSortedKeys(Map<String, HoodieRecord<T>> map) {
List<String> keys;
if (map instanceof ExternalSpillableMap) {
keys = ((ExternalSpillableMap<String, HoodieRecord<T>>) map).keyStream().collect(Collectors.toList());
} else {
keys = new ArrayList<>(map.keySet());
}
return new PriorityQueue<>(keys);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -77,8 +78,35 @@ public final class BitCaskDiskMap<T extends Serializable, R> extends DiskMap<T,
private static final ThreadLocal<CompressionHandler> DISK_COMPRESSION_REF =
ThreadLocal.withInitial(CompressionHandler::new);

// Stores the key and corresponding value's latest metadata spilled to disk
// Stores the key and corresponding value's latest metadata spilled to disk.
// LinkedHashMap preserves insertion order, which equals disk offset order because the file is
// append-only and all writes hold mapWriteLock. LazyFileIterator relies on this to iterate in
// offset order without an extra sort or ArrayList copy (see ENG-43078).
//
// LinkedHashMap invariants that must be maintained internally to this class
// (callers of put/get/remove see only the Map contract; these are NOT caller-facing rules):
// (a) accessOrder is false (default constructor) — get() must NOT move entries; only put()
// appends to the tail. Never use LinkedHashMap(capacity, loadFactor, accessOrder=true).
// (b) Re-insertion: LinkedHashMap.put(k, v) does NOT move an existing key to the tail.
// BitCaskDiskMap.put() therefore calls valueMetadataMap.remove(key) before the put()
// (under mapWriteLock) so the new, higher disk offset lands at the tail and the
// insertion-order == disk-offset-order invariant is preserved. Internal modifications
// to valueMetadataMap must do the same — never use compute(), merge(), replace(), or
// putIfAbsent() directly on valueMetadataMap; they share this re-insertion pitfall.
//
// Thread-safety model: a ReentrantReadWriteLock guards valueMetadataMap.
// mapWriteLock (exclusive): held by put(), remove(), clear(), close() for the entire
// operation (map update + disk write must be atomic together).
// mapReadLock (shared): held by get(), containsKey(), size(), isEmpty(), and snapshot
// operations (iterator, valueStream, keySet, entrySet). Multiple concurrent readers are
// allowed; disk I/O in get() is performed after releasing mapReadLock to avoid blocking
// other readers while waiting on a seek/read.
// This restores the concurrent-read behaviour that ConcurrentHashMap provided while keeping
// LinkedHashMap's insertion-order guarantee for lock-free iteration (ENG-43078).
private final Map<T, ValueMetadata> valueMetadataMap;
private final ReentrantReadWriteLock mapLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock mapReadLock = mapLock.readLock();
private final ReentrantReadWriteLock.WriteLock mapWriteLock = mapLock.writeLock();
// Enables compression for all values stored in the disk map
private final boolean isCompressionEnabled;
// Write only file
Expand All @@ -101,7 +129,7 @@ public final class BitCaskDiskMap<T extends Serializable, R> extends DiskMap<T,

public BitCaskDiskMap(String baseFilePath, CustomSerializer<R> valueSerializer, boolean isCompressionEnabled) throws IOException {
super(baseFilePath, ExternalSpillableMap.DiskMapType.BITCASK.name());
this.valueMetadataMap = new ConcurrentHashMap<>();
this.valueMetadataMap = new LinkedHashMap<>();
this.isCompressionEnabled = isCompressionEnabled;
this.writeOnlyFile = new File(diskMapPath, UUID.randomUUID().toString());
this.filePath = writeOnlyFile.getPath();
Expand Down Expand Up @@ -159,7 +187,16 @@ private void flushToDisk() {
*/
@Override
public Iterator<R> iterator() {
ClosableIterator<R> iterator = new LazyFileIterable(filePath, valueMetadataMap, valueSerializer, isCompressionEnabled).iterator();
// Snapshot under mapReadLock so that concurrent puts cannot cause
// ConcurrentModificationException when the iterator is consumed.
final List<Map.Entry<T, ValueMetadata>> snapshot;
mapReadLock.lock();
try {
snapshot = new ArrayList<>(valueMetadataMap.entrySet());
} finally {
mapReadLock.unlock();
}
ClosableIterator<R> iterator = new LazyFileIterable<>(filePath, snapshot, valueSerializer, isCompressionEnabled).iterator();
this.iterators.add(iterator);
return iterator;
}
Expand All @@ -169,8 +206,16 @@ public Iterator<R> iterator() {
*/
@Override
public Iterator<R> iterator(Predicate<T> filter) {
Map<T, ValueMetadata> filteredValueMetadata = valueMetadataMap.entrySet().stream().filter(e -> filter.test(e.getKey())).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
ClosableIterator<R> iterator = new LazyFileIterable(filePath, filteredValueMetadata, valueSerializer, isCompressionEnabled).iterator();
final List<Map.Entry<T, ValueMetadata>> snapshot;
mapReadLock.lock();
try {
snapshot = valueMetadataMap.entrySet().stream()
.filter(e -> filter.test(e.getKey()))
.collect(Collectors.toList());
} finally {
mapReadLock.unlock();
}
ClosableIterator<R> iterator = new LazyFileIterable<>(filePath, snapshot, valueSerializer, isCompressionEnabled).iterator();
this.iterators.add(iterator);
return iterator;
}
Expand All @@ -185,17 +230,32 @@ public long sizeOfFileOnDiskInBytes() {

@Override
public int size() {
return valueMetadataMap.size();
mapReadLock.lock();
try {
return valueMetadataMap.size();
} finally {
mapReadLock.unlock();
}
}

@Override
public boolean isEmpty() {
return valueMetadataMap.isEmpty();
mapReadLock.lock();
try {
return valueMetadataMap.isEmpty();
} finally {
mapReadLock.unlock();
}
}

@Override
public boolean containsKey(Object key) {
return valueMetadataMap.containsKey(key);
mapReadLock.lock();
try {
return valueMetadataMap.containsKey(key);
} finally {
mapReadLock.unlock();
}
}

@Override
Expand All @@ -205,7 +265,15 @@ public boolean containsValue(Object value) {

@Override
public R get(Object key) {
ValueMetadata entry = valueMetadataMap.get(key);
// Hold mapReadLock only for the metadata lookup; release before disk I/O so that
// concurrent get() calls on different keys are not serialized by the I/O wait.
ValueMetadata entry;
mapReadLock.lock();
try {
entry = valueMetadataMap.get(key);
} finally {
mapReadLock.unlock();
}
if (entry == null) {
return null;
}
Expand All @@ -228,18 +296,31 @@ public static <V> V get(ValueMetadata entry, RandomAccessFile file, CustomSerial
}
}

private synchronized R put(T key, R value, boolean flush) {
private R put(T key, R value, boolean flush) {
try {
byte[] val = isCompressionEnabled ? DISK_COMPRESSION_REF.get().compressBytes(valueSerializer.serialize(value)) :
valueSerializer.serialize(value);
int valueSize = val.length;
long timestamp = System.currentTimeMillis();
this.valueMetadataMap.put(key,
new BitCaskDiskMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp));
byte[] serializedKey = SerializationUtils.serialize(key);
filePosition
.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(generateChecksum(val),
serializedKey.length, valueSize, serializedKey, val, timestamp)));
// Hold mapWriteLock for the entire map-update + disk-write to keep them atomic:
// a concurrent get() must not see a metadata entry whose bytes haven't been flushed yet.
mapWriteLock.lock();
try {
// Remove before re-inserting so the key always lands at the LinkedHashMap tail.
// LinkedHashMap does not move an existing key on put() — without this remove, a
// re-inserted key would stay at its original position while its new offset is higher
// than all entries that follow it, breaking the insertion-order == offset-order invariant.
// The old bytes remain in the disk file as dead bytes (existing behaviour for remove()).
valueMetadataMap.remove(key);
this.valueMetadataMap.put(key,
new BitCaskDiskMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp));
filePosition
.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(generateChecksum(val),
serializedKey.length, valueSize, serializedKey, val, timestamp)));
} finally {
mapWriteLock.unlock();
}
if (flush) {
flushToDisk();
}
Expand All @@ -256,9 +337,17 @@ public R put(T key, R value) {

@Override
public R remove(Object key) {
R value = get(key);
valueMetadataMap.remove(key);
return value;
ValueMetadata entry;
mapWriteLock.lock();
try {
entry = valueMetadataMap.remove(key);
} finally {
mapWriteLock.unlock();
}
if (entry == null) {
return null;
}
return get(entry);
}

@Override
Expand All @@ -271,14 +360,27 @@ public void putAll(Map<? extends T, ? extends R> m) {

@Override
public void clear() {
valueMetadataMap.clear();
// Acquire mapWriteLock so a concurrent reader under mapReadLock (e.g. via
// SpillableMapBasedFileSystemView's shared readLock) can't observe a half-cleared map.
mapWriteLock.lock();
try {
valueMetadataMap.clear();
} finally {
mapWriteLock.unlock();
}
// Do not delete file-handles & file as there is no way to do it without synchronizing get/put(and
// reducing concurrency). Instead, just clear the pointer map. The file will be removed on exit.
}

@Override
public void close() {
valueMetadataMap.clear();
// See clear() for why this needs mapWriteLock.
mapWriteLock.lock();
try {
valueMetadataMap.clear();
} finally {
mapWriteLock.unlock();
}
try {
if (writeOnlyFileHandle != null) {
writeOnlyFileHandle.flush();
Expand Down Expand Up @@ -309,7 +411,12 @@ public void close() {

@Override
public Set<T> keySet() {
return valueMetadataMap.keySet();
mapReadLock.lock();
try {
return new HashSet<>(valueMetadataMap.keySet());
} finally {
mapReadLock.unlock();
}
}

@Override
Expand All @@ -320,13 +427,30 @@ public Collection<R> values() {
@Override
public Stream<R> valueStream() {
final BufferedRandomAccessFile file = getRandomAccessFile();
return valueMetadataMap.values().stream().sorted().map(valueMetaData -> get(valueMetaData, file, valueSerializer, isCompressionEnabled));
// Snapshot under mapReadLock. LinkedHashMap insertion order equals disk offset order
// (see put()), so no sort is needed — values are already in forward-read order.
final List<ValueMetadata> snapshot;
mapReadLock.lock();
try {
snapshot = new ArrayList<>(valueMetadataMap.values());
} finally {
mapReadLock.unlock();
}
return snapshot.stream().sequential().map(valueMetaData -> (R) get(valueMetaData, file, valueSerializer, isCompressionEnabled));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: .sequential() is a no-op here — ArrayList.stream() already returns a sequential stream. Could you drop it? It may leave a future reader wondering whether parallelism was a concern and whether the stream might become parallel.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

}

@Override
public Set<Entry<T, R>> entrySet() {
// Snapshot keys under mapReadLock; disk I/O per-key happens outside the lock.
final Set<T> keySnapshot;
mapReadLock.lock();
try {
keySnapshot = new HashSet<>(valueMetadataMap.keySet());
} finally {
mapReadLock.unlock();
}
Set<Entry<T, R>> entrySet = new HashSet<>();
for (T key : valueMetadataMap.keySet()) {
for (T key : keySnapshot) {
entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key)));
}
return entrySet;
Expand Down Expand Up @@ -388,11 +512,13 @@ public static final class ValueMetadata implements Comparable<ValueMetadata> {
// FilePath to store the spilled data
private final String filePath;
// Size (numberOfBytes) of the value written to disk
private final Integer sizeOfValue;
// ENG-43078: primitive fields avoid ~64 B/entry of boxing overhead (Integer + 2 Long wrappers)
// across the millions of spilled entries held in valueMetadataMap.
private final int sizeOfValue;
// FilePosition of the value written to disk
private final Long offsetOfValue;
private final long offsetOfValue;
// Current timestamp when the value was written to disk
private final Long timestamp;
private final long timestamp;

@Override
public int compareTo(ValueMetadata o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,22 @@ public Stream<R> valueStream() {
return Stream.concat(inMemoryMap.values().stream(), diskBasedMap.valueStream());
}

/**
* Returns a lazy stream over all keys in this map (in-memory + spilled).
*
* Unlike {@link #keySet()}, this method does NOT allocate a new {@link java.util.HashSet} copying
* all keys. When the map has spilled ({@code diskBasedMap != null}), {@link #keySet()} must
* materialise a full copy to combine both key sets into a single {@link java.util.Set}. Callers
* that only need to iterate keys once (e.g. to seed a {@link java.util.PriorityQueue}) should
* prefer this method to avoid the transient per-task heap spike (ENG-43078).
*/
public Stream<T> keyStream() {
if (diskBasedMap == null) {
return inMemoryMap.keySet().stream();
}
return Stream.concat(inMemoryMap.keySet().stream(), diskBasedMap.keySet().stream());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 When the map has spilled, keyStream() calls diskBasedMap.keySet(), which this PR changes to eagerly return new HashSet<>(valueMetadataMap.keySet()) of all disk keys. So for a heavily-spilled map (the MDT case) the transient HashSet is still materialized here — does this fully realize the consumer-E savings, or would a truly streaming keySet on BitCaskDiskMap be needed? The javadoc's claim that keyStream "does NOT allocate a new HashSet copying all keys" seems only partially true once disk keys dominate.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

}

@Override
public Set<Entry<T, R>> entrySet() {
if (diskBasedMap == null) {
Expand Down
Loading
Loading