Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,12 @@ static Set<Identifier> readDeletedEntries(
return readDeletedEntries(
m ->
manifestFile.read(
m.fileName(), m.fileSize(), deletedFilter(), Filter.alwaysTrue()),
m.fileName(),
m.fileSize(),
Filter.alwaysTrue(),
deletedFilter(),
Filter.alwaysTrue(),
SimpleFileEntry::from),
manifestFiles,
manifestReadParallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFunctionWithIOE;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.FunctionWithIOException;
import org.apache.paimon.utils.ObjectSerializer;
import org.apache.paimon.utils.ObjectsCache;
Expand Down Expand Up @@ -75,7 +76,8 @@ public ManifestEntryCache(
}

@Override
protected ManifestEntrySegments createSegments(Path path, @Nullable Long fileSize) {
protected ManifestEntrySegments createSegments(
Path path, @Nullable Long fileSize, Filter<InternalRow> loadFilter) {
Map<Triple<BinaryRow, Integer, Integer>, DataPagedOutputSerializer> segments =
new HashMap<>();
Function<InternalRow, BinaryRow> partitionGetter = partitionGetter();
Expand All @@ -88,6 +90,9 @@ protected ManifestEntrySegments createSegments(Path path, @Nullable Long fileSiz
try (CloseableIterator<InternalRow> iterator = reader.apply(path, fileSize)) {
while (iterator.hasNext()) {
InternalRow row = iterator.next();
if (!loadFilter.test(row)) {
continue;
}
BinaryRow partition = partitionGetter.apply(row);
int bucket = bucketGetter.apply(row);
int totalBucket = totalBucketGetter.apply(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,16 @@ public Iterator<ManifestEntry> readManifestEntries(

private <T extends FileEntry> Iterator<T> readAndMergeFileEntries(
List<ManifestFileMeta> manifests,
Function<List<ManifestEntry>, List<T>> converter,
Function<ManifestEntry, T> converter,
boolean useSequential) {
Set<Identifier> deletedEntries =
FileEntry.readDeletedEntries(
manifest -> readManifest(manifest, FileEntry.deletedFilter(), null),
manifest ->
readManifest(
manifest,
SimpleFileEntry::from,
FileEntry.deletedFilter(),
null),
manifests,
parallelism);

Expand All @@ -421,11 +426,11 @@ private <T extends FileEntry> Iterator<T> readAndMergeFileEntries(

Function<ManifestFileMeta, List<T>> processor =
manifest ->
converter.apply(
readManifest(
manifest,
FileEntry.addFilter(),
entry -> !deletedEntries.contains(entry.identifier())));
readManifest(
manifest,
converter,
FileEntry.addFilter(),
entry -> !deletedEntries.contains(entry.identifier()));
if (useSequential) {
return sequentialBatchedExecute(processor, manifests, parallelism).iterator();
} else {
Expand Down Expand Up @@ -475,36 +480,36 @@ protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> entr
/** Note: Keep this thread-safe. */
@Override
public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
return readManifest(manifest, null, null);
return readManifest(manifest, Function.identity(), null, null);
}

private List<ManifestEntry> readManifest(
private <T> List<T> readManifest(
ManifestFileMeta manifest,
Function<ManifestEntry, T> converter,
@Nullable Filter<InternalRow> additionalFilter,
@Nullable Filter<ManifestEntry> additionalTFilter) {
List<ManifestEntry> entries =
Function<ManifestEntry, T> finalConverter =
dropStats ? e -> converter.apply(dropStats(e)) : converter;

Filter<InternalRow> entryRowFilter = createEntryRowFilter();

List<T> entries =
manifestFileFactory
.create()
.withCacheMetrics(
scanMetrics != null ? scanMetrics.getCacheMetrics() : null)
.read(
manifest.fileName(),
manifest.fileSize(),
manifestsReader.partitionFilter(),
createBucketFilter(),
createEntryRowFilter().and(additionalFilter),
entryRowFilter,
entryRowFilter.and(additionalFilter),
entry ->
(additionalTFilter == null || additionalTFilter.test(entry))
&& (manifestEntryFilter == null
|| manifestEntryFilter.test(entry))
&& filterByStats(entry));
if (dropStats) {
List<ManifestEntry> copied = new ArrayList<>(entries.size());
for (ManifestEntry entry : entries) {
copied.add(dropStats(entry));
}
entries = copied;
}
&& filterByStats(entry),
finalConverter);
LOG.info("Read {} manifest entries from {}", entries.size(), manifest.fileName());
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

import static org.apache.paimon.utils.ObjectsFile.readFromIterator;

Expand Down Expand Up @@ -63,13 +65,31 @@ public void withCacheMetrics(@Nullable CacheMetrics cacheMetrics) {
}

public List<V> read(K key, @Nullable Long fileSize, Filters<V> filters) throws IOException {
return read(
key,
fileSize,
Filter.alwaysTrue(),
filters.readFilter(),
filters.readVFilter(),
Function.identity());
}

public <R> List<R> read(
K key,
@Nullable Long fileSize,
Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter,
Filter<V> readVFilter,
Function<V, R> convertor)
throws IOException {
@SuppressWarnings("unchecked")
S segments = (S) cache.getIfPresents(key);
if (segments != null) {
if (cacheMetrics != null) {
cacheMetrics.increaseHitObject();
}
return readFromSegments(segments, filters);
return convert(
readFromSegments(segments, new Filters<>(readFilter, readVFilter)), convertor);
} else {
if (cacheMetrics != null) {
cacheMetrics.increaseMissedObject();
Expand All @@ -78,22 +98,38 @@ public List<V> read(K key, @Nullable Long fileSize, Filters<V> filters) throws I
fileSize = fileSizeFunction.apply(key);
}
if (fileSize <= cache.maxElementSize()) {
segments = createSegments(key, fileSize);
segments = createSegments(key, fileSize, loadFilter);
cache.put(key, segments);
return readFromSegments(segments, filters);
return convert(
readFromSegments(segments, new Filters<>(readFilter, readVFilter)),
convertor);
} else {
return readFromIterator(
reader.apply(key, fileSize),
projectedSerializer,
filters.readFilter(),
filters.readVFilter());
readFilter,
readVFilter,
convertor);
}
}
}

private <R> List<R> convert(List<V> values, Function<V, R> convertor) {
List<R> result = new ArrayList<>(values.size());
for (V v : values) {
result.add(convertor.apply(v));
}
return result;
}

protected abstract List<V> readFromSegments(S segments, Filters<V> filters) throws IOException;

protected abstract S createSegments(K k, @Nullable Long fileSize);
protected abstract S createSegments(
K k, @Nullable Long fileSize, Filter<InternalRow> loadFilter);

protected S createSegments(K k, @Nullable Long fileSize) {
return createSegments(k, fileSize, Filter.alwaysTrue());
}

/** Filter context for reading. */
public static class Filters<V> {
Expand Down
64 changes: 53 additions & 11 deletions paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

import static org.apache.paimon.utils.FileUtils.checkExists;

Expand Down Expand Up @@ -100,7 +101,13 @@ public List<T> read(String fileName) {
}

public List<T> read(String fileName, @Nullable Long fileSize) {
return read(fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue());
return read(
fileName,
fileSize,
Filter.alwaysTrue(),
Filter.alwaysTrue(),
Filter.alwaysTrue(),
Function.identity());
}

public List<T> readWithIOException(String fileName) throws IOException {
Expand All @@ -109,7 +116,13 @@ public List<T> readWithIOException(String fileName) throws IOException {

public List<T> readWithIOException(String fileName, @Nullable Long fileSize)
throws IOException {
return readWithIOException(fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue());
return readWithIOException(
fileName,
fileSize,
Filter.alwaysTrue(),
Filter.alwaysTrue(),
Filter.alwaysTrue(),
Function.identity());
}

public boolean exists(String fileName) {
Expand All @@ -120,31 +133,50 @@ public boolean exists(String fileName) {
}
}

public List<T> read(
public <R> List<R> read(
String fileName,
@Nullable Long fileSize,
Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter,
Filter<T> readTFilter) {
Filter<T> readTFilter,
Function<T, R> convertor) {
try {
return readWithIOException(fileName, fileSize, readFilter, readTFilter);
return readWithIOException(
fileName, fileSize, loadFilter, readFilter, readTFilter, convertor);
} catch (IOException e) {
throw new RuntimeException("Failed to read " + fileName, e);
}
}

private List<T> readWithIOException(
public List<T> read(
String fileName,
@Nullable Long fileSize,
Filter<InternalRow> readFilter,
Filter<T> readTFilter)
Filter<T> readTFilter) {
return read(
fileName,
fileSize,
Filter.alwaysTrue(),
readFilter,
readTFilter,
Function.identity());
}

private <R> List<R> readWithIOException(
String fileName,
@Nullable Long fileSize,
Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter,
Filter<T> readTFilter,
Function<T, R> convertor)
throws IOException {
Path path = pathFactory.toPath(fileName);
if (cache != null) {
return cache.read(path, fileSize, new ObjectsCache.Filters<>(readFilter, readTFilter));
return cache.read(path, fileSize, loadFilter, readFilter, readTFilter, convertor);
}

return readFromIterator(
createIterator(path, fileSize), serializer, readFilter, readTFilter);
createIterator(path, fileSize), serializer, readFilter, readTFilter, convertor);
}

public String writeWithoutRolling(Collection<T> records) {
Expand Down Expand Up @@ -208,14 +240,24 @@ public static <V> List<V> readFromIterator(
ObjectSerializer<V> serializer,
Filter<InternalRow> readFilter,
Filter<V> readVFilter) {
return readFromIterator(
inputIterator, serializer, readFilter, readVFilter, Function.identity());
}

public static <V, R> List<R> readFromIterator(
CloseableIterator<InternalRow> inputIterator,
ObjectSerializer<V> serializer,
Filter<InternalRow> readFilter,
Filter<V> readVFilter,
Function<V, R> convertor) {
try (CloseableIterator<InternalRow> iterator = inputIterator) {
List<V> result = new ArrayList<>();
List<R> result = new ArrayList<>();
while (iterator.hasNext()) {
InternalRow row = iterator.next();
if (readFilter.test(row)) {
V v = serializer.fromRow(row);
if (readVFilter.test(v)) {
result.add(v);
result.add(convertor.apply(v));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ protected List<V> readFromSegments(Segments segments, Filters<V> filters) throws
}

@Override
protected Segments createSegments(K key, @Nullable Long fileSize) {
protected Segments createSegments(
K key, @Nullable Long fileSize, Filter<InternalRow> loadFilter) {
InternalRowSerializer formatSerializer = this.formatSerializer.get();
try (CloseableIterator<InternalRow> iterator = reader.apply(key, fileSize)) {
ArrayList<MemorySegment> segments = new ArrayList<>();
Expand All @@ -73,7 +74,9 @@ protected Segments createSegments(K key, @Nullable Long fileSize) {
new SimpleCollectingOutputView(segments, segmentSource, cache.pageSize());
while (iterator.hasNext()) {
InternalRow row = iterator.next();
formatSerializer.serializeToPages(row, output);
if (loadFilter.test(row)) {
formatSerializer.serializeToPages(row, output);
}
}
return Segments.create(segments, output.getCurrentPositionInSegment());
} catch (Exception e) {
Expand Down
Loading
Loading