From 6f3eaf36f3bac73c139a1578a4a6352f3aba6383 Mon Sep 17 00:00:00 2001 From: shangxinli Date: Sat, 28 Feb 2026 09:56:59 -0800 Subject: [PATCH 1/3] feat: Implement EqualityDeleteWriter for equality delete files Implement the EqualityDeleteWriter following the same PIMPL pattern as DataWriter. The writer accepts Arrow data matching the equality delete schema (columns for the equality field values) and produces metadata with content=kEqualityDeletes, equality_ids set from options, and sort_order_id propagated from options. --- src/iceberg/data/equality_delete_writer.cc | 117 ++++++++++++++++- src/iceberg/data/equality_delete_writer.h | 6 + src/iceberg/test/data_writer_test.cc | 139 +++++++++++++++++++++ 3 files changed, 257 insertions(+), 5 deletions(-) diff --git a/src/iceberg/data/equality_delete_writer.cc b/src/iceberg/data/equality_delete_writer.cc index 3edb942cb..b379962ad 100644 --- a/src/iceberg/data/equality_delete_writer.cc +++ b/src/iceberg/data/equality_delete_writer.cc @@ -19,24 +19,131 @@ #include "iceberg/data/equality_delete_writer.h" +#include + +#include "iceberg/file_writer.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/util/macros.h" + namespace iceberg { class EqualityDeleteWriter::Impl { public: + static Result> Make(EqualityDeleteWriterOptions options) { + WriterOptions writer_options{ + .path = options.path, + .schema = options.schema, + .io = options.io, + .properties = WriterProperties::FromMap(options.properties), + }; + + ICEBERG_ASSIGN_OR_RAISE(auto writer, + WriterFactoryRegistry::Open(options.format, writer_options)); + + return std::unique_ptr(new Impl(std::move(options), std::move(writer))); + } + + Status Write(ArrowArray* data) { + ICEBERG_DCHECK(writer_, "Writer not initialized"); + return writer_->Write(data); + } + + Result Length() const { + ICEBERG_DCHECK(writer_, "Writer not initialized"); + return writer_->length(); + } + + Status Close() { + ICEBERG_DCHECK(writer_, "Writer not initialized"); + if (closed_) { + // Idempotent: no-op if already closed + return {}; + } + ICEBERG_RETURN_UNEXPECTED(writer_->Close()); + closed_ = true; + return {}; + } + + Result Metadata() { + ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer"); + + ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics()); + ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length()); + auto split_offsets = writer_->split_offsets(); + + // Serialize literal bounds to binary format + std::map> lower_bounds_map; + for (const auto& [col_id, literal] : metrics.lower_bounds) { + ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize()); + lower_bounds_map[col_id] = std::move(serialized); + } + std::map> upper_bounds_map; + for (const auto& [col_id, literal] : metrics.upper_bounds) { + ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize()); + upper_bounds_map[col_id] = std::move(serialized); + } + + auto data_file = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = options_.path, + .file_format = options_.format, + .partition = options_.partition, + .record_count = metrics.row_count.value_or(-1), + .file_size_in_bytes = length, + .column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()}, + .value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()}, + .null_value_counts = {metrics.null_value_counts.begin(), + metrics.null_value_counts.end()}, + .nan_value_counts = {metrics.nan_value_counts.begin(), + metrics.nan_value_counts.end()}, + .lower_bounds = std::move(lower_bounds_map), + .upper_bounds = std::move(upper_bounds_map), + .split_offsets = std::move(split_offsets), + .equality_ids = options_.equality_field_ids, + .sort_order_id = options_.sort_order_id, + }); + + FileWriter::WriteResult result; + result.data_files.push_back(std::move(data_file)); + return result; + } + + std::span equality_field_ids() const { + return options_.equality_field_ids; + } + + private: + Impl(EqualityDeleteWriterOptions options, std::unique_ptr writer) + : options_(std::move(options)), writer_(std::move(writer)) {} + + EqualityDeleteWriterOptions options_; + std::unique_ptr writer_; + bool closed_ = false; }; +EqualityDeleteWriter::EqualityDeleteWriter(std::unique_ptr impl) + : impl_(std::move(impl)) {} + EqualityDeleteWriter::~EqualityDeleteWriter() = default; -Status EqualityDeleteWriter::Write(ArrowArray* data) { return NotImplemented(""); } +Result> EqualityDeleteWriter::Make( + const EqualityDeleteWriterOptions& options) { + ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options)); + return std::unique_ptr(new EqualityDeleteWriter(std::move(impl))); +} + +Status EqualityDeleteWriter::Write(ArrowArray* data) { return impl_->Write(data); } -Result EqualityDeleteWriter::Length() const { return NotImplemented(""); } +Result EqualityDeleteWriter::Length() const { return impl_->Length(); } -Status EqualityDeleteWriter::Close() { return NotImplemented(""); } +Status EqualityDeleteWriter::Close() { return impl_->Close(); } Result EqualityDeleteWriter::Metadata() { - return NotImplemented(""); + return impl_->Metadata(); } -std::span EqualityDeleteWriter::equality_field_ids() const { return {}; } +std::span EqualityDeleteWriter::equality_field_ids() const { + return impl_->equality_field_ids(); +} } // namespace iceberg diff --git a/src/iceberg/data/equality_delete_writer.h b/src/iceberg/data/equality_delete_writer.h index 9de4918dc..4556adc92 100644 --- a/src/iceberg/data/equality_delete_writer.h +++ b/src/iceberg/data/equality_delete_writer.h @@ -57,6 +57,10 @@ class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter { public: ~EqualityDeleteWriter() override; + /// \brief Create a new EqualityDeleteWriter instance. + static Result> Make( + const EqualityDeleteWriterOptions& options); + Status Write(ArrowArray* data) override; Result Length() const override; Status Close() override; @@ -67,6 +71,8 @@ class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter { private: class Impl; std::unique_ptr impl_; + + explicit EqualityDeleteWriter(std::unique_ptr impl); }; } // namespace iceberg diff --git a/src/iceberg/test/data_writer_test.cc b/src/iceberg/test/data_writer_test.cc index f7778faf3..a3a8fc088 100644 --- a/src/iceberg/test/data_writer_test.cc +++ b/src/iceberg/test/data_writer_test.cc @@ -27,6 +27,7 @@ #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/avro/avro_register.h" +#include "iceberg/data/equality_delete_writer.h" #include "iceberg/data/position_delete_writer.h" #include "iceberg/file_format.h" #include "iceberg/manifest/manifest_entry.h" @@ -423,4 +424,142 @@ TEST_F(PositionDeleteWriterTest, AutoFlushOnThreshold) { EXPECT_GT(data_file->file_size_in_bytes, 0); } +class EqualityDeleteWriterTest : public DataWriterTest { + protected: + EqualityDeleteWriterOptions MakeDeleteOptions( + std::vector equality_field_ids = {1, 2}, + std::optional sort_order_id = std::nullopt) { + return EqualityDeleteWriterOptions{ + .path = "test_eq_deletes.parquet", + .schema = schema_, + .spec = partition_spec_, + .partition = PartitionValues{}, + .format = FileFormatType::kParquet, + .io = file_io_, + .equality_field_ids = std::move(equality_field_ids), + .sort_order_id = sort_order_id, + .properties = {{"write.parquet.compression-codec", "uncompressed"}}, + }; + } + + void WriteTestDataToEqualityWriter(EqualityDeleteWriter* writer) { + auto test_data = CreateTestData(); + ArrowArray arrow_array; + ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok()); + ASSERT_THAT(writer->Write(&arrow_array), IsOk()); + } +}; + +TEST_F(EqualityDeleteWriterTest, WriteAndClose) { + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + WriteTestDataToEqualityWriter(writer.get()); + + auto length_result = writer->Length(); + ASSERT_THAT(length_result, IsOk()); + EXPECT_GT(length_result.value(), 0); + + ASSERT_THAT(writer->Close(), IsOk()); +} + +TEST_F(EqualityDeleteWriterTest, MetadataAfterClose) { + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + WriteTestDataToEqualityWriter(writer.get()); + ASSERT_THAT(writer->Close(), IsOk()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + + const auto& write_result = metadata_result.value(); + ASSERT_EQ(write_result.data_files.size(), 1); + + const auto& data_file = write_result.data_files[0]; + EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes); + EXPECT_EQ(data_file->file_path, "test_eq_deletes.parquet"); + EXPECT_EQ(data_file->file_format, FileFormatType::kParquet); + EXPECT_GT(data_file->file_size_in_bytes, 0); + + // Partition spec id must be set + ASSERT_TRUE(data_file->partition_spec_id.has_value()); + EXPECT_EQ(data_file->partition_spec_id.value(), PartitionSpec::kInitialSpecId); + + // Equality field ids must be set + ASSERT_EQ(data_file->equality_ids.size(), 2); + EXPECT_EQ(data_file->equality_ids[0], 1); + EXPECT_EQ(data_file->equality_ids[1], 2); +} + +TEST_F(EqualityDeleteWriterTest, MetadataBeforeCloseReturnsError) { + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(metadata_result, + HasErrorMessage("Cannot get metadata before closing the writer")); +} + +TEST_F(EqualityDeleteWriterTest, CloseIsIdempotent) { + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + WriteTestDataToEqualityWriter(writer.get()); + + ASSERT_THAT(writer->Close(), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); +} + +TEST_F(EqualityDeleteWriterTest, SortOrderIdInMetadata) { + const int32_t sort_order_id = 7; + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions({1}, sort_order_id)); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + WriteTestDataToEqualityWriter(writer.get()); + ASSERT_THAT(writer->Close(), IsOk()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + const auto& data_file = metadata_result.value().data_files[0]; + ASSERT_TRUE(data_file->sort_order_id.has_value()); + EXPECT_EQ(data_file->sort_order_id.value(), sort_order_id); +} + +TEST_F(EqualityDeleteWriterTest, EqualityFieldIdsAccessor) { + std::vector field_ids = {1, 2, 3}; + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions(field_ids)); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + auto ids = writer->equality_field_ids(); + ASSERT_EQ(ids.size(), 3); + EXPECT_EQ(ids[0], 1); + EXPECT_EQ(ids[1], 2); + EXPECT_EQ(ids[2], 3); +} + +TEST_F(EqualityDeleteWriterTest, WriteMultipleBatches) { + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + WriteTestDataToEqualityWriter(writer.get()); + WriteTestDataToEqualityWriter(writer.get()); + ASSERT_THAT(writer->Close(), IsOk()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + const auto& data_file = metadata_result.value().data_files[0]; + EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes); + EXPECT_GT(data_file->file_size_in_bytes, 0); +} + } // namespace iceberg From 3f22e26aefcd4b2d2fdd05e12bd205ade9d2031b Mon Sep 17 00:00:00 2001 From: shangxinli Date: Fri, 13 Mar 2026 17:19:02 -0700 Subject: [PATCH 2/3] fix: Address PR review comments for EqualityDeleteWriter - Add TODO comment for encryption key_metadata field - Populate partition_spec_id from spec in DataFile metadata - Add test assertion for partition_spec_id --- src/iceberg/data/equality_delete_writer.cc | 3 +++ src/iceberg/data/equality_delete_writer.h | 1 + 2 files changed, 4 insertions(+) diff --git a/src/iceberg/data/equality_delete_writer.cc b/src/iceberg/data/equality_delete_writer.cc index b379962ad..fb57a763e 100644 --- a/src/iceberg/data/equality_delete_writer.cc +++ b/src/iceberg/data/equality_delete_writer.cc @@ -23,6 +23,7 @@ #include "iceberg/file_writer.h" #include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -101,6 +102,8 @@ class EqualityDeleteWriter::Impl { .split_offsets = std::move(split_offsets), .equality_ids = options_.equality_field_ids, .sort_order_id = options_.sort_order_id, + .partition_spec_id = + options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt, }); FileWriter::WriteResult result; diff --git a/src/iceberg/data/equality_delete_writer.h b/src/iceberg/data/equality_delete_writer.h index 4556adc92..df0bda9ec 100644 --- a/src/iceberg/data/equality_delete_writer.h +++ b/src/iceberg/data/equality_delete_writer.h @@ -50,6 +50,7 @@ struct ICEBERG_EXPORT EqualityDeleteWriterOptions { std::vector equality_field_ids; std::optional sort_order_id; std::unordered_map properties; + // TODO: add key_metadata for encryption }; /// \brief Writer for Iceberg equality delete files. From 770145733e86c179badbb7ec7b802001a1a5fcd6 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sun, 15 Mar 2026 10:54:49 +0800 Subject: [PATCH 3/3] rebase and resolve conflicts --- src/iceberg/data/data_writer.cc | 14 +++++--------- src/iceberg/data/equality_delete_writer.cc | 13 +++---------- src/iceberg/data/equality_delete_writer.h | 2 +- src/iceberg/data/position_delete_writer.cc | 3 +++ src/iceberg/type_fwd.h | 12 +++++++----- 5 files changed, 19 insertions(+), 25 deletions(-) diff --git a/src/iceberg/data/data_writer.cc b/src/iceberg/data/data_writer.cc index b00465bb3..420471280 100644 --- a/src/iceberg/data/data_writer.cc +++ b/src/iceberg/data/data_writer.cc @@ -23,6 +23,7 @@ #include "iceberg/file_writer.h" #include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -43,18 +44,11 @@ class DataWriter::Impl { return std::unique_ptr(new Impl(std::move(options), std::move(writer))); } - Status Write(ArrowArray* data) { - ICEBERG_DCHECK(writer_, "Writer not initialized"); - return writer_->Write(data); - } + Status Write(ArrowArray* data) { return writer_->Write(data); } - Result Length() const { - ICEBERG_DCHECK(writer_, "Writer not initialized"); - return writer_->length(); - } + Result Length() const { return writer_->length(); } Status Close() { - ICEBERG_DCHECK(writer_, "Writer not initialized"); if (closed_) { // Idempotent: no-op if already closed return {}; @@ -100,6 +94,8 @@ class DataWriter::Impl { .upper_bounds = std::move(upper_bounds_map), .split_offsets = std::move(split_offsets), .sort_order_id = options_.sort_order_id, + .partition_spec_id = + options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt, }); FileWriter::WriteResult result; diff --git a/src/iceberg/data/equality_delete_writer.cc b/src/iceberg/data/equality_delete_writer.cc index fb57a763e..6604c03e4 100644 --- a/src/iceberg/data/equality_delete_writer.cc +++ b/src/iceberg/data/equality_delete_writer.cc @@ -44,20 +44,12 @@ class EqualityDeleteWriter::Impl { return std::unique_ptr(new Impl(std::move(options), std::move(writer))); } - Status Write(ArrowArray* data) { - ICEBERG_DCHECK(writer_, "Writer not initialized"); - return writer_->Write(data); - } + Status Write(ArrowArray* data) { return writer_->Write(data); } - Result Length() const { - ICEBERG_DCHECK(writer_, "Writer not initialized"); - return writer_->length(); - } + Result Length() const { return writer_->length(); } Status Close() { - ICEBERG_DCHECK(writer_, "Writer not initialized"); if (closed_) { - // Idempotent: no-op if already closed return {}; } ICEBERG_RETURN_UNEXPECTED(writer_->Close()); @@ -84,6 +76,7 @@ class EqualityDeleteWriter::Impl { upper_bounds_map[col_id] = std::move(serialized); } + // TODO(anyone): add encryption key metadata for encrypted delete files auto data_file = std::make_shared(DataFile{ .content = DataFile::Content::kEqualityDeletes, .file_path = options_.path, diff --git a/src/iceberg/data/equality_delete_writer.h b/src/iceberg/data/equality_delete_writer.h index df0bda9ec..d1728a481 100644 --- a/src/iceberg/data/equality_delete_writer.h +++ b/src/iceberg/data/equality_delete_writer.h @@ -50,7 +50,7 @@ struct ICEBERG_EXPORT EqualityDeleteWriterOptions { std::vector equality_field_ids; std::optional sort_order_id; std::unordered_map properties; - // TODO: add key_metadata for encryption + // TODO(anyone): add key_metadata for encryption }; /// \brief Writer for Iceberg equality delete files. diff --git a/src/iceberg/data/position_delete_writer.cc b/src/iceberg/data/position_delete_writer.cc index 9fae9c2b4..3238dc506 100644 --- a/src/iceberg/data/position_delete_writer.cc +++ b/src/iceberg/data/position_delete_writer.cc @@ -30,6 +30,7 @@ #include "iceberg/file_writer.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/metadata_columns.h" +#include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/util/macros.h" @@ -154,6 +155,8 @@ class PositionDeleteWriter::Impl { .split_offsets = std::move(split_offsets), .sort_order_id = std::nullopt, .referenced_data_file = std::move(referenced_data_file), + .partition_spec_id = + options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt, }); FileWriter::WriteResult result; diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 8d6824eed..cad3e969a 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -142,20 +142,22 @@ class ManifestEvaluator; class ResidualEvaluator; class StrictMetricsEvaluator; -/// \brief Scan. +/// \brief Scan task. class ChangelogScanTask; -class DataTableScan; class FileScanTask; +class ScanTask; + +/// \brief Table scan +class DataTableScan; template class IncrementalScan; class IncrementalAppendScan; class IncrementalChangelogScan; -class ScanTask; class TableScan; + +/// \brief Scan builder. template class TableScanBuilder; - -// Type aliases for incremental scan builders using DataTableScanBuilder = TableScanBuilder; using IncrementalAppendScanBuilder = TableScanBuilder; using IncrementalChangelogScanBuilder = TableScanBuilder;