Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions src/iceberg/data/data_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "iceberg/file_writer.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/partition_spec.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand All @@ -43,18 +44,11 @@ class DataWriter::Impl {
return std::unique_ptr<Impl>(new Impl(std::move(options), std::move(writer)));
}

Status Write(ArrowArray* data) {
ICEBERG_DCHECK(writer_, "Writer not initialized");
return writer_->Write(data);
}
Status Write(ArrowArray* data) { return writer_->Write(data); }

Result<int64_t> Length() const {
ICEBERG_DCHECK(writer_, "Writer not initialized");
return writer_->length();
}
Result<int64_t> Length() const { return writer_->length(); }

Status Close() {
ICEBERG_DCHECK(writer_, "Writer not initialized");
if (closed_) {
// Idempotent: no-op if already closed
return {};
Expand Down Expand Up @@ -100,6 +94,8 @@ class DataWriter::Impl {
.upper_bounds = std::move(upper_bounds_map),
.split_offsets = std::move(split_offsets),
.sort_order_id = options_.sort_order_id,
.partition_spec_id =
options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt,
});

FileWriter::WriteResult result;
Expand Down
113 changes: 108 additions & 5 deletions src/iceberg/data/equality_delete_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,127 @@

#include "iceberg/data/equality_delete_writer.h"

#include <map>

#include "iceberg/file_writer.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/partition_spec.h"
#include "iceberg/util/macros.h"

namespace iceberg {

class EqualityDeleteWriter::Impl {
public:
static Result<std::unique_ptr<Impl>> Make(EqualityDeleteWriterOptions options) {
WriterOptions writer_options{
.path = options.path,
.schema = options.schema,
.io = options.io,
.properties = WriterProperties::FromMap(options.properties),
};

ICEBERG_ASSIGN_OR_RAISE(auto writer,
WriterFactoryRegistry::Open(options.format, writer_options));

return std::unique_ptr<Impl>(new Impl(std::move(options), std::move(writer)));
}

Status Write(ArrowArray* data) { return writer_->Write(data); }

Result<int64_t> Length() const { return writer_->length(); }

Status Close() {
if (closed_) {
return {};
}
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
closed_ = true;
return {};
}

Result<FileWriter::WriteResult> Metadata() {
ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");

ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
auto split_offsets = writer_->split_offsets();

// Serialize literal bounds to binary format
std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
for (const auto& [col_id, literal] : metrics.lower_bounds) {
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
lower_bounds_map[col_id] = std::move(serialized);
}
std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
for (const auto& [col_id, literal] : metrics.upper_bounds) {
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
upper_bounds_map[col_id] = std::move(serialized);
}

// TODO(anyone): add encryption key metadata for encrypted delete files
auto data_file = std::make_shared<DataFile>(DataFile{
.content = DataFile::Content::kEqualityDeletes,
.file_path = options_.path,
.file_format = options_.format,
.partition = options_.partition,
.record_count = metrics.row_count.value_or(-1),
.file_size_in_bytes = length,
.column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()},
.value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()},
.null_value_counts = {metrics.null_value_counts.begin(),
metrics.null_value_counts.end()},
.nan_value_counts = {metrics.nan_value_counts.begin(),
metrics.nan_value_counts.end()},
.lower_bounds = std::move(lower_bounds_map),
.upper_bounds = std::move(upper_bounds_map),
.split_offsets = std::move(split_offsets),
.equality_ids = options_.equality_field_ids,
.sort_order_id = options_.sort_order_id,
.partition_spec_id =
options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt,
});

FileWriter::WriteResult result;
result.data_files.push_back(std::move(data_file));
return result;
}

std::span<const int32_t> equality_field_ids() const {
return options_.equality_field_ids;
}

private:
Impl(EqualityDeleteWriterOptions options, std::unique_ptr<Writer> writer)
: options_(std::move(options)), writer_(std::move(writer)) {}

EqualityDeleteWriterOptions options_;
std::unique_ptr<Writer> writer_;
bool closed_ = false;
};

EqualityDeleteWriter::EqualityDeleteWriter(std::unique_ptr<Impl> impl)
: impl_(std::move(impl)) {}

EqualityDeleteWriter::~EqualityDeleteWriter() = default;

