Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
"$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src>")
set(ICEBERG_SOURCES
arrow_c_data_util.cc
arrow_c_data_guard_internal.cc
catalog/memory/in_memory_catalog.cc
delete_file_index.cc
Expand Down Expand Up @@ -164,6 +165,7 @@ set(ICEBERG_DATA_SOURCES
data/delete_filter.cc
data/delete_loader.cc
data/equality_delete_writer.cc
data/file_scan_task_reader.cc
data/position_delete_writer.cc
data/writer.cc
deletes/position_delete_index.cc
Expand Down Expand Up @@ -220,9 +222,10 @@ add_subdirectory(util)

if(ICEBERG_BUILD_BUNDLE)
set(ICEBERG_BUNDLE_SOURCES
arrow/arrow_c_data_util.cc
arrow/arrow_io.cc
arrow/s3/arrow_s3_file_io.cc
arrow/arrow_io_register.cc
arrow/arrow_register.cc
arrow/metadata_column_util.cc
avro/avro_data_util.cc
avro/avro_direct_decoder.cc
Expand Down
131 changes: 131 additions & 0 deletions src/iceberg/arrow/arrow_c_data_util.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <cstdint>
#include <memory>
#include <mutex>
#include <span>
#include <utility>
#include <vector>

#include <arrow/array/array_primitive.h>
#include <arrow/buffer.h>
#include <arrow/c/bridge.h>
#include <arrow/compute/api_vector.h>
#include <arrow/record_batch.h>
#include <nanoarrow/nanoarrow.h>

#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/arrow/nanoarrow_status_internal.h"
#include "iceberg/arrow_c_data_guard_internal.h"
#include "iceberg/arrow_c_data_util_internal.h"
#include "iceberg/result.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

struct ArrowProjectBatchState {
std::shared_ptr<::arrow::Schema> input_schema;
std::shared_ptr<::arrow::Schema> output_schema;
};

Result<std::shared_ptr<::arrow::Schema>> ImportArrowSchema(
const ArrowSchema& arrow_schema) {
ArrowSchema schema_copy;
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowSchemaDeepCopy(&arrow_schema, &schema_copy));
internal::ArrowSchemaGuard schema_copy_guard(&schema_copy);

ICEBERG_ARROW_ASSIGN_OR_RETURN(auto schema, ::arrow::ImportSchema(&schema_copy));
return schema;
}

Result<std::shared_ptr<ArrowProjectBatchState>> GetArrowProjectBatchState(
ProjectionContext& projection) {
auto state =
std::static_pointer_cast<ArrowProjectBatchState>(projection.project_batch_state());
if (state != nullptr) {
return state;
}

ICEBERG_ASSIGN_OR_RAISE(auto input_schema,
ImportArrowSchema(projection.input_arrow_schema()));
ICEBERG_ASSIGN_OR_RAISE(auto output_schema,
ImportArrowSchema(projection.output_arrow_schema()));

state = std::make_shared<ArrowProjectBatchState>(
ArrowProjectBatchState{.input_schema = std::move(input_schema),
.output_schema = std::move(output_schema)});
projection.project_batch_state() = state;
return state;
}

Result<ArrowArray> ProjectBatchArrowCompute(ArrowArray* input_batch,
std::span<const int32_t> row_indices,
ProjectionContext& projection) {
ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null");
ICEBERG_ASSIGN_OR_RAISE(auto state, GetArrowProjectBatchState(projection));

ICEBERG_ARROW_ASSIGN_OR_RETURN(
auto input_record_batch,
::arrow::ImportRecordBatch(input_batch, state->input_schema));

const int32_t empty_index = 0;
// Buffer::Wrap needs a valid pointer even when the zero-length buffer is never read.
const int32_t* row_indices_data =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty_index is a bit misleading — it is just a non-null pointer to satisfy Buffer::Wrap when row_indices is empty (the value 0 is never actually used). A brief comment would clarify intent, e.g.:

// Provide a valid pointer for Buffer::Wrap; the array length is 0 so this is never read.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the Buffer::Wrap empty-input comment.

row_indices.empty() ? &empty_index : row_indices.data();
auto index_array = std::make_shared<::arrow::Int32Array>(
static_cast<int64_t>(row_indices.size()),
::arrow::Buffer::Wrap(row_indices_data, row_indices.size()));

std::vector<std::shared_ptr<::arrow::Array>> output_columns;
output_columns.reserve(projection.selected_field_indices().size());
for (int32_t input_index : projection.selected_field_indices()) {
ICEBERG_PRECHECK(input_index >= 0 && input_index < input_record_batch->num_columns(),
"Input field index {} out of range for batch with {} columns",
input_index, input_record_batch->num_columns());
ICEBERG_ARROW_ASSIGN_OR_RETURN(
auto taken_column,
::arrow::compute::Take(*input_record_batch->column(input_index), *index_array));
output_columns.push_back(std::move(taken_column));
}

auto output_record_batch = ::arrow::RecordBatch::Make(
state->output_schema, static_cast<int64_t>(row_indices.size()),
std::move(output_columns));

ArrowArray output_array;
ICEBERG_ARROW_RETURN_NOT_OK(
::arrow::ExportRecordBatch(*output_record_batch, &output_array));
internal::ArrowArrayGuard output_array_guard(&output_array);

return std::exchange(output_array, ArrowArray{});
}

} // namespace

void RegisterArrowProjectBatch() {
static std::once_flag flag;
std::call_once(flag, []() {
ProjectionContext::RegisterProjectBatchFunction(&ProjectBatchArrowCompute);
});
}

} // namespace iceberg
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "iceberg/arrow/arrow_io_register.h"
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/arrow/arrow_register.h"

#include <mutex>
#include <string>
#include <unordered_map>

#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/file_io_registry.h"

namespace iceberg {
void RegisterArrowProjectBatch();
}

namespace iceberg::arrow {

namespace {
Expand All @@ -43,8 +50,6 @@ void RegisterS3FileIO() {
#endif
}

} // namespace

void EnsureArrowFileIOsRegistered() {
static std::once_flag flag;
std::call_once(flag, []() {
Expand All @@ -58,4 +63,11 @@ void EnsureArrowFileIOsRegistered() {
return true;
}();

} // namespace

void RegisterAll() {
EnsureArrowFileIOsRegistered();
::iceberg::RegisterArrowProjectBatch();
}

} // namespace iceberg::arrow
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

#pragma once

/// \file iceberg/arrow/arrow_io_register.h
/// \brief Provide functions to register Arrow FileIO implementations.
/// \file iceberg/arrow/arrow_register.h
/// \brief Provide functions to register Arrow bundle integrations.

#include "iceberg/iceberg_bundle_export.h"

namespace iceberg::arrow {

/// \brief Register built-in Arrow FileIO implementations into the FileIORegistry.
/// \brief Register Arrow FileIOs and Arrow-backed C Data utilities.
///
/// This operation is idempotent and safe to call multiple times.
ICEBERG_BUNDLE_EXPORT void EnsureArrowFileIOsRegistered();
ICEBERG_BUNDLE_EXPORT void RegisterAll();

} // namespace iceberg::arrow
Loading
Loading