diff --git a/src/iceberg/data/data_writer.cc b/src/iceberg/data/data_writer.cc index b00465bb3..420471280 100644 --- a/src/iceberg/data/data_writer.cc +++ b/src/iceberg/data/data_writer.cc @@ -23,6 +23,7 @@ #include "iceberg/file_writer.h" #include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -43,18 +44,11 @@ class DataWriter::Impl { return std::unique_ptr(new Impl(std::move(options), std::move(writer))); } - Status Write(ArrowArray* data) { - ICEBERG_DCHECK(writer_, "Writer not initialized"); - return writer_->Write(data); - } + Status Write(ArrowArray* data) { return writer_->Write(data); } - Result Length() const { - ICEBERG_DCHECK(writer_, "Writer not initialized"); - return writer_->length(); - } + Result Length() const { return writer_->length(); } Status Close() { - ICEBERG_DCHECK(writer_, "Writer not initialized"); if (closed_) { // Idempotent: no-op if already closed return {}; @@ -100,6 +94,8 @@ class DataWriter::Impl { .upper_bounds = std::move(upper_bounds_map), .split_offsets = std::move(split_offsets), .sort_order_id = options_.sort_order_id, + .partition_spec_id = + options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt, }); FileWriter::WriteResult result; diff --git a/src/iceberg/data/equality_delete_writer.cc b/src/iceberg/data/equality_delete_writer.cc index 3edb942cb..6604c03e4 100644 --- a/src/iceberg/data/equality_delete_writer.cc +++ b/src/iceberg/data/equality_delete_writer.cc @@ -19,24 +19,127 @@ #include "iceberg/data/equality_delete_writer.h" +#include + +#include "iceberg/file_writer.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" +#include "iceberg/util/macros.h" + namespace iceberg { class EqualityDeleteWriter::Impl { public: + static Result> Make(EqualityDeleteWriterOptions options) { + WriterOptions writer_options{ + .path = options.path, + .schema = options.schema, + .io = options.io, + .properties = WriterProperties::FromMap(options.properties), + }; + + ICEBERG_ASSIGN_OR_RAISE(auto writer, + WriterFactoryRegistry::Open(options.format, writer_options)); + + return std::unique_ptr(new Impl(std::move(options), std::move(writer))); + } + + Status Write(ArrowArray* data) { return writer_->Write(data); } + + Result Length() const { return writer_->length(); } + + Status Close() { + if (closed_) { + return {}; + } + ICEBERG_RETURN_UNEXPECTED(writer_->Close()); + closed_ = true; + return {}; + } + + Result Metadata() { + ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer"); + + ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics()); + ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length()); + auto split_offsets = writer_->split_offsets(); + + // Serialize literal bounds to binary format + std::map> lower_bounds_map; + for (const auto& [col_id, literal] : metrics.lower_bounds) { + ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize()); + lower_bounds_map[col_id] = std::move(serialized); + } + std::map> upper_bounds_map; + for (const auto& [col_id, literal] : metrics.upper_bounds) { + ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize()); + upper_bounds_map[col_id] = std::move(serialized); + } + + // TODO(anyone): add encryption key metadata for encrypted delete files + auto data_file = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = options_.path, + .file_format = options_.format, + .partition = options_.partition, + .record_count = metrics.row_count.value_or(-1), + .file_size_in_bytes = length, + .column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()}, + .value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()}, + .null_value_counts = {metrics.null_value_counts.begin(), + metrics.null_value_counts.end()}, + .nan_value_counts = {metrics.nan_value_counts.begin(), + metrics.nan_value_counts.end()}, + .lower_bounds = std::move(lower_bounds_map), + .upper_bounds = std::move(upper_bounds_map), + .split_offsets = std::move(split_offsets), + .equality_ids = options_.equality_field_ids, + .sort_order_id = options_.sort_order_id, + .partition_spec_id = + options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt, + }); + + FileWriter::WriteResult result; + result.data_files.push_back(std::move(data_file)); + return result; + } + + std::span equality_field_ids() const { + return options_.equality_field_ids; + } + + private: + Impl(EqualityDeleteWriterOptions options, std::unique_ptr writer) + : options_(std::move(options)), writer_(std::move(writer)) {} + + EqualityDeleteWriterOptions options_; + std::unique_ptr writer_; + bool closed_ = false; }; +EqualityDeleteWriter::EqualityDeleteWriter(std::unique_ptr impl) + : impl_(std::move(impl)) {} + EqualityDeleteWriter::~EqualityDeleteWriter() = default; -Status EqualityDeleteWriter::Write(ArrowArray* data) { return NotImplemented(""); } +Result> EqualityDeleteWriter::Make( + const EqualityDeleteWriterOptions& options) { + ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options)); + return std::unique_ptr(new EqualityDeleteWriter(std::move(impl))); +} + +Status EqualityDeleteWriter::Write(ArrowArray* data) { return impl_->Write(data); } -Result EqualityDeleteWriter::Length() const { return NotImplemented(""); } +Result EqualityDeleteWriter::Length() const { return impl_->Length(); } -Status EqualityDeleteWriter::Close() { return NotImplemented(""); } +Status EqualityDeleteWriter::Close() { return impl_->Close(); } Result EqualityDeleteWriter::Metadata() { - return NotImplemented(""); + return impl_->Metadata(); } -std::span EqualityDeleteWriter::equality_field_ids() const { return {}; } +std::span EqualityDeleteWriter::equality_field_ids() const { + return impl_->equality_field_ids(); +} } // namespace iceberg diff --git a/src/iceberg/data/equality_delete_writer.h b/src/iceberg/data/equality_delete_writer.h index 9de4918dc..d1728a481 100644 --- a/src/iceberg/data/equality_delete_writer.h +++ b/src/iceberg/data/equality_delete_writer.h @@ -50,6 +50,7 @@ struct ICEBERG_EXPORT EqualityDeleteWriterOptions { std::vector equality_field_ids; std::optional sort_order_id; std::unordered_map properties; + // TODO(anyone): add key_metadata for encryption }; /// \brief Writer for Iceberg equality delete files. @@ -57,6 +58,10 @@ class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter { public: ~EqualityDeleteWriter() override; + /// \brief Create a new EqualityDeleteWriter instance. + static Result> Make( + const EqualityDeleteWriterOptions& options); + Status Write(ArrowArray* data) override; Result Length() const override; Status Close() override; @@ -67,6 +72,8 @@ class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter { private: class Impl; std::unique_ptr impl_; + + explicit EqualityDeleteWriter(std::unique_ptr impl); }; } // namespace iceberg diff --git a/src/iceberg/data/position_delete_writer.cc b/src/iceberg/data/position_delete_writer.cc index 9fae9c2b4..3238dc506 100644 --- a/src/iceberg/data/position_delete_writer.cc +++ b/src/iceberg/data/position_delete_writer.cc @@ -30,6 +30,7 @@ #include "iceberg/file_writer.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/metadata_columns.h" +#include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/util/macros.h" @@ -154,6 +155,8 @@ class PositionDeleteWriter::Impl { .split_offsets = std::move(split_offsets), .sort_order_id = std::nullopt, .referenced_data_file = std::move(referenced_data_file), + .partition_spec_id = + options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt, }); FileWriter::WriteResult result; diff --git a/src/iceberg/test/data_writer_test.cc b/src/iceberg/test/data_writer_test.cc index f7778faf3..a3a8fc088 100644 --- a/src/iceberg/test/data_writer_test.cc +++ b/src/iceberg/test/data_writer_test.cc @@ -27,6 +27,7 @@ #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/avro/avro_register.h" +#include "iceberg/data/equality_delete_writer.h" #include "iceberg/data/position_delete_writer.h" #include "iceberg/file_format.h" #include "iceberg/manifest/manifest_entry.h" @@ -423,4 +424,142 @@ TEST_F(PositionDeleteWriterTest, AutoFlushOnThreshold) { EXPECT_GT(data_file->file_size_in_bytes, 0); } +class EqualityDeleteWriterTest : public DataWriterTest { + protected: + EqualityDeleteWriterOptions MakeDeleteOptions( + std::vector equality_field_ids = {1, 2}, + std::optional sort_order_id = std::nullopt) { + return EqualityDeleteWriterOptions{ + .path = "test_eq_deletes.parquet", + .schema = schema_, + .spec = partition_spec_, + .partition = PartitionValues{}, + .format = FileFormatType::kParquet, + .io = file_io_, + .equality_field_ids = std::move(equality_field_ids), + .sort_order_id = sort_order_id, + .properties = {{"write.parquet.compression-codec", "uncompressed"}}, + }; + } + + void WriteTestDataToEqualityWriter(EqualityDeleteWriter* writer) { + auto test_data = CreateTestData(); + ArrowArray arrow_array; + ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok()); + ASSERT_THAT(writer->Write(&arrow_array), IsOk()); + } +}; + +TEST_F(EqualityDeleteWriterTest, WriteAndClose) { + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + WriteTestDataToEqualityWriter(writer.get()); + + auto length_result = writer->Length(); + ASSERT_THAT(length_result, IsOk()); + EXPECT_GT(length_result.value(), 0); + + ASSERT_THAT(writer->Close(), IsOk()); +} + +TEST_F(EqualityDeleteWriterTest, MetadataAfterClose) { + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + WriteTestDataToEqualityWriter(writer.get()); + ASSERT_THAT(writer->Close(), IsOk()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + + const auto& write_result = metadata_result.value(); + ASSERT_EQ(write_result.data_files.size(), 1); + + const auto& data_file = write_result.data_files[0]; + EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes); + EXPECT_EQ(data_file->file_path, "test_eq_deletes.parquet"); + EXPECT_EQ(data_file->file_format, FileFormatType::kParquet); + EXPECT_GT(data_file->file_size_in_bytes, 0); + + // Partition spec id must be set + ASSERT_TRUE(data_file->partition_spec_id.has_value()); + EXPECT_EQ(data_file->partition_spec_id.value(), PartitionSpec::kInitialSpecId); + + // Equality field ids must be set + ASSERT_EQ(data_file->equality_ids.size(), 2); + EXPECT_EQ(data_file->equality_ids[0], 1); + EXPECT_EQ(data_file->equality_ids[1], 2); +} + +TEST_F(EqualityDeleteWriterTest, MetadataBeforeCloseReturnsError) { + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(metadata_result, + HasErrorMessage("Cannot get metadata before closing the writer")); +} + +TEST_F(EqualityDeleteWriterTest, CloseIsIdempotent) { + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + WriteTestDataToEqualityWriter(writer.get()); + + ASSERT_THAT(writer->Close(), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); +} + +TEST_F(EqualityDeleteWriterTest, SortOrderIdInMetadata) { + const int32_t sort_order_id = 7; + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions({1}, sort_order_id)); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + WriteTestDataToEqualityWriter(writer.get()); + ASSERT_THAT(writer->Close(), IsOk()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + const auto& data_file = metadata_result.value().data_files[0]; + ASSERT_TRUE(data_file->sort_order_id.has_value()); + EXPECT_EQ(data_file->sort_order_id.value(), sort_order_id); +} + +TEST_F(EqualityDeleteWriterTest, EqualityFieldIdsAccessor) { + std::vector field_ids = {1, 2, 3}; + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions(field_ids)); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + auto ids = writer->equality_field_ids(); + ASSERT_EQ(ids.size(), 3); + EXPECT_EQ(ids[0], 1); + EXPECT_EQ(ids[1], 2); + EXPECT_EQ(ids[2], 3); +} + +TEST_F(EqualityDeleteWriterTest, WriteMultipleBatches) { + auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions()); + ASSERT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + WriteTestDataToEqualityWriter(writer.get()); + WriteTestDataToEqualityWriter(writer.get()); + ASSERT_THAT(writer->Close(), IsOk()); + + auto metadata_result = writer->Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + const auto& data_file = metadata_result.value().data_files[0]; + EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes); + EXPECT_GT(data_file->file_size_in_bytes, 0); +} + } // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 8d6824eed..cad3e969a 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -142,20 +142,22 @@ class ManifestEvaluator; class ResidualEvaluator; class StrictMetricsEvaluator; -/// \brief Scan. +/// \brief Scan task. class ChangelogScanTask; -class DataTableScan; class FileScanTask; +class ScanTask; + +/// \brief Table scan +class DataTableScan; template class IncrementalScan; class IncrementalAppendScan; class IncrementalChangelogScan; -class ScanTask; class TableScan; + +/// \brief Scan builder. template class TableScanBuilder; - -// Type aliases for incremental scan builders using DataTableScanBuilder = TableScanBuilder; using IncrementalAppendScanBuilder = TableScanBuilder; using IncrementalChangelogScanBuilder = TableScanBuilder;