From 91364dac9a4e396679c7494661bf7271d51ed515 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Mon, 18 May 2026 13:15:19 +0800 Subject: [PATCH 1/2] feat(data): add MOR file scan task reader Add FileScanTaskReader as the delete-aware Arrow stream entrypoint for file scan tasks, covering both no-delete reader passthrough and merge-on-read filtering with position and equality deletes. Introduce reusable Arrow C Data utilities for stream wrapping and batch projection, including ProjectionContext caching and optional Arrow compute registration via arrow::RegisterAll. Move Arrow IO registration into arrow_register and remove FileScanTask::ToArrow. Add coverage for projection behavior across nanoarrow and Arrow compute paths, plus end-to-end FileScanTaskReader tests for projected reads, position deletes, equality deletes, dropped equality fields, and fully deleted batches. --- src/iceberg/CMakeLists.txt | 5 +- src/iceberg/arrow/arrow_c_data_util.cc | 130 +++++ ...arrow_io_register.cc => arrow_register.cc} | 52 +- .../{arrow_io_register.h => arrow_register.h} | 8 +- src/iceberg/arrow_c_data_util.cc | 395 +++++++++++++ src/iceberg/arrow_c_data_util_internal.h | 239 ++++++++ src/iceberg/data/delete_filter.cc | 6 +- src/iceberg/data/file_scan_task_reader.cc | 233 ++++++++ src/iceberg/data/file_scan_task_reader.h | 81 +++ src/iceberg/data/meson.build | 1 + src/iceberg/file_reader.h | 2 +- src/iceberg/meson.build | 2 + src/iceberg/table_scan.cc | 118 ---- src/iceberg/table_scan.h | 11 - src/iceberg/test/CMakeLists.txt | 4 +- src/iceberg/test/arrow_c_data_util_test.cc | 194 +++++++ .../test/file_scan_task_reader_test.cc | 524 ++++++++++++++++++ src/iceberg/test/file_scan_task_test.cc | 27 +- 18 files changed, 1866 insertions(+), 166 deletions(-) create mode 100644 src/iceberg/arrow/arrow_c_data_util.cc rename src/iceberg/arrow/{arrow_io_register.cc => arrow_register.cc} (50%) rename src/iceberg/arrow/{arrow_io_register.h => arrow_register.h} (79%) create mode 100644 src/iceberg/arrow_c_data_util.cc create mode 100644 src/iceberg/arrow_c_data_util_internal.h create mode 100644 src/iceberg/data/file_scan_task_reader.cc create mode 100644 src/iceberg/data/file_scan_task_reader.h create mode 100644 src/iceberg/test/arrow_c_data_util_test.cc create mode 100644 src/iceberg/test/file_scan_task_reader_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 79298b1a1..ce68ad7b0 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -18,6 +18,7 @@ set(ICEBERG_INCLUDES "$" "$") set(ICEBERG_SOURCES + arrow_c_data_util.cc arrow_c_data_guard_internal.cc catalog/memory/in_memory_catalog.cc delete_file_index.cc @@ -164,6 +165,7 @@ set(ICEBERG_DATA_SOURCES data/delete_filter.cc data/delete_loader.cc data/equality_delete_writer.cc + data/file_scan_task_reader.cc data/position_delete_writer.cc data/writer.cc deletes/position_delete_index.cc @@ -220,9 +222,10 @@ add_subdirectory(util) if(ICEBERG_BUILD_BUNDLE) set(ICEBERG_BUNDLE_SOURCES + arrow/arrow_c_data_util.cc arrow/arrow_io.cc arrow/s3/arrow_s3_file_io.cc - arrow/arrow_io_register.cc + arrow/arrow_register.cc arrow/metadata_column_util.cc avro/avro_data_util.cc avro/avro_direct_decoder.cc diff --git a/src/iceberg/arrow/arrow_c_data_util.cc b/src/iceberg/arrow/arrow_c_data_util.cc new file mode 100644 index 000000000..cb2f066a3 --- /dev/null +++ b/src/iceberg/arrow/arrow_c_data_util.cc @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/nanoarrow_status_internal.h" +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/result.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +struct ArrowProjectBatchState { + std::shared_ptr<::arrow::Schema> input_schema; + std::shared_ptr<::arrow::Schema> output_schema; +}; + +Result> ImportArrowSchema( + const ArrowSchema& arrow_schema) { + ArrowSchema schema_copy; + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowSchemaDeepCopy(&arrow_schema, &schema_copy)); + internal::ArrowSchemaGuard schema_copy_guard(&schema_copy); + + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto schema, ::arrow::ImportSchema(&schema_copy)); + return schema; +} + +Result> GetArrowProjectBatchState( + ProjectionContext& projection) { + auto state = + std::static_pointer_cast(projection.project_batch_state()); + if (state != nullptr) { + return state; + } + + ICEBERG_ASSIGN_OR_RAISE(auto input_schema, + ImportArrowSchema(projection.input_arrow_schema())); + ICEBERG_ASSIGN_OR_RAISE(auto output_schema, + ImportArrowSchema(projection.output_arrow_schema())); + + state = std::make_shared( + ArrowProjectBatchState{.input_schema = std::move(input_schema), + .output_schema = std::move(output_schema)}); + projection.project_batch_state() = state; + return state; +} + +Result ProjectBatchArrowCompute(ArrowArray* input_batch, + std::span row_indices, + ProjectionContext& projection) { + ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null"); + ICEBERG_ASSIGN_OR_RAISE(auto state, GetArrowProjectBatchState(projection)); + + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto input_record_batch, + ::arrow::ImportRecordBatch(input_batch, state->input_schema)); + + const int32_t empty_index = 0; + const int32_t* row_indices_data = + row_indices.empty() ? &empty_index : row_indices.data(); + auto index_array = std::make_shared<::arrow::Int32Array>( + static_cast(row_indices.size()), + ::arrow::Buffer::Wrap(row_indices_data, row_indices.size())); + + std::vector> output_columns; + output_columns.reserve(projection.selected_field_indices().size()); + for (int input_index : projection.selected_field_indices()) { + ICEBERG_PRECHECK(input_index >= 0 && input_index < input_record_batch->num_columns(), + "Input field index {} out of range for batch with {} columns", + input_index, input_record_batch->num_columns()); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto taken_column, + ::arrow::compute::Take(*input_record_batch->column(input_index), *index_array)); + output_columns.push_back(std::move(taken_column)); + } + + auto output_record_batch = ::arrow::RecordBatch::Make( + state->output_schema, static_cast(row_indices.size()), + std::move(output_columns)); + + ArrowArray output_array; + ICEBERG_ARROW_RETURN_NOT_OK( + ::arrow::ExportRecordBatch(*output_record_batch, &output_array)); + internal::ArrowArrayGuard output_array_guard(&output_array); + + return std::exchange(output_array, ArrowArray{}); +} + +} // namespace + +void RegisterArrowProjectBatch() { + static std::once_flag flag; + std::call_once(flag, []() { + ProjectionContext::RegisterProjectBatchFunction(&ProjectBatchArrowCompute); + }); +} + +} // namespace iceberg diff --git a/src/iceberg/arrow/arrow_io_register.cc b/src/iceberg/arrow/arrow_register.cc similarity index 50% rename from src/iceberg/arrow/arrow_io_register.cc rename to src/iceberg/arrow/arrow_register.cc index 43273c0ae..d2983ffae 100644 --- a/src/iceberg/arrow/arrow_io_register.cc +++ b/src/iceberg/arrow/arrow_register.cc @@ -1,28 +1,35 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "iceberg/arrow/arrow_io_register.h" +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/arrow/arrow_register.h" #include #include +#include #include "iceberg/arrow/arrow_io_util.h" #include "iceberg/file_io_registry.h" +namespace iceberg { +void RegisterArrowProjectBatch(); +} + namespace iceberg::arrow { namespace { @@ -43,8 +50,6 @@ void RegisterS3FileIO() { #endif } -} // namespace - void EnsureArrowFileIOsRegistered() { static std::once_flag flag; std::call_once(flag, []() { @@ -58,4 +63,11 @@ void EnsureArrowFileIOsRegistered() { return true; }(); +} // namespace + +void RegisterAll() { + EnsureArrowFileIOsRegistered(); + ::iceberg::RegisterArrowProjectBatch(); +} + } // namespace iceberg::arrow diff --git a/src/iceberg/arrow/arrow_io_register.h b/src/iceberg/arrow/arrow_register.h similarity index 79% rename from src/iceberg/arrow/arrow_io_register.h rename to src/iceberg/arrow/arrow_register.h index f28b7a565..7a67be516 100644 --- a/src/iceberg/arrow/arrow_io_register.h +++ b/src/iceberg/arrow/arrow_register.h @@ -19,16 +19,16 @@ #pragma once -/// \file iceberg/arrow/arrow_io_register.h -/// \brief Provide functions to register Arrow FileIO implementations. +/// \file iceberg/arrow/arrow_register.h +/// \brief Provide functions to register Arrow bundle integrations. #include "iceberg/iceberg_bundle_export.h" namespace iceberg::arrow { -/// \brief Register built-in Arrow FileIO implementations into the FileIORegistry. +/// \brief Register Arrow FileIOs and Arrow-backed C Data utilities. /// /// This operation is idempotent and safe to call multiple times. -ICEBERG_BUNDLE_EXPORT void EnsureArrowFileIOsRegistered(); +ICEBERG_BUNDLE_EXPORT void RegisterAll(); } // namespace iceberg::arrow diff --git a/src/iceberg/arrow_c_data_util.cc b/src/iceberg/arrow_c_data_util.cc new file mode 100644 index 000000000..8f27961c4 --- /dev/null +++ b/src/iceberg/arrow_c_data_util.cc @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "iceberg/arrow/nanoarrow_status_internal.h" +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/file_reader.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result MakeArrowArrayStream(std::unique_ptr reader) { + return MakeArrowArrayStream(std::move(reader)); +} + +namespace { + +Result FindFieldIndexById(std::span fields, int32_t field_id) { + for (size_t index = 0; index < fields.size(); ++index) { + if (fields[index].field_id() == field_id) { + return index; + } + } + return InvalidArgument("Required schema does not contain projected field id {}", + field_id); +} + +std::mutex g_project_batch_function_mutex; +ProjectionContext::ProjectBatchFunction g_project_batch_function = nullptr; + +ProjectionContext::ProjectBatchFunction GetProjectBatchFunction() { + std::lock_guard lock(g_project_batch_function_mutex); + return g_project_batch_function; +} + +Result> BuildSelectedFieldIndices( + std::span input_fields, + std::span output_fields) { + std::vector selected_field_indices; + selected_field_indices.reserve(output_fields.size()); + + for (const auto& output_field : output_fields) { + ICEBERG_ASSIGN_OR_RAISE(auto input_index, + FindFieldIndexById(input_fields, output_field.field_id())); + const auto& input_field = input_fields[input_index]; + if (*input_field.type() != *output_field.type()) { + return InvalidArgument( + "ProjectBatch only supports complete top-level fields, but field id " + "{} changes type from {} to {}", + output_field.field_id(), input_field.type()->ToString(), + output_field.type()->ToString()); + } + ICEBERG_PRECHECK(input_index <= static_cast(std::numeric_limits::max()), + "Input field index {} exceeds int range", input_index); + selected_field_indices.push_back(static_cast(input_index)); + } + + return selected_field_indices; +} + +Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array); + +Status AppendListValues(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index); + const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 1); + for (int64_t element_index = begin; element_index < end; ++element_index) { + ICEBERG_RETURN_UNEXPECTED( + AppendValue(*input_schema.children[0], *input_array.children[0], + *input_view.children[0], element_index, output_array->children[0])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; +} + +Status AppendMapValues(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index); + const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 1); + for (int64_t entry_index = begin; entry_index < end; ++entry_index) { + ICEBERG_RETURN_UNEXPECTED( + AppendValue(*input_schema.children[0], *input_array.children[0], + *input_view.children[0], entry_index, output_array->children[0])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; +} + +Status AppendDecimal(const ArrowSchema& input_schema, const ArrowArrayView& input_view, + int64_t row_index, ArrowArray* output_array) { + ArrowError error; + ArrowSchemaView schema_view; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowSchemaViewInit(&schema_view, &input_schema, &error), error); + + ArrowDecimal value; + ArrowDecimalInit(&value, schema_view.decimal_bitwidth, schema_view.decimal_precision, + schema_view.decimal_scale); + ArrowArrayViewGetDecimalUnsafe(&input_view, row_index, &value); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDecimal(output_array, &value)); + return {}; +} + +Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + if (ArrowArrayViewIsNull(&input_view, row_index)) { + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1)); + return {}; + } + + switch (input_view.storage_type) { + case NANOARROW_TYPE_NA: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1)); + return {}; + case NANOARROW_TYPE_BOOL: + case NANOARROW_TYPE_INT8: + case NANOARROW_TYPE_INT16: + case NANOARROW_TYPE_INT32: + case NANOARROW_TYPE_INT64: + case NANOARROW_TYPE_DATE32: + case NANOARROW_TYPE_DATE64: + case NANOARROW_TYPE_TIME32: + case NANOARROW_TYPE_TIME64: + case NANOARROW_TYPE_TIMESTAMP: + case NANOARROW_TYPE_DURATION: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt( + output_array, ArrowArrayViewGetIntUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_UINT8: + case NANOARROW_TYPE_UINT16: + case NANOARROW_TYPE_UINT32: + case NANOARROW_TYPE_UINT64: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendUInt( + output_array, ArrowArrayViewGetUIntUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_HALF_FLOAT: + case NANOARROW_TYPE_FLOAT: + case NANOARROW_TYPE_DOUBLE: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDouble( + output_array, ArrowArrayViewGetDoubleUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_STRING: + case NANOARROW_TYPE_LARGE_STRING: + case NANOARROW_TYPE_STRING_VIEW: { + auto value = ArrowArrayViewGetStringUnsafe(&input_view, row_index); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(output_array, value)); + return {}; + } + case NANOARROW_TYPE_BINARY: + case NANOARROW_TYPE_LARGE_BINARY: + case NANOARROW_TYPE_FIXED_SIZE_BINARY: + case NANOARROW_TYPE_BINARY_VIEW: { + auto value = ArrowArrayViewGetBytesUnsafe(&input_view, row_index); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendBytes(output_array, value)); + return {}; + } + case NANOARROW_TYPE_DECIMAL128: + case NANOARROW_TYPE_DECIMAL256: + return AppendDecimal(input_schema, input_view, row_index, output_array); + case NANOARROW_TYPE_STRUCT: { + for (int64_t child_index = 0; child_index < input_schema.n_children; + ++child_index) { + ICEBERG_RETURN_UNEXPECTED(AppendValue( + *input_schema.children[child_index], *input_array.children[child_index], + *input_view.children[child_index], row_index, + output_array->children[child_index])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; + } + case NANOARROW_TYPE_LIST: + case NANOARROW_TYPE_LARGE_LIST: + case NANOARROW_TYPE_FIXED_SIZE_LIST: + return AppendListValues(input_schema, input_array, input_view, row_index, + output_array); + case NANOARROW_TYPE_MAP: + return AppendMapValues(input_schema, input_array, input_view, row_index, + output_array); + default: + return NotImplemented("Unsupported Arrow type for merge-on-read projection: {}", + static_cast(input_view.storage_type)); + } +} + +} // namespace + +ProjectionContext::ProjectionContext(ProjectionContext&& other) noexcept + : input_schema_(std::exchange(other.input_schema_, nullptr)), + output_schema_(std::exchange(other.output_schema_, nullptr)), + selected_field_indices_(std::move(other.selected_field_indices_)), + input_arrow_schema_(other.input_arrow_schema_), + output_arrow_schema_(other.output_arrow_schema_), + project_batch_function_(std::exchange(other.project_batch_function_, nullptr)), + project_batch_state_(std::move(other.project_batch_state_)) { + other.input_arrow_schema_.release = nullptr; + other.output_arrow_schema_.release = nullptr; +} + +ProjectionContext& ProjectionContext::operator=(ProjectionContext&& other) noexcept { + if (this == &other) { + return *this; + } + + if (input_arrow_schema_.release != nullptr) { + input_arrow_schema_.release(&input_arrow_schema_); + } + if (output_arrow_schema_.release != nullptr) { + output_arrow_schema_.release(&output_arrow_schema_); + } + + input_schema_ = std::exchange(other.input_schema_, nullptr); + output_schema_ = std::exchange(other.output_schema_, nullptr); + selected_field_indices_ = std::move(other.selected_field_indices_); + input_arrow_schema_ = other.input_arrow_schema_; + other.input_arrow_schema_.release = nullptr; + output_arrow_schema_ = other.output_arrow_schema_; + other.output_arrow_schema_.release = nullptr; + project_batch_function_ = std::exchange(other.project_batch_function_, nullptr); + project_batch_state_ = std::move(other.project_batch_state_); + return *this; +} + +ProjectionContext::~ProjectionContext() { + if (input_arrow_schema_.release != nullptr) { + input_arrow_schema_.release(&input_arrow_schema_); + } + if (output_arrow_schema_.release != nullptr) { + output_arrow_schema_.release(&output_arrow_schema_); + } +} + +Result ProjectionContext::Make( + const Schema& input_schema, const Schema& output_schema, + ProjectionContext::ProjectBatchFunction project_batch_function) { + ICEBERG_ASSIGN_OR_RAISE( + auto selected_field_indices, + BuildSelectedFieldIndices(input_schema.fields(), output_schema.fields())); + + ProjectionContext context; + context.input_schema_ = &input_schema; + context.output_schema_ = &output_schema; + context.selected_field_indices_ = std::move(selected_field_indices); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(input_schema, &context.input_arrow_schema_)); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(output_schema, &context.output_arrow_schema_)); + context.project_batch_function_ = project_batch_function; + + return context; +} + +const Schema& ProjectionContext::input_schema() const { return *input_schema_; } + +const Schema& ProjectionContext::output_schema() const { return *output_schema_; } + +const ArrowSchema& ProjectionContext::input_arrow_schema() const { + return input_arrow_schema_; +} + +const ArrowSchema& ProjectionContext::output_arrow_schema() const { + return output_arrow_schema_; +} + +std::span ProjectionContext::selected_field_indices() const { + return selected_field_indices_; +} + +ProjectionContext::ProjectBatchFunction ProjectionContext::project_batch_function() + const { + return project_batch_function_; +} + +ProjectionContext::ProjectBatchState& ProjectionContext::project_batch_state() { + return project_batch_state_; +} + +void ProjectionContext::RegisterProjectBatchFunction( + ProjectionContext::ProjectBatchFunction project_batch_function) { + ICEBERG_DCHECK(project_batch_function != nullptr, + "ProjectBatch implementation must not be null"); + if (project_batch_function == nullptr) { + return; + } + std::lock_guard lock(g_project_batch_function_mutex); + g_project_batch_function = project_batch_function; +} + +bool ProjectionContext::HasProjectBatchFunction() { + return GetProjectBatchFunction() != nullptr; +} + +auto ProjectionContext::ResolveProjectBatchFunction() + -> ProjectionContext::ProjectBatchFunction { + return GetProjectBatchFunction(); +} + +namespace { + +Result ProjectBatchNanoarrow(const ArrowSchema& input_arrow_schema, + const ArrowArray& input_batch, + std::span row_indices, + const ArrowSchema& output_arrow_schema, + std::span selected_field_indices) { + ArrowArrayView input_view; + ArrowError error; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayViewInitFromSchema(&input_view, &input_arrow_schema, &error), error); + internal::ArrowArrayViewGuard input_view_guard(&input_view); + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayViewSetArray(&input_view, &input_batch, &error), error); + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayViewValidate(&input_view, NANOARROW_VALIDATION_LEVEL_FULL, &error), + error); + + ArrowArray output_array; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayInitFromSchema(&output_array, &output_arrow_schema, &error), error); + internal::ArrowArrayGuard output_array_guard(&output_array); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&output_array)); + ICEBERG_NANOARROW_RETURN_UNEXPECTED( + ArrowArrayReserve(&output_array, static_cast(row_indices.size()))); + + for (int64_t row_index : row_indices) { + ICEBERG_PRECHECK(row_index >= 0 && row_index < input_batch.length, + "Row index {} out of range for batch length {}", row_index, + input_batch.length); + for (size_t output_index = 0; output_index < selected_field_indices.size(); + ++output_index) { + const int input_index = selected_field_indices[output_index]; + ICEBERG_RETURN_UNEXPECTED(AppendValue(*input_arrow_schema.children[input_index], + *input_batch.children[input_index], + *input_view.children[input_index], row_index, + output_array.children[output_index])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&output_array)); + } + + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayFinishBuildingDefault(&output_array, &error), error); + + return std::exchange(output_array, ArrowArray{}); +} + +} // namespace + +Result ProjectBatch(ArrowArray* input_batch, + std::span row_indices, + ProjectionContext& projection) { + ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null"); + internal::ArrowArrayGuard input_batch_guard(input_batch); + + auto project_batch_function = projection.project_batch_function(); + if (project_batch_function != nullptr) { + return project_batch_function(input_batch, row_indices, projection); + } + + return ProjectBatchNanoarrow(projection.input_arrow_schema(), *input_batch, row_indices, + projection.output_arrow_schema(), + projection.selected_field_indices()); +} + +} // namespace iceberg diff --git a/src/iceberg/arrow_c_data_util_internal.h b/src/iceberg/arrow_c_data_util_internal.h new file mode 100644 index 000000000..435aab155 --- /dev/null +++ b/src/iceberg/arrow_c_data_util_internal.h @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Cached state for ProjectBatch over one input/output schema pair. +class ICEBERG_EXPORT ProjectionContext { + public: + using ProjectBatchState = std::shared_ptr; + + using ProjectBatchFunction = auto (*)(ArrowArray* input_batch, + std::span row_indices, + ProjectionContext& projection) + -> Result; + + /// \brief Register a custom implementation for ProjectBatch. + static void RegisterProjectBatchFunction(ProjectBatchFunction project_batch_function); + + /// \brief Returns true when a custom implementation has been registered. + static bool HasProjectBatchFunction(); + + /// \brief Resolve the registered ProjectBatch implementation. + static auto ResolveProjectBatchFunction() -> ProjectBatchFunction; + + /// \brief Build reusable projection state for a validated schema pair. + /// + /// \param input_schema Schema that describes every input batch. + /// \param output_schema Final schema and column order requested by the caller. + /// \param project_batch_function Optional implementation returned by + /// ProjectionContext::ResolveProjectBatchFunction, or nullptr to use the nanoarrow + /// path. + /// \note It validates that output_schema selects or reorders complete top-level fields + /// by field id. Nested pruning and type changes are rejected. The input_schema and + /// output_schema passed to Make must outlive the context. ProjectBatch may lazily + /// initialize backend cache; do not share one context across concurrent calls. + static Result Make(const Schema& input_schema, + const Schema& output_schema, + ProjectBatchFunction project_batch_function); + + ProjectionContext(ProjectionContext&&) noexcept; + ProjectionContext& operator=(ProjectionContext&&) noexcept; + ~ProjectionContext(); + + ProjectionContext(const ProjectionContext&) = delete; + ProjectionContext& operator=(const ProjectionContext&) = delete; + + const Schema& input_schema() const; + + const Schema& output_schema() const; + + const ArrowSchema& input_arrow_schema() const; + + const ArrowSchema& output_arrow_schema() const; + + std::span selected_field_indices() const; + + ProjectBatchFunction project_batch_function() const; + + ProjectBatchState& project_batch_state(); + + private: + ProjectionContext() = default; + + const Schema* input_schema_ = nullptr; + const Schema* output_schema_ = nullptr; + std::vector selected_field_indices_; + ArrowSchema input_arrow_schema_{}; + ArrowSchema output_arrow_schema_{}; + ProjectBatchFunction project_batch_function_ = nullptr; + ProjectBatchState project_batch_state_; +}; + +/// \brief Concept for sources that can be wrapped as ArrowArrayStreams. +template +concept ArrowArrayStreamProvider = requires(Source& source) { + { source.Close() } -> std::same_as; + { source.Next() } -> std::same_as>>; + { source.Schema() } -> std::same_as>; +}; + +namespace detail { + +template +struct ArrowArrayStreamPrivateData { + std::unique_ptr source; + std::string last_error; + + explicit ArrowArrayStreamPrivateData(std::unique_ptr src) + : source(std::move(src)) {} + + ~ArrowArrayStreamPrivateData() { + if (source != nullptr) { + std::ignore = source->Close(); + } + } +}; + +template +int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) { + if (stream == nullptr || stream->private_data == nullptr) { + return EINVAL; + } + + auto* private_data = + static_cast*>(stream->private_data); + auto schema_result = private_data->source->Schema(); + if (!schema_result.has_value()) { + private_data->last_error = schema_result.error().message; + std::memset(out, 0, sizeof(ArrowSchema)); + return EIO; + } + + *out = std::move(schema_result.value()); + return 0; +} + +template +int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) { + if (stream == nullptr || stream->private_data == nullptr) { + return EINVAL; + } + + auto* private_data = + static_cast*>(stream->private_data); + auto next_result = private_data->source->Next(); + if (!next_result.has_value()) { + private_data->last_error = next_result.error().message; + std::memset(out, 0, sizeof(ArrowArray)); + return EIO; + } + + auto& optional_array = next_result.value(); + if (optional_array.has_value()) { + *out = std::move(optional_array.value()); + } else { + std::memset(out, 0, sizeof(ArrowArray)); + } + + return 0; +} + +template +const char* GetLastError(struct ArrowArrayStream* stream) { + if (stream == nullptr || stream->private_data == nullptr) { + return nullptr; + } + + auto* private_data = + static_cast*>(stream->private_data); + return private_data->last_error.empty() ? nullptr : private_data->last_error.c_str(); +} + +template +void Release(struct ArrowArrayStream* stream) { + if (stream == nullptr || stream->private_data == nullptr) { + return; + } + + delete static_cast*>(stream->private_data); + stream->private_data = nullptr; + stream->release = nullptr; +} + +} // namespace detail + +/// \brief Wrap an object with Close, Next, and Schema as an ArrowArrayStream. +template +Result MakeArrowArrayStream(std::unique_ptr source) { + if (source == nullptr) { + return InvalidArgument("Cannot make ArrowArrayStream from null source"); + } + + auto private_data = + std::make_unique>(std::move(source)); + ArrowArrayStream stream{.get_schema = detail::GetSchema, + .get_next = detail::GetNext, + .get_last_error = detail::GetLastError, + .release = detail::Release, + .private_data = private_data.release()}; + return stream; +} + +/// \brief Wrap a Reader as an ArrowArrayStream. +ICEBERG_EXPORT Result MakeArrowArrayStream( + std::unique_ptr reader); + +/// \brief Project selected rows from a batch into complete top-level fields. +/// +/// `input_batch` is consumed by this function. If the projection carries a registered +/// implementation, the call is delegated to it; otherwise the built-in nanoarrow +/// implementation is used. The projection must have been created for the stable schema +/// pair that describes the input batch and requested output. This function does not +/// revalidate schema compatibility on each batch. +/// +/// \param input_batch Owned Arrow C Data batch to project. +/// \param row_indices Zero-based row positions to copy from `input_batch`. +/// \param projection Reusable schema/projection state created by +/// ProjectionContext::Make. +/// \return A newly owned ArrowArray matching `projection.output_schema()`. +ICEBERG_EXPORT Result ProjectBatch(ArrowArray* input_batch, + std::span row_indices, + ProjectionContext& projection); + +} // namespace iceberg diff --git a/src/iceberg/data/delete_filter.cc b/src/iceberg/data/delete_filter.cc index 876d644e5..5f21a32de 100644 --- a/src/iceberg/data/delete_filter.cc +++ b/src/iceberg/data/delete_filter.cc @@ -20,6 +20,7 @@ #include "iceberg/data/delete_filter.h" #include +#include #include #include #include @@ -745,7 +746,10 @@ Result DeleteFilter::ComputeAliveRows(const ArrowSchema& batc return result; } - result.indices.reserve(batch.length); + ICEBERG_PRECHECK( + batch.length <= static_cast(std::numeric_limits::max()), + "Batch length {} exceeds int32_t row index capacity", batch.length); + result.indices.reserve(static_cast(batch.length)); ICEBERG_ASSIGN_OR_RAISE(auto row, ArrowArrayStructLike::Make(batch_schema, batch)); for (int64_t i = 0; i < batch.length; ++i) { diff --git a/src/iceberg/data/file_scan_task_reader.cc b/src/iceberg/data/file_scan_task_reader.cc new file mode 100644 index 000000000..6deeb0723 --- /dev/null +++ b/src/iceberg/data/file_scan_task_reader.cc @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/file_scan_task_reader.h" + +#include +#include +#include +#include +#include + +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/data/delete_filter.h" +#include "iceberg/file_reader.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" +#include "iceberg/table_scan.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +ReaderOptions MakeReaderOptions(const DataFile& data_file, std::shared_ptr io, + std::shared_ptr projection, + std::shared_ptr filter, + std::shared_ptr name_mapping, + ReaderProperties properties) { + return ReaderOptions{ + .path = data_file.file_path, + .length = static_cast(data_file.file_size_in_bytes), + .io = std::move(io), + .projection = std::move(projection), + .filter = std::move(filter), + .name_mapping = std::move(name_mapping), + .properties = std::move(properties), + }; +} + +class MergeOnReadStreamSource { + public: + MergeOnReadStreamSource(std::unique_ptr reader, + std::unique_ptr delete_filter, + std::shared_ptr<::iceberg::Schema> required_schema, + std::shared_ptr<::iceberg::Schema> projected_schema, + ProjectionContext projection_context) + : reader_(std::move(reader)), + delete_filter_(std::move(delete_filter)), + required_schema_(std::move(required_schema)), + projected_schema_(std::move(projected_schema)), + project_all_rows_(required_schema_->SameSchema(*projected_schema_)), + projection_context_(std::move(projection_context)) {} + + ~MergeOnReadStreamSource() { + if (cached_schema_.has_value() && cached_schema_->release != nullptr) { + cached_schema_->release(&cached_schema_.value()); + } + } + + Status Close() { + if (reader_ == nullptr) { + return {}; + } + return reader_->Close(); + } + + Result> Next() { + if (!cached_schema_.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(cached_schema_, reader_->Schema()); + } + ArrowSchema& input_arrow_schema = cached_schema_.value(); + + while (true) { + ICEBERG_ASSIGN_OR_RAISE(auto next_batch, reader_->Next()); + if (!next_batch.has_value()) { + return std::nullopt; + } + + ArrowArray input_batch = std::move(next_batch.value()); + internal::ArrowArrayGuard input_batch_guard(&input_batch); + + ICEBERG_ASSIGN_OR_RAISE( + auto alive, delete_filter_->ComputeAliveRows(input_arrow_schema, input_batch)); + if (alive.empty()) { + continue; + } + + if (alive.alive_count() == input_batch.length && project_all_rows_) { + // Transfer ownership to the stream result; the local guard must not release it. + ArrowArray output_batch = input_batch; + input_batch.release = nullptr; + return output_batch; + } + + return ProjectBatch(&input_batch, alive.indices, projection_context_); + } + } + + Result Schema() { + ArrowSchema schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*projected_schema_, &schema)); + return schema; + } + + private: + std::unique_ptr reader_; + std::unique_ptr delete_filter_; + std::shared_ptr<::iceberg::Schema> required_schema_; + std::shared_ptr<::iceberg::Schema> projected_schema_; + bool project_all_rows_ = false; + ProjectionContext projection_context_; + std::optional cached_schema_; +}; + +} // namespace + +class FileScanTaskReader::Impl { + public: + static Result> Make(Options options) { + ICEBERG_PRECHECK(options.io != nullptr, "FileIO must not be null"); + ICEBERG_PRECHECK(options.table_schema != nullptr, "Table schema must not be null"); + ICEBERG_PRECHECK(options.projected_schema != nullptr, + "Projected schema must not be null"); + for (const auto& schema : options.schemas) { + ICEBERG_PRECHECK(schema != nullptr, "Schema list must not contain null schemas"); + } + + ICEBERG_ASSIGN_OR_RAISE( + auto field_lookup, + DeleteFilter::MakeFieldLookup(options.table_schema, options.schemas)); + auto delete_counter = std::make_shared(); + + return std::unique_ptr( + new Impl(std::move(options), std::move(field_lookup), std::move(delete_counter))); + } + + Result Open(const FileScanTask& task) { + const auto& data_file = task.data_file(); + ICEBERG_PRECHECK(data_file != nullptr, "Data file must not be null"); + ICEBERG_PRECHECK(data_file->file_size_in_bytes >= 0, + "Data file size must not be negative: {}", + data_file->file_size_in_bytes); + + if (task.delete_files().empty()) { + auto options = + MakeReaderOptions(*data_file, io_, projected_schema_, task.residual_filter(), + name_mapping_, properties_); + ICEBERG_ASSIGN_OR_RAISE( + auto reader, ReaderFactoryRegistry::Open(data_file->file_format, options)); + return MakeArrowArrayStream(std::move(reader)); + } + + const bool has_position_deletes = + std::any_of(task.delete_files().begin(), task.delete_files().end(), + [](const std::shared_ptr& f) { + return f->content == DataFile::Content::kPositionDeletes; + }); + + ICEBERG_ASSIGN_OR_RAISE( + auto delete_filter, + DeleteFilter::Make(data_file->file_path, task.delete_files(), projected_schema_, + io_, field_lookup_, has_position_deletes, delete_counter_)); + + auto required_schema = delete_filter->RequiredSchema(); + auto project_batch_function = ProjectionContext::ResolveProjectBatchFunction(); + ICEBERG_ASSIGN_OR_RAISE(auto projection_context, + ProjectionContext::Make(*required_schema, *projected_schema_, + project_batch_function)); + + auto options = MakeReaderOptions(*data_file, io_, required_schema, + task.residual_filter(), name_mapping_, properties_); + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ReaderFactoryRegistry::Open(data_file->file_format, options)); + + auto mor_reader = std::make_unique( + std::move(reader), std::move(delete_filter), std::move(required_schema), + projected_schema_, std::move(projection_context)); + return MakeArrowArrayStream(std::move(mor_reader)); + } + + private: + Impl(Options options, DeleteFilter::FieldLookup field_lookup, + std::shared_ptr delete_counter) + : io_(std::move(options.io)), + schemas_(std::move(options.schemas)), + projected_schema_(std::move(options.projected_schema)), + name_mapping_(std::move(options.name_mapping)), + properties_(ReaderProperties::FromMap(options.properties)), + field_lookup_(std::move(field_lookup)), + delete_counter_(std::move(delete_counter)) {} + + std::shared_ptr io_; + std::vector> schemas_; + std::shared_ptr projected_schema_; + std::shared_ptr name_mapping_; + ReaderProperties properties_; + DeleteFilter::FieldLookup field_lookup_; + std::shared_ptr delete_counter_; +}; + +Result> FileScanTaskReader::Make(Options options) { + ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(std::move(options))); + return std::unique_ptr(new FileScanTaskReader(std::move(impl))); +} + +FileScanTaskReader::FileScanTaskReader(std::unique_ptr impl) + : impl_(std::move(impl)) {} + +FileScanTaskReader::~FileScanTaskReader() = default; + +Result FileScanTaskReader::Open(const FileScanTask& task) { + return impl_->Open(task); +} + +} // namespace iceberg diff --git a/src/iceberg/data/file_scan_task_reader.h b/src/iceberg/data/file_scan_task_reader.h new file mode 100644 index 000000000..a71ef5f84 --- /dev/null +++ b/src/iceberg/data/file_scan_task_reader.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/file_scan_task_reader.h +/// Delete-aware FileScanTask reader for copy-on-write and merge-on-read paths. + +#include +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_data_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Opens a FileScanTask as an ArrowArrayStream. +/// +/// FileScanTaskReader chooses the copy-on-write path for tasks without deletes and +/// the merge-on-read path for tasks with v2 position or equality deletes. The returned +/// stream always exposes `projected_schema`. +/// +/// TODO(gangwu): Add a mode that emits a `_deleted` column instead of filtering rows. +/// TODO(gangwu): Use evaluator to apply residual expression filters. +class ICEBERG_DATA_EXPORT FileScanTaskReader { + public: + /// \brief Options shared by all tasks opened by this reader. + struct Options { + /// FileIO instance for reading data and delete files. + std::shared_ptr io; + /// The table schema. Used as the primary field lookup for delete file resolution. + std::shared_ptr table_schema; + /// Optional list of historical table schemas for field lookup. + std::vector> schemas; + /// The output schema for the returned ArrowArrayStream. Must be a + /// projection of table_schema. + std::shared_ptr projected_schema; + /// Optional name mapping for files written without field IDs. + std::shared_ptr name_mapping; + /// Format-specific or implementation-specific options for data readers. + std::unordered_map properties; + }; + + /// \brief Create a reusable task reader from shared read context. + static Result> Make(Options options); + + ~FileScanTaskReader(); + + /// \brief Open a task and return an Arrow C stream for its projected live rows. + Result Open(const FileScanTask& task); + + FileScanTaskReader(const FileScanTaskReader&) = delete; + FileScanTaskReader& operator=(const FileScanTaskReader&) = delete; + + private: + class Impl; + explicit FileScanTaskReader(std::unique_ptr impl); + std::unique_ptr impl_; +}; + +} // namespace iceberg diff --git a/src/iceberg/data/meson.build b/src/iceberg/data/meson.build index f0877ec64..bbb26db27 100644 --- a/src/iceberg/data/meson.build +++ b/src/iceberg/data/meson.build @@ -21,6 +21,7 @@ install_headers( 'delete_filter.h', 'delete_loader.h', 'equality_delete_writer.h', + 'file_scan_task_reader.h', 'position_delete_writer.h', 'writer.h', ], diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h index c31d9b292..c76c10093 100644 --- a/src/iceberg/file_reader.h +++ b/src/iceberg/file_reader.h @@ -68,7 +68,7 @@ struct ICEBERG_EXPORT Split { size_t length; }; -class ReaderProperties : public ConfigBase { +class ICEBERG_EXPORT ReaderProperties : public ConfigBase { public: template using Entry = const ConfigBase::Entry; diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 0b5f269d5..8d4c8e3e3 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -41,6 +41,7 @@ configure_file( iceberg_include_dir = include_directories('..') iceberg_sources = files( 'arrow_c_data_guard_internal.cc', + 'arrow_c_data_util.cc', 'catalog/memory/in_memory_catalog.cc', 'delete_file_index.cc', 'expression/aggregate.cc', @@ -145,6 +146,7 @@ iceberg_data_sources = files( 'data/delete_filter.cc', 'data/delete_loader.cc', 'data/equality_delete_writer.cc', + 'data/file_scan_task_reader.cc', 'data/position_delete_writer.cc', 'data/writer.cc', 'deletes/position_delete_index.cc', diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index f61bd3a0c..e2b7acb5b 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -20,14 +20,11 @@ #include "iceberg/table_scan.h" #include -#include -#include #include #include "iceberg/expression/binder.h" #include "iceberg/expression/expression.h" #include "iceberg/expression/residual_evaluator.h" -#include "iceberg/file_reader.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_group.h" #include "iceberg/result.h" @@ -61,103 +58,6 @@ const std::vector kScanColumnsWithStats = [] { return cols; }(); -/// \brief Private data structure to hold the Reader and error state -struct ReaderStreamPrivateData { - std::unique_ptr reader; - std::string last_error; - - explicit ReaderStreamPrivateData(std::unique_ptr reader_ptr) - : reader(std::move(reader_ptr)) {} - - ~ReaderStreamPrivateData() { - if (reader) { - std::ignore = reader->Close(); - } - } -}; - -/// \brief Callback to get the stream schema -static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) { - if (!stream || !stream->private_data) { - return EINVAL; - } - auto* private_data = static_cast(stream->private_data); - // Get schema from reader - auto schema_result = private_data->reader->Schema(); - if (!schema_result.has_value()) { - private_data->last_error = schema_result.error().message; - std::memset(out, 0, sizeof(ArrowSchema)); - return EIO; - } - - *out = std::move(schema_result.value()); - return 0; -} - -/// \brief Callback to get the next array from the stream -static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) { - if (!stream || !stream->private_data) { - return EINVAL; - } - - auto* private_data = static_cast(stream->private_data); - - auto next_result = private_data->reader->Next(); - if (!next_result.has_value()) { - private_data->last_error = next_result.error().message; - std::memset(out, 0, sizeof(ArrowArray)); - return EIO; - } - - auto& optional_array = next_result.value(); - if (optional_array.has_value()) { - *out = std::move(optional_array.value()); - } else { - // End of stream - set release to nullptr to signal end - std::memset(out, 0, sizeof(ArrowArray)); - out->release = nullptr; - } - - return 0; -} - -/// \brief Callback to get the last error message -static const char* GetLastError(struct ArrowArrayStream* stream) { - if (!stream || !stream->private_data) { - return nullptr; - } - - auto* private_data = static_cast(stream->private_data); - return private_data->last_error.empty() ? nullptr : private_data->last_error.c_str(); -} - -/// \brief Callback to release the stream resources -static void Release(struct ArrowArrayStream* stream) { - if (!stream || !stream->private_data) { - return; - } - - delete static_cast(stream->private_data); - stream->private_data = nullptr; - stream->release = nullptr; -} - -Result MakeArrowArrayStream(std::unique_ptr reader) { - if (!reader) { - return InvalidArgument("Reader cannot be null"); - } - - auto private_data = std::make_unique(std::move(reader)); - - ArrowArrayStream stream{.get_schema = GetSchema, - .get_next = GetNext, - .get_last_error = GetLastError, - .release = Release, - .private_data = private_data.release()}; - - return stream; -} - } // namespace namespace internal { @@ -280,24 +180,6 @@ int32_t FileScanTask::files_count() const { return 1; } int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; } -Result FileScanTask::ToArrow( - const std::shared_ptr& io, std::shared_ptr projected_schema) const { - if (!delete_files_.empty()) { - return NotSupported("Reading data files with delete files is not yet supported."); - } - - const ReaderOptions options{.path = data_file_->file_path, - .length = data_file_->file_size_in_bytes, - .io = io, - .projection = std::move(projected_schema), - .filter = residual_filter_}; - - ICEBERG_ASSIGN_OR_RAISE(auto reader, - ReaderFactoryRegistry::Open(data_file_->file_format, options)); - - return MakeArrowArrayStream(std::move(reader)); -} - // ChangelogScanTask implementation int64_t ChangelogScanTask::size_bytes() const { diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 307946e6c..64fb3ffd1 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -27,7 +27,6 @@ #include #include -#include "iceberg/arrow_c_data.h" #include "iceberg/iceberg_export.h" #include "iceberg/result.h" #include "iceberg/table_metadata.h" @@ -87,16 +86,6 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { int32_t files_count() const override; int64_t estimated_row_count() const override; - /// TODO(gangwu): move it to iceberg/data/task_scanner.h - /// - /// \brief Returns a C-ABI compatible ArrowArrayStream to read the data for this task. - /// - /// \param io The FileIO instance for accessing the file data. - /// \param projected_schema The projected schema for reading the data. - /// \return A Result containing an ArrowArrayStream, or an error on failure. - Result ToArrow(const std::shared_ptr& io, - std::shared_ptr projected_schema) const; - private: std::shared_ptr data_file_; std::vector> delete_files_; diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index afafc4c14..c9fc111af 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -219,9 +219,11 @@ if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(data_test USE_BUNDLE SOURCES + arrow_c_data_util_test.cc data_writer_test.cc delete_filter_test.cc - delete_loader_test.cc) + delete_loader_test.cc + file_scan_task_reader_test.cc) endif() diff --git a/src/iceberg/test/arrow_c_data_util_test.cc b/src/iceberg/test/arrow_c_data_util_test.cc new file mode 100644 index 000000000..c93a928a3 --- /dev/null +++ b/src/iceberg/test/arrow_c_data_util_test.cc @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/// \file arrow_c_data_util_test.cc +/// Verifies ProjectBatch behavior across registered implementations. + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_register.h" +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" +#include "iceberg/test/matchers.h" +#include "iceberg/type.h" + +namespace iceberg::internal { + +namespace { + +std::shared_ptr<::arrow::RecordBatch> MakeBatch(const Schema& schema, + std::string_view json) { + ArrowSchema c_schema; + EXPECT_THAT(ToArrowSchema(schema, &c_schema), IsOk()); + // ImportSchema takes ownership of c_schema and calls release. + auto arrow_schema = ::arrow::ImportSchema(&c_schema).ValueOrDie(); + auto struct_type = ::arrow::struct_(arrow_schema->fields()); + return ::arrow::RecordBatch::FromStructArray( + ::arrow::json::ArrayFromJSONString(struct_type, std::string(json)) + .ValueOrDie()) + .ValueOrDie(); +} + +ProjectionContext::ProjectBatchFunction ArrowComputeFunction() { + arrow::RegisterAll(); + auto function = ProjectionContext::ResolveProjectBatchFunction(); + EXPECT_NE(function, nullptr); + return function; +} + +std::shared_ptr<::arrow::RecordBatch> RunProjectBatch( + const ::arrow::RecordBatch& batch, const std::vector& alive_indices, + const Schema& required_schema, const Schema& projected_schema, + ProjectionContext::ProjectBatchFunction project_batch_function) { + ArrowSchema c_schema; + ArrowArray c_array; + EXPECT_TRUE(::arrow::ExportRecordBatch(batch, &c_array, &c_schema).ok()); + ArrowSchemaGuard schema_guard(&c_schema); + ArrowArrayGuard array_guard(&c_array); + + auto projection = + ProjectionContext::Make(required_schema, projected_schema, project_batch_function); + EXPECT_THAT(projection, IsOk()); + + auto result = ProjectBatch(&c_array, alive_indices, projection.value()); + EXPECT_THAT(result, IsOk()); + + ArrowSchema out_c_schema; + EXPECT_THAT(ToArrowSchema(projected_schema, &out_c_schema), IsOk()); + auto arrow_out_schema = ::arrow::ImportSchema(&out_c_schema).ValueOrDie(); + + ArrowArray out_array = std::exchange(result.value(), ArrowArray{}); + return ::arrow::ImportRecordBatch(&out_array, arrow_out_schema).ValueOrDie(); +} + +void ExpectProjectBatch(const ::arrow::RecordBatch& batch, + const std::vector& alive_indices, + const Schema& required_schema, const Schema& projected_schema, + std::string_view expected_json) { + auto expected = MakeBatch(projected_schema, expected_json); + auto nanoarrow = + RunProjectBatch(batch, alive_indices, required_schema, projected_schema, nullptr); + auto arrow_compute = RunProjectBatch(batch, alive_indices, required_schema, + projected_schema, ArrowComputeFunction()); + + EXPECT_TRUE(nanoarrow->Equals(*expected)) << "nanoarrow:\n" + << nanoarrow->ToString() << "expected:\n" + << expected->ToString(); + EXPECT_TRUE(arrow_compute->Equals(*expected)) + << "arrow_compute:\n" + << arrow_compute->ToString() << "expected:\n" + << expected->ToString(); + EXPECT_TRUE(nanoarrow->Equals(*arrow_compute)) + << "nanoarrow:\n" + << nanoarrow->ToString() << "arrow_compute:\n" + << arrow_compute->ToString(); +} + +std::shared_ptr MakeFullSchema() { + return std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "score", float64())}); +} + +} // namespace + +TEST(ProjectBatchTest, ProjectSelectedRowsWithoutColumnProjection) { + auto schema = MakeFullSchema(); + auto batch = MakeBatch(*schema, R"([[1,"a",1.0],[2,"b",2.0],[3,"c",3.0],[4,"d",4.0]])"); + std::vector alive = {0, 2}; + + ExpectProjectBatch(*batch, alive, *schema, *schema, R"([[1,"a",1.0],[3,"c",3.0]])"); +} + +TEST(ProjectBatchTest, ProjectColumnsWithoutRowFiltering) { + auto full_schema = MakeFullSchema(); + auto projected = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}); + auto batch = MakeBatch(*full_schema, R"([[1,"a",1.0],[2,"b",2.0],[3,"c",3.0]])"); + std::vector alive = {0, 1, 2}; + + ExpectProjectBatch(*batch, alive, *full_schema, *projected, + R"([[1,"a"],[2,"b"],[3,"c"]])"); +} + +TEST(ProjectBatchTest, ProjectSelectedRowsAndReorderColumns) { + auto full_schema = MakeFullSchema(); + // Reorder: score(3) before name(2), drop id(1). + auto projected = std::make_shared( + std::vector{SchemaField::MakeOptional(3, "score", float64()), + SchemaField::MakeOptional(2, "name", string())}); + auto batch = MakeBatch(*full_schema, R"([[1,"a",1.0],[2,"b",2.0],[3,"c",3.0]])"); + std::vector alive = {1, 2}; + + ExpectProjectBatch(*batch, alive, *full_schema, *projected, R"([[2.0,"b"],[3.0,"c"]])"); +} + +TEST(ProjectBatchTest, NullValues) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}); + auto batch = MakeBatch(*schema, R"([[1,null],[2,"b"],[3,null]])"); + std::vector alive = {0, 2}; + + ExpectProjectBatch(*batch, alive, *schema, *schema, R"([[1,null],[3,null]])"); +} + +TEST(ProjectBatchTest, EmptyRowSelection) { + auto schema = MakeFullSchema(); + auto batch = MakeBatch(*schema, R"([[1,"a",1.0],[2,"b",2.0]])"); + std::vector alive = {}; + + ExpectProjectBatch(*batch, alive, *schema, *schema, R"([])"); +} + +TEST(ProjectBatchTest, ProjectionRejectsNestedPruning) { + auto input_schema = Schema(std::vector{ + SchemaField::MakeOptional(1, "person", + std::make_shared(std::vector{ + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "age", int32()), + })), + }); + auto output_schema = Schema(std::vector{ + SchemaField::MakeOptional(1, "person", + std::make_shared(std::vector{ + SchemaField::MakeOptional(2, "name", string()), + })), + }); + + auto projection = ProjectionContext::Make(input_schema, output_schema, nullptr); + + EXPECT_THAT(projection, IsError(ErrorKind::kInvalidArgument)); +} + +} // namespace iceberg::internal diff --git a/src/iceberg/test/file_scan_task_reader_test.cc b/src/iceberg/test/file_scan_task_reader_test.cc new file mode 100644 index 000000000..1630a108a --- /dev/null +++ b/src/iceberg/test/file_scan_task_reader_test.cc @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/file_scan_task_reader.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/arrow/arrow_register.h" +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/data/equality_delete_writer.h" +#include "iceberg/data/position_delete_writer.h" +#include "iceberg/file_format.h" +#include "iceberg/file_reader.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/parquet/parquet_register.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" +#include "iceberg/table_scan.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/temp_file_test_base.h" +#include "iceberg/type.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +struct ExportedBatch { + ArrowSchema schema{}; + ArrowArray array{}; + + ~ExportedBatch() { + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } + } + + ExportedBatch() = default; + ExportedBatch(const ExportedBatch&) = delete; + ExportedBatch& operator=(const ExportedBatch&) = delete; + + ExportedBatch(ExportedBatch&& other) noexcept + : schema(other.schema), array(other.array) { + other.schema.release = nullptr; + other.array.release = nullptr; + } + ExportedBatch& operator=(ExportedBatch&& other) noexcept = delete; +}; + +} // namespace + +class FileScanTaskReaderTest : public TempFileTestBase { + protected: + static void SetUpTestSuite() { + arrow::RegisterAll(); + parquet::RegisterAll(); + } + + void SetUp() override { + TempFileTestBase::SetUp(); + file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO(); + partition_spec_ = PartitionSpec::Unpartitioned(); + table_schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "category", string())}, + /*schema_id=*/2); + projected_schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}, + table_schema_->schema_id()); + } + + Result MakeBatch(const Schema& schema, + const std::string& json_data) const { + ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, MakeArrowSchema(schema)); + auto struct_type = ::arrow::struct_(arrow_schema->fields()); + auto array_result = ::arrow::json::ArrayFromJSONString(struct_type, json_data); + if (!array_result.ok()) { + return UnknownError(array_result.status().ToString()); + } + + ExportedBatch batch; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &batch.schema)); + auto export_status = + ::arrow::ExportArray(*array_result.MoveValueUnsafe(), &batch.array); + if (!export_status.ok()) { + return UnknownError(export_status.ToString()); + } + return std::move(batch); + } + + Result> MakeArrowSchema(const Schema& schema) const { + ArrowSchema c_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &c_schema)); + auto arrow_schema_result = ::arrow::ImportSchema(&c_schema); + if (!arrow_schema_result.ok()) { + return UnknownError(arrow_schema_result.status().ToString()); + } + return arrow_schema_result.MoveValueUnsafe(); + } + + Result> MakeRecordBatch( + const std::shared_ptr<::arrow::Schema>& arrow_schema, + const std::string& json_data) const { + auto struct_type = ::arrow::struct_(arrow_schema->fields()); + auto array_result = ::arrow::json::ArrayFromJSONString(struct_type, json_data); + if (!array_result.ok()) { + return UnknownError(array_result.status().ToString()); + } + + auto batch_result = + ::arrow::RecordBatch::FromStructArray(array_result.MoveValueUnsafe()); + if (!batch_result.ok()) { + return UnknownError(batch_result.status().ToString()); + } + return batch_result.MoveValueUnsafe(); + } + + Result CreateParquetDataFile(std::shared_ptr schema, + const std::string& json_data, + int64_t row_group_size = 1024) { + auto path = CreateNewTempFilePathWithSuffix(".parquet"); + + ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, MakeArrowSchema(*schema)); + ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeRecordBatch(arrow_schema, json_data)); + auto table_result = + ::arrow::Table::FromRecordBatches(arrow_schema, {std::move(batch)}); + if (!table_result.ok()) { + return UnknownError(table_result.status().ToString()); + } + + ICEBERG_ASSIGN_OR_RAISE(auto outfile, arrow::OpenArrowOutputStream(file_io_, path)); + auto write_status = ::parquet::arrow::WriteTable(*table_result.MoveValueUnsafe(), + ::arrow::default_memory_pool(), + outfile, row_group_size); + if (!write_status.ok()) { + return UnknownError(write_status.ToString()); + } + if (auto close_status = outfile->Close(); !close_status.ok()) { + return UnknownError(close_status.ToString()); + } + return path; + } + + Result CreateParquetDataFile(std::shared_ptr schema, + const std::vector& json_batches, + int64_t max_row_group_length) { + ICEBERG_PRECHECK(!json_batches.empty(), "Parquet data file must have a batch"); + + auto path = CreateNewTempFilePathWithSuffix(".parquet"); + ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, MakeArrowSchema(*schema)); + ICEBERG_ASSIGN_OR_RAISE(auto outfile, arrow::OpenArrowOutputStream(file_io_, path)); + + auto properties_builder = ::parquet::WriterProperties::Builder(); + auto writer_properties = + properties_builder.compression(::parquet::Compression::UNCOMPRESSED) + ->max_row_group_length(max_row_group_length) + ->build(); + auto writer_result = ::parquet::arrow::FileWriter::Open( + *arrow_schema, ::arrow::default_memory_pool(), outfile, writer_properties); + if (!writer_result.ok()) { + return UnknownError(writer_result.status().ToString()); + } + auto writer = writer_result.MoveValueUnsafe(); + + for (const auto& json_batch : json_batches) { + ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeRecordBatch(arrow_schema, json_batch)); + if (auto write_status = writer->WriteRecordBatch(*batch); !write_status.ok()) { + return UnknownError(write_status.ToString()); + } + } + if (auto close_status = writer->Close(); !close_status.ok()) { + return UnknownError(close_status.ToString()); + } + ICEBERG_PRECHECK( + writer->metadata()->num_row_groups() == static_cast(json_batches.size()), + "Expected {} Parquet row groups, got {}", json_batches.size(), + writer->metadata()->num_row_groups()); + if (auto close_status = outfile->Close(); !close_status.ok()) { + return UnknownError(close_status.ToString()); + } + return path; + } + + Result> MakeDataFile(std::shared_ptr schema, + const std::string& json_data, + int64_t record_count = 3) { + ICEBERG_ASSIGN_OR_RAISE(auto path, + CreateParquetDataFile(std::move(schema), json_data)); + ICEBERG_ASSIGN_OR_RAISE(auto input_file, file_io_->NewInputFile(path)); + ICEBERG_ASSIGN_OR_RAISE(auto size, input_file->Size()); + return std::make_shared(DataFile{ + .content = DataFile::Content::kData, + .file_path = path, + .file_format = FileFormatType::kParquet, + .record_count = record_count, + .file_size_in_bytes = size, + }); + } + + Result> MakeDataFile( + std::shared_ptr schema, const std::vector& json_batches, + int64_t record_count, int64_t max_row_group_length) { + ICEBERG_ASSIGN_OR_RAISE( + auto path, + CreateParquetDataFile(std::move(schema), json_batches, max_row_group_length)); + ICEBERG_ASSIGN_OR_RAISE(auto input_file, file_io_->NewInputFile(path)); + ICEBERG_ASSIGN_OR_RAISE(auto size, input_file->Size()); + return std::make_shared(DataFile{ + .content = DataFile::Content::kData, + .file_path = path, + .file_format = FileFormatType::kParquet, + .record_count = record_count, + .file_size_in_bytes = size, + }); + } + + Result> MakePositionDeleteFile( + const std::string& path, const std::vector& positions, + const std::string& data_path) { + PositionDeleteWriterOptions options{ + .path = path, + .schema = table_schema_, + .spec = partition_spec_, + .partition = PartitionValues{}, + .format = FileFormatType::kParquet, + .io = file_io_, + .flush_threshold = 10000, + .properties = {{"write.parquet.compression-codec", "uncompressed"}}, + }; + + ICEBERG_ASSIGN_OR_RAISE(auto writer, PositionDeleteWriter::Make(options)); + for (int64_t pos : positions) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteDelete(data_path, pos)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata()); + return metadata.data_files[0]; + } + + Result> MakeEqualityDeleteFile( + const std::string& path, std::shared_ptr schema, + const std::string& json_data, std::vector equality_field_ids) { + EqualityDeleteWriterOptions options{ + .path = path, + .schema = schema, + .spec = partition_spec_, + .partition = PartitionValues{}, + .format = FileFormatType::kParquet, + .io = file_io_, + .equality_field_ids = std::move(equality_field_ids), + .properties = {{"write.parquet.compression-codec", "uncompressed"}}, + }; + + ICEBERG_ASSIGN_OR_RAISE(auto writer, EqualityDeleteWriter::Make(options)); + ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeBatch(*schema, json_data)); + ICEBERG_RETURN_UNEXPECTED(writer->Write(&batch.array)); + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata()); + return metadata.data_files[0]; + } + + void VerifyStream(struct ArrowArrayStream* stream, std::string_view expected_json) { + auto record_batch_reader = ::arrow::ImportRecordBatchReader(stream).ValueOrDie(); + + auto result = record_batch_reader->Next(); + ASSERT_TRUE(result.ok()) << result.status().message(); + auto actual_batch = result.ValueOrDie(); + ASSERT_NE(actual_batch, nullptr) << "Stream is exhausted but expected more data."; + + auto struct_type = ::arrow::struct_(actual_batch->schema()->fields()); + auto expected_array = + ::arrow::json::ArrayFromJSONString(struct_type, expected_json).ValueOrDie(); + auto expected_batch = + ::arrow::RecordBatch::FromStructArray(expected_array).ValueOrDie(); + + ASSERT_TRUE(actual_batch->Equals(*expected_batch)) + << "Actual batch:\n" + << actual_batch->ToString() << "\nExpected batch:\n" + << expected_batch->ToString(); + + result = record_batch_reader->Next(); + ASSERT_TRUE(result.ok()) << result.status().message(); + ASSERT_EQ(result.ValueOrDie(), nullptr) << "Reader returned an extra batch."; + } + + void VerifyDataReaderBatchLengths(const DataFile& data_file, + const std::vector& expected_lengths, + ReaderProperties properties = {}) { + ReaderOptions options{ + .path = data_file.file_path, + .length = static_cast(data_file.file_size_in_bytes), + .io = file_io_, + .projection = table_schema_, + .properties = std::move(properties), + }; + ICEBERG_UNWRAP_OR_FAIL(auto reader, + ReaderFactoryRegistry::Open(data_file.file_format, options)); + + for (int64_t expected_length : expected_lengths) { + ICEBERG_UNWRAP_OR_FAIL(auto maybe_batch, reader->Next()); + ASSERT_TRUE(maybe_batch.has_value()) << "Reader is exhausted too early."; + + ArrowArray batch = std::move(maybe_batch.value()); + internal::ArrowArrayGuard batch_guard(&batch); + ASSERT_EQ(batch.length, expected_length); + } + + ICEBERG_UNWRAP_OR_FAIL(auto maybe_batch, reader->Next()); + ASSERT_FALSE(maybe_batch.has_value()) << "Reader returned an extra batch."; + ASSERT_THAT(reader->Close(), IsOk()); + } + + std::shared_ptr file_io_; + std::shared_ptr partition_spec_; + std::shared_ptr table_schema_; + std::shared_ptr projected_schema_; +}; + +TEST_F(FileScanTaskReaderTest, OpenWithoutDeletesReadsProjectedSchema) { + ICEBERG_UNWRAP_OR_FAIL( + auto data_file, + MakeDataFile(table_schema_, + R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz", "green"]])")); + FileScanTask task(data_file); + + FileScanTaskReader::Options options{ + .io = file_io_, + .table_schema = table_schema_, + .schemas = {table_schema_}, + .projected_schema = projected_schema_, + }; + ICEBERG_UNWRAP_OR_FAIL(auto reader, FileScanTaskReader::Make(std::move(options))); + auto stream_result = reader->Open(task); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyStream(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")); +} + +TEST_F(FileScanTaskReaderTest, OpenWithPositionDeletesFiltersRowsAndPrunesPos) { + ICEBERG_UNWRAP_OR_FAIL( + auto data_file, + MakeDataFile(table_schema_, + R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz", "green"]])")); + ICEBERG_UNWRAP_OR_FAIL( + auto pos_delete, MakePositionDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"), + {1}, data_file->file_path)); + FileScanTask task(data_file, {pos_delete}); + + FileScanTaskReader::Options options{ + .io = file_io_, + .table_schema = table_schema_, + .schemas = {table_schema_}, + .projected_schema = projected_schema_, + }; + ICEBERG_UNWRAP_OR_FAIL(auto reader, FileScanTaskReader::Make(std::move(options))); + auto stream_result = reader->Open(task); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3, "Baz"]])")); +} + +TEST_F(FileScanTaskReaderTest, OpenWithEqualityDeletesAddsAndPrunesDeleteOnlyColumns) { + ICEBERG_UNWRAP_OR_FAIL( + auto data_file, + MakeDataFile(table_schema_, + R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz", "green"]])")); + ICEBERG_UNWRAP_OR_FAIL( + auto eq_delete, + MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"), table_schema_, + R"([[0, "unused", "red"]])", {3})); + FileScanTask task(data_file, {eq_delete}); + + FileScanTaskReader::Options options{ + .io = file_io_, + .table_schema = table_schema_, + .schemas = {table_schema_}, + .projected_schema = projected_schema_, + }; + ICEBERG_UNWRAP_OR_FAIL(auto reader, FileScanTaskReader::Make(std::move(options))); + auto stream_result = reader->Open(task); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3, "Baz"]])")); +} + +TEST_F(FileScanTaskReaderTest, OpenWithEqualityDeletesKeepsInputBatchWhenAllRowsAlive) { + ICEBERG_UNWRAP_OR_FAIL( + auto data_file, + MakeDataFile(table_schema_, + R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz", "green"]])")); + ICEBERG_UNWRAP_OR_FAIL( + auto eq_delete, + MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"), table_schema_, + R"([[99, "unused", "unused"]])", {1})); + FileScanTask task(data_file, {eq_delete}); + + FileScanTaskReader::Options options{ + .io = file_io_, + .table_schema = table_schema_, + .schemas = {table_schema_}, + .projected_schema = projected_schema_, + }; + ICEBERG_UNWRAP_OR_FAIL(auto reader, FileScanTaskReader::Make(std::move(options))); + auto stream_result = reader->Open(task); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyStream(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")); +} + +TEST_F(FileScanTaskReaderTest, OpenWithSchemasResolvesDroppedEqualityField) { + auto current_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}, + /*schema_id=*/2); + auto old_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(4, "dropped_value", string())}, + /*schema_id=*/1); + ICEBERG_UNWRAP_OR_FAIL( + auto data_file, + MakeDataFile(old_schema, + R"([[1, "Foo", "keep"], [2, "Bar", "gone"], [3, "Baz", "keep"]])")); + ICEBERG_UNWRAP_OR_FAIL( + auto eq_delete, + MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"), old_schema, + R"([[0, "unused", "gone"]])", {4})); + FileScanTask task(data_file, {eq_delete}); + + FileScanTaskReader::Options options{ + .io = file_io_, + .table_schema = current_schema, + .schemas = {current_schema, old_schema}, + .projected_schema = current_schema, + }; + ICEBERG_UNWRAP_OR_FAIL(auto reader, FileScanTaskReader::Make(std::move(options))); + auto stream_result = reader->Open(task); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3, "Baz"]])")); +} + +TEST_F(FileScanTaskReaderTest, OpenWithMixedDeletesSkipsFullyDeletedBatches) { + ICEBERG_UNWRAP_OR_FAIL( + auto data_file, + MakeDataFile(table_schema_, + std::vector{R"([[1, "Foo", "blue"], [2, "Bar", "red"]])", + R"([[3, "Baz", "green"]])"}, + /*record_count=*/3, /*max_row_group_length=*/2)); + // Parquet can coalesce row groups when the reader batch size is larger. + ReaderProperties properties; + properties.Set(ReaderProperties::kBatchSize, int64_t{2}); + ASSERT_NO_FATAL_FAILURE(VerifyDataReaderBatchLengths(*data_file, {2, 1}, properties)); + ICEBERG_UNWRAP_OR_FAIL( + auto pos_delete, MakePositionDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"), + {0, 1}, data_file->file_path)); + ICEBERG_UNWRAP_OR_FAIL( + auto eq_delete, + MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"), table_schema_, + R"([[0, "unused", "yellow"]])", {3})); + FileScanTask task(data_file, {pos_delete, eq_delete}); + + FileScanTaskReader::Options options{ + .io = file_io_, + .table_schema = table_schema_, + .schemas = {table_schema_}, + .projected_schema = projected_schema_, + .properties = properties.configs(), + }; + ICEBERG_UNWRAP_OR_FAIL(auto reader, FileScanTaskReader::Make(std::move(options))); + auto stream_result = reader->Open(task); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[3, "Baz"]])")); +} + +} // namespace iceberg diff --git a/src/iceberg/test/file_scan_task_test.cc b/src/iceberg/test/file_scan_task_test.cc index 55bc6a110..cb945ba1f 100644 --- a/src/iceberg/test/file_scan_task_test.cc +++ b/src/iceberg/test/file_scan_task_test.cc @@ -28,6 +28,7 @@ #include #include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/data/file_scan_task_reader.h" #include "iceberg/file_format.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/parquet/parquet_register.h" @@ -109,6 +110,20 @@ class FileScanTaskTest : public TempFileTestBase { return data_file; } + Result OpenTask(const FileScanTask& task, + std::shared_ptr projected_schema) { + auto current_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}); + FileScanTaskReader::Options options{ + .io = file_io_, + .table_schema = current_schema, + .projected_schema = std::move(projected_schema), + }; + ICEBERG_ASSIGN_OR_RAISE(auto reader, FileScanTaskReader::Make(std::move(options))); + return reader->Open(task); + } + // Helper method to verify the content of the next batch from an ArrowArrayStream. void VerifyStreamNextBatch(struct ArrowArrayStream* stream, std::string_view expected_json) { @@ -154,9 +169,7 @@ TEST_F(FileScanTaskTest, ReadFullSchema) { FileScanTask task(data_file); - auto stream_result = task.ToArrow(file_io_, projected_schema); - ASSERT_THAT(stream_result, IsOk()); - auto stream = std::move(stream_result.value()); + ICEBERG_UNWRAP_OR_FAIL(auto stream, OpenTask(task, projected_schema)); ASSERT_NO_FATAL_FAILURE( VerifyStreamNextBatch(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")); @@ -171,9 +184,7 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { FileScanTask task(data_file); - auto stream_result = task.ToArrow(file_io_, projected_schema); - ASSERT_THAT(stream_result, IsOk()); - auto stream = std::move(stream_result.value()); + ICEBERG_UNWRAP_OR_FAIL(auto stream, OpenTask(task, projected_schema)); ASSERT_NO_FATAL_FAILURE( VerifyStreamNextBatch(&stream, R"([["Foo", null], ["Bar", null], ["Baz", null]])")); @@ -188,9 +199,7 @@ TEST_F(FileScanTaskTest, ReadEmptyFile) { FileScanTask task(data_file); - auto stream_result = task.ToArrow(file_io_, projected_schema); - ASSERT_THAT(stream_result, IsOk()); - auto stream = std::move(stream_result.value()); + ICEBERG_UNWRAP_OR_FAIL(auto stream, OpenTask(task, projected_schema)); // The stream should be immediately exhausted ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream)); From 46035d8c3c93325862eae40ce5612442b80e60ed Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 20 May 2026 23:25:43 +0800 Subject: [PATCH 2/2] fix: address scan task reader review comments --- src/iceberg/arrow/arrow_c_data_util.cc | 3 ++- src/iceberg/arrow_c_data_util.cc | 28 ++++++++++++----------- src/iceberg/arrow_c_data_util_internal.h | 12 +++++++--- src/iceberg/data/file_scan_task_reader.cc | 2 ++ 4 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/iceberg/arrow/arrow_c_data_util.cc b/src/iceberg/arrow/arrow_c_data_util.cc index cb2f066a3..ea52d64c0 100644 --- a/src/iceberg/arrow/arrow_c_data_util.cc +++ b/src/iceberg/arrow/arrow_c_data_util.cc @@ -88,6 +88,7 @@ Result ProjectBatchArrowCompute(ArrowArray* input_batch, ::arrow::ImportRecordBatch(input_batch, state->input_schema)); const int32_t empty_index = 0; + // Buffer::Wrap needs a valid pointer even when the zero-length buffer is never read. const int32_t* row_indices_data = row_indices.empty() ? &empty_index : row_indices.data(); auto index_array = std::make_shared<::arrow::Int32Array>( @@ -96,7 +97,7 @@ Result ProjectBatchArrowCompute(ArrowArray* input_batch, std::vector> output_columns; output_columns.reserve(projection.selected_field_indices().size()); - for (int input_index : projection.selected_field_indices()) { + for (int32_t input_index : projection.selected_field_indices()) { ICEBERG_PRECHECK(input_index >= 0 && input_index < input_record_batch->num_columns(), "Input field index {} out of range for batch with {} columns", input_index, input_record_batch->num_columns()); diff --git a/src/iceberg/arrow_c_data_util.cc b/src/iceberg/arrow_c_data_util.cc index 8f27961c4..a1d765d2c 100644 --- a/src/iceberg/arrow_c_data_util.cc +++ b/src/iceberg/arrow_c_data_util.cc @@ -63,10 +63,10 @@ ProjectionContext::ProjectBatchFunction GetProjectBatchFunction() { return g_project_batch_function; } -Result> BuildSelectedFieldIndices( +Result> BuildSelectedFieldIndices( std::span input_fields, std::span output_fields) { - std::vector selected_field_indices; + std::vector selected_field_indices; selected_field_indices.reserve(output_fields.size()); for (const auto& output_field : output_fields) { @@ -80,9 +80,10 @@ Result> BuildSelectedFieldIndices( output_field.field_id(), input_field.type()->ToString(), output_field.type()->ToString()); } - ICEBERG_PRECHECK(input_index <= static_cast(std::numeric_limits::max()), - "Input field index {} exceeds int range", input_index); - selected_field_indices.push_back(static_cast(input_index)); + ICEBERG_PRECHECK( + input_index <= static_cast(std::numeric_limits::max()), + "Input field index {} exceeds int32 range", input_index); + selected_field_indices.push_back(static_cast(input_index)); } return selected_field_indices; @@ -294,7 +295,7 @@ const ArrowSchema& ProjectionContext::output_arrow_schema() const { return output_arrow_schema_; } -std::span ProjectionContext::selected_field_indices() const { +std::span ProjectionContext::selected_field_indices() const { return selected_field_indices_; } @@ -329,11 +330,10 @@ auto ProjectionContext::ResolveProjectBatchFunction() namespace { -Result ProjectBatchNanoarrow(const ArrowSchema& input_arrow_schema, - const ArrowArray& input_batch, - std::span row_indices, - const ArrowSchema& output_arrow_schema, - std::span selected_field_indices) { +Result ProjectBatchNanoarrow( + const ArrowSchema& input_arrow_schema, const ArrowArray& input_batch, + std::span row_indices, const ArrowSchema& output_arrow_schema, + std::span selected_field_indices) { ArrowArrayView input_view; ArrowError error; ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( @@ -342,7 +342,7 @@ Result ProjectBatchNanoarrow(const ArrowSchema& input_arrow_schema, ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( ArrowArrayViewSetArray(&input_view, &input_batch, &error), error); ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( - ArrowArrayViewValidate(&input_view, NANOARROW_VALIDATION_LEVEL_FULL, &error), + ArrowArrayViewValidate(&input_view, NANOARROW_VALIDATION_LEVEL_DEFAULT, &error), error); ArrowArray output_array; @@ -359,7 +359,7 @@ Result ProjectBatchNanoarrow(const ArrowSchema& input_arrow_schema, input_batch.length); for (size_t output_index = 0; output_index < selected_field_indices.size(); ++output_index) { - const int input_index = selected_field_indices[output_index]; + const int32_t input_index = selected_field_indices[output_index]; ICEBERG_RETURN_UNEXPECTED(AppendValue(*input_arrow_schema.children[input_index], *input_batch.children[input_index], *input_view.children[input_index], row_index, @@ -384,6 +384,8 @@ Result ProjectBatch(ArrowArray* input_batch, auto project_batch_function = projection.project_batch_function(); if (project_batch_function != nullptr) { + // ProjectBatch owns input_batch. Arrow-backed implementations import it and clear + // release, so input_batch_guard becomes a no-op instead of double-releasing. return project_batch_function(input_batch, row_indices, projection); } diff --git a/src/iceberg/arrow_c_data_util_internal.h b/src/iceberg/arrow_c_data_util_internal.h index 435aab155..e02db29a6 100644 --- a/src/iceberg/arrow_c_data_util_internal.h +++ b/src/iceberg/arrow_c_data_util_internal.h @@ -27,7 +27,6 @@ #include #include #include -#include #include #include @@ -39,6 +38,8 @@ namespace iceberg { /// \brief Cached state for ProjectBatch over one input/output schema pair. +/// +/// Exported because this internal utility is shared across library translation units. class ICEBERG_EXPORT ProjectionContext { public: using ProjectBatchState = std::shared_ptr; @@ -49,6 +50,9 @@ class ICEBERG_EXPORT ProjectionContext { -> Result; /// \brief Register a custom implementation for ProjectBatch. + /// + /// Registration is process-wide. If multiple implementations are registered, + /// the last non-null implementation wins. static void RegisterProjectBatchFunction(ProjectBatchFunction project_batch_function); /// \brief Returns true when a custom implementation has been registered. @@ -87,7 +91,7 @@ class ICEBERG_EXPORT ProjectionContext { const ArrowSchema& output_arrow_schema() const; - std::span selected_field_indices() const; + std::span selected_field_indices() const; ProjectBatchFunction project_batch_function() const; @@ -96,9 +100,11 @@ class ICEBERG_EXPORT ProjectionContext { private: ProjectionContext() = default; + // Raw schema pointers are borrowed from caller-owned schemas. FileScanTaskReader + // keeps those schema objects alive in the same stream source that owns this context. const Schema* input_schema_ = nullptr; const Schema* output_schema_ = nullptr; - std::vector selected_field_indices_; + std::vector selected_field_indices_; ArrowSchema input_arrow_schema_{}; ArrowSchema output_arrow_schema_{}; ProjectBatchFunction project_batch_function_ = nullptr; diff --git a/src/iceberg/data/file_scan_task_reader.cc b/src/iceberg/data/file_scan_task_reader.cc index 6deeb0723..7076486ac 100644 --- a/src/iceberg/data/file_scan_task_reader.cc +++ b/src/iceberg/data/file_scan_task_reader.cc @@ -84,6 +84,7 @@ class MergeOnReadStreamSource { Result> Next() { if (!cached_schema_.has_value()) { + // File readers expose one stable Arrow schema for every batch in the stream. ICEBERG_ASSIGN_OR_RAISE(cached_schema_, reader_->Schema()); } ArrowSchema& input_arrow_schema = cached_schema_.value(); @@ -181,6 +182,7 @@ class FileScanTaskReader::Impl { auto required_schema = delete_filter->RequiredSchema(); auto project_batch_function = ProjectionContext::ResolveProjectBatchFunction(); + // ProjectionContext borrows schemas that are kept in MergeOnReadStreamSource. ICEBERG_ASSIGN_OR_RAISE(auto projection_context, ProjectionContext::Make(*required_schema, *projected_schema_, project_batch_function));