Status EqualityDeleteWriter::Write(ArrowArray* data) { return NotImplemented(""); }
Result<std::unique_ptr<EqualityDeleteWriter>> EqualityDeleteWriter::Make(
const EqualityDeleteWriterOptions& options) {
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
return std::unique_ptr<EqualityDeleteWriter>(new EqualityDeleteWriter(std::move(impl)));
}

Status EqualityDeleteWriter::Write(ArrowArray* data) { return impl_->Write(data); }

Result<int64_t> EqualityDeleteWriter::Length() const { return NotImplemented(""); }
Result<int64_t> EqualityDeleteWriter::Length() const { return impl_->Length(); }

Status EqualityDeleteWriter::Close() { return NotImplemented(""); }
Status EqualityDeleteWriter::Close() { return impl_->Close(); }

Result<FileWriter::WriteResult> EqualityDeleteWriter::Metadata() {
return NotImplemented("");
return impl_->Metadata();
}

std::span<const int32_t> EqualityDeleteWriter::equality_field_ids() const { return {}; }
std::span<const int32_t> EqualityDeleteWriter::equality_field_ids() const {
return impl_->equality_field_ids();
}

} // namespace iceberg
7 changes: 7 additions & 0 deletions src/iceberg/data/equality_delete_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ struct ICEBERG_EXPORT EqualityDeleteWriterOptions {
std::vector<int32_t> equality_field_ids;
std::optional<int32_t> sort_order_id;
std::unordered_map<std::string, std::string> properties;
// TODO(anyone): add key_metadata for encryption
};

/// \brief Writer for Iceberg equality delete files.
class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter {
public:
~EqualityDeleteWriter() override;

/// \brief Create a new EqualityDeleteWriter instance.
static Result<std::unique_ptr<EqualityDeleteWriter>> Make(
const EqualityDeleteWriterOptions& options);

Status Write(ArrowArray* data) override;
Result<int64_t> Length() const override;
Status Close() override;
Expand All @@ -67,6 +72,8 @@ class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter {
private:
class Impl;
std::unique_ptr<Impl> impl_;

explicit EqualityDeleteWriter(std::unique_ptr<Impl> impl);
};

} // namespace iceberg
3 changes: 3 additions & 0 deletions src/iceberg/data/position_delete_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "iceberg/file_writer.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/metadata_columns.h"
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/macros.h"
Expand Down Expand Up @@ -154,6 +155,8 @@ class PositionDeleteWriter::Impl {
.split_offsets = std::move(split_offsets),
.sort_order_id = std::nullopt,
.referenced_data_file = std::move(referenced_data_file),
.partition_spec_id =
options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt,
});

FileWriter::WriteResult result;
Expand Down
139 changes: 139 additions & 0 deletions src/iceberg/test/data_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/data/equality_delete_writer.h"
#include "iceberg/data/position_delete_writer.h"
#include "iceberg/file_format.h"
#include "iceberg/manifest/manifest_entry.h"
Expand Down Expand Up @@ -423,4 +424,142 @@ TEST_F(PositionDeleteWriterTest, AutoFlushOnThreshold) {
EXPECT_GT(data_file->file_size_in_bytes, 0);
}

class EqualityDeleteWriterTest : public DataWriterTest {
protected:
EqualityDeleteWriterOptions MakeDeleteOptions(
std::vector<int32_t> equality_field_ids = {1, 2},
std::optional<int32_t> sort_order_id = std::nullopt) {
return EqualityDeleteWriterOptions{
.path = "test_eq_deletes.parquet",
.schema = schema_,
.spec = partition_spec_,
.partition = PartitionValues{},
.format = FileFormatType::kParquet,
.io = file_io_,
.equality_field_ids = std::move(equality_field_ids),
.sort_order_id = sort_order_id,
.properties = {{"write.parquet.compression-codec", "uncompressed"}},
};
}

void WriteTestDataToEqualityWriter(EqualityDeleteWriter* writer) {
auto test_data = CreateTestData();
ArrowArray arrow_array;
ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
ASSERT_THAT(writer->Write(&arrow_array), IsOk());
}
};

TEST_F(EqualityDeleteWriterTest, WriteAndClose) {
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

WriteTestDataToEqualityWriter(writer.get());

auto length_result = writer->Length();
ASSERT_THAT(length_result, IsOk());
EXPECT_GT(length_result.value(), 0);

ASSERT_THAT(writer->Close(), IsOk());
}

