Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions src/iceberg/manifest/manifest_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,8 @@ Result<std::vector<ManifestEntry>> ManifestGroup::Entries() {

Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(
const ManifestFile& manifest) {
auto spec_it = specs_by_id_.find(manifest.partition_spec_id);
if (spec_it == specs_by_id_.end()) {
return InvalidArgument("Partition spec {} not found for manifest {}",
manifest.partition_spec_id, manifest.manifest_path);
}

ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, io_, schema_, spec_it->second));
ManifestReader::Make(manifest, io_, schema_, specs_by_id_));

reader->FilterRows(data_filter_)
.FilterPartitions(partition_filter_)
Expand Down
14 changes: 14 additions & 0 deletions src/iceberg/manifest/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <algorithm>
#include <memory>
#include <optional>
#include <ranges>
#include <type_traits>
#include <unordered_set>
Expand Down Expand Up @@ -998,6 +999,19 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
manifest.first_row_id);
}

Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> schema,
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& specs_by_id) {
auto spec_it = specs_by_id.find(manifest.partition_spec_id);
if (spec_it == specs_by_id.end() || spec_it->second == nullptr) {
return InvalidArgument("Partition spec {} not found for manifest {}",
manifest.partition_spec_id, manifest.manifest_path);
}
auto spec = spec_it->second;
return Make(manifest, std::move(file_io), std::move(schema), std::move(spec));
}

Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
std::string_view manifest_location, std::optional<int64_t> manifest_length,
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> schema,
Expand Down
11 changes: 11 additions & 0 deletions src/iceberg/manifest/manifest_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ class ICEBERG_EXPORT ManifestReader {
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec);

/// \brief Creates a reader for a manifest file using specs keyed by ID.
/// \param manifest A ManifestFile object containing metadata about the manifest.
/// \param file_io File IO implementation to use.
/// \param schema Schema used to bind the partition type.
/// \param specs_by_id Mapping of partition spec ID to PartitionSpec.
/// \return A Result containing the reader or an error.
static Result<std::unique_ptr<ManifestReader>> Make(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> schema,
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& specs_by_id);

/// \brief Creates a reader for a manifest file.
/// \param manifest_location Path to the manifest file.
/// \param manifest_length Length of the manifest file.
Expand Down
258 changes: 258 additions & 0 deletions src/iceberg/test/expire_snapshots_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
#include "iceberg/avro/avro_register.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_writer.h"
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/statistics_file.h"
#include "iceberg/table_metadata.h"
#include "iceberg/test/matchers.h"
Expand Down Expand Up @@ -135,6 +138,13 @@ class ExpireSnapshotsCleanupTest : public UpdateTestBase {
return manifest_result.value();
}

ManifestFile AssignManifestSequenceNumber(ManifestFile manifest,
int64_t sequence_number) const {
manifest.sequence_number = sequence_number;
manifest.min_sequence_number = sequence_number;
return manifest;
}

ManifestFile WriteDeleteManifest(const std::string& path, int64_t snapshot_id,
std::vector<ManifestEntry> entries) {
auto writer_result = ManifestWriter::MakeWriter(
Expand Down Expand Up @@ -227,6 +237,15 @@ TEST_F(ExpireSnapshotsTest, ExpireById) {
EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
}

TEST_F(ExpireSnapshotsTest, ExpireByIdOverridesRetainLast) {
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->RetainLast(2);
update->ExpireSnapshotId(3051729675574597004);

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_THAT(result.snapshot_ids_to_remove, testing::ElementsAre(3051729675574597004));
}

TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
struct TestCase {
int64_t expire_older_than;
Expand All @@ -243,6 +262,30 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
}
}

TEST_F(ExpireSnapshotsCleanupTest, RetainsUnreferencedSnapshotAtExpireThreshold) {
const int64_t unreferenced_snapshot_id = 4055729675574597004;
const int64_t expire_at_ms = 1515100955770;

auto metadata = ReloadMetadata();
metadata->snapshots.push_back(std::make_shared<Snapshot>(Snapshot{
.snapshot_id = unreferenced_snapshot_id,
.parent_snapshot_id = std::nullopt,
.sequence_number = 2,
.timestamp_ms = TimePointMsFromUnixMs(expire_at_ms),
.manifest_list = table_location_ + "/metadata/unreferenced.avro",
.summary = {{SnapshotSummaryFields::kOperation, "append"}},
.schema_id = metadata->current_schema_id,
}));
RewriteTable(std::move(metadata));

ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->ExpireOlderThan(expire_at_ms);

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_THAT(result.snapshot_ids_to_remove,
testing::Not(testing::Contains(unreferenced_snapshot_id)));
}

TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
Expand Down Expand Up @@ -350,6 +393,8 @@ TEST_F(ExpireSnapshotsCleanupTest, IgnoresExpiredDeleteManifestReadFailures) {

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
// Force the reachable path.
update->ExpireSnapshotId(kExpiredSnapshotId);
Comment thread
wgtmac marked this conversation as resolved.
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

Expand Down Expand Up @@ -388,6 +433,7 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) {

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->ExpireSnapshotId(kExpiredSnapshotId);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

Expand Down Expand Up @@ -573,4 +619,216 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedPartitionStats) {
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path)));
}

