From 0744b88fc25807d8f74c5206885470b527223dc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=B0=B8=E7=BF=94?= Date: Thu, 29 Jan 2026 12:20:32 +0000 Subject: [PATCH 1/2] [#6843581556] Support purge valueStats when read manifest * fix manifest read oom1 * fix manifest read oom * fix manifest read oom See merge request: !635 --- .../org/apache/paimon/manifest/FileEntry.java | 7 +- .../paimon/manifest/ManifestEntryCache.java | 6 +- .../operation/AbstractFileStoreScan.java | 40 ++++++----- .../org/apache/paimon/utils/ObjectsCache.java | 47 +++++++++++-- .../org/apache/paimon/utils/ObjectsFile.java | 67 +++++++++++++++---- .../paimon/utils/SimpleObjectsCache.java | 7 +- .../apache/paimon/utils/ObjectsCacheTest.java | 62 +++++++++++++++-- 7 files changed, 190 insertions(+), 46 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index 3a8008825581..3daf42352994 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -240,7 +240,12 @@ static Set readDeletedEntries( return readDeletedEntries( m -> manifestFile.read( - m.fileName(), m.fileSize(), deletedFilter(), Filter.alwaysTrue()), + m.fileName(), + m.fileSize(), + Filter.alwaysTrue(), + deletedFilter(), + Filter.alwaysTrue(), + SimpleFileEntry::from), manifestFiles, manifestReadParallelism); } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java index 59b2a34650a4..530b58d04f54 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java @@ -75,7 +75,8 @@ public ManifestEntryCache( } @Override - protected ManifestEntrySegments createSegments(Path path, @Nullable Long fileSize) { + protected ManifestEntrySegments createSegments( + Path path, @Nullable Long fileSize, Filter loadFilter) { Map, DataPagedOutputSerializer> segments = new HashMap<>(); Function partitionGetter = partitionGetter(); @@ -88,6 +89,9 @@ protected ManifestEntrySegments createSegments(Path path, @Nullable Long fileSiz try (CloseableIterator iterator = reader.apply(path, fileSize)) { while (iterator.hasNext()) { InternalRow row = iterator.next(); + if (!loadFilter.test(row)) { + continue; + } BinaryRow partition = partitionGetter.apply(row); int bucket = bucketGetter.apply(row); int totalBucket = totalBucketGetter.apply(row); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 68ebacaa805f..49267aea46bd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -406,11 +406,16 @@ public Iterator readManifestEntries( private Iterator readAndMergeFileEntries( List manifests, - Function, List> converter, + Function converter, boolean useSequential) { Set deletedEntries = FileEntry.readDeletedEntries( - manifest -> readManifest(manifest, FileEntry.deletedFilter(), null), + manifest -> + readManifest( + manifest, + SimpleFileEntry::from, + FileEntry.deletedFilter(), + null), manifests, parallelism); @@ -421,11 +426,11 @@ private Iterator readAndMergeFileEntries( Function> processor = manifest -> - converter.apply( - readManifest( - manifest, - FileEntry.addFilter(), - entry -> !deletedEntries.contains(entry.identifier()))); + readManifest( + manifest, + converter, + FileEntry.addFilter(), + entry -> !deletedEntries.contains(entry.identifier())); if (useSequential) { return sequentialBatchedExecute(processor, manifests, parallelism).iterator(); } else { @@ -475,14 +480,18 @@ protected List postFilterManifestEntries(List entr /** Note: Keep this thread-safe. */ @Override public List readManifest(ManifestFileMeta manifest) { - return readManifest(manifest, null, null); + return readManifest(manifest, Function.identity(), null, null); } - private List readManifest( + private List readManifest( ManifestFileMeta manifest, + Function converter, @Nullable Filter additionalFilter, @Nullable Filter additionalTFilter) { - List entries = + Function finalConverter = + dropStats ? e -> converter.apply(dropStats(e)) : converter; + + List entries = manifestFileFactory .create() .withCacheMetrics( @@ -497,14 +506,9 @@ private List readManifest( (additionalTFilter == null || additionalTFilter.test(entry)) && (manifestEntryFilter == null || manifestEntryFilter.test(entry)) - && filterByStats(entry)); - if (dropStats) { - List copied = new ArrayList<>(entries.size()); - for (ManifestEntry entry : entries) { - copied.add(dropStats(entry)); - } - entries = copied; - } + && filterByStats(entry), + finalConverter); + LOG.info("Read {} manifest entries from {}", entries.size(), manifest.fileName()); return entries; } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java index 21b17549b3bf..20a120772ab0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java @@ -28,7 +28,9 @@ import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.function.Function; import static org.apache.paimon.utils.ObjectsFile.readFromIterator; @@ -63,13 +65,30 @@ public void withCacheMetrics(@Nullable CacheMetrics cacheMetrics) { } public List read(K key, @Nullable Long fileSize, Filters filters) throws IOException { + return read( + key, + fileSize, + Filter.alwaysTrue(), + filters.readFilter(), + filters.readVFilter(), + Function.identity()); + } + + public List read( + K key, + @Nullable Long fileSize, + Filter loadFilter, + Filter readFilter, + Filter readVFilter, + Function convertor) + throws IOException { @SuppressWarnings("unchecked") S segments = (S) cache.getIfPresents(key); if (segments != null) { if (cacheMetrics != null) { cacheMetrics.increaseHitObject(); } - return readFromSegments(segments, filters); + return convert(readFromSegments(segments, new Filters<>(readFilter, readVFilter)), convertor); } else { if (cacheMetrics != null) { cacheMetrics.increaseMissedObject(); @@ -78,22 +97,38 @@ public List read(K key, @Nullable Long fileSize, Filters filters) throws I fileSize = fileSizeFunction.apply(key); } if (fileSize <= cache.maxElementSize()) { - segments = createSegments(key, fileSize); + segments = createSegments(key, fileSize, loadFilter); cache.put(key, segments); - return readFromSegments(segments, filters); + return convert( + readFromSegments(segments, new Filters<>(readFilter, readVFilter)), + convertor); } else { return readFromIterator( reader.apply(key, fileSize), projectedSerializer, - filters.readFilter(), - filters.readVFilter()); + readFilter, + readVFilter, + convertor); } } } + private List convert(List values, Function convertor) { + List result = new ArrayList<>(values.size()); + for (V v : values) { + result.add(convertor.apply(v)); + } + return result; + } + protected abstract List readFromSegments(S segments, Filters filters) throws IOException; - protected abstract S createSegments(K k, @Nullable Long fileSize); + protected abstract S createSegments( + K k, @Nullable Long fileSize, Filter loadFilter); + + protected S createSegments(K k, @Nullable Long fileSize) { + return createSegments(k, fileSize, Filter.alwaysTrue()); + } /** Filter context for reading. */ public static class Filters { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java index 2293a59fffa7..72e8aceb2a11 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java @@ -37,6 +37,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.function.Function; import static org.apache.paimon.utils.FileUtils.checkExists; @@ -100,16 +101,27 @@ public List read(String fileName) { } public List read(String fileName, @Nullable Long fileSize) { - return read(fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue()); + return read( + fileName, + fileSize, + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Function.identity()); } public List readWithIOException(String fileName) throws IOException { return readWithIOException(fileName, null); } - public List readWithIOException(String fileName, @Nullable Long fileSize) - throws IOException { - return readWithIOException(fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue()); + public List readWithIOException(String fileName, @Nullable Long fileSize) throws IOException { + return readWithIOException( + fileName, + fileSize, + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Function.identity()); } public boolean exists(String fileName) { @@ -120,31 +132,50 @@ public boolean exists(String fileName) { } } - public List read( + public List read( String fileName, @Nullable Long fileSize, + Filter loadFilter, Filter readFilter, - Filter readTFilter) { + Filter readTFilter, + Function convertor) { try { - return readWithIOException(fileName, fileSize, readFilter, readTFilter); + return readWithIOException( + fileName, fileSize, loadFilter, readFilter, readTFilter, convertor); } catch (IOException e) { throw new RuntimeException("Failed to read " + fileName, e); } } - private List readWithIOException( + public List read( + String fileName, + @Nullable Long fileSize, + Filter readFilter, + Filter readTFilter) { + return read( + fileName, + fileSize, + Filter.alwaysTrue(), + readFilter, + readTFilter, + Function.identity()); + } + + private List readWithIOException( String fileName, @Nullable Long fileSize, + Filter loadFilter, Filter readFilter, - Filter readTFilter) + Filter readTFilter, + Function convertor) throws IOException { Path path = pathFactory.toPath(fileName); if (cache != null) { - return cache.read(path, fileSize, new ObjectsCache.Filters<>(readFilter, readTFilter)); + return cache.read(path, fileSize, loadFilter, readFilter, readTFilter, convertor); } return readFromIterator( - createIterator(path, fileSize), serializer, readFilter, readTFilter); + createIterator(path, fileSize), serializer, readFilter, readTFilter, convertor); } public String writeWithoutRolling(Collection records) { @@ -208,14 +239,24 @@ public static List readFromIterator( ObjectSerializer serializer, Filter readFilter, Filter readVFilter) { + return readFromIterator( + inputIterator, serializer, readFilter, readVFilter, Function.identity()); + } + + public static List readFromIterator( + CloseableIterator inputIterator, + ObjectSerializer serializer, + Filter readFilter, + Filter readVFilter, + Function convertor) { try (CloseableIterator iterator = inputIterator) { - List result = new ArrayList<>(); + List result = new ArrayList<>(); while (iterator.hasNext()) { InternalRow row = iterator.next(); if (readFilter.test(row)) { V v = serializer.fromRow(row); if (readVFilter.test(v)) { - result.add(v); + result.add(convertor.apply(v)); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SimpleObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/SimpleObjectsCache.java index b3ba2db167fd..cc7ec0c6c8e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SimpleObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SimpleObjectsCache.java @@ -63,7 +63,8 @@ protected List readFromSegments(Segments segments, Filters filters) throws } @Override - protected Segments createSegments(K key, @Nullable Long fileSize) { + protected Segments createSegments( + K key, @Nullable Long fileSize, Filter loadFilter) { InternalRowSerializer formatSerializer = this.formatSerializer.get(); try (CloseableIterator iterator = reader.apply(key, fileSize)) { ArrayList segments = new ArrayList<>(); @@ -73,7 +74,9 @@ protected Segments createSegments(K key, @Nullable Long fileSize) { new SimpleCollectingOutputView(segments, segmentSource, cache.pageSize()); while (iterator.hasNext()) { InternalRow row = iterator.next(); - formatSerializer.serializeToPages(row, output); + if (loadFilter.test(row)) { + formatSerializer.serializeToPages(row, output); + } } return Segments.create(segments, output.getCurrentPositionInSegment()); } catch (Exception e) { diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java index 4061044fb7f6..17d32da13464 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; @@ -62,19 +63,40 @@ public void testObjectsCacheAndMetrics() throws IOException { cache.withCacheMetrics(scanMetrics.getCacheMetrics()); // test empty map.put("k1", Collections.emptyList()); - List values = cache.read("k1", null, Filter.alwaysTrue(), Filter.alwaysTrue()); + List values = + cache.read( + "k1", + null, + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Function.identity()); assertThat(values).isEmpty(); assertThat(scanMetrics.getCacheMetrics().getMissedObject()).hasValue(1); // test values List expect = Arrays.asList("v1", "v2", "v3"); map.put("k2", expect); - values = cache.read("k2", null, Filter.alwaysTrue(), Filter.alwaysTrue()); + values = + cache.read( + "k2", + null, + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Function.identity()); assertThat(values).containsExactlyElementsOf(expect); assertThat(scanMetrics.getCacheMetrics().getMissedObject()).hasValue(2); // test cache - values = cache.read("k2", null, Filter.alwaysTrue(), Filter.alwaysTrue()); + values = + cache.read( + "k2", + null, + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Function.identity()); assertThat(values).containsExactlyElementsOf(expect); assertThat(scanMetrics.getCacheMetrics().getHitObject()).hasValue(1); @@ -83,10 +105,38 @@ public void testObjectsCacheAndMetrics() throws IOException { cache.read( "k2", null, + Filter.alwaysTrue(), r -> r.getString(0).toString().endsWith("2"), - Filter.alwaysTrue()); + Filter.alwaysTrue(), + Function.identity()); assertThat(values).containsExactly("v2"); + // test load filter + expect = Arrays.asList("v1", "v2", "v3"); + map.put("k3", expect); + values = + cache.read( + "k3", + null, + r -> r.getString(0).toString().endsWith("2"), + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Function.identity()); + assertThat(values).containsExactly("v2"); + + // test load filter empty + expect = Arrays.asList("v1", "v2", "v3"); + map.put("k4", expect); + values = + cache.read( + "k4", + null, + r -> r.getString(0).toString().endsWith("5"), + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Function.identity()); + assertThat(values).isEmpty(); + // test read concurrently map.clear(); for (int i = 0; i < 10; i++) { @@ -102,7 +152,9 @@ public void testObjectsCacheAndMetrics() throws IOException { k, null, Filter.alwaysTrue(), - Filter.alwaysTrue())) + Filter.alwaysTrue(), + Filter.alwaysTrue(), + Function.identity())) .containsExactly(k); } catch (IOException e) { throw new RuntimeException(e); From c97bc70b0aeb3a33abc1d8e5ad88137837ce6426 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Tue, 10 Mar 2026 17:42:32 +0800 Subject: [PATCH 2/2] fix: adapt valueStats purge cherry-pick MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 解决将主分支 commit 9cc7469e4f93fd1dcefe4c2079ad20e5d546b60a cherry-pick 到旧分支时的兼容性问题: - `ManifestEntryCache` 增加 `Filter` import,补齐 `createSegments` 新签名依赖 - `AbstractFileStoreScan` 改用新的 `ObjectsFile.read(loadFilter, readFilter, readTFilter, converter)` 调用方式,并复用 `entryRowFilter` - `ObjectsCache` / `ObjectsFile` 按 spotless 规则调整格式 Co-Authored-By: Aime Change-Id: I156c879130457301c7fab7ec90ae1bf5536d7ac6 --- .../org/apache/paimon/manifest/ManifestEntryCache.java | 1 + .../org/apache/paimon/operation/AbstractFileStoreScan.java | 7 ++++--- .../main/java/org/apache/paimon/utils/ObjectsCache.java | 3 ++- .../src/main/java/org/apache/paimon/utils/ObjectsFile.java | 3 ++- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java index 530b58d04f54..790ba9c25987 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java @@ -31,6 +31,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BiFunctionWithIOE; import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.FunctionWithIOException; import org.apache.paimon.utils.ObjectSerializer; import org.apache.paimon.utils.ObjectsCache; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 49267aea46bd..ba7372679c48 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -491,6 +491,8 @@ private List readManifest( Function finalConverter = dropStats ? e -> converter.apply(dropStats(e)) : converter; + Filter entryRowFilter = createEntryRowFilter(); + List entries = manifestFileFactory .create() @@ -499,9 +501,8 @@ private List readManifest( .read( manifest.fileName(), manifest.fileSize(), - manifestsReader.partitionFilter(), - createBucketFilter(), - createEntryRowFilter().and(additionalFilter), + entryRowFilter, + entryRowFilter.and(additionalFilter), entry -> (additionalTFilter == null || additionalTFilter.test(entry)) && (manifestEntryFilter == null diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java index 20a120772ab0..0df92d49ed2f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java @@ -88,7 +88,8 @@ public List read( if (cacheMetrics != null) { cacheMetrics.increaseHitObject(); } - return convert(readFromSegments(segments, new Filters<>(readFilter, readVFilter)), convertor); + return convert( + readFromSegments(segments, new Filters<>(readFilter, readVFilter)), convertor); } else { if (cacheMetrics != null) { cacheMetrics.increaseMissedObject(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java index 72e8aceb2a11..fce9cca5560c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java @@ -114,7 +114,8 @@ public List readWithIOException(String fileName) throws IOException { return readWithIOException(fileName, null); } - public List readWithIOException(String fileName, @Nullable Long fileSize) throws IOException { + public List readWithIOException(String fileName, @Nullable Long fileSize) + throws IOException { return readWithIOException( fileName, fileSize,