TEST_F(EqualityDeleteWriterTest, MetadataAfterClose) {
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

WriteTestDataToEqualityWriter(writer.get());
ASSERT_THAT(writer->Close(), IsOk());

auto metadata_result = writer->Metadata();
ASSERT_THAT(metadata_result, IsOk());

const auto& write_result = metadata_result.value();
ASSERT_EQ(write_result.data_files.size(), 1);

const auto& data_file = write_result.data_files[0];
EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes);
EXPECT_EQ(data_file->file_path, "test_eq_deletes.parquet");
EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
EXPECT_GT(data_file->file_size_in_bytes, 0);

// Partition spec id must be set
ASSERT_TRUE(data_file->partition_spec_id.has_value());
EXPECT_EQ(data_file->partition_spec_id.value(), PartitionSpec::kInitialSpecId);

// Equality field ids must be set
ASSERT_EQ(data_file->equality_ids.size(), 2);
EXPECT_EQ(data_file->equality_ids[0], 1);
EXPECT_EQ(data_file->equality_ids[1], 2);
}

TEST_F(EqualityDeleteWriterTest, MetadataBeforeCloseReturnsError) {
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

auto metadata_result = writer->Metadata();
ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
EXPECT_THAT(metadata_result,
HasErrorMessage("Cannot get metadata before closing the writer"));
}

TEST_F(EqualityDeleteWriterTest, CloseIsIdempotent) {
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

WriteTestDataToEqualityWriter(writer.get());

ASSERT_THAT(writer->Close(), IsOk());
ASSERT_THAT(writer->Close(), IsOk());
ASSERT_THAT(writer->Close(), IsOk());
}

TEST_F(EqualityDeleteWriterTest, SortOrderIdInMetadata) {
const int32_t sort_order_id = 7;
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions({1}, sort_order_id));
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

WriteTestDataToEqualityWriter(writer.get());
ASSERT_THAT(writer->Close(), IsOk());

auto metadata_result = writer->Metadata();
ASSERT_THAT(metadata_result, IsOk());
const auto& data_file = metadata_result.value().data_files[0];
ASSERT_TRUE(data_file->sort_order_id.has_value());
EXPECT_EQ(data_file->sort_order_id.value(), sort_order_id);
}

TEST_F(EqualityDeleteWriterTest, EqualityFieldIdsAccessor) {
std::vector<int32_t> field_ids = {1, 2, 3};
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions(field_ids));
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

auto ids = writer->equality_field_ids();
ASSERT_EQ(ids.size(), 3);
EXPECT_EQ(ids[0], 1);
EXPECT_EQ(ids[1], 2);
EXPECT_EQ(ids[2], 3);
}

TEST_F(EqualityDeleteWriterTest, WriteMultipleBatches) {
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

WriteTestDataToEqualityWriter(writer.get());
WriteTestDataToEqualityWriter(writer.get());
ASSERT_THAT(writer->Close(), IsOk());

auto metadata_result = writer->Metadata();
ASSERT_THAT(metadata_result, IsOk());
const auto& data_file = metadata_result.value().data_files[0];
EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes);
EXPECT_GT(data_file->file_size_in_bytes, 0);
}

} // namespace iceberg
12 changes: 7 additions & 5 deletions src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,22 @@ class ManifestEvaluator;
class ResidualEvaluator;
class StrictMetricsEvaluator;

/// \brief Scan.
/// \brief Scan task.
class ChangelogScanTask;
class DataTableScan;
class FileScanTask;
class ScanTask;

/// \brief Table scan
class DataTableScan;
template <typename ScanTaskType>
class IncrementalScan;
class IncrementalAppendScan;
class IncrementalChangelogScan;
class ScanTask;
class TableScan;

/// \brief Scan builder.
template <typename ScanType>
class TableScanBuilder;

// Type aliases for incremental scan builders
using DataTableScanBuilder = TableScanBuilder<DataTableScan>;
using IncrementalAppendScanBuilder = TableScanBuilder<IncrementalAppendScan>;
using IncrementalChangelogScanBuilder = TableScanBuilder<IncrementalChangelogScan>;
Expand Down
Loading