TEST_F(ExpireSnapshotsCleanupTest, IncrementalDispatchPreservesAncestorAddedFiles) {
const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet";
const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro";
const auto expired_manifest_list_path =
table_location_ + "/metadata/expired-manifest-list.avro";
const auto current_manifest_list_path =
table_location_ + "/metadata/current-manifest-list.avro";

auto expired_data_manifest = WriteDataManifest(
expired_data_manifest_path, kExpiredSnapshotId,
{MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber,
MakeDataFile(expired_data_file_path))});
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
/*parent_snapshot_id=*/0, kExpiredSequenceNumber,
{expired_data_manifest});
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
kCurrentSequenceNumber, {});
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Contains(expired_data_manifest_path));
EXPECT_THAT(deleted_files, testing::Contains(expired_manifest_list_path));
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(expired_data_file_path)));
}

TEST_F(ExpireSnapshotsCleanupTest, IncrementalDeletesExpiredDeletedEntries) {
const auto deleted_data_file_path =
table_location_ + "/data/deleted-by-expired.parquet";
const auto delete_manifest_path =
table_location_ + "/metadata/expired-delete-entry.avro";
const auto expired_manifest_list_path =
table_location_ + "/metadata/expired-deleted-entry-ml.avro";
const auto current_manifest_list_path =
table_location_ + "/metadata/current-deleted-entry-ml.avro";

auto delete_manifest = WriteDataManifest(
delete_manifest_path, kExpiredSnapshotId,
{MakeEntry(ManifestStatus::kDeleted, kExpiredSnapshotId, kExpiredSequenceNumber,
MakeDataFile(deleted_data_file_path))});
delete_manifest =
AssignManifestSequenceNumber(std::move(delete_manifest), kExpiredSequenceNumber);
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
/*parent_snapshot_id=*/0, kExpiredSequenceNumber, {delete_manifest});
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
kCurrentSequenceNumber, {delete_manifest});
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Contains(deleted_data_file_path));
EXPECT_THAT(deleted_files, testing::Contains(expired_manifest_list_path));
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(delete_manifest_path)));
}

TEST_F(ExpireSnapshotsCleanupTest, ReachableDispatchDeletesUnreachableData) {
const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet";
const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro";
const auto expired_manifest_list_path =
table_location_ + "/metadata/expired-manifest-list.avro";
const auto current_manifest_list_path =
table_location_ + "/metadata/current-manifest-list.avro";

auto expired_data_manifest = WriteDataManifest(
expired_data_manifest_path, kExpiredSnapshotId,
{MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber,
MakeDataFile(expired_data_file_path))});
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
/*parent_snapshot_id=*/0, kExpiredSequenceNumber,
{expired_data_manifest});
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
kCurrentSequenceNumber, {});
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->ExpireSnapshotId(kExpiredSnapshotId);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path,
expired_data_manifest_path,
expired_manifest_list_path));
}

