Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions cmake_modules/arrow.diff
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,193 @@ index 4d3acb491e..3906ff3c59 100644
int64_t pagesize_;
ParquetDataPageVersion parquet_data_page_version_;
ParquetVersion::type parquet_version_;

--- a/cpp/src/parquet/file_reader.h
+++ b/cpp/src/parquet/file_reader.h
@@ -210,6 +210,17 @@
::arrow::Future<> WhenBuffered(const std::vector<int>& row_groups,
const std::vector<int>& column_indices) const;

+ /// Pre-buffer arbitrary byte ranges (e.g., page-level ranges from OffsetIndex).
+ /// Unlike PreBuffer(), this does NOT set the column bitmap, so
+ /// GetColumnPageReader will use CachedInputStream (page-level cache path).
+ void PreBufferRanges(const std::vector<::arrow::io::ReadRange>& ranges,
+ const ::arrow::io::IOContext& ctx,
+ const ::arrow::io::CacheOptions& options);
+
+ /// Wait for arbitrary byte ranges to be pre-buffered.
+ ::arrow::Future<> WhenBufferedRanges(
+ const std::vector<::arrow::io::ReadRange>& ranges) const;
+
private:
// Holds a pointer to an instance of Contents implementation
std::unique_ptr<Contents> contents_;

--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -207,6 +207,100 @@
return {col_start, col_length};
}

+// CachedInputStream: InputStream adapter that reads through ReadRangeCache with
+// zero-cost skip for non-cached pages. Used for page-level caching where only
+// specific pages are pre-buffered.
+//
+// Key behavior:
+// - Read(): On cache hit, returns cached data. On cache miss, returns zero-filled
+// buffer (zero I/O). This makes InputStream::Advance() (which calls Read() and
+// discards) effectively free for skipped pages.
+// - Peek(): Always falls back to source on cache miss, because PageReader uses
+// Peek() to read Thrift page headers (~30 bytes) which must have real data.
+class CachedInputStream : public ::arrow::io::InputStream {
+ public:
+ CachedInputStream(
+ std::shared_ptr<::arrow::io::internal::ReadRangeCache> cache,
+ std::shared_ptr<ArrowInputFile> source,
+ int64_t offset, int64_t length)
+ : cache_(std::move(cache)),
+ source_(std::move(source)),
+ base_offset_(offset),
+ length_(length) {}
+
+ ::arrow::Status Close() override {
+ closed_ = true;
+ return ::arrow::Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ ::arrow::Result<int64_t> Tell() const override { return position_; }
+
+ ::arrow::Result<std::string_view> Peek(int64_t nbytes) override {
+ int64_t to_read = std::min(nbytes, length_ - position_);
+ if (to_read <= 0) {
+ return std::string_view();
+ }
+ ::arrow::io::ReadRange range{base_offset_ + position_, to_read};
+ auto result = cache_->Read(range);
+ if (result.ok()) {
+ peek_buffer_ = *result;
+ } else {
+ // Peek is used for Thrift page headers (~30 bytes) — must read real data
+ ARROW_ASSIGN_OR_RAISE(peek_buffer_,
+ source_->ReadAt(range.offset, range.length));
+ }
+ return std::string_view(
+ reinterpret_cast<const char*>(peek_buffer_->data()),
+ static_cast<size_t>(peek_buffer_->size()));
+ }
+
+ ::arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+ int64_t to_read = std::min(nbytes, length_ - position_);
+ if (to_read <= 0) return 0;
+ ::arrow::io::ReadRange range{base_offset_ + position_, to_read};
+ auto result = cache_->Read(range);
+ if (result.ok()) {
+ auto& buf = *result;
+ memcpy(out, buf->data(), static_cast<size_t>(buf->size()));
+ position_ += buf->size();
+ return buf->size();
+ }
+ // Cache miss: fall back to real I/O from source
+ ARROW_ASSIGN_OR_RAISE(auto buf, source_->ReadAt(range.offset, range.length));
+ memcpy(out, buf->data(), static_cast<size_t>(buf->size()));
+ position_ += buf->size();
+ return buf->size();
+ }
+
+ ::arrow::Result<std::shared_ptr<::arrow::Buffer>> Read(int64_t nbytes) override {
+ int64_t to_read = std::min(nbytes, length_ - position_);
+ if (to_read <= 0) {
+ return std::make_shared<::arrow::Buffer>(nullptr, 0);
+ }
+ ::arrow::io::ReadRange range{base_offset_ + position_, to_read};
+ auto result = cache_->Read(range);
+ if (result.ok()) {
+ position_ += (*result)->size();
+ return *result;
+ }
+ // Cache miss: fall back to real I/O from source
+ ARROW_ASSIGN_OR_RAISE(auto buf, source_->ReadAt(range.offset, range.length));
+ position_ += buf->size();
+ return std::shared_ptr<::arrow::Buffer>(std::move(buf));
+ }
+
+ private:
+ std::shared_ptr<::arrow::io::internal::ReadRangeCache> cache_;
+ std::shared_ptr<ArrowInputFile> source_;
+ int64_t base_offset_;
+ int64_t length_;
+ int64_t position_ = 0;
+ bool closed_ = false;
+ std::shared_ptr<::arrow::Buffer> peek_buffer_;
+};
+
// RowGroupReader::Contents implementation for the Parquet file specification
class SerializedRowGroup : public RowGroupReader::Contents {
public:
@@ -242,6 +336,11 @@
// segments.
PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range));
stream = std::make_shared<::arrow::io::BufferReader>(buffer);
+ } else if (cached_source_) {
+ // Page-level caching: read through cache with fallback to source.
+ // Advance() is zero-cost for skipped pages via data_page_filter.
+ stream = std::make_shared<CachedInputStream>(
+ cached_source_, source_, col_range.offset, col_range.length);
} else {
stream = properties_.GetStream(source_, col_range.offset, col_range.length);
}
@@ -417,6 +516,26 @@
return cached_source_->WaitFor(ranges);
}

