diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 937af94b1..648746a87 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -171,6 +171,89 @@ Status TableScanContext::Validate() const { return {}; } +bool TableScanContext::IsScanCurrentLineage() const { + return !from_snapshot_id.has_value() && !to_snapshot_id.has_value(); +} + +Result TableScanContext::ToSnapshotIdInclusive( + const TableMetadata& metadata) const { + // Get the branch's current snapshot ID if branch is set + std::shared_ptr branch_snapshot; + if (!branch.empty()) { + auto iter = metadata.refs.find(branch); + ICEBERG_CHECK(iter != metadata.refs.end() && iter->second != nullptr, + "Cannot find branch: {}", branch); + ICEBERG_ASSIGN_OR_RAISE(branch_snapshot, + metadata.SnapshotById(iter->second->snapshot_id)); + } + + if (to_snapshot_id.has_value()) { + int64_t to_snapshot_id_value = to_snapshot_id.value(); + + if (branch_snapshot != nullptr) { + // Validate `to_snapshot_id` is on the current branch + ICEBERG_ASSIGN_OR_RAISE( + bool is_ancestor, + SnapshotUtil::IsAncestorOf(metadata, branch_snapshot->snapshot_id, + to_snapshot_id_value)); + ICEBERG_CHECK(is_ancestor, + "End snapshot is not a valid snapshot on the current branch: {}", + branch); + } + + return to_snapshot_id_value; + } + + // If to_snapshot_id is not set, use branch's current snapshot if branch is set + if (branch_snapshot != nullptr) { + return branch_snapshot->snapshot_id; + } + + // Get current snapshot from table's current snapshot + std::shared_ptr current_snapshot; + ICEBERG_ASSIGN_OR_RAISE(current_snapshot, metadata.Snapshot()); + ICEBERG_CHECK(current_snapshot != nullptr, + "End snapshot is not set and table has no current snapshot"); + return current_snapshot->snapshot_id; +} + +Result> TableScanContext::FromSnapshotIdExclusive( + const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const { + if (!from_snapshot_id.has_value()) { + return std::nullopt; + } + + int64_t from_snapshot_id_value = from_snapshot_id.value(); + + // Validate `from_snapshot_id` is an ancestor of `to_snapshot_id_inclusive` + if (from_snapshot_id_inclusive) { + ICEBERG_ASSIGN_OR_RAISE(bool is_ancestor, + SnapshotUtil::IsAncestorOf(metadata, to_snapshot_id_inclusive, + from_snapshot_id_value)); + ICEBERG_CHECK( + is_ancestor, + "Starting snapshot (inclusive) {} is not an ancestor of end snapshot {}", + from_snapshot_id_value, to_snapshot_id_inclusive); + + // For inclusive behavior, return the parent snapshot ID (can be nullopt) + ICEBERG_ASSIGN_OR_RAISE(auto from_snapshot, + metadata.SnapshotById(from_snapshot_id_value)); + return from_snapshot->parent_snapshot_id; + } + + // Validate there is an ancestor of `to_snapshot_id_inclusive` where parent is + // `from_snapshot_id` + ICEBERG_ASSIGN_OR_RAISE(bool is_parent_ancestor, SnapshotUtil::IsParentAncestorOf( + metadata, to_snapshot_id_inclusive, + from_snapshot_id_value)); + ICEBERG_CHECK( + is_parent_ancestor, + "Starting snapshot (exclusive) {} is not a parent ancestor of end snapshot {}", + from_snapshot_id_value, to_snapshot_id_inclusive); + + return from_snapshot_id_value; +} + } // namespace internal ScanTask::~ScanTask() = default; @@ -340,10 +423,15 @@ TableScanBuilder& TableScanBuilder::AsOfTime( template TableScanBuilder& TableScanBuilder::FromSnapshot( - [[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) + int64_t from_snapshot_id, bool inclusive) requires IsIncrementalScan { - AddError(NotImplemented("Incremental scan is not implemented")); + if (inclusive) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, + metadata_->SnapshotById(from_snapshot_id)); + } + this->context_.from_snapshot_id = from_snapshot_id; + this->context_.from_snapshot_id_inclusive = inclusive; return *this; } @@ -352,15 +440,20 @@ TableScanBuilder& TableScanBuilder::FromSnapshot( const std::string& ref, bool inclusive) requires IsIncrementalScan { - AddError(NotImplemented("Incremental scan is not implemented")); - return *this; + auto iter = metadata_->refs.find(ref); + ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", ref); + ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref); + ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag, + "Ref {} is not a tag", ref); + return FromSnapshot(iter->second->snapshot_id, inclusive); } template TableScanBuilder& TableScanBuilder::ToSnapshot(int64_t to_snapshot_id) requires IsIncrementalScan { - AddError(NotImplemented("Incremental scan is not implemented")); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(to_snapshot_id)); + context_.to_snapshot_id = to_snapshot_id; return *this; } @@ -368,8 +461,12 @@ template TableScanBuilder& TableScanBuilder::ToSnapshot(const std::string& ref) requires IsIncrementalScan { - AddError(NotImplemented("Incremental scan is not implemented")); - return *this; + auto iter = metadata_->refs.find(ref); + ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", ref); + ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref); + ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag, + "Ref {} is not a tag", ref); + return ToSnapshot(iter->second->snapshot_id); } template @@ -377,6 +474,11 @@ TableScanBuilder& TableScanBuilder::UseBranch( const std::string& branch) requires IsIncrementalScan { + auto iter = metadata_->refs.find(branch); + ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", branch); + ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", branch); + ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kBranch, + "Ref {} is not a branch", branch); context_.branch = branch; return *this; } @@ -539,17 +641,75 @@ Result>> DataTableScan::PlanFiles() co // IncrementalAppendScan implementation Result> IncrementalAppendScan::Make( - [[maybe_unused]] std::shared_ptr metadata, - [[maybe_unused]] std::shared_ptr schema, - [[maybe_unused]] std::shared_ptr io, - [[maybe_unused]] internal::TableScanContext context) { - return NotImplemented("IncrementalAppendScan is not implemented"); + std::shared_ptr metadata, std::shared_ptr schema, + std::shared_ptr io, internal::TableScanContext context) { + ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null"); + ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null"); + ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null"); + return std::unique_ptr(new IncrementalAppendScan( + std::move(metadata), std::move(schema), std::move(io), std::move(context))); } Result>> IncrementalAppendScan::PlanFiles( std::optional from_snapshot_id_exclusive, int64_t to_snapshot_id_inclusive) const { - return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented"); + ICEBERG_ASSIGN_OR_RAISE( + auto ancestors_snapshots, + SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive, + from_snapshot_id_exclusive)); + + std::vector> append_snapshots; + std::ranges::copy_if(ancestors_snapshots, std::back_inserter(append_snapshots), + [](const auto& snapshot) { + return snapshot != nullptr && + snapshot->Operation().has_value() && + snapshot->Operation().value() == DataOperation::kAppend; + }); + if (append_snapshots.empty()) { + return std::vector>{}; + } + + std::unordered_set snapshot_ids; + std::ranges::transform(append_snapshots, + std::inserter(snapshot_ids, snapshot_ids.end()), + [](const auto& snapshot) { return snapshot->snapshot_id; }); + + std::vector data_manifests; + for (const auto& snapshot : append_snapshots) { + SnapshotCache snapshot_cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(io_)); + std::ranges::copy_if(manifests, std::back_inserter(data_manifests), + [&snapshot_ids](const ManifestFile& manifest) { + return snapshot_ids.contains(manifest.added_snapshot_id); + }); + } + if (data_manifests.empty()) { + return std::vector>{}; + } + + TableMetadataCache metadata_cache(metadata_.get()); + ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById()); + + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_group, + ManifestGroup::Make(io_, schema_, specs_by_id, std::move(data_manifests), {})); + + manifest_group->CaseSensitive(context_.case_sensitive) + .Select(ScanColumns()) + .FilterData(filter()) + .FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) { + return entry.snapshot_id.has_value() && + snapshot_ids.contains(entry.snapshot_id.value()) && + entry.status == ManifestStatus::kAdded; + }) + .IgnoreDeleted() + .ColumnsToKeepStats(context_.columns_to_keep_stats); + + if (context_.ignore_residuals) { + manifest_group->IgnoreResiduals(); + } + + return manifest_group->PlanFiles(); } // IncrementalChangelogScan implementation diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index d5bf6b4a5..b35d57914 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -29,6 +29,7 @@ #include "iceberg/arrow_c_data.h" #include "iceberg/result.h" +#include "iceberg/table_metadata.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" @@ -132,6 +133,17 @@ struct TableScanContext { // Validate the context parameters to see if they have conflicts. [[nodiscard]] Status Validate() const; + + /// \brief Returns true if this scan is a current lineage scan, which means it does not + /// specify from/to snapshot IDs. + bool IsScanCurrentLineage() const; + + /// \brief Get the snapshot ID to scan up to (inclusive) based on the context. + Result ToSnapshotIdInclusive(const TableMetadata& metadata) const; + + /// \brief Get the snapshot ID to scan from (exclusive) based on the context. + Result> FromSnapshotIdExclusive( + const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const; }; } // namespace internal @@ -361,9 +373,7 @@ class ICEBERG_EXPORT IncrementalScan : public TableScan { /// \brief Plans the scan tasks by resolving manifests and data files. /// \return A Result containing scan tasks or an error. - Result>> PlanFiles() const { - return NotImplemented("IncrementalScan::PlanFiles is not implemented"); - } + Result>> PlanFiles() const; protected: virtual Result>> PlanFiles( @@ -373,6 +383,26 @@ class ICEBERG_EXPORT IncrementalScan : public TableScan { using TableScan::TableScan; }; +// Template method implementation (must be in header for MSVC) +template +Result>> +IncrementalScan::PlanFiles() const { + if (context_.IsScanCurrentLineage()) { + ICEBERG_ASSIGN_OR_RAISE(auto current_snapshot, metadata_->Snapshot()); + if (current_snapshot == nullptr) { + return std::vector>{}; + } + } + + ICEBERG_ASSIGN_OR_RAISE(int64_t to_snapshot_id_inclusive, + context_.ToSnapshotIdInclusive(*metadata_)); + ICEBERG_ASSIGN_OR_RAISE( + std::optional from_snapshot_id_exclusive, + context_.FromSnapshotIdExclusive(*metadata_, to_snapshot_id_inclusive)); + + return PlanFiles(from_snapshot_id_exclusive, to_snapshot_id_inclusive); +} + /// \brief A scan that reads data files added between snapshots (incremental appends). class ICEBERG_EXPORT IncrementalAppendScan : public IncrementalScan { public: @@ -383,6 +413,9 @@ class ICEBERG_EXPORT IncrementalAppendScan : public IncrementalScan::PlanFiles; + protected: Result>> PlanFiles( std::optional from_snapshot_id_exclusive, @@ -402,6 +435,9 @@ class ICEBERG_EXPORT IncrementalChangelogScan ~IncrementalChangelogScan() override = default; + // Bring the public PlanFiles() from base class into scope + using IncrementalScan::PlanFiles; + protected: Result>> PlanFiles( std::optional from_snapshot_id_exclusive, diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index fdd88888e..c62d53473 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES file_scan_task_test.cc + incremental_append_scan_test.cc table_scan_test.cc) add_iceberg_test(table_update_test diff --git a/src/iceberg/test/incremental_append_scan_test.cc b/src/iceberg/test/incremental_append_scan_test.cc new file mode 100644 index 000000000..559aedd81 --- /dev/null +++ b/src/iceberg/test/incremental_append_scan_test.cc @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include +#include + +#include "iceberg/test/scan_test_base.h" + +namespace iceberg { + +class IncrementalAppendScanTest : public ScanTestBase {}; + +TEST_P(IncrementalAppendScanTest, FromSnapshotInclusive) { + auto version = GetParam(); + + // Create 3 snapshots, each appending one file + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + auto snapshot_b = + MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"}); + auto snapshot_c = + MakeAppendSnapshot(version, 3000L, 2000L, 3L, {"/path/to/file_c.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a, snapshot_b, snapshot_c}, 3000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}}); + + // Test: from_snapshot_inclusive(snapshot_a) should return 3 files (A, B, C) + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot(1000L, /*inclusive=*/true); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 3); + EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre( + "/path/to/file_a.parquet", "/path/to/file_b.parquet", + "/path/to/file_c.parquet")); + } + + // Test: from_snapshot_inclusive(snapshot_a).to_snapshot(snapshot_c) should return 3 + // files + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot(1000L, /*inclusive=*/true).ToSnapshot(3000L); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 3); + } +} + +TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithNonExistingRef) { + auto metadata = MakeTableMetadata({}, -1L); + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot("non_existing_ref", /*inclusive=*/true); + EXPECT_THAT(builder->Build(), + ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Cannot find ref: non_existing_ref"))); +} + +TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithTag) { + auto version = GetParam(); + + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + auto snapshot_b = MakeAppendSnapshot( + version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", "/path/to/file_c.parquet"}); + auto snapshot_current = MakeAppendSnapshot( + version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet", "/path/to/file_a2.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a, snapshot_b, snapshot_current}, 3000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}, + {"t1", std::make_shared( + SnapshotRef{.snapshot_id = 1000L, .retention = SnapshotRef::Tag{}})}, + {"t2", std::make_shared( + SnapshotRef{.snapshot_id = 2000L, .retention = SnapshotRef::Tag{}})}}); + + // Test: from_snapshot_inclusive(t1) should return 5 files + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot("t1", /*inclusive=*/true); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 5); + } + + // Test: from_snapshot_inclusive(t1).to_snapshot(t2) should return 3 files + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot("t1", /*inclusive=*/true).ToSnapshot("t2"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 3); + } +} + +TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithBranchShouldFail) { + auto version = GetParam(); + + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a}, 1000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 1000L, .retention = SnapshotRef::Branch{}})}, + {"b1", std::make_shared(SnapshotRef{ + .snapshot_id = 1000L, .retention = SnapshotRef::Branch{}})}}); + + // Test: from_snapshot_inclusive(branch_name) should fail + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot("b1", /*inclusive=*/true); + EXPECT_THAT(builder->Build(), + ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Ref b1 is not a tag"))); + } + + // Test: to_snapshot(branch_name) should fail + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot(1000L, /*inclusive=*/true).ToSnapshot("b1"); + EXPECT_THAT(builder->Build(), + ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Ref b1 is not a tag"))); + } +} + +TEST_P(IncrementalAppendScanTest, UseBranch) { + auto version = GetParam(); + + // Common ancestor + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + // Main branch snapshots + auto snapshot_main_b = MakeAppendSnapshot( + version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", "/path/to/file_c.parquet"}); + auto snapshot_current = MakeAppendSnapshot( + version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet", "/path/to/file_a2.parquet"}); + // Branch b1 snapshots + auto snapshot_branch_b = + MakeAppendSnapshot(version, 4000L, 1000L, 2L, {"/path/to/file_c_branch.parquet"}); + auto snapshot_branch_c = + MakeAppendSnapshot(version, 5000L, 4000L, 3L, {"/path/to/file_c_branch2.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a, snapshot_main_b, snapshot_current, snapshot_branch_b, + snapshot_branch_c}, + 3000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}, + {"t1", std::make_shared( + SnapshotRef{.snapshot_id = 1000L, .retention = SnapshotRef::Tag{}})}, + {"t2", std::make_shared( + SnapshotRef{.snapshot_id = 2000L, .retention = SnapshotRef::Tag{}})}, + {"b1", std::make_shared(SnapshotRef{ + .snapshot_id = 5000L, .retention = SnapshotRef::Branch{}})}}); + + // Test: from_snapshot_inclusive(t1) on main should return 5 files + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot("t1", /*inclusive=*/true); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 5); + } + + // Test: from_snapshot_inclusive(t1).use_branch(b1) should return 3 files + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot("t1", /*inclusive=*/true).UseBranch("b1"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 3); + } + + // Test: to_snapshot(snapshot_branch_b).use_branch(b1) should return 2 files + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->ToSnapshot(4000L).UseBranch("b1"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + } + + // Test: to_snapshot(snapshot_branch_c).use_branch(b1) should return 3 files + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->ToSnapshot(5000L).UseBranch("b1"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 3); + } + + // Test: from_snapshot_exclusive(t1).to_snapshot(snapshot_branch_b).use_branch(b1) + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot("t1", /*inclusive=*/false).ToSnapshot(4000L).UseBranch("b1"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 1); + } +} + +TEST_P(IncrementalAppendScanTest, UseBranchWithTagShouldFail) { + auto version = GetParam(); + + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a}, 1000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 1000L, .retention = SnapshotRef::Branch{}})}, + {"t1", std::make_shared( + SnapshotRef{.snapshot_id = 1000L, .retention = SnapshotRef::Tag{}})}}); + + // Test: use_branch(tag_name) should fail + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot(1000L, /*inclusive=*/true).UseBranch("t1"); + EXPECT_THAT(builder->Build(), + ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Ref t1 is not a branch"))); +} + +TEST_P(IncrementalAppendScanTest, UseBranchWithInvalidSnapshotShouldFail) { + auto version = GetParam(); + + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + auto snapshot_main_b = MakeAppendSnapshot( + version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", "/path/to/file_c.parquet"}); + auto snapshot_branch_b = + MakeAppendSnapshot(version, 3000L, 1000L, 2L, {"/path/to/file_d.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a, snapshot_main_b, snapshot_branch_b}, 2000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 2000L, .retention = SnapshotRef::Branch{}})}, + {"b1", std::make_shared(SnapshotRef{ + .snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}}); + + // Test: to_snapshot(snapshot_main_b).use_branch(b1) should fail + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->ToSnapshot(2000L).UseBranch("b1"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + EXPECT_THAT( + scan->PlanFiles(), + ::testing::AllOf( + IsError(ErrorKind::kValidationFailed), + HasErrorMessage( + "End snapshot is not a valid snapshot on the current branch: b1"))); + } + + // Test: from_snapshot_inclusive(snapshot_main_b).use_branch(b1) should fail + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot(2000L, /*inclusive=*/true).UseBranch("b1"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + EXPECT_THAT( + scan->PlanFiles(), + ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Starting snapshot (inclusive) 2000 is not an " + "ancestor of end snapshot 3000"))); + } +} + +TEST_P(IncrementalAppendScanTest, UseBranchWithNonExistingRef) { + auto metadata = MakeTableMetadata({}, -1L); + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->UseBranch("non_existing_ref"); + EXPECT_THAT(builder->Build(), + ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Cannot find ref: non_existing_ref"))); +} + +TEST_P(IncrementalAppendScanTest, FromSnapshotExclusive) { + auto version = GetParam(); + + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + auto snapshot_b = + MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"}); + auto snapshot_c = + MakeAppendSnapshot(version, 3000L, 2000L, 3L, {"/path/to/file_c.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a, snapshot_b, snapshot_c}, 3000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}}); + + // Test: from_snapshot_exclusive(snapshot_a) should return 2 files (B, C) + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot(1000L, /*inclusive=*/false); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), + testing::UnorderedElementsAre("/path/to/file_b.parquet", + "/path/to/file_c.parquet")); + } + + // Test: from_snapshot_exclusive(snapshot_a).to_snapshot(snapshot_b) should return 1 + // file (B) + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot(1000L, /*inclusive=*/false).ToSnapshot(2000L); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 1); + EXPECT_EQ(tasks[0]->data_file()->file_path, "/path/to/file_b.parquet"); + } +} + +TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithNonExistingRef) { + auto metadata = MakeTableMetadata({}, -1L); + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot("nonExistingRef", /*inclusive=*/false); + EXPECT_THAT(builder->Build(), + ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Cannot find ref: nonExistingRef"))); +} + +TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithTag) { + auto version = GetParam(); + + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + auto snapshot_b = MakeAppendSnapshot( + version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", "/path/to/file_c.parquet"}); + auto snapshot_current = MakeAppendSnapshot( + version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet", "/path/to/file_a2.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a, snapshot_b, snapshot_current}, 3000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}, + {"t1", std::make_shared( + SnapshotRef{.snapshot_id = 1000L, .retention = SnapshotRef::Tag{}})}, + {"t2", std::make_shared( + SnapshotRef{.snapshot_id = 2000L, .retention = SnapshotRef::Tag{}})}}); + + // Test: from_snapshot_exclusive(t1) should return 4 files + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot("t1", /*inclusive=*/false); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 4); + } + + // Test: from_snapshot_exclusive(t1).to_snapshot(t2) should return 2 files + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot("t1", /*inclusive=*/false).ToSnapshot("t2"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + } +} + +TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithBranchShouldFail) { + auto version = GetParam(); + + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a}, 1000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 1000L, .retention = SnapshotRef::Branch{}})}, + {"b1", std::make_shared(SnapshotRef{ + .snapshot_id = 1000L, .retention = SnapshotRef::Branch{}})}}); + + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot("b1", /*inclusive=*/false); + EXPECT_THAT(builder->Build(), ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Ref b1 is not a tag"))); +} + +TEST_P(IncrementalAppendScanTest, ToSnapshot) { + auto version = GetParam(); + + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + auto snapshot_b = + MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"}); + auto snapshot_c = + MakeAppendSnapshot(version, 3000L, 2000L, 3L, {"/path/to/file_c.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a, snapshot_b, snapshot_c}, 3000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}}); + + // Test: to_snapshot(snapshot_b) should return 2 files (A, B) + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->ToSnapshot(2000L); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), + testing::UnorderedElementsAre("/path/to/file_a.parquet", + "/path/to/file_b.parquet")); + } +} + +TEST_P(IncrementalAppendScanTest, ToSnapshotWithTag) { + auto version = GetParam(); + + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + auto snapshot_b = + MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"}); + auto snapshot_current = + MakeAppendSnapshot(version, 3000L, 2000L, 3L, {"/path/to/file_b2.parquet"}); + auto snapshot_branch_b = + MakeAppendSnapshot(version, 4000L, 2000L, 3L, {"/path/to/file_c.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a, snapshot_b, snapshot_current, snapshot_branch_b}, 3000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}, + {"b1", std::make_shared( + SnapshotRef{.snapshot_id = 4000L, .retention = SnapshotRef::Branch{}})}, + {"t1", std::make_shared( + SnapshotRef{.snapshot_id = 2000L, .retention = SnapshotRef::Tag{}})}, + {"t2", std::make_shared( + SnapshotRef{.snapshot_id = 4000L, .retention = SnapshotRef::Tag{}})}}); + + // Test: to_snapshot(t1) should return 2 files + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->ToSnapshot("t1"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + } + + // Test: to_snapshot(t2) should return 3 files (on branch b1) + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->ToSnapshot("t2"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 3); + } +} + +TEST_P(IncrementalAppendScanTest, ToSnapshotWithNonExistingRef) { + auto metadata = MakeTableMetadata({}, -1L); + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->ToSnapshot("non_existing_ref"); + EXPECT_THAT(builder->Build(), + ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Cannot find ref: non_existing_ref"))); +} + +TEST_P(IncrementalAppendScanTest, ToSnapshotWithBranchShouldFail) { + auto version = GetParam(); + + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + auto snapshot_b = + MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"}); + + auto metadata = MakeTableMetadata( + {snapshot_a, snapshot_b}, 2000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 2000L, .retention = SnapshotRef::Branch{}})}, + {"b1", std::make_shared(SnapshotRef{ + .snapshot_id = 2000L, .retention = SnapshotRef::Branch{}})}}); + + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->ToSnapshot("b1"); + EXPECT_THAT(builder->Build(), ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Ref b1 is not a tag"))); +} + +TEST_P(IncrementalAppendScanTest, MultipleRootSnapshots) { + auto version = GetParam(); + + // Snapshot A (will be "expired" by not having it as parent of C) + auto snapshot_a = + MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"}); + // Snapshot B (staged, orphaned - not an ancestor of main branch) + auto snapshot_b = + MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"}); + // Snapshot C (new root after A is expired) + auto snapshot_c = + MakeAppendSnapshot(version, 3000L, std::nullopt, 3L, {"/path/to/file_c.parquet"}); + // Snapshot D + auto snapshot_d = + MakeAppendSnapshot(version, 4000L, 3000L, 4L, {"/path/to/file_d.parquet"}); + + // Note: snapshot_a is kept in metadata but not in the parent chain of C/D + // This simulates expiring snapshot A, creating two root snapshots (B and C) + auto metadata = MakeTableMetadata( + {snapshot_a, snapshot_b, snapshot_c, snapshot_d}, 4000L, + {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 4000L, .retention = SnapshotRef::Branch{}})}}); + + // Test: to_snapshot(snapshot_d) should discover snapshots C and D only + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->ToSnapshot(4000L); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), + testing::UnorderedElementsAre("/path/to/file_c.parquet", + "/path/to/file_d.parquet")); + } + + // Test: from_snapshot_exclusive(snapshot_b).to_snapshot(snapshot_d) should fail + // because B is not a parent ancestor of D + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot(2000L, /*inclusive=*/false).ToSnapshot(4000L); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + EXPECT_THAT( + scan->PlanFiles(), + ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Starting snapshot (exclusive) 2000 is not a " + "parent ancestor of end snapshot 4000"))); + } + + // Test: from_snapshot_inclusive(snapshot_b).to_snapshot(snapshot_d) should fail + // because B is not an ancestor of D + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + IncrementalAppendScanBuilder::Make(metadata, file_io_)); + builder->FromSnapshot(2000L, /*inclusive=*/true).ToSnapshot(4000L); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + EXPECT_THAT( + scan->PlanFiles(), + ::testing::AllOf(IsError(ErrorKind::kValidationFailed), + HasErrorMessage("Starting snapshot (inclusive) 2000 is not an " + "ancestor of end snapshot 4000"))); + } +} + +INSTANTIATE_TEST_SUITE_P(IncrementalAppendScanVersions, IncrementalAppendScanTest, + testing::Values(1, 2, 3)); + +} // namespace iceberg diff --git a/src/iceberg/test/scan_test_base.h b/src/iceberg/test/scan_test_base.h new file mode 100644 index 000000000..65a4e0531 --- /dev/null +++ b/src/iceberg/test/scan_test_base.h @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/avro/avro_register.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_scan.h" +#include "iceberg/test/matchers.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" + +namespace iceberg { + +/// \brief Base class for scan-related tests providing common test utilities. +/// +/// This class provides common setup and helper functions for testing +/// TableScan and IncrementalScan implementations. +class ScanTestBase : public testing::TestWithParam { + protected: + void SetUp() override { + avro::RegisterAll(); + + file_io_ = arrow::MakeMockFileIO(); + schema_ = std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/1, "id", int32()), + SchemaField::MakeRequired(/*field_id=*/2, "data", string())}); + unpartitioned_spec_ = PartitionSpec::Unpartitioned(); + + ICEBERG_UNWRAP_OR_FAIL( + partitioned_spec_, + PartitionSpec::Make( + /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000, + "data_bucket_16_2", Transform::Bucket(16))})); + } + + /// \brief Generate a unique manifest file path. + std::string MakeManifestPath() { + return std::format("manifest-{}-{}.avro", manifest_counter_++, + std::chrono::system_clock::now().time_since_epoch().count()); + } + + /// \brief Generate a unique manifest list file path. + std::string MakeManifestListPath() { + return std::format("manifest-list-{}-{}.avro", manifest_list_counter_++, + std::chrono::system_clock::now().time_since_epoch().count()); + } + + /// \brief Create a manifest entry. + ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id, + int64_t sequence_number, std::shared_ptr file) { + return ManifestEntry{ + .status = status, + .snapshot_id = snapshot_id, + .sequence_number = sequence_number, + .file_sequence_number = sequence_number, + .data_file = std::move(file), + }; + } + + /// \brief Write a data manifest file. + ManifestFile WriteDataManifest( + int8_t format_version, int64_t snapshot_id, std::vector entries, + std::shared_ptr spec = PartitionSpec::Unpartitioned()) { + const std::string manifest_path = MakeManifestPath(); + auto writer_result = ManifestWriter::MakeWriter( + format_version, snapshot_id, manifest_path, file_io_, spec, schema_, + ManifestContent::kData, + /*first_row_id=*/format_version >= 3 ? std::optional(0L) : std::nullopt); + + EXPECT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + for (const auto& entry : entries) { + EXPECT_THAT(writer->WriteEntry(entry), IsOk()); + } + + EXPECT_THAT(writer->Close(), IsOk()); + auto manifest_result = writer->ToManifestFile(); + EXPECT_THAT(manifest_result, IsOk()); + return std::move(manifest_result.value()); + } + + /// \brief Write a delete manifest file. + ManifestFile WriteDeleteManifest(int8_t format_version, int64_t snapshot_id, + std::vector entries, + std::shared_ptr spec) { + const std::string manifest_path = MakeManifestPath(); + auto writer_result = + ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path, file_io_, + spec, schema_, ManifestContent::kDeletes); + + EXPECT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + for (const auto& entry : entries) { + EXPECT_THAT(writer->WriteEntry(entry), IsOk()); + } + + EXPECT_THAT(writer->Close(), IsOk()); + auto manifest_result = writer->ToManifestFile(); + EXPECT_THAT(manifest_result, IsOk()); + return std::move(manifest_result.value()); + } + + /// \brief Write a manifest list file. + std::string WriteManifestList(int8_t format_version, int64_t snapshot_id, + int64_t parent_snapshot_id, int64_t sequence_number, + const std::vector& manifests) { + const std::string manifest_list_path = MakeManifestListPath(); + + auto writer_result = ManifestListWriter::MakeWriter( + format_version, snapshot_id, parent_snapshot_id, manifest_list_path, file_io_, + /*sequence_number=*/format_version >= 2 ? std::optional(sequence_number) + : std::nullopt, + /*first_row_id=*/format_version >= 3 ? std::optional(0L) : std::nullopt); + + EXPECT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + EXPECT_THAT(writer->AddAll(manifests), IsOk()); + EXPECT_THAT(writer->Close(), IsOk()); + + return manifest_list_path; + } + + /// \brief Extract file paths from scan tasks. + static std::vector GetPaths( + const std::vector>& tasks) { + return tasks | std::views::transform([](const auto& task) { + return task->data_file()->file_path; + }) | + std::ranges::to>(); + } + + /// \brief Create table metadata with the given snapshots. + std::shared_ptr MakeTableMetadata( + const std::vector>& snapshots, + int64_t current_snapshot_id, + const std::unordered_map>& refs = {}, + std::shared_ptr default_spec = nullptr) { + TimePointMs timestamp_ms = TimePointMsFromUnixMs(1609459200000L); + int64_t last_seq = snapshots.empty() ? 0L : snapshots.back()->sequence_number; + auto effective_spec = default_spec ? default_spec : unpartitioned_spec_; + + return std::make_shared(TableMetadata{ + .format_version = GetParam(), + .table_uuid = "test-table-uuid", + .location = "/tmp/table", + .last_sequence_number = last_seq, + .last_updated_ms = timestamp_ms, + .last_column_id = 2, + .schemas = {schema_}, + .current_schema_id = schema_->schema_id(), + .partition_specs = {partitioned_spec_, unpartitioned_spec_}, + .default_spec_id = effective_spec->spec_id(), + .last_partition_id = 1000, + .current_snapshot_id = current_snapshot_id, + .snapshots = snapshots, + .snapshot_log = {}, + .default_sort_order_id = 0, + .refs = refs, + }); + } + + /// \brief Create a data file with optional partition values. + std::shared_ptr MakeDataFile( + const std::string& path, + PartitionValues partition = PartitionValues(std::vector{}), + std::shared_ptr spec = nullptr, int64_t record_count = 1) { + auto effective_spec = spec ? spec : unpartitioned_spec_; + return std::make_shared(DataFile{ + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = std::move(partition), + .record_count = record_count, + .file_size_in_bytes = 10, + .sort_order_id = 0, + .partition_spec_id = effective_spec->spec_id(), + }); + } + + /// \brief Create an append snapshot with the given files (string paths). + std::shared_ptr MakeAppendSnapshot( + int8_t format_version, int64_t snapshot_id, + std::optional parent_snapshot_id, int64_t sequence_number, + const std::vector& added_files, + std::shared_ptr spec = nullptr) { + std::vector> files_with_partitions; + for (const auto& path : added_files) { + files_with_partitions.emplace_back(path, kEmptyPartition); + } + return MakeAppendSnapshotWithPartitionValues(format_version, snapshot_id, + parent_snapshot_id, sequence_number, + files_with_partitions, spec); + } + + /// \brief Create an append snapshot with the given files (with partition values). + std::shared_ptr MakeAppendSnapshotWithPartitionValues( + int8_t format_version, int64_t snapshot_id, + std::optional parent_snapshot_id, int64_t sequence_number, + const std::vector>& added_files, + std::shared_ptr spec = nullptr) { + auto effective_spec = spec ? spec : unpartitioned_spec_; + std::vector entries; + entries.reserve(added_files.size()); + for (const auto& [path, partition] : added_files) { + auto file = MakeDataFile(path, partition, effective_spec); + entries.push_back( + MakeEntry(ManifestStatus::kAdded, snapshot_id, sequence_number, file)); + } + + auto manifest = WriteDataManifest(format_version, snapshot_id, std::move(entries), + effective_spec); + int64_t parent_id = parent_snapshot_id.value_or(0L); + auto manifest_list = WriteManifestList(format_version, snapshot_id, parent_id, + sequence_number, {manifest}); + TimePointMs timestamp_ms = + TimePointMsFromUnixMs(1609459200000L + sequence_number * 1000); + return std::make_shared(Snapshot{ + .snapshot_id = snapshot_id, + .parent_snapshot_id = parent_snapshot_id, + .sequence_number = sequence_number, + .timestamp_ms = timestamp_ms, + .manifest_list = manifest_list, + .summary = {{"operation", "append"}}, + .schema_id = schema_->schema_id(), + }); + } + + std::shared_ptr file_io_; + std::shared_ptr schema_; + std::shared_ptr partitioned_spec_; + std::shared_ptr unpartitioned_spec_; + + private: + int manifest_counter_ = 0; + int manifest_list_counter_ = 0; + constexpr static PartitionValues kEmptyPartition{}; +}; + +} // namespace iceberg diff --git a/src/iceberg/test/table_scan_test.cc b/src/iceberg/test/table_scan_test.cc index d9fc1e927..e4a3d21f4 100644 --- a/src/iceberg/test/table_scan_test.cc +++ b/src/iceberg/test/table_scan_test.cc @@ -17,53 +17,26 @@ * under the License. */ -#include "iceberg/table_scan.h" - -#include -#include #include #include #include +#include #include #include #include -#include "iceberg/arrow/arrow_file_io.h" -#include "iceberg/avro/avro_register.h" #include "iceberg/expression/expressions.h" -#include "iceberg/manifest/manifest_entry.h" -#include "iceberg/manifest/manifest_list.h" -#include "iceberg/manifest/manifest_reader.h" -#include "iceberg/manifest/manifest_writer.h" -#include "iceberg/partition_spec.h" -#include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" -#include "iceberg/test/matchers.h" -#include "iceberg/transform.h" -#include "iceberg/type.h" -#include "iceberg/util/timepoint.h" +#include "iceberg/test/scan_test_base.h" namespace iceberg { -class TableScanTest : public testing::TestWithParam { +class TableScanTest : public ScanTestBase { protected: void SetUp() override { - avro::RegisterAll(); - - file_io_ = arrow::MakeMockFileIO(); - schema_ = std::make_shared(std::vector{ - SchemaField::MakeRequired(/*field_id=*/1, "id", int32()), - SchemaField::MakeRequired(/*field_id=*/2, "data", string())}); - unpartitioned_spec_ = PartitionSpec::Unpartitioned(); - - ICEBERG_UNWRAP_OR_FAIL( - partitioned_spec_, - PartitionSpec::Make( - /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000, - "data_bucket_16_2", Transform::Bucket(16))})); - + ScanTestBase::SetUp(); MakeTableMetadata(); } @@ -134,12 +107,6 @@ class TableScanTest : public testing::TestWithParam { }); } - std::string MakeManifestPath() { - static int counter = 0; - return std::format("manifest-{}-{}.avro", counter++, - std::chrono::system_clock::now().time_since_epoch().count()); - } - std::shared_ptr MakeDataFile(const std::string& path, const PartitionValues& partition, int32_t spec_id, int64_t record_count = 1, @@ -164,84 +131,12 @@ class TableScanTest : public testing::TestWithParam { return file; } - ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id, - int64_t sequence_number, std::shared_ptr file) { - return ManifestEntry{ - .status = status, - .snapshot_id = snapshot_id, - .sequence_number = sequence_number, - .file_sequence_number = sequence_number, - .data_file = std::move(file), - }; - } - - ManifestFile WriteDataManifest(int8_t format_version, int64_t snapshot_id, - std::vector entries, - std::shared_ptr spec) { - const std::string manifest_path = MakeManifestPath(); - auto writer_result = ManifestWriter::MakeWriter( - format_version, snapshot_id, manifest_path, file_io_, spec, schema_, - ManifestContent::kData, - /*first_row_id=*/format_version >= 3 ? std::optional(0L) : std::nullopt); - - EXPECT_THAT(writer_result, IsOk()); - auto writer = std::move(writer_result.value()); - - for (const auto& entry : entries) { - EXPECT_THAT(writer->WriteEntry(entry), IsOk()); - } - - EXPECT_THAT(writer->Close(), IsOk()); - auto manifest_result = writer->ToManifestFile(); - EXPECT_THAT(manifest_result, IsOk()); - return std::move(manifest_result.value()); - } - - ManifestFile WriteDeleteManifest(int8_t format_version, int64_t snapshot_id, - std::vector entries, - std::shared_ptr spec) { - const std::string manifest_path = MakeManifestPath(); - auto writer_result = - ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path, file_io_, - spec, schema_, ManifestContent::kDeletes); - - EXPECT_THAT(writer_result, IsOk()); - auto writer = std::move(writer_result.value()); - - for (const auto& entry : entries) { - EXPECT_THAT(writer->WriteEntry(entry), IsOk()); - } - - EXPECT_THAT(writer->Close(), IsOk()); - auto manifest_result = writer->ToManifestFile(); - EXPECT_THAT(manifest_result, IsOk()); - return std::move(manifest_result.value()); - } - - std::string MakeManifestListPath() { - static int counter = 0; - return std::format("manifest-list-{}-{}.avro", counter++, - std::chrono::system_clock::now().time_since_epoch().count()); - } - std::string WriteManifestList(int8_t format_version, int64_t snapshot_id, int64_t sequence_number, const std::vector& manifests) { - const std::string manifest_list_path = MakeManifestListPath(); - constexpr int64_t kParentSnapshotId = 0L; - - auto writer_result = ManifestListWriter::MakeWriter( - format_version, snapshot_id, kParentSnapshotId, manifest_list_path, file_io_, - /*sequence_number=*/format_version >= 2 ? std::optional(sequence_number) - : std::nullopt, - /*first_row_id=*/format_version >= 3 ? std::optional(0L) : std::nullopt); - - EXPECT_THAT(writer_result, IsOk()); - auto writer = std::move(writer_result.value()); - EXPECT_THAT(writer->AddAll(manifests), IsOk()); - EXPECT_THAT(writer->Close(), IsOk()); - - return manifest_list_path; + return ScanTestBase::WriteManifestList(format_version, snapshot_id, + 0L /*parent_snapshot_id*/, sequence_number, + manifests); } std::unordered_map> GetSpecsById() { @@ -249,18 +144,6 @@ class TableScanTest : public testing::TestWithParam { {unpartitioned_spec_->spec_id(), unpartitioned_spec_}}; } - static std::vector GetPaths( - const std::vector>& tasks) { - return tasks | std::views::transform([](const auto& task) { - return task->data_file()->file_path; - }) | - std::ranges::to>(); - } - - std::shared_ptr file_io_; - std::shared_ptr schema_; - std::shared_ptr partitioned_spec_; - std::shared_ptr unpartitioned_spec_; std::shared_ptr table_metadata_; };