diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc index 220b8585c..8af717b25 100644 --- a/src/iceberg/manifest/manifest_group.cc +++ b/src/iceberg/manifest/manifest_group.cc @@ -262,14 +262,8 @@ Result> ManifestGroup::Entries() { Result> ManifestGroup::MakeReader( const ManifestFile& manifest) { - auto spec_it = specs_by_id_.find(manifest.partition_spec_id); - if (spec_it == specs_by_id_.end()) { - return InvalidArgument("Partition spec {} not found for manifest {}", - manifest.partition_spec_id, manifest.manifest_path); - } - ICEBERG_ASSIGN_OR_RAISE(auto reader, - ManifestReader::Make(manifest, io_, schema_, spec_it->second)); + ManifestReader::Make(manifest, io_, schema_, specs_by_id_)); reader->FilterRows(data_filter_) .FilterPartitions(partition_filter_) diff --git a/src/iceberg/manifest/manifest_reader.cc b/src/iceberg/manifest/manifest_reader.cc index 53100b236..7747e2be3 100644 --- a/src/iceberg/manifest/manifest_reader.cc +++ b/src/iceberg/manifest/manifest_reader.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -998,6 +999,19 @@ Result> ManifestReader::Make( manifest.first_row_id); } +Result> ManifestReader::Make( + const ManifestFile& manifest, std::shared_ptr file_io, + std::shared_ptr schema, + const std::unordered_map>& specs_by_id) { + auto spec_it = specs_by_id.find(manifest.partition_spec_id); + if (spec_it == specs_by_id.end() || spec_it->second == nullptr) { + return InvalidArgument("Partition spec {} not found for manifest {}", + manifest.partition_spec_id, manifest.manifest_path); + } + auto spec = spec_it->second; + return Make(manifest, std::move(file_io), std::move(schema), std::move(spec)); +} + Result> ManifestReader::Make( std::string_view manifest_location, std::optional manifest_length, std::shared_ptr file_io, std::shared_ptr schema, diff --git a/src/iceberg/manifest/manifest_reader.h b/src/iceberg/manifest/manifest_reader.h index 1a1420216..42c56e1c2 100644 --- a/src/iceberg/manifest/manifest_reader.h +++ b/src/iceberg/manifest/manifest_reader.h @@ -92,6 +92,17 @@ class ICEBERG_EXPORT ManifestReader { const ManifestFile& manifest, std::shared_ptr file_io, std::shared_ptr schema, std::shared_ptr spec); + /// \brief Creates a reader for a manifest file using specs keyed by ID. + /// \param manifest A ManifestFile object containing metadata about the manifest. + /// \param file_io File IO implementation to use. + /// \param schema Schema used to bind the partition type. + /// \param specs_by_id Mapping of partition spec ID to PartitionSpec. + /// \return A Result containing the reader or an error. + static Result> Make( + const ManifestFile& manifest, std::shared_ptr file_io, + std::shared_ptr schema, + const std::unordered_map>& specs_by_id); + /// \brief Creates a reader for a manifest file. /// \param manifest_location Path to the manifest file. /// \param manifest_length Length of the manifest file. diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index 4dcc72d6c..3a99b0009 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -28,6 +28,9 @@ #include "iceberg/avro/avro_register.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" #include "iceberg/statistics_file.h" #include "iceberg/table_metadata.h" #include "iceberg/test/matchers.h" @@ -135,6 +138,13 @@ class ExpireSnapshotsCleanupTest : public UpdateTestBase { return manifest_result.value(); } + ManifestFile AssignManifestSequenceNumber(ManifestFile manifest, + int64_t sequence_number) const { + manifest.sequence_number = sequence_number; + manifest.min_sequence_number = sequence_number; + return manifest; + } + ManifestFile WriteDeleteManifest(const std::string& path, int64_t snapshot_id, std::vector entries) { auto writer_result = ManifestWriter::MakeWriter( @@ -227,6 +237,15 @@ TEST_F(ExpireSnapshotsTest, ExpireById) { EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004); } +TEST_F(ExpireSnapshotsTest, ExpireByIdOverridesRetainLast) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->RetainLast(2); + update->ExpireSnapshotId(3051729675574597004); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_THAT(result.snapshot_ids_to_remove, testing::ElementsAre(3051729675574597004)); +} + TEST_F(ExpireSnapshotsTest, ExpireOlderThan) { struct TestCase { int64_t expire_older_than; @@ -243,6 +262,30 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) { } } +TEST_F(ExpireSnapshotsCleanupTest, RetainsUnreferencedSnapshotAtExpireThreshold) { + const int64_t unreferenced_snapshot_id = 4055729675574597004; + const int64_t expire_at_ms = 1515100955770; + + auto metadata = ReloadMetadata(); + metadata->snapshots.push_back(std::make_shared(Snapshot{ + .snapshot_id = unreferenced_snapshot_id, + .parent_snapshot_id = std::nullopt, + .sequence_number = 2, + .timestamp_ms = TimePointMsFromUnixMs(expire_at_ms), + .manifest_list = table_location_ + "/metadata/unreferenced.avro", + .summary = {{SnapshotSummaryFields::kOperation, "append"}}, + .schema_id = metadata->current_schema_id, + })); + RewriteTable(std::move(metadata)); + + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireOlderThan(expire_at_ms); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_THAT(result.snapshot_ids_to_remove, + testing::Not(testing::Contains(unreferenced_snapshot_id))); +} + TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); @@ -350,6 +393,8 @@ TEST_F(ExpireSnapshotsCleanupTest, IgnoresExpiredDeleteManifestReadFailures) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + // Force the reachable path. + update->ExpireSnapshotId(kExpiredSnapshotId); update->DeleteWith( [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); @@ -388,6 +433,7 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireSnapshotId(kExpiredSnapshotId); update->DeleteWith( [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); @@ -573,4 +619,216 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedPartitionStats) { EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path))); } +TEST_F(ExpireSnapshotsCleanupTest, IncrementalDispatchPreservesAncestorAddedFiles) { + const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; + const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-manifest-list.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-manifest-list.avro"; + + auto expired_data_manifest = WriteDataManifest( + expired_data_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(expired_data_file_path))}); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, + {expired_data_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {}); + RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_THAT(deleted_files, testing::Contains(expired_data_manifest_path)); + EXPECT_THAT(deleted_files, testing::Contains(expired_manifest_list_path)); + EXPECT_THAT(deleted_files, testing::Not(testing::Contains(expired_data_file_path))); +} + +TEST_F(ExpireSnapshotsCleanupTest, IncrementalDeletesExpiredDeletedEntries) { + const auto deleted_data_file_path = + table_location_ + "/data/deleted-by-expired.parquet"; + const auto delete_manifest_path = + table_location_ + "/metadata/expired-delete-entry.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-deleted-entry-ml.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-deleted-entry-ml.avro"; + + auto delete_manifest = WriteDataManifest( + delete_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kDeleted, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(deleted_data_file_path))}); + delete_manifest = + AssignManifestSequenceNumber(std::move(delete_manifest), kExpiredSequenceNumber); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, {delete_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {delete_manifest}); + RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_THAT(deleted_files, testing::Contains(deleted_data_file_path)); + EXPECT_THAT(deleted_files, testing::Contains(expired_manifest_list_path)); + EXPECT_THAT(deleted_files, testing::Not(testing::Contains(delete_manifest_path))); +} + +TEST_F(ExpireSnapshotsCleanupTest, ReachableDispatchDeletesUnreachableData) { + const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; + const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-manifest-list.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-manifest-list.avro"; + + auto expired_data_manifest = WriteDataManifest( + expired_data_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(expired_data_file_path))}); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, + {expired_data_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {}); + RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireSnapshotId(kExpiredSnapshotId); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, + expired_data_manifest_path, + expired_manifest_list_path)); +} + +TEST_F(ExpireSnapshotsCleanupTest, IncrementalSkipsCherryPickedSnapshotCleanup) { + const auto picked_data_file_path = table_location_ + "/data/picked-data.parquet"; + const auto picked_manifest_path = table_location_ + "/metadata/picked-data.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-picked-ml.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-picked-ml.avro"; + + auto picked_manifest = WriteDataManifest( + picked_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(picked_data_file_path))}); + picked_manifest = + AssignManifestSequenceNumber(std::move(picked_manifest), kExpiredSequenceNumber); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, {picked_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {picked_manifest}); + + auto metadata = ReloadMetadata(); + ASSERT_EQ(metadata->snapshots.size(), 2); + metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path; + metadata->snapshots.at(1)->manifest_list = current_manifest_list_path; + metadata->snapshots.at(1)->summary[SnapshotSummaryFields::kSourceSnapshotId] = + std::to_string(kExpiredSnapshotId); + RewriteTable(std::move(metadata)); + + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_TRUE(deleted_files.empty()); + auto committed_metadata = ReloadMetadata(); + EXPECT_EQ(committed_metadata->snapshots.size(), 1); + EXPECT_EQ(committed_metadata->snapshots.at(0)->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(ExpireSnapshotsCleanupTest, ReachableCleanupFailsClosedOnUnbindableExpiredSpec) { + const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; + const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-manifest-list.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-manifest-list.avro"; + + auto expired_data_manifest = WriteDataManifest( + expired_data_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(expired_data_file_path))}); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, + {expired_data_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {}); + + auto metadata = ReloadMetadata(); + ASSERT_EQ(metadata->snapshots.size(), 2); + metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path; + metadata->snapshots.at(1)->manifest_list = current_manifest_list_path; + ICEBERG_UNWRAP_OR_FAIL(auto retained_spec, PartitionSpec::Make(/*spec_id=*/1, {})); + metadata->partition_specs.push_back( + std::shared_ptr(std::move(retained_spec))); + metadata->default_spec_id = 1; + ICEBERG_UNWRAP_OR_FAIL( + auto retained_schema, + Schema::Make(std::vector{SchemaField::MakeRequired(2, "y", int64()), + SchemaField::MakeRequired(3, "z", int64())}, + /*schema_id=*/2, std::vector{})); + metadata->schemas.push_back(std::shared_ptr(std::move(retained_schema))); + metadata->current_schema_id = 2; + metadata->snapshots.at(1)->schema_id = 2; + RewriteTable(std::move(metadata)); + + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireSnapshotId(kExpiredSnapshotId); + update->CleanExpiredMetadata(true); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path, + expired_manifest_list_path)); + EXPECT_THAT(deleted_files, testing::Not(testing::Contains(expired_data_file_path))); +} + +TEST_F(ExpireSnapshotsCleanupTest, CommitIgnoresMalformedSourceSnapshotIdCleanup) { + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-malformed-ml.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-malformed-ml.avro"; + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, {}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {}); + + auto metadata = ReloadMetadata(); + ASSERT_EQ(metadata->snapshots.size(), 2); + metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path; + metadata->snapshots.at(1)->manifest_list = current_manifest_list_path; + metadata->snapshots.at(1)->summary[SnapshotSummaryFields::kSourceSnapshotId] = + "not-a-number"; + RewriteTable(std::move(metadata)); + + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_TRUE(deleted_files.empty()); + auto committed_metadata = ReloadMetadata(); + EXPECT_EQ(committed_metadata->snapshots.size(), 1); + EXPECT_EQ(committed_metadata->snapshots.at(0)->snapshot_id, kCurrentSnapshotId); +} + } // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index ce65882c9..8e109d085 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -23,14 +23,16 @@ #include #include #include -#include #include +#include #include +#include #include #include "iceberg/file_io.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_reader.h" +#include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/statistics_file.h" @@ -40,18 +42,23 @@ #include "iceberg/util/error_collector.h" #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/string_util.h" namespace iceberg { namespace { -Result> MakeManifestReader( +Result> MakeManifestReader( const ManifestFile& manifest, const std::shared_ptr& file_io, const TableMetadata& metadata) { + // TODO(gangwu): Build manifest file schemas from PartitionSpec::RawPartitionType + // with UnknownType for dropped source fields instead of requiring the table schema + // to bind every partition source field. Until then, cleanup fails closed when + // historical specs cannot bind to the metadata schema. ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); - ICEBERG_ASSIGN_OR_RAISE(auto spec, - metadata.PartitionSpecById(manifest.partition_spec_id)); - return ManifestReader::Make(manifest, file_io, std::move(schema), std::move(spec)); + TableMetadataCache metadata_cache(&metadata); + ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById()); + return ManifestReader::Make(manifest, file_io, std::move(schema), specs_by_id.get()); } /// \brief Abstract strategy for cleaning up files after snapshot expiration. @@ -67,14 +74,30 @@ class FileCleanupStrategy { /// /// \param metadata_before_expiration Table metadata before expiration. /// \param metadata_after_expiration Table metadata after expiration. - /// \param expired_snapshot_ids Snapshot IDs that were expired during this operation. /// \param level Controls which types of files are eligible for deletion. virtual Status CleanFiles(const TableMetadata& metadata_before_expiration, const TableMetadata& metadata_after_expiration, - const std::unordered_set& expired_snapshot_ids, CleanupLevel level) = 0; protected: + /// \brief Snapshot IDs present in `before` but not in `after`. + static std::unordered_set ExpiredSnapshotIds(const TableMetadata& before, + const TableMetadata& after) { + std::unordered_set after_ids; + after_ids.reserve(after.snapshots.size()); + for (const auto& s : after.snapshots) { + if (s) after_ids.insert(s->snapshot_id); + } + std::unordered_set expired; + expired.reserve(before.snapshots.size()); + for (const auto& s : before.snapshots) { + if (s && !after_ids.contains(s->snapshot_id)) { + expired.insert(s->snapshot_id); + } + } + return expired; + } + /// \brief Delete a single file void DeleteFile(const std::string& path) { try { @@ -84,11 +107,11 @@ class FileCleanupStrategy { std::ignore = file_io_->DeleteFile(path); } } catch (...) { - /// TODO(shangxinli): add retry + // TODO(shangxinli): add retry } } - /// TODO(shangxinli): Add bulk deletion + // TODO(shangxinli): Add bulk deletion void DeleteFiles(const std::unordered_set& paths) { for (const auto& path : paths) { DeleteFile(path); @@ -149,8 +172,10 @@ class ReachableFileCleanup : public FileCleanupStrategy { Status CleanFiles(const TableMetadata& metadata_before_expiration, const TableMetadata& metadata_after_expiration, - const std::unordered_set& expired_snapshot_ids, CleanupLevel level) override { + const auto expired_snapshot_ids = + ExpiredSnapshotIds(metadata_before_expiration, metadata_after_expiration); + std::unordered_set retained_snapshot_ids; for (const auto& snapshot : metadata_after_expiration.snapshots) { if (snapshot) { @@ -182,7 +207,7 @@ class ReachableFileCleanup : public FileCleanupStrategy { if (level == CleanupLevel::kAll) { // Deleting data files auto data_files_to_delete = FindDataFilesToDelete( - metadata_after_expiration, manifests_to_delete, current_manifests); + metadata_before_expiration, manifests_to_delete, current_manifests); DeleteFiles(data_files_to_delete); } @@ -195,8 +220,7 @@ class ReachableFileCleanup : public FileCleanupStrategy { DeleteFiles(manifest_lists_to_delete); // Deleting statistics files - if (HasAnyStatisticsFiles(metadata_before_expiration) || - HasAnyStatisticsFiles(metadata_after_expiration)) { + if (HasAnyStatisticsFiles(metadata_before_expiration)) { DeleteFiles( StatisticsFilesToDelete(metadata_before_expiration, metadata_after_expiration)); } @@ -262,7 +286,7 @@ class ReachableFileCleanup : public FileCleanupStrategy { "Cannot read data file paths from a delete manifest: {}", manifest.manifest_path); - /// TODO(shangxinli): optimize by only reading file paths + // TODO(shangxinli): optimize by only reading file paths ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeManifestReader(manifest, file_io_, metadata)); ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); @@ -331,6 +355,296 @@ class ReachableFileCleanup : public FileCleanupStrategy { } }; +/// \brief Incremental file cleanup strategy for simple linear-ancestry expirations. +/// +/// Only safe when: +/// * No snapshot IDs were explicitly listed for expiration. +/// * No removed snapshots lived outside the current main ancestry. +/// * No retained snapshots live outside the current main ancestry. +/// +/// Each manifest is attributed to its writer snapshot via added_snapshot_id, so +/// two snapshot passes are enough -- one over retained snapshots to learn which +/// manifests are still live, one over expired snapshots to learn which manifests, +/// manifest lists, and data files to drop. Cherry-pick protection via +/// SnapshotSummaryFields::kSourceSnapshotId prevents removing data that was +/// logically introduced by a snapshot whose changes are still present in the +/// current state under a different id. +/// +/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion support. +class IncrementalFileCleanup : public FileCleanupStrategy { + public: + using FileCleanupStrategy::FileCleanupStrategy; + + Status CleanFiles(const TableMetadata& metadata_before_expiration, + const TableMetadata& metadata_after_expiration, + CleanupLevel level) override { + const auto expired_snapshot_ids = + ExpiredSnapshotIds(metadata_before_expiration, metadata_after_expiration); + if (expired_snapshot_ids.empty()) { + return {}; + } + + std::unordered_set valid_ids; + valid_ids.reserve(metadata_after_expiration.snapshots.size()); + for (const auto& snapshot : metadata_after_expiration.snapshots) { + if (snapshot) { + valid_ids.insert(snapshot->snapshot_id); + } + } + + auto current_result = metadata_before_expiration.SnapshotById( + metadata_before_expiration.current_snapshot_id); + if (!current_result.has_value() || current_result.value() == nullptr) { + return {}; + } + + // Only delete files removed by ancestors of the current table state. + auto ancestors_result = SnapshotUtil::AncestorsOf( + current_result.value()->snapshot_id, [&metadata_before_expiration](int64_t id) { + return metadata_before_expiration.SnapshotById(id); + }); + if (!ancestors_result.has_value()) { + return {}; + } + std::unordered_set ancestor_ids; + ancestor_ids.reserve(ancestors_result.value().size()); + for (const auto& ancestor : ancestors_result.value()) { + if (ancestor) ancestor_ids.insert(ancestor->snapshot_id); + } + + // Protect snapshots whose changes were picked into the current ancestry. + std::unordered_set picked_ancestor_snapshot_ids; + picked_ancestor_snapshot_ids.reserve(ancestor_ids.size()); + for (const auto& ancestor : ancestors_result.value()) { + if (!ancestor) continue; + const auto& summary = ancestor->summary; + auto it = summary.find(SnapshotSummaryFields::kSourceSnapshotId); + if (it == summary.end()) continue; + ICEBERG_ASSIGN_OR_RAISE(auto source_id, + StringUtils::ParseNumber(it->second)); + picked_ancestor_snapshot_ids.insert(source_id); + } + + // Find manifests still referenced by a valid snapshot but written by an + // expired snapshot. Their deleted entries point at data files now safe to + // remove and become candidates for manifests_to_scan below. + std::unordered_set valid_manifests; + std::unordered_set manifests_to_scan; + manifests_to_scan.reserve(expired_snapshot_ids.size()); + for (const auto& snapshot : metadata_after_expiration.snapshots) { + if (!snapshot) continue; + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io_); + if (!manifests_result.has_value()) continue; // best-effort + auto manifests = std::move(manifests_result).value(); + for (auto& manifest : manifests) { + valid_manifests.insert(manifest.manifest_path); + + int64_t writer_id = manifest.added_snapshot_id; + bool from_valid_snapshots = valid_ids.contains(writer_id); + bool is_from_ancestor = ancestor_ids.contains(writer_id); + bool is_picked = picked_ancestor_snapshot_ids.contains(writer_id); + if (!from_valid_snapshots && (is_from_ancestor || is_picked) && + manifest.has_deleted_files()) { + manifests_to_scan.insert(std::move(manifest)); + } + } + } + + // Find manifests that were only referenced by snapshots that have expired, + // and split them by what kind of cleanup they need: + // - manifests_to_delete: not referenced by any retained snapshot; + // - manifests_to_scan: from a current-state ancestor and has deleted + // entries (data files now safe to drop); + // - manifests_to_revert: written by an expiring non-ancestor snapshot + // and contains added entries -- those data files were never adopted. + std::unordered_set manifest_lists_to_delete; + manifest_lists_to_delete.reserve(expired_snapshot_ids.size()); + std::unordered_set manifests_to_delete; + manifests_to_delete.reserve(expired_snapshot_ids.size()); + std::unordered_set manifests_to_revert; + manifests_to_revert.reserve(expired_snapshot_ids.size()); + for (const auto& snapshot : metadata_before_expiration.snapshots) { + if (!snapshot) continue; + int64_t snapshot_id = snapshot->snapshot_id; + if (valid_ids.contains(snapshot_id)) continue; + + // Skip cherry-picked snapshots; the picked snapshot owns its cleanup. + if (picked_ancestor_snapshot_ids.contains(snapshot_id)) { + continue; + } + + int64_t source_snapshot_id = -1; + auto src_it = snapshot->summary.find(SnapshotSummaryFields::kSourceSnapshotId); + if (src_it != snapshot->summary.end()) { + auto source_snapshot_id_result = + StringUtils::ParseNumber(src_it->second); + if (!source_snapshot_id_result.has_value()) { + continue; + } + source_snapshot_id = source_snapshot_id_result.value(); + } + // If this commit was cherry-picked from a still-live snapshot, skip it. + if (ancestor_ids.contains(source_snapshot_id) || + picked_ancestor_snapshot_ids.contains(source_snapshot_id)) { + continue; + } + + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io_); + if (!manifests_result.has_value()) { + continue; + } + + auto manifests = std::move(manifests_result).value(); + for (auto& manifest : manifests) { + if (valid_manifests.contains(manifest.manifest_path)) continue; + manifests_to_delete.insert(manifest.manifest_path); + + int64_t writer_id = manifest.added_snapshot_id; + bool is_from_ancestor = ancestor_ids.contains(writer_id); + bool is_from_expiring_snapshot = expired_snapshot_ids.contains(writer_id); + + if (is_from_ancestor && manifest.has_deleted_files()) { + manifests_to_scan.insert(std::move(manifest)); + } else if (!is_from_ancestor && is_from_expiring_snapshot && + manifest.has_added_files()) { + // The writer must be known-expired so missing history cannot make + // an ancestor look like a reverted snapshot. + manifests_to_revert.insert(std::move(manifest)); + } + } + if (!snapshot->manifest_list.empty()) { + manifest_lists_to_delete.insert(snapshot->manifest_list); + } + } + + // Deleting data files + if (level == CleanupLevel::kAll) { + // Manifests may reference partition specs that were pruned during expiration + // when CleanExpiredMetadata is enabled, so resolve schemas/specs against the + // pre-expiration metadata. + auto files_to_delete = FindFilesToDelete( + metadata_before_expiration, manifests_to_scan, manifests_to_revert, valid_ids); + DeleteFiles(files_to_delete); + } + + // Deleting manifest files + DeleteFiles(manifests_to_delete); + + // Deleting manifest-list files + DeleteFiles(manifest_lists_to_delete); + + // Deleting statistics files + if (HasAnyStatisticsFiles(metadata_before_expiration)) { + DeleteFiles( + StatisticsFilesToDelete(metadata_before_expiration, metadata_after_expiration)); + } + + return {}; + } + + private: + /// \brief Resolve the data files that the incremental pass identified for deletion. + /// + /// For manifests_to_scan, read DELETED entries whose snapshot id is no longer valid. + /// For manifests_to_revert, read every ADDED entry. + std::unordered_set FindFilesToDelete( + const TableMetadata& metadata, + const std::unordered_set& manifests_to_scan, + const std::unordered_set& manifests_to_revert, + const std::unordered_set& valid_ids) { + std::unordered_set files_to_delete; + + for (const auto& manifest : manifests_to_scan) { + auto reader_result = MakeManifestReader(manifest, file_io_, metadata); + if (!reader_result.has_value()) continue; + auto entries_result = reader_result.value()->Entries(); + if (!entries_result.has_value()) continue; + for (const auto& entry : entries_result.value()) { + if (entry.status == ManifestStatus::kDeleted && entry.snapshot_id.has_value() && + !valid_ids.contains(entry.snapshot_id.value()) && entry.data_file) { + files_to_delete.insert(entry.data_file->file_path); + } + } + } + + for (const auto& manifest : manifests_to_revert) { + auto reader_result = MakeManifestReader(manifest, file_io_, metadata); + if (!reader_result.has_value()) continue; + auto entries_result = reader_result.value()->Entries(); + if (!entries_result.has_value()) continue; + for (const auto& entry : entries_result.value()) { + if (entry.status == ManifestStatus::kAdded && entry.data_file) { + files_to_delete.insert(entry.data_file->file_path); + } + } + } + + return files_to_delete; + } +}; + +/// \brief True if any retained snapshot sits outside the current main ancestry. +bool HasNonMainSnapshots(const TableMetadata& metadata) { + auto current_result = metadata.SnapshotById(metadata.current_snapshot_id); + if (!current_result.has_value() || current_result.value() == nullptr) { + return !metadata.snapshots.empty(); + } + auto ancestors_result = SnapshotUtil::AncestorsOf( + current_result.value()->snapshot_id, + [&metadata](int64_t id) { return metadata.SnapshotById(id); }); + if (!ancestors_result.has_value()) { + return true; + } + std::unordered_set main_ancestors; + for (const auto& a : ancestors_result.value()) { + if (a) main_ancestors.insert(a->snapshot_id); + } + for (const auto& snapshot : metadata.snapshots) { + if (snapshot && !main_ancestors.contains(snapshot->snapshot_id)) { + return true; + } + } + return false; +} + +/// \brief True if any expired snapshot lived outside the current main ancestry. +/// +/// When `before` has no current snapshot, the main-ancestor set is empty; any +/// removed snapshot then counts as "non-main" and returns true. This guards the +/// dispatch in Finalize() against picking incremental cleanup when the before-state +/// has snapshots but no current pointer. +bool HasRemovedNonMainAncestors(const TableMetadata& before, const TableMetadata& after) { + std::unordered_set main_ancestors; + auto current_result = before.SnapshotById(before.current_snapshot_id); + if (current_result.has_value() && current_result.value() != nullptr) { + auto ancestors_result = SnapshotUtil::AncestorsOf( + current_result.value()->snapshot_id, + [&before](int64_t id) { return before.SnapshotById(id); }); + if (!ancestors_result.has_value()) { + return true; + } + for (const auto& a : ancestors_result.value()) { + if (a) main_ancestors.insert(a->snapshot_id); + } + } + std::unordered_set after_ids; + after_ids.reserve(after.snapshots.size()); + for (const auto& s : after.snapshots) { + if (s) after_ids.insert(s->snapshot_id); + } + for (const auto& snapshot : before.snapshots) { + if (!snapshot) continue; + bool removed = !after_ids.contains(snapshot->snapshot_id); + bool in_main = main_ancestors.contains(snapshot->snapshot_id); + if (removed && !in_main) { + return true; + } + } + return false; +} + } // namespace Result> ExpireSnapshots::Make( @@ -464,7 +778,7 @@ Result> ExpireSnapshots::UnreferencedSnapshotIdsToRe for (const auto& snapshot : base().snapshots) { ICEBERG_DCHECK(snapshot != nullptr, "Snapshot is null"); if (!referenced_ids.contains(snapshot->snapshot_id) && - snapshot->timestamp_ms > default_expire_older_than_) { + snapshot->timestamp_ms >= default_expire_older_than_) { // unreferenced and not old enough to be expired ids_to_retain.insert(snapshot->snapshot_id); } @@ -528,6 +842,8 @@ Result ExpireSnapshots::Apply() { ICEBERG_PRECHECK(!retained_id_to_refs.contains(id), "Cannot expire {}. Still referenced by refs", id); } + std::unordered_set explicit_snapshot_ids(snapshot_ids_to_expire_.begin(), + snapshot_ids_to_expire_.end()); ICEBERG_ASSIGN_OR_RAISE(auto all_branch_snapshot_ids, ComputeAllBranchSnapshotIdsToRetain(retained_refs)); ICEBERG_ASSIGN_OR_RAISE(auto unreferenced_snapshot_ids, @@ -544,8 +860,10 @@ Result ExpireSnapshots::Apply() { result.refs_to_remove.push_back(key_to_ref.first); } }); - std::ranges::for_each(base.snapshots, [&ids_to_retain, &result](const auto& snapshot) { - if (snapshot && !ids_to_retain.contains(snapshot->snapshot_id)) { + std::ranges::for_each(base.snapshots, [&explicit_snapshot_ids, &ids_to_retain, + &result](const auto& snapshot) { + if (snapshot && (explicit_snapshot_ids.contains(snapshot->snapshot_id) || + !ids_to_retain.contains(snapshot->snapshot_id))) { result.snapshot_ids_to_remove.push_back(snapshot->snapshot_id); } }); @@ -608,16 +926,24 @@ Status ExpireSnapshots::Finalize(Result commit_result) { auto metadata_before_expiration_ptr = apply_result_->metadata_before_expiration; const TableMetadata& metadata_before_expiration = *metadata_before_expiration_ptr; const TableMetadata& metadata_after_expiration = *commit_result.value(); - std::unordered_set expired_ids(apply_result_->snapshot_ids_to_remove.begin(), - apply_result_->snapshot_ids_to_remove.end()); apply_result_.reset(); - // File cleanup is best-effort: log and continue on individual file deletion failures - ReachableFileCleanup strategy(ctx_->table->io(), delete_func_); - return strategy.CleanFiles(metadata_before_expiration, metadata_after_expiration, - expired_ids, cleanup_level_); + // Pick incremental cleanup when the expiration is a simple linear-ancestry walk: + // no explicit snapshot IDs, no removed snapshots outside main ancestry, and no + // retained snapshots outside main ancestry. + const bool can_use_incremental = + !specified_snapshot_id_ && + !HasRemovedNonMainAncestors(metadata_before_expiration, + metadata_after_expiration) && + !HasNonMainSnapshots(metadata_after_expiration); + + if (can_use_incremental) { + return IncrementalFileCleanup(ctx_->table->io(), delete_func_) + .CleanFiles(metadata_before_expiration, metadata_after_expiration, + cleanup_level_); + } + return ReachableFileCleanup(ctx_->table->io(), delete_func_) + .CleanFiles(metadata_before_expiration, metadata_after_expiration, cleanup_level_); } -// TODO(shangxinli): add IncrementalFileCleanup strategy for linear ancestry optimization. - } // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index 7c1588aa5..a5b6e3b32 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include