+ void PreBufferRanges(const std::vector<::arrow::io::ReadRange>& ranges,
+ const ::arrow::io::IOContext& ctx,
+ const ::arrow::io::CacheOptions& options) {
+ cached_source_ =
+ std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, options);
+ // Do NOT set prebuffered_column_chunks_ bitmap — GetColumnPageReader will
+ // use CachedInputStream path instead of full-chunk BufferReader path.
+ prebuffered_column_chunks_.clear();
+ PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
+ }
+
+ ::arrow::Future<> WhenBufferedRanges(
+ const std::vector<::arrow::io::ReadRange>& ranges) const {
+ if (!cached_source_) {
+ return ::arrow::Status::Invalid(
+ "Must call PreBufferRanges before WhenBufferedRanges");
+ }
+ return cached_source_->WaitFor(ranges);
+ }
+
// Metadata/footer parsing. Divided up to separate sync/async paths, and to use
// exceptions for error handling (with the async path converting to Future/Status).

@@ -911,6 +1030,22 @@
return file->WhenBuffered(row_groups, column_indices);
}

+void ParquetFileReader::PreBufferRanges(
+ const std::vector<::arrow::io::ReadRange>& ranges,
+ const ::arrow::io::IOContext& ctx,
+ const ::arrow::io::CacheOptions& options) {
+ SerializedFile* file =
+ ::arrow::internal::checked_cast<SerializedFile*>(contents_.get());
+ file->PreBufferRanges(ranges, ctx, options);
+}
+
+::arrow::Future<> ParquetFileReader::WhenBufferedRanges(
+ const std::vector<::arrow::io::ReadRange>& ranges) const {
+ SerializedFile* file =
+ ::arrow::internal::checked_cast<SerializedFile*>(contents_.get());
+ return file->WhenBufferedRanges(ranges);
+}
+
// ----------------------------------------------------------------------
// File metadata helpers

diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake
--- a/cpp/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake
Expand Down
19 changes: 19 additions & 0 deletions src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"

#include <algorithm>
#include <cstdint>
#include <functional>
#include <utility>

