From 9a7f53abae7d087d8f0620c1d01bf7d7619b8258 Mon Sep 17 00:00:00 2001 From: shangxinli Date: Sat, 28 Feb 2026 09:37:22 -0800 Subject: [PATCH 1/4] feat: Implement PositionDeleteWriter for position delete files Implement the PositionDeleteWriter following the same PIMPL pattern as DataWriter. The writer supports both buffered WriteDelete(file_path, pos) calls and direct Write(ArrowArray*) for pre-formed batches. Metadata reports content=kPositionDeletes with sort_order_id=nullopt per spec, and tracks referenced_data_file when all deletes target a single file. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/iceberg/data/position_delete_writer.cc | 187 ++++++++++++++++++++- src/iceberg/data/position_delete_writer.h | 6 + src/iceberg/test/data_writer_test.cc | 132 +++++++++++++++ 3 files changed, 320 insertions(+), 5 deletions(-) diff --git a/src/iceberg/data/position_delete_writer.cc b/src/iceberg/data/position_delete_writer.cc index f58741083..6b9fdd224 100644 --- a/src/iceberg/data/position_delete_writer.cc +++ b/src/iceberg/data/position_delete_writer.cc @@ -19,26 +19,203 @@ #include "iceberg/data/position_delete_writer.h" +#include +#include +#include + +#include + +#include "iceberg/arrow/nanoarrow_status_internal.h" +#include "iceberg/file_writer.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/metadata_columns.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" +#include "iceberg/util/macros.h" + namespace iceberg { class PositionDeleteWriter::Impl { public: + static Result> Make(PositionDeleteWriterOptions options) { + // Build the position delete schema with file_path and pos columns + std::vector fields; + fields.push_back(MetadataColumns::kDeleteFilePath); + fields.push_back(MetadataColumns::kDeleteFilePos); + + auto delete_schema = std::make_shared(std::move(fields)); + + WriterOptions writer_options{ + .path = options.path, + .schema = delete_schema, + .io = options.io, + .properties = WriterProperties::FromMap(options.properties), + }; + + ICEBERG_ASSIGN_OR_RAISE(auto writer, + WriterFactoryRegistry::Open(options.format, writer_options)); + + return std::unique_ptr( + new Impl(std::move(options), std::move(delete_schema), std::move(writer))); + } + + Status Write(ArrowArray* data) { + ICEBERG_DCHECK(writer_, "Writer not initialized"); + return writer_->Write(data); + } + + Status WriteDelete(std::string_view file_path, int64_t pos) { + ICEBERG_DCHECK(writer_, "Writer not initialized"); + buffered_paths_.emplace_back(file_path); + buffered_positions_.push_back(pos); + referenced_paths_.emplace(file_path); + + if (static_cast(buffered_paths_.size()) >= kFlushThreshold) { + return FlushBuffer(); + } + return {}; + } + + Result Length() const { + ICEBERG_DCHECK(writer_, "Writer not initialized"); + return writer_->length(); + } + + Status Close() { + ICEBERG_DCHECK(writer_, "Writer not initialized"); + if (closed_) { + return {}; + } + if (!buffered_paths_.empty()) { + ICEBERG_RETURN_UNEXPECTED(FlushBuffer()); + } + ICEBERG_RETURN_UNEXPECTED(writer_->Close()); + closed_ = true; + return {}; + } + + Result Metadata() { + ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer"); + + ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics()); + ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length()); + auto split_offsets = writer_->split_offsets(); + + // Serialize literal bounds to binary format + std::map> lower_bounds_map; + for (const auto& [col_id, literal] : metrics.lower_bounds) { + ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize()); + lower_bounds_map[col_id] = std::move(serialized); + } + std::map> upper_bounds_map; + for (const auto& [col_id, literal] : metrics.upper_bounds) { + ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize()); + upper_bounds_map[col_id] = std::move(serialized); + } + + // Set referenced_data_file if all deletes reference the same data file + std::optional referenced_data_file; + if (referenced_paths_.size() == 1) { + referenced_data_file = *referenced_paths_.begin(); + } + + auto data_file = std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = options_.path, + .file_format = options_.format, + .partition = options_.partition, + .record_count = metrics.row_count.value_or(-1), + .file_size_in_bytes = length, + .column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()}, + .value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()}, + .null_value_counts = {metrics.null_value_counts.begin(), + metrics.null_value_counts.end()}, + .nan_value_counts = {metrics.nan_value_counts.begin(), + metrics.nan_value_counts.end()}, + .lower_bounds = std::move(lower_bounds_map), + .upper_bounds = std::move(upper_bounds_map), + .split_offsets = std::move(split_offsets), + .sort_order_id = std::nullopt, + .referenced_data_file = std::move(referenced_data_file), + }); + + FileWriter::WriteResult result; + result.data_files.push_back(std::move(data_file)); + return result; + } + + private: + static constexpr int64_t kFlushThreshold = 1000; + + Impl(PositionDeleteWriterOptions options, std::shared_ptr delete_schema, + std::unique_ptr writer) + : options_(std::move(options)), + delete_schema_(std::move(delete_schema)), + writer_(std::move(writer)) {} + + Status FlushBuffer() { + ArrowSchema arrow_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*delete_schema_, &arrow_schema)); + + ArrowArray array; + ArrowError error; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayInitFromSchema(&array, &arrow_schema, &error), error); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&array)); + + for (size_t i = 0; i < buffered_paths_.size(); ++i) { + ArrowStringView path_view(buffered_paths_[i].data(), + static_cast(buffered_paths_[i].size())); + ICEBERG_NANOARROW_RETURN_UNEXPECTED( + ArrowArrayAppendString(array.children[0], path_view)); + ICEBERG_NANOARROW_RETURN_UNEXPECTED( + ArrowArrayAppendInt(array.children[1], buffered_positions_[i])); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&array)); + } + + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayFinishBuildingDefault(&array, &error), error); + + ICEBERG_RETURN_UNEXPECTED(writer_->Write(&array)); + + buffered_paths_.clear(); + buffered_positions_.clear(); + arrow_schema.release(&arrow_schema); + return {}; + } + + PositionDeleteWriterOptions options_; + std::shared_ptr delete_schema_; + std::unique_ptr writer_; + bool closed_ = false; + std::vector buffered_paths_; + std::vector buffered_positions_; + std::set referenced_paths_; }; +PositionDeleteWriter::PositionDeleteWriter(std::unique_ptr impl) + : impl_(std::move(impl)) {} + PositionDeleteWriter::~PositionDeleteWriter() = default; -Status PositionDeleteWriter::Write(ArrowArray* data) { return NotImplemented(""); } +Result> PositionDeleteWriter::Make( + const PositionDeleteWriterOptions& options) { + ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options)); + return std::unique_ptr(new PositionDeleteWriter(std::move(impl))); +} + +Status PositionDeleteWriter::Write(ArrowArray* data) { return impl_->Write(data); } Status PositionDeleteWriter::WriteDelete(std::string_view file_path, int64_t pos) { - return NotImplemented(""); + return impl_->WriteDelete(file_path, pos); } -Result PositionDeleteWriter::Length() const { return NotImplemented(""); } +Result PositionDeleteWriter::Length() const { return impl_->Length(); } -Status PositionDeleteWriter::Close() { return NotImplemented(""); } +Status PositionDeleteWriter::Close() { return impl_->Close(); } Result PositionDeleteWriter::Metadata() { - return NotImplemented(""); + return impl_->Metadata(); } } // namespace iceberg diff --git a/src/iceberg/data/position_delete_writer.h b/src/iceberg/data/position_delete_writer.h index c660812cd..76bcfc577 100644 --- a/src/iceberg/data/position_delete_writer.h +++ b/src/iceberg/data/position_delete_writer.h @@ -55,6 +55,10 @@ class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter { public: ~PositionDeleteWriter() override; + /// \brief Create a new PositionDeleteWriter instance. + static Result> Make( + const PositionDeleteWriterOptions& options); + Status Write(ArrowArray* data) override; Status WriteDelete(std::string_view file_path, int64_t pos); Result Length() const override; @@ -64,6 +68,8 @@ class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter { private: class Impl; std::unique_ptr impl_; + + explicit PositionDeleteWriter(std::unique_ptr impl); }; } // namespace iceberg diff --git a/src/iceberg/test/data_writer_test.cc b/src/iceberg/test/data_writer_test.cc index 7671e7fe1..0648bb36d 100644 --- a/src/iceberg/test/data_writer_test.cc +++ b/src/iceberg/test/data_writer_test.cc @@ -27,8 +27,10 @@ #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/avro/avro_register.h" +#include "iceberg/data/position_delete_writer.h" #include "iceberg/file_format.h" #include "iceberg/manifest/manifest_entry.h" +#include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_register.h" #include "iceberg/partition_spec.h" #include "iceberg/row/partition_values.h" @@ -264,4 +266,134 @@ TEST_F(DataWriterTest, WriteMultipleBatches) { EXPECT_GT(data_file->file_size_in_bytes, 0); } +class PositionDeleteWriterTest : public DataWriterTest { + protected: + PositionDeleteWriterOptions MakeDeleteOptions() { + return PositionDeleteWriterOptions{ + .path = "test_deletes.parquet", + .schema = schema_, + .spec = partition_spec_, + .partition = PartitionValues{}, + .format = FileFormatType::kParquet, + .io = file_io_, + .properties = {{"write.parquet.compression-codec", "uncompressed"}}, + }; + } + + std::shared_ptr<::arrow::Array> CreatePositionDeleteData() { + auto delete_schema = std::make_shared(std::vector{ + MetadataColumns::kDeleteFilePath, MetadataColumns::kDeleteFilePos}); + + ArrowSchema arrow_c_schema; + ICEBERG_THROW_NOT_OK(ToArrowSchema(*delete_schema, &arrow_c_schema)); + auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + return ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_type->fields()), + R"([["data_file_1.parquet", 0], ["data_file_1.parquet", 5], ["data_file_1.parquet", 10]])") + .ValueOrDie(); + } +}; + +TEST_F(PositionDeleteWriterTest, WriteDeleteAndClose) { + auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk()); + ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk()); + ASSERT_THAT(writer->WriteDelete("data_file.parquet", 10), IsOk()); + + ASSERT_THAT(writer->Close(), IsOk()); + + auto length_result = writer->Length(); + ASSERT_THAT(length_result, IsOk()); + EXPECT_GT(length_result.value(), 0); +} + +TEST_F(PositionDeleteWriterTest, MetadataAfterClose) { + auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk()); + ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + + const auto& write_result = metadata_result.value(); + ASSERT_EQ(write_result.data_files.size(), 1); + + const auto& data_file = write_result.data_files[0]; + EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes); + EXPECT_EQ(data_file->file_path, "test_deletes.parquet"); + EXPECT_EQ(data_file->file_format, FileFormatType::kParquet); + EXPECT_GT(data_file->file_size_in_bytes, 0); + EXPECT_FALSE(data_file->sort_order_id.has_value()); +} + +TEST_F(PositionDeleteWriterTest, MetadataBeforeCloseReturnsError) { + auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(metadata_result, + HasErrorMessage("Cannot get metadata before closing the writer")); +} + +TEST_F(PositionDeleteWriterTest, CloseIsIdempotent) { + auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk()); + + ASSERT_THAT(writer->Close(), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); +} + +TEST_F(PositionDeleteWriterTest, WriteMultipleDeletes) { + auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + for (int64_t i = 0; i < 100; ++i) { + ASSERT_THAT(writer->WriteDelete("data_file.parquet", i), IsOk()); + } + + ASSERT_THAT(writer->Close(), IsOk()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + + const auto& data_file = metadata_result.value().data_files[0]; + EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes); + EXPECT_GT(data_file->file_size_in_bytes, 0); +} + +TEST_F(PositionDeleteWriterTest, WriteBatchData) { + auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + auto test_data = CreatePositionDeleteData(); + ArrowArray arrow_array; + ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok()); + ASSERT_THAT(writer->Write(&arrow_array), IsOk()); + + ASSERT_THAT(writer->Close(), IsOk()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + + const auto& data_file = metadata_result.value().data_files[0]; + EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes); + EXPECT_GT(data_file->file_size_in_bytes, 0); +} + } // namespace iceberg From e683ce27df7e98548b724727156ab2cc01afd6d6 Mon Sep 17 00:00:00 2001 From: shangxinli Date: Wed, 11 Mar 2026 08:30:10 -0700 Subject: [PATCH 2/4] address review: make flush threshold configurable and add test Make kFlushThreshold configurable via PositionDeleteWriterOptions with a default of 1000. Add AutoFlushOnThreshold test that uses a small threshold to verify the automatic flush logic. --- src/iceberg/data/position_delete_writer.cc | 4 +-- src/iceberg/data/position_delete_writer.h | 1 + src/iceberg/test/data_writer_test.cc | 29 +++++++++++++++++++++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/iceberg/data/position_delete_writer.cc b/src/iceberg/data/position_delete_writer.cc index 6b9fdd224..97db5dc07 100644 --- a/src/iceberg/data/position_delete_writer.cc +++ b/src/iceberg/data/position_delete_writer.cc @@ -70,7 +70,7 @@ class PositionDeleteWriter::Impl { buffered_positions_.push_back(pos); referenced_paths_.emplace(file_path); - if (static_cast(buffered_paths_.size()) >= kFlushThreshold) { + if (static_cast(buffered_paths_.size()) >= options_.flush_threshold) { return FlushBuffer(); } return {}; @@ -145,8 +145,6 @@ class PositionDeleteWriter::Impl { } private: - static constexpr int64_t kFlushThreshold = 1000; - Impl(PositionDeleteWriterOptions options, std::shared_ptr delete_schema, std::unique_ptr writer) : options_(std::move(options)), diff --git a/src/iceberg/data/position_delete_writer.h b/src/iceberg/data/position_delete_writer.h index 76bcfc577..5c255f8d0 100644 --- a/src/iceberg/data/position_delete_writer.h +++ b/src/iceberg/data/position_delete_writer.h @@ -47,6 +47,7 @@ struct ICEBERG_EXPORT PositionDeleteWriterOptions { FileFormatType format = FileFormatType::kParquet; std::shared_ptr io; std::shared_ptr row_schema; // Optional row data schema + int64_t flush_threshold = 1000; // Number of buffered deletes before auto-flush std::unordered_map properties; }; diff --git a/src/iceberg/test/data_writer_test.cc b/src/iceberg/test/data_writer_test.cc index 0648bb36d..f7778faf3 100644 --- a/src/iceberg/test/data_writer_test.cc +++ b/src/iceberg/test/data_writer_test.cc @@ -268,7 +268,7 @@ TEST_F(DataWriterTest, WriteMultipleBatches) { class PositionDeleteWriterTest : public DataWriterTest { protected: - PositionDeleteWriterOptions MakeDeleteOptions() { + PositionDeleteWriterOptions MakeDeleteOptions(int64_t flush_threshold = 1000) { return PositionDeleteWriterOptions{ .path = "test_deletes.parquet", .schema = schema_, @@ -276,6 +276,7 @@ class PositionDeleteWriterTest : public DataWriterTest { .partition = PartitionValues{}, .format = FileFormatType::kParquet, .io = file_io_, + .flush_threshold = flush_threshold, .properties = {{"write.parquet.compression-codec", "uncompressed"}}, }; } @@ -396,4 +397,30 @@ TEST_F(PositionDeleteWriterTest, WriteBatchData) { EXPECT_GT(data_file->file_size_in_bytes, 0); } +TEST_F(PositionDeleteWriterTest, AutoFlushOnThreshold) { + // Use a small flush threshold to trigger automatic flush + const int64_t flush_threshold = 5; + auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions(flush_threshold)); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + // Write more deletes than the threshold to trigger auto-flush + for (int64_t i = 0; i < 12; ++i) { + ASSERT_THAT(writer->WriteDelete("data_file.parquet", i), IsOk()); + } + + // Length should be > 0 since auto-flush should have written data + auto length_result = writer->Length(); + ASSERT_THAT(length_result, IsOk()); + EXPECT_GT(length_result.value(), 0); + + ASSERT_THAT(writer->Close(), IsOk()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + const auto& data_file = metadata_result.value().data_files[0]; + EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes); + EXPECT_GT(data_file->file_size_in_bytes, 0); +} + } // namespace iceberg From 425bd5dc7bdf428d401806cf52a4c18691480f4c Mon Sep 17 00:00:00 2001 From: shangxinli Date: Fri, 13 Mar 2026 17:02:20 -0700 Subject: [PATCH 3/4] address review: RAII guards, metrics filtering, and TODOs - Use ArrowSchemaGuard/ArrowArrayGuard in FlushBuffer for memory safety on early returns, fixing potential leaks when nanoarrow macros fail - Fix guards to handle already-consumed arrays (null release check) - Filter out value_counts/null_value_counts/nan_value_counts for delete metadata columns (file_path, pos) to match Java parity; also drop bounds when referencing multiple data files - Add TODO for extracting paths from ArrowArray in Write() to update referenced_paths_ for batch writes - Add TODO for row_schema support in position deletes (V2 spec) --- src/iceberg/arrow_c_data_guard_internal.cc | 4 +-- src/iceberg/data/position_delete_writer.cc | 29 ++++++++++++++++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/iceberg/arrow_c_data_guard_internal.cc b/src/iceberg/arrow_c_data_guard_internal.cc index 5fb3f9fa5..fd4217076 100644 --- a/src/iceberg/arrow_c_data_guard_internal.cc +++ b/src/iceberg/arrow_c_data_guard_internal.cc @@ -22,13 +22,13 @@ namespace iceberg::internal { ArrowArrayGuard::~ArrowArrayGuard() { - if (array_ != nullptr) { + if (array_ != nullptr && array_->release != nullptr) { ArrowArrayRelease(array_); } } ArrowSchemaGuard::~ArrowSchemaGuard() { - if (schema_ != nullptr) { + if (schema_ != nullptr && schema_->release != nullptr) { ArrowSchemaRelease(schema_); } } diff --git a/src/iceberg/data/position_delete_writer.cc b/src/iceberg/data/position_delete_writer.cc index 97db5dc07..f4b82d36b 100644 --- a/src/iceberg/data/position_delete_writer.cc +++ b/src/iceberg/data/position_delete_writer.cc @@ -26,6 +26,7 @@ #include #include "iceberg/arrow/nanoarrow_status_internal.h" +#include "iceberg/arrow_c_data_guard_internal.h" #include "iceberg/file_writer.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/metadata_columns.h" @@ -38,7 +39,8 @@ namespace iceberg { class PositionDeleteWriter::Impl { public: static Result> Make(PositionDeleteWriterOptions options) { - // Build the position delete schema with file_path and pos columns + // TODO: Support writing row data if options.row_schema is provided. + // The V2 spec allows position deletes to optionally include the deleted row. std::vector fields; fields.push_back(MetadataColumns::kDeleteFilePath); fields.push_back(MetadataColumns::kDeleteFilePos); @@ -61,6 +63,8 @@ class PositionDeleteWriter::Impl { Status Write(ArrowArray* data) { ICEBERG_DCHECK(writer_, "Writer not initialized"); + // TODO: Extract file paths from ArrowArray to update referenced_paths_ so that + // Metadata() can correctly populate referenced_data_file for batch writes. return writer_->Write(data); } @@ -101,6 +105,26 @@ class PositionDeleteWriter::Impl { ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length()); auto split_offsets = writer_->split_offsets(); + // Filter out metrics for delete metadata columns (file_path, pos) to avoid + // bloating the manifest, matching Java's PositionDeleteWriter behavior. + // Always remove field counts; also remove bounds when referencing multiple files. + const auto path_id = MetadataColumns::kDeleteFilePathColumnId; + const auto pos_id = MetadataColumns::kDeleteFilePosColumnId; + + metrics.value_counts.erase(path_id); + metrics.value_counts.erase(pos_id); + metrics.null_value_counts.erase(path_id); + metrics.null_value_counts.erase(pos_id); + metrics.nan_value_counts.erase(path_id); + metrics.nan_value_counts.erase(pos_id); + + if (referenced_paths_.size() > 1) { + metrics.lower_bounds.erase(path_id); + metrics.lower_bounds.erase(pos_id); + metrics.upper_bounds.erase(path_id); + metrics.upper_bounds.erase(pos_id); + } + // Serialize literal bounds to binary format std::map> lower_bounds_map; for (const auto& [col_id, literal] : metrics.lower_bounds) { @@ -154,11 +178,13 @@ class PositionDeleteWriter::Impl { Status FlushBuffer() { ArrowSchema arrow_schema; ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*delete_schema_, &arrow_schema)); + internal::ArrowSchemaGuard schema_guard(&arrow_schema); ArrowArray array; ArrowError error; ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( ArrowArrayInitFromSchema(&array, &arrow_schema, &error), error); + internal::ArrowArrayGuard array_guard(&array); ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&array)); for (size_t i = 0; i < buffered_paths_.size(); ++i) { @@ -178,7 +204,6 @@ class PositionDeleteWriter::Impl { buffered_paths_.clear(); buffered_positions_.clear(); - arrow_schema.release(&arrow_schema); return {}; } From 035f1713dc8013d06f0b1ae718f59f96f87d65a7 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sun, 15 Mar 2026 10:07:13 +0800 Subject: [PATCH 4/4] minor fix --- src/iceberg/data/position_delete_writer.cc | 27 ++++++++-------------- src/iceberg/data/position_delete_writer.h | 3 +-- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/src/iceberg/data/position_delete_writer.cc b/src/iceberg/data/position_delete_writer.cc index f4b82d36b..9fae9c2b4 100644 --- a/src/iceberg/data/position_delete_writer.cc +++ b/src/iceberg/data/position_delete_writer.cc @@ -39,13 +39,10 @@ namespace iceberg { class PositionDeleteWriter::Impl { public: static Result> Make(PositionDeleteWriterOptions options) { - // TODO: Support writing row data if options.row_schema is provided. - // The V2 spec allows position deletes to optionally include the deleted row. - std::vector fields; - fields.push_back(MetadataColumns::kDeleteFilePath); - fields.push_back(MetadataColumns::kDeleteFilePos); - - auto delete_schema = std::make_shared(std::move(fields)); + auto delete_schema = std::make_shared(std::vector{ + MetadataColumns::kDeleteFilePath, + MetadataColumns::kDeleteFilePos, + }); WriterOptions writer_options{ .path = options.path, @@ -62,31 +59,27 @@ class PositionDeleteWriter::Impl { } Status Write(ArrowArray* data) { - ICEBERG_DCHECK(writer_, "Writer not initialized"); - // TODO: Extract file paths from ArrowArray to update referenced_paths_ so that - // Metadata() can correctly populate referenced_data_file for batch writes. + ICEBERG_PRECHECK(buffered_paths_.empty(), + "Cannot write batch data when there are buffered deletes."); + // TODO(anyone): Extract file paths from ArrowArray to update referenced_paths_. return writer_->Write(data); } Status WriteDelete(std::string_view file_path, int64_t pos) { - ICEBERG_DCHECK(writer_, "Writer not initialized"); + // TODO(anyone): check if the sort order of file_path and pos observes the spec. buffered_paths_.emplace_back(file_path); buffered_positions_.push_back(pos); referenced_paths_.emplace(file_path); - if (static_cast(buffered_paths_.size()) >= options_.flush_threshold) { + if (buffered_paths_.size() >= options_.flush_threshold) { return FlushBuffer(); } return {}; } - Result Length() const { - ICEBERG_DCHECK(writer_, "Writer not initialized"); - return writer_->length(); - } + Result Length() const { return writer_->length(); } Status Close() { - ICEBERG_DCHECK(writer_, "Writer not initialized"); if (closed_) { return {}; } diff --git a/src/iceberg/data/position_delete_writer.h b/src/iceberg/data/position_delete_writer.h index 5c255f8d0..de7a2c3f3 100644 --- a/src/iceberg/data/position_delete_writer.h +++ b/src/iceberg/data/position_delete_writer.h @@ -46,8 +46,7 @@ struct ICEBERG_EXPORT PositionDeleteWriterOptions { PartitionValues partition; FileFormatType format = FileFormatType::kParquet; std::shared_ptr io; - std::shared_ptr row_schema; // Optional row data schema - int64_t flush_threshold = 1000; // Number of buffered deletes before auto-flush + int64_t flush_threshold = 1000; // Number of buffered deletes before auto-flush std::unordered_map properties; };