TEST_F(ExpireSnapshotsCleanupTest, IncrementalSkipsCherryPickedSnapshotCleanup) {
const auto picked_data_file_path = table_location_ + "/data/picked-data.parquet";
const auto picked_manifest_path = table_location_ + "/metadata/picked-data.avro";
const auto expired_manifest_list_path =
table_location_ + "/metadata/expired-picked-ml.avro";
const auto current_manifest_list_path =
table_location_ + "/metadata/current-picked-ml.avro";

auto picked_manifest = WriteDataManifest(
picked_manifest_path, kExpiredSnapshotId,
{MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber,
MakeDataFile(picked_data_file_path))});
picked_manifest =
AssignManifestSequenceNumber(std::move(picked_manifest), kExpiredSequenceNumber);
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
/*parent_snapshot_id=*/0, kExpiredSequenceNumber, {picked_manifest});
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
kCurrentSequenceNumber, {picked_manifest});

auto metadata = ReloadMetadata();
ASSERT_EQ(metadata->snapshots.size(), 2);
metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path;
metadata->snapshots.at(1)->manifest_list = current_manifest_list_path;
metadata->snapshots.at(1)->summary[SnapshotSummaryFields::kSourceSnapshotId] =
std::to_string(kExpiredSnapshotId);
RewriteTable(std::move(metadata));

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_TRUE(deleted_files.empty());
auto committed_metadata = ReloadMetadata();
EXPECT_EQ(committed_metadata->snapshots.size(), 1);
EXPECT_EQ(committed_metadata->snapshots.at(0)->snapshot_id, kCurrentSnapshotId);
}

TEST_F(ExpireSnapshotsCleanupTest, ReachableCleanupFailsClosedOnUnbindableExpiredSpec) {
const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet";
const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro";
const auto expired_manifest_list_path =
table_location_ + "/metadata/expired-manifest-list.avro";
const auto current_manifest_list_path =
table_location_ + "/metadata/current-manifest-list.avro";

auto expired_data_manifest = WriteDataManifest(
expired_data_manifest_path, kExpiredSnapshotId,
{MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber,
MakeDataFile(expired_data_file_path))});
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
/*parent_snapshot_id=*/0, kExpiredSequenceNumber,
{expired_data_manifest});
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
kCurrentSequenceNumber, {});

auto metadata = ReloadMetadata();
ASSERT_EQ(metadata->snapshots.size(), 2);
metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path;
metadata->snapshots.at(1)->manifest_list = current_manifest_list_path;
ICEBERG_UNWRAP_OR_FAIL(auto retained_spec, PartitionSpec::Make(/*spec_id=*/1, {}));
metadata->partition_specs.push_back(
std::shared_ptr<PartitionSpec>(std::move(retained_spec)));
metadata->default_spec_id = 1;
ICEBERG_UNWRAP_OR_FAIL(
auto retained_schema,
Schema::Make(std::vector<SchemaField>{SchemaField::MakeRequired(2, "y", int64()),
SchemaField::MakeRequired(3, "z", int64())},
/*schema_id=*/2, std::vector<int32_t>{}));
metadata->schemas.push_back(std::shared_ptr<Schema>(std::move(retained_schema)));
metadata->current_schema_id = 2;
metadata->snapshots.at(1)->schema_id = 2;
RewriteTable(std::move(metadata));

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->ExpireSnapshotId(kExpiredSnapshotId);
update->CleanExpiredMetadata(true);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path,
expired_manifest_list_path));
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(expired_data_file_path)));
}

TEST_F(ExpireSnapshotsCleanupTest, CommitIgnoresMalformedSourceSnapshotIdCleanup) {
const auto expired_manifest_list_path =
table_location_ + "/metadata/expired-malformed-ml.avro";
const auto current_manifest_list_path =
table_location_ + "/metadata/current-malformed-ml.avro";
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
/*parent_snapshot_id=*/0, kExpiredSequenceNumber, {});
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
kCurrentSequenceNumber, {});

auto metadata = ReloadMetadata();
ASSERT_EQ(metadata->snapshots.size(), 2);
metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path;
metadata->snapshots.at(1)->manifest_list = current_manifest_list_path;
metadata->snapshots.at(1)->summary[SnapshotSummaryFields::kSourceSnapshotId] =
"not-a-number";
RewriteTable(std::move(metadata));

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_TRUE(deleted_files.empty());
auto committed_metadata = ReloadMetadata();
EXPECT_EQ(committed_metadata->snapshots.size(), 1);
EXPECT_EQ(committed_metadata->snapshots.at(0)->snapshot_id, kCurrentSnapshotId);
}

} // namespace iceberg
Loading
Loading