#include "arrow/api.h"
Expand Down Expand Up @@ -50,9 +52,18 @@ ArrowInputStreamAdapter::ArrowInputStreamAdapter(
: input_stream_(input_stream), pool_(pool), file_size_(file_size) {}

ArrowInputStreamAdapter::~ArrowInputStreamAdapter() {
WaitForPendingAsyncReads();
[[maybe_unused]] auto status = DoClose();
}

void ArrowInputStreamAdapter::WaitForPendingAsyncReads() {
std::lock_guard<std::mutex> lock(pending_futures_mutex_);
if (!pending_futures_.empty()) {
(void)arrow::All(pending_futures_).result();
pending_futures_.clear();
}
}

arrow::Status ArrowInputStreamAdapter::Seek(int64_t position) {
return ToArrowStatus(input_stream_->Seek(position, SeekOrigin::FS_SEEK_SET));
}
Expand Down Expand Up @@ -130,6 +141,14 @@ arrow::Future<std::shared_ptr<arrow::Buffer>> ArrowInputStreamAdapter::ReadAsync
fut.MarkFinished(ToArrowStatus(callback_status));
}
});
{
std::lock_guard<std::mutex> lock(pending_futures_mutex_);
// Prune completed futures to avoid unbounded growth
pending_futures_.erase(std::remove_if(pending_futures_.begin(), pending_futures_.end(),
[](const auto& f) { return f.is_finished(); }),
pending_futures_.end());
pending_futures_.push_back(fut);
}
return fut;
}

Expand Down
9 changes: 9 additions & 0 deletions src/paimon/common/utils/arrow/arrow_input_stream_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include <cstdint>
#include <memory>
#include <mutex>
#include <vector>

#include "arrow/api.h"
#include "arrow/io/interfaces.h"
Expand Down Expand Up @@ -51,11 +53,18 @@ class PAIMON_EXPORT ArrowInputStreamAdapter : public arrow::io::RandomAccessFile

private:
arrow::Status DoClose();
void WaitForPendingAsyncReads();

std::shared_ptr<paimon::InputStream> input_stream_;
std::shared_ptr<arrow::MemoryPool> pool_;
uint64_t file_size_;
bool closed_ = false;

// Track outstanding async reads to ensure they complete before destruction.
// Without this, JindoSDK bthread callbacks may fire after the pool is freed,
// causing use-after-free in arrow::PoolBuffer::~PoolBuffer().
std::mutex pending_futures_mutex_;
std::vector<arrow::Future<std::shared_ptr<arrow::Buffer>>> pending_futures_;
};
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious — is this issue introduced by the page-level pushdown feature, or did it exist before?
If it's a pre-existing problem, should we consider submitting a separate fix PR to address it independently?

That way, we can clearly track and verify the bug fix without being coupled to the new feature changes.


} // namespace paimon
1 change: 1 addition & 0 deletions src/paimon/core/operation/key_value_file_store_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ Result<std::unique_ptr<KeyValueFileStoreScan>> KeyValueFileStoreScan::Create(
scan->SplitAndSetFilter(table_schema->PartitionKeys(), arrow_schema, scan_filters));
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> trimmed_pk, table_schema->TrimmedPrimaryKeys());
PAIMON_RETURN_NOT_OK(scan->SplitAndSetKeyValueFilter(trimmed_pk));

return scan;
}

Expand Down
9 changes: 8 additions & 1 deletion src/paimon/format/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@ set(PAIMON_PARQUET_FILE_FORMAT
parquet_field_id_converter.cpp
predicate_converter.cpp
file_reader_wrapper.cpp
page_filtered_row_group_reader.cpp
parquet_timestamp_converter.cpp
parquet_file_batch_reader.cpp
parquet_file_format_factory.cpp
parquet_format_writer.cpp
parquet_schema_util.cpp
parquet_stats_extractor.cpp
parquet_writer_builder.cpp)
parquet_writer_builder.cpp
row_ranges.cpp
column_index_filter.cpp)

add_paimon_lib(paimon_parquet_file_format
SOURCES
${PAIMON_PARQUET_FILE_FORMAT}
DEPENDENCIES
paimon_shared
parquet
PRIVATE_INCLUDES
"${ARROW_SOURCE_DIR}/cpp/src"
STATIC_LINK_LIBS
parquet
arrow
Expand All @@ -46,6 +51,7 @@ if(PAIMON_BUILD_TESTS)
add_paimon_test(parquet_format_test
SOURCES
file_reader_wrapper_test.cpp
page_filtered_row_group_reader_test.cpp
parquet_timestamp_converter_test.cpp
parquet_field_id_converter_test.cpp
parquet_file_batch_reader_test.cpp
Expand All @@ -54,6 +60,7 @@ if(PAIMON_BUILD_TESTS)
parquet_writer_builder_test.cpp
predicate_converter_test.cpp
predicate_pushdown_test.cpp
column_index_filter_test.cpp
STATIC_LINK_LIBS
paimon_shared
test_utils_static
Expand Down
Loading
Loading