From 230a0b67465195a966bb4c80a0c22eebf4f665d5 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 13 Mar 2026 22:40:29 +0800 Subject: [PATCH] refactor: introduce TransactionContext to decouple Transaction and PendingUpdate Add TransactionContext to own the shared state (table, metadata_builder, kind) between Transaction and PendingUpdate. Both now hold a shared_ptr instead of PendingUpdate holding a weak_ptr. This fixes two issues: - pending_updates_ was weak_ptr, so dropping a PendingUpdate would silently break Finalize/retry; now Transaction holds shared_ptr - Table::New*() no longer creates a temporary Transaction; it creates a TransactionContext directly and passes it to the PendingUpdate, removing the circular dependency Also clean up related redundancy: - Hoist Transaction::Kind to a standalone enum class TransactionKind - Remove Transaction::kind_ (duplicate of ctx_->kind) - Remove auto_commit machinery; PendingUpdate::Commit() now calls txn->Commit() explicitly on the table-created path - TransactionContext::Make returns Result to propagate null table errors --- .../catalog/memory/in_memory_catalog.cc | 3 +- src/iceberg/catalog/rest/rest_catalog.cc | 3 +- src/iceberg/table.cc | 53 +++-- .../test/update_partition_spec_test.cc | 10 +- src/iceberg/transaction.cc | 195 ++++++++++-------- src/iceberg/transaction.h | 57 +++-- src/iceberg/type_fwd.h | 1 + src/iceberg/update/expire_snapshots.cc | 13 +- src/iceberg/update/expire_snapshots.h | 4 +- src/iceberg/update/fast_append.cc | 18 +- src/iceberg/update/fast_append.h | 4 +- src/iceberg/update/pending_update.cc | 29 ++- src/iceberg/update/pending_update.h | 4 +- src/iceberg/update/set_snapshot.cc | 13 +- src/iceberg/update/set_snapshot.h | 4 +- src/iceberg/update/snapshot_manager.cc | 3 +- src/iceberg/update/snapshot_update.cc | 36 ++-- src/iceberg/update/snapshot_update.h | 2 +- src/iceberg/update/update_location.cc | 11 +- src/iceberg/update/update_location.h | 4 +- src/iceberg/update/update_partition_spec.cc | 12 +- src/iceberg/update/update_partition_spec.h | 4 +- .../update/update_partition_statistics.cc | 12 +- .../update/update_partition_statistics.h | 4 +- src/iceberg/update/update_properties.cc | 11 +- src/iceberg/update/update_properties.h | 4 +- src/iceberg/update/update_schema.cc | 13 +- src/iceberg/update/update_schema.h | 4 +- .../update/update_snapshot_reference.cc | 14 +- .../update/update_snapshot_reference.h | 4 +- src/iceberg/update/update_sort_order.cc | 11 +- src/iceberg/update/update_sort_order.h | 4 +- src/iceberg/update/update_statistics.cc | 11 +- src/iceberg/update/update_statistics.h | 4 +- 34 files changed, 314 insertions(+), 265 deletions(-) diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index 8a082aad1..7cd02ac73 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -500,8 +500,7 @@ Result> InMemoryCatalog::StageCreateTable( ICEBERG_ASSIGN_OR_RAISE( auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_, shared_from_this())); - return Transaction::Make(std::move(table), Transaction::Kind::kCreate, - /* auto_commit */ false); + return Transaction::Make(std::move(table), TransactionKind::kCreate); } Result InMemoryCatalog::TableExists(const TableIdentifier& identifier) const { diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index 94c6b1e4e..42dc8659a 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -384,8 +384,7 @@ Result> RestCatalog::StageCreateTable( StagedTable::Make(identifier, std::move(result.metadata), std::move(result.metadata_location), file_io_, shared_from_this())); - return Transaction::Make(std::move(staged_table), Transaction::Kind::kCreate, - /*auto_commit=*/false); + return Transaction::Make(std::move(staged_table), TransactionKind::kCreate); } Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) { diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 72190855f..2f2753f38 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -32,11 +32,16 @@ #include "iceberg/table_scan.h" #include "iceberg/transaction.h" #include "iceberg/update/expire_snapshots.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" +#include "iceberg/update/update_location.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_partition_statistics.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" +#include "iceberg/update/update_snapshot_reference.h" +#include "iceberg/update/update_sort_order.h" #include "iceberg/update/update_statistics.h" #include "iceberg/util/macros.h" @@ -166,71 +171,61 @@ Table::NewIncrementalChangelogScan() const { Result> Table::NewTransaction() { // Create a brand new transaction object for the table. Users are expected to commit the // transaction manually. - return Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, - /*auto_commit=*/false); + return Transaction::Make(shared_from_this(), TransactionKind::kUpdate); } Result> Table::NewUpdatePartitionSpec() { ICEBERG_ASSIGN_OR_RAISE( - auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, - /*auto_commit=*/true)); - return transaction->NewUpdatePartitionSpec(); + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return UpdatePartitionSpec::Make(std::move(ctx)); } Result> Table::NewUpdateProperties() { ICEBERG_ASSIGN_OR_RAISE( - auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, - /*auto_commit=*/true)); - return transaction->NewUpdateProperties(); + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return UpdateProperties::Make(std::move(ctx)); } Result> Table::NewUpdateSortOrder() { ICEBERG_ASSIGN_OR_RAISE( - auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, - /*auto_commit=*/true)); - return transaction->NewUpdateSortOrder(); + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return UpdateSortOrder::Make(std::move(ctx)); } Result> Table::NewUpdateSchema() { ICEBERG_ASSIGN_OR_RAISE( - auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, - /*auto_commit=*/true)); - return transaction->NewUpdateSchema(); + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return UpdateSchema::Make(std::move(ctx)); } Result> Table::NewExpireSnapshots() { ICEBERG_ASSIGN_OR_RAISE( - auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, - /*auto_commit=*/true)); - return transaction->NewExpireSnapshots(); + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return ExpireSnapshots::Make(std::move(ctx)); } Result> Table::NewUpdateLocation() { ICEBERG_ASSIGN_OR_RAISE( - auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, - /*auto_commit=*/true)); - return transaction->NewUpdateLocation(); + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return UpdateLocation::Make(std::move(ctx)); } Result> Table::NewFastAppend() { ICEBERG_ASSIGN_OR_RAISE( - auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, - /*auto_commit=*/true)); - return transaction->NewFastAppend(); + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return FastAppend::Make(name().name, std::move(ctx)); } Result> Table::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE( - auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, - /*auto_commit=*/true)); - return transaction->NewUpdateStatistics(); + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return UpdateStatistics::Make(std::move(ctx)); } Result> Table::NewUpdatePartitionStatistics() { ICEBERG_ASSIGN_OR_RAISE( - auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, - /*auto_commit=*/true)); - return transaction->NewUpdatePartitionStatistics(); + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return UpdatePartitionStatistics::Make(std::move(ctx)); } Result> Table::NewSnapshotManager() { diff --git a/src/iceberg/test/update_partition_spec_test.cc b/src/iceberg/test/update_partition_spec_test.cc index fc316aae2..632c4a55e 100644 --- a/src/iceberg/test/update_partition_spec_test.cc +++ b/src/iceberg/test/update_partition_spec_test.cc @@ -232,14 +232,12 @@ class UpdatePartitionSpecTest : public ::testing::TestWithParam { // Helper to create UpdatePartitionSpec from a table std::shared_ptr CreateUpdateFromTable( std::shared_ptr table) { - auto transaction_result = - Transaction::Make(table, Transaction::Kind::kUpdate, /*auto_commit=*/false); - if (!transaction_result.has_value()) { - ADD_FAILURE() << "Failed to create transaction: " - << transaction_result.error().message; + auto ctx_result = TransactionContext::Make(table, TransactionKind::kUpdate); + if (!ctx_result.has_value()) { + ADD_FAILURE() << "Failed to create context: " << ctx_result.error().message; return nullptr; } - auto update_result = UpdatePartitionSpec::Make(transaction_result.value()); + auto update_result = UpdatePartitionSpec::Make(std::move(ctx_result.value())); if (!update_result.has_value()) { ADD_FAILURE() << "Failed to create UpdatePartitionSpec: " << update_result.error().message; diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 58b0daf9b..bf4ac426b 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -19,6 +19,7 @@ */ #include "iceberg/transaction.h" +#include #include #include @@ -52,50 +53,85 @@ namespace iceberg { -Transaction::Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, - std::unique_ptr metadata_builder) - : table_(std::move(table)), - kind_(kind), - auto_commit_(auto_commit), - metadata_builder_(std::move(metadata_builder)) {} +// --------------------------------------------------------------------------- +// TransactionContext +// --------------------------------------------------------------------------- + +TransactionContext::TransactionContext() = default; +TransactionContext::~TransactionContext() = default; + +Result> TransactionContext::Make( + std::shared_ptr
table, TransactionKind kind) { + ICEBERG_PRECHECK(table != nullptr, "Table cannot be null"); + auto ctx = std::make_shared(); + ctx->kind = kind; + ctx->table = std::move(table); + if (kind == TransactionKind::kCreate) { + ctx->metadata_builder = TableMetadataBuilder::BuildFromEmpty(); + std::ignore = ctx->metadata_builder->ApplyChangesForCreate(*ctx->table->metadata()); + } else { + ctx->metadata_builder = TableMetadataBuilder::BuildFrom(ctx->table->metadata().get()); + } + return ctx; +} + +const TableMetadata* TransactionContext::base() const { return metadata_builder->base(); } + +const TableMetadata& TransactionContext::current() const { + return metadata_builder->current(); +} + +std::string TransactionContext::MetadataFileLocation(std::string_view filename) const { + const auto metadata_location = + current().properties.Get(TableProperties::kWriteMetadataLocation); + if (metadata_location.empty()) { + return std::format("{}/metadata/{}", current().location, filename); + } + return std::format("{}/{}", LocationUtil::StripTrailingSlash(metadata_location), + filename); +} + +// --------------------------------------------------------------------------- +// Transaction +// --------------------------------------------------------------------------- + +Transaction::Transaction(std::shared_ptr ctx) + : ctx_(std::move(ctx)) {} Transaction::~Transaction() = default; Result> Transaction::Make(std::shared_ptr
table, - Kind kind, bool auto_commit) { + TransactionKind kind) { ICEBERG_PRECHECK(table && table->catalog(), "Table and catalog cannot be null"); + ICEBERG_ASSIGN_OR_RAISE(auto ctx, TransactionContext::Make(std::move(table), kind)); + auto txn = std::shared_ptr(new Transaction(ctx)); + ctx->transaction = std::weak_ptr(txn); + return txn; +} - std::unique_ptr metadata_builder; - if (kind == Kind::kCreate) { - metadata_builder = TableMetadataBuilder::BuildFromEmpty(); - std::ignore = metadata_builder->ApplyChangesForCreate(*table->metadata()); - } else { - metadata_builder = TableMetadataBuilder::BuildFrom(table->metadata().get()); - } - - return std::shared_ptr( - new Transaction(std::move(table), kind, auto_commit, std::move(metadata_builder))); +Result> Transaction::Make( + std::shared_ptr ctx) { + ICEBERG_PRECHECK(ctx != nullptr, "TransactionContext cannot be null"); + auto txn = std::shared_ptr(new Transaction(ctx)); + ctx->transaction = std::weak_ptr(txn); + return txn; } -const TableMetadata* Transaction::base() const { return metadata_builder_->base(); } +const std::shared_ptr
& Transaction::table() const { return ctx_->table; } + +const TableMetadata* Transaction::base() const { return ctx_->base(); } -const TableMetadata& Transaction::current() const { return metadata_builder_->current(); } +const TableMetadata& Transaction::current() const { return ctx_->current(); } std::string Transaction::MetadataFileLocation(std::string_view filename) const { - const auto metadata_location = - current().properties.Get(TableProperties::kWriteMetadataLocation); - if (metadata_location.empty()) { - return std::format("{}/{}", LocationUtil::StripTrailingSlash(metadata_location), - filename); - } - return std::format("{}/metadata/{}", current().location, filename); + return ctx_->MetadataFileLocation(filename); } Status Transaction::AddUpdate(const std::shared_ptr& update) { ICEBERG_CHECK(last_update_committed_, "Cannot add update when previous update is not committed"); - pending_updates_.emplace_back(std::weak_ptr(update)); + pending_updates_.push_back(update); last_update_committed_ = false; return {}; } @@ -153,52 +189,48 @@ Status Transaction::Apply(PendingUpdate& update) { last_update_committed_ = true; - if (auto_commit_) { - ICEBERG_RETURN_UNEXPECTED(Commit()); - } - return {}; } Status Transaction::ApplyExpireSnapshots(ExpireSnapshots& update) { ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply()); if (!result.snapshot_ids_to_remove.empty()) { - metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove)); + ctx_->metadata_builder->RemoveSnapshots(std::move(result.snapshot_ids_to_remove)); } if (!result.refs_to_remove.empty()) { for (const auto& ref_name : result.refs_to_remove) { - metadata_builder_->RemoveRef(ref_name); + ctx_->metadata_builder->RemoveRef(ref_name); } } if (!result.partition_spec_ids_to_remove.empty()) { - metadata_builder_->RemovePartitionSpecs( + ctx_->metadata_builder->RemovePartitionSpecs( std::move(result.partition_spec_ids_to_remove)); } if (!result.schema_ids_to_remove.empty()) { - metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove)); + ctx_->metadata_builder->RemoveSchemas(std::move(result.schema_ids_to_remove)); } return {}; } Status Transaction::ApplySetSnapshot(SetSnapshot& update) { ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, update.Apply()); - metadata_builder_->SetBranchSnapshot(snapshot_id, - std::string(SnapshotRef::kMainBranch)); + ctx_->metadata_builder->SetBranchSnapshot(snapshot_id, + std::string(SnapshotRef::kMainBranch)); return {}; } Status Transaction::ApplyUpdateLocation(UpdateLocation& update) { ICEBERG_ASSIGN_OR_RAISE(auto location, update.Apply()); - metadata_builder_->SetLocation(location); + ctx_->metadata_builder->SetLocation(location); return {}; } Status Transaction::ApplyUpdatePartitionSpec(UpdatePartitionSpec& update) { ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply()); if (result.set_as_default) { - metadata_builder_->SetDefaultPartitionSpec(std::move(result.spec)); + ctx_->metadata_builder->SetDefaultPartitionSpec(std::move(result.spec)); } else { - metadata_builder_->AddPartitionSpec(std::move(result.spec)); + ctx_->metadata_builder->AddPartitionSpec(std::move(result.spec)); } return {}; } @@ -206,30 +238,30 @@ Status Transaction::ApplyUpdatePartitionSpec(UpdatePartitionSpec& update) { Status Transaction::ApplyUpdateProperties(UpdateProperties& update) { ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply()); if (!result.updates.empty()) { - metadata_builder_->SetProperties(std::move(result.updates)); + ctx_->metadata_builder->SetProperties(std::move(result.updates)); } if (!result.removals.empty()) { - metadata_builder_->RemoveProperties(std::move(result.removals)); + ctx_->metadata_builder->RemoveProperties(std::move(result.removals)); } if (result.format_version.has_value()) { - metadata_builder_->UpgradeFormatVersion(result.format_version.value()); + ctx_->metadata_builder->UpgradeFormatVersion(result.format_version.value()); } return {}; } Status Transaction::ApplyUpdateSchema(UpdateSchema& update) { ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply()); - metadata_builder_->SetCurrentSchema(std::move(result.schema), - result.new_last_column_id); + ctx_->metadata_builder->SetCurrentSchema(std::move(result.schema), + result.new_last_column_id); if (!result.updated_props.empty()) { - metadata_builder_->SetProperties(result.updated_props); + ctx_->metadata_builder->SetProperties(result.updated_props); } return {}; } Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate& update) { - const auto& base = metadata_builder_->current(); + const auto& base = ctx_->metadata_builder->current(); ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply()); @@ -252,14 +284,14 @@ Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate& update) { } for (const auto& change : temp_update->changes()) { - change->ApplyTo(*metadata_builder_); + change->ApplyTo(*ctx_->metadata_builder); } // If the table UUID is missing, add it here. the UUID will be re-created each time // this operation retries to ensure that if a concurrent operation assigns the UUID, // this operation will not fail. if (base.table_uuid.empty()) { - metadata_builder_->AssignUUID(); + ctx_->metadata_builder->AssignUUID(); } return {}; } @@ -267,27 +299,27 @@ Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate& update) { Status Transaction::ApplyUpdateSnapshotReference(UpdateSnapshotReference& update) { ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply()); for (const auto& name : result.to_remove) { - metadata_builder_->RemoveRef(name); + ctx_->metadata_builder->RemoveRef(name); } for (auto&& [name, ref] : result.to_set) { - metadata_builder_->SetRef(std::move(name), std::move(ref)); + ctx_->metadata_builder->SetRef(std::move(name), std::move(ref)); } return {}; } Status Transaction::ApplyUpdateSortOrder(UpdateSortOrder& update) { ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update.Apply()); - metadata_builder_->SetDefaultSortOrder(std::move(sort_order)); + ctx_->metadata_builder->SetDefaultSortOrder(std::move(sort_order)); return {}; } Status Transaction::ApplyUpdateStatistics(UpdateStatistics& update) { ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply()); for (auto&& [_, stat_file] : result.to_set) { - metadata_builder_->SetStatistics(std::move(stat_file)); + ctx_->metadata_builder->SetStatistics(std::move(stat_file)); } for (const auto& snapshot_id : result.to_remove) { - metadata_builder_->RemoveStatistics(snapshot_id); + ctx_->metadata_builder->RemoveStatistics(snapshot_id); } return {}; } @@ -295,10 +327,10 @@ Status Transaction::ApplyUpdateStatistics(UpdateStatistics& update) { Status Transaction::ApplyUpdatePartitionStatistics(UpdatePartitionStatistics& update) { ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply()); for (auto&& [_, partition_stat_file] : result.to_set) { - metadata_builder_->SetPartitionStatistics(std::move(partition_stat_file)); + ctx_->metadata_builder->SetPartitionStatistics(std::move(partition_stat_file)); } for (const auto& snapshot_id : result.to_remove) { - metadata_builder_->RemovePartitionStatistics(snapshot_id); + ctx_->metadata_builder->RemovePartitionStatistics(snapshot_id); } return {}; } @@ -308,104 +340,103 @@ Result> Transaction::Commit() { ICEBERG_CHECK(last_update_committed_, "Cannot commit transaction when previous update is not committed"); - const auto& updates = metadata_builder_->changes(); + const auto& updates = ctx_->metadata_builder->changes(); if (updates.empty()) { committed_ = true; - return table_; + return ctx_->table; } std::vector> requirements; - switch (kind_) { - case Kind::kCreate: { + switch (ctx_->kind) { + case TransactionKind::kCreate: { ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(updates)); } break; - case Kind::kUpdate: { - ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForUpdateTable( - *metadata_builder_->base(), updates)); + case TransactionKind::kUpdate: { + ICEBERG_ASSIGN_OR_RAISE( + requirements, + TableRequirements::ForUpdateTable(*ctx_->metadata_builder->base(), updates)); } break; } // XXX: we should handle commit failure and retry here. auto commit_result = - table_->catalog()->UpdateTable(table_->name(), requirements, updates); + ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements, updates); for (const auto& update : pending_updates_) { - if (auto update_ptr = update.lock()) { - std::ignore = update_ptr->Finalize(commit_result.has_value() - ? std::nullopt - : std::make_optional(commit_result.error())); - } + std::ignore = update->Finalize(commit_result.has_value() + ? std::nullopt + : std::make_optional(commit_result.error())); } ICEBERG_RETURN_UNEXPECTED(commit_result); // Mark as committed and update table reference committed_ = true; - table_ = std::move(commit_result.value()); + ctx_->table = std::move(commit_result.value()); - return table_; + return ctx_->table; } Result> Transaction::NewUpdatePartitionSpec() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_spec, - UpdatePartitionSpec::Make(shared_from_this())); + UpdatePartitionSpec::Make(ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_spec)); return update_spec; } Result> Transaction::NewUpdateProperties() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_properties, - UpdateProperties::Make(shared_from_this())); + UpdateProperties::Make(ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_properties)); return update_properties; } Result> Transaction::NewUpdateSortOrder() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_sort_order, - UpdateSortOrder::Make(shared_from_this())); + UpdateSortOrder::Make(ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_sort_order)); return update_sort_order; } Result> Transaction::NewUpdateSchema() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_schema, - UpdateSchema::Make(shared_from_this())); + UpdateSchema::Make(ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_schema)); return update_schema; } Result> Transaction::NewExpireSnapshots() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr expire_snapshots, - ExpireSnapshots::Make(shared_from_this())); + ExpireSnapshots::Make(ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(expire_snapshots)); return expire_snapshots; } Result> Transaction::NewUpdateLocation() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_location, - UpdateLocation::Make(shared_from_this())); + UpdateLocation::Make(ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_location)); return update_location; } Result> Transaction::NewSetSnapshot() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr set_snapshot, - SetSnapshot::Make(shared_from_this())); + SetSnapshot::Make(ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(set_snapshot)); return set_snapshot; } Result> Transaction::NewFastAppend() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr fast_append, - FastAppend::Make(table_->name().name, shared_from_this())); + FastAppend::Make(ctx_->table->name().name, ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(fast_append)); return fast_append; } Result> Transaction::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, - UpdateStatistics::Make(shared_from_this())); + UpdateStatistics::Make(ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_statistics)); return update_statistics; } @@ -414,7 +445,7 @@ Result> Transaction::NewUpdatePartitionStatistics() { ICEBERG_ASSIGN_OR_RAISE( std::shared_ptr update_partition_statistics, - UpdatePartitionStatistics::Make(shared_from_this())); + UpdatePartitionStatistics::Make(ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_partition_statistics)); return update_partition_statistics; } @@ -422,7 +453,7 @@ Transaction::NewUpdatePartitionStatistics() { Result> Transaction::NewUpdateSnapshotReference() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_ref, - UpdateSnapshotReference::Make(shared_from_this())); + UpdateSnapshotReference::Make(ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref)); return update_ref; } diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 438054b51..ec8c4db0a 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -20,7 +20,11 @@ #pragma once +#include #include +#include +#include +#include #include #include "iceberg/iceberg_export.h" @@ -29,19 +33,24 @@ namespace iceberg { +/// \brief Whether a transaction creates a new table or updates an existing one. +enum class TransactionKind : uint8_t { kCreate, kUpdate }; + /// \brief A transaction for performing multiple updates to a table class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this { public: - enum class Kind : uint8_t { kCreate, kUpdate }; - ~Transaction(); /// \brief Create a new transaction static Result> Make(std::shared_ptr
table, - Kind kind, bool auto_commit); + TransactionKind kind); + + /// \brief Create a transaction from an existing context (used by PendingUpdate::Commit) + static Result> Make( + std::shared_ptr ctx); /// \brief Return the Table that this transaction will update - const std::shared_ptr
& table() const { return table_; } + const std::shared_ptr
& table() const; /// \brief Returns the base metadata without any changes const TableMetadata* base() const; @@ -109,8 +118,7 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewUpdateSnapshotReference(); private: - Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, - std::unique_ptr metadata_builder); + explicit Transaction(std::shared_ptr ctx); Status AddUpdate(const std::shared_ptr& update); @@ -133,22 +141,35 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this table_; - // The kind of this transaction. - const Kind kind_; - // Whether to auto-commit the transaction when updates are applied. - // This is useful when a temporary transaction is created for a single operation. - bool auto_commit_; + // Shared context owning the table, metadata builder, and kind. + std::shared_ptr ctx_; + // Keep track of all created pending updates. + std::vector> pending_updates_; // To make the state simple, we require updates are added and committed in order. bool last_update_committed_ = true; // Tracks if transaction has been committed to prevent double-commit bool committed_ = false; - // Keep track of all created pending updates. Use weak_ptr to avoid circular references. - // This is useful to retry failed updates. - std::vector> pending_updates_; - // Accumulated updates from all pending updates. - std::unique_ptr metadata_builder_; +}; + +/// \brief Shared context between Transaction and PendingUpdate instances. +class ICEBERG_EXPORT TransactionContext { + public: + TransactionContext(); + ~TransactionContext(); + + static Result> Make(std::shared_ptr
table, + TransactionKind kind); + + const TableMetadata* base() const; + const TableMetadata& current() const; + std::string MetadataFileLocation(std::string_view filename) const; + + std::shared_ptr
table; + std::unique_ptr metadata_builder; + TransactionKind kind; + // If PendingUpdate is created directly from Table, this is nullopt; + // otherwise, it holds a weak pointer to the Transaction that created it. + std::optional> transaction; }; } // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 8d6824eed..b81e5d27e 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -201,6 +201,7 @@ class TableUpdate; class TableRequirement; class TableUpdateContext; class Transaction; +class TransactionContext; /// \brief Update family. class ExpireSnapshots; diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index 68cf08caf..722ae7a42 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -38,14 +38,13 @@ namespace iceberg { Result> ExpireSnapshots::Make( - std::shared_ptr transaction) { - ICEBERG_PRECHECK(transaction != nullptr, - "Cannot create ExpireSnapshots without a transaction"); - return std::shared_ptr(new ExpireSnapshots(std::move(transaction))); + std::shared_ptr ctx) { + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create ExpireSnapshots without a context"); + return std::shared_ptr(new ExpireSnapshots(std::move(ctx))); } -ExpireSnapshots::ExpireSnapshots(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)), +ExpireSnapshots::ExpireSnapshots(std::shared_ptr ctx) + : PendingUpdate(std::move(ctx)), current_time_ms_(CurrentTimePointMs()), default_max_ref_age_ms_(base().properties.Get(TableProperties::kMaxRefAgeMs)), default_min_num_snapshots_( @@ -263,7 +262,7 @@ Result ExpireSnapshots::Apply() { ICEBERG_ASSIGN_OR_RAISE(auto snapshot, base.SnapshotById(snapshot_id)); SnapshotCache snapshot_cache(snapshot.get()); ICEBERG_ASSIGN_OR_RAISE(auto manifests, - snapshot_cache.Manifests(transaction_->table()->io())); + snapshot_cache.Manifests(ctx_->table->io())); for (const auto& manifest : manifests) { reachable_specs.insert(manifest.partition_spec_id); } diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index 17a4d8b0f..bc05d810d 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -64,7 +64,7 @@ enum class CleanupLevel : uint8_t { class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { public: static Result> Make( - std::shared_ptr transaction); + std::shared_ptr ctx); ~ExpireSnapshots() override; @@ -143,7 +143,7 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { Result Apply(); private: - explicit ExpireSnapshots(std::shared_ptr transaction); + explicit ExpireSnapshots(std::shared_ptr ctx); using SnapshotToRef = std::unordered_map>; diff --git a/src/iceberg/update/fast_append.cc b/src/iceberg/update/fast_append.cc index 3c132a407..d08f497cf 100644 --- a/src/iceberg/update/fast_append.cc +++ b/src/iceberg/update/fast_append.cc @@ -36,16 +36,15 @@ namespace iceberg { Result> FastAppend::Make( - std::string table_name, std::shared_ptr transaction) { + std::string table_name, std::shared_ptr ctx) { ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); - ICEBERG_PRECHECK(transaction != nullptr, - "Cannot create FastAppend without a transaction"); + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create FastAppend without a context"); return std::unique_ptr( - new FastAppend(std::move(table_name), std::move(transaction))); + new FastAppend(std::move(table_name), std::move(ctx))); } -FastAppend::FastAppend(std::string table_name, std::shared_ptr transaction) - : SnapshotUpdate(std::move(transaction)), table_name_(std::move(table_name)) {} +FastAppend::FastAppend(std::string table_name, std::shared_ptr ctx) + : SnapshotUpdate(std::move(ctx)), table_name_(std::move(table_name)) {} FastAppend& FastAppend::AppendFile(const std::shared_ptr& file) { ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); @@ -118,7 +117,7 @@ Result> FastAppend::Apply( if (snapshot != nullptr) { auto cached_snapshot = SnapshotCache(snapshot.get()); ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests, - cached_snapshot.Manifests(transaction_->table()->io())); + cached_snapshot.Manifests(ctx_->table->io())); manifests.insert(manifests.end(), snapshot_manifests.begin(), snapshot_manifests.end()); } @@ -179,9 +178,8 @@ Result FastAppend::CopyManifest(const ManifestFile& manifest) { int64_t snapshot_id = SnapshotId(); // Copy the manifest with the new snapshot ID. - return CopyAppendManifest(manifest, transaction_->table()->io(), schema, spec, - snapshot_id, new_manifest_path, current.format_version, - &summary_); + return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, snapshot_id, + new_manifest_path, current.format_version, &summary_); } Result> FastAppend::WriteNewManifests() { diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h index 7f5cbb097..580fa4722 100644 --- a/src/iceberg/update/fast_append.h +++ b/src/iceberg/update/fast_append.h @@ -47,7 +47,7 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { /// \param transaction The transaction to use for this update /// \return A Result containing the FastAppend instance or an error static Result> Make( - std::string table_name, std::shared_ptr transaction); + std::string table_name, std::shared_ptr ctx); /// \brief Append a data file to this update. /// @@ -76,7 +76,7 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { bool CleanupAfterCommit() const override; private: - explicit FastAppend(std::string table_name, std::shared_ptr transaction); + explicit FastAppend(std::string table_name, std::shared_ptr ctx); /// \brief Get the partition spec by spec ID. Result> Spec(int32_t spec_id); diff --git a/src/iceberg/update/pending_update.cc b/src/iceberg/update/pending_update.cc index e55a93dfd..2d5247d29 100644 --- a/src/iceberg/update/pending_update.cc +++ b/src/iceberg/update/pending_update.cc @@ -20,20 +20,41 @@ #include "iceberg/update/pending_update.h" #include "iceberg/transaction.h" +#include "iceberg/util/macros.h" namespace iceberg { -PendingUpdate::PendingUpdate(std::shared_ptr transaction) - : transaction_(std::move(transaction)) {} +PendingUpdate::PendingUpdate(std::shared_ptr ctx) + : ctx_(std::move(ctx)) {} PendingUpdate::~PendingUpdate() = default; -Status PendingUpdate::Commit() { return transaction_->Apply(*this); } +Status PendingUpdate::Commit() { + if (!ctx_->transaction) { + // Table-created path: no transaction exists yet, create a temporary one. + ICEBERG_ASSIGN_OR_RAISE(auto txn, Transaction::Make(ctx_)); + Status status = txn->Apply(*this); + if (status.has_value()) { + auto commit_result = txn->Commit(); + if (!commit_result.has_value()) { + status = std::unexpected(commit_result.error()); + } + } + std::ignore = + Finalize(status.has_value() ? std::nullopt : std::make_optional(status.error())); + return status; + } + auto txn = ctx_->transaction->lock(); + if (!txn) { + return CommitFailed("Transaction has been destroyed"); + } + return txn->Apply(*this); +} Status PendingUpdate::Finalize([[maybe_unused]] std::optional commit_error) { return {}; } -const TableMetadata& PendingUpdate::base() const { return transaction_->current(); } +const TableMetadata& PendingUpdate::base() const { return ctx_->current(); } } // namespace iceberg diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index f44812a85..f67be18cd 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -84,11 +84,11 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { ~PendingUpdate() override; protected: - explicit PendingUpdate(std::shared_ptr transaction); + explicit PendingUpdate(std::shared_ptr ctx); const TableMetadata& base() const; - std::shared_ptr transaction_; + std::shared_ptr ctx_; }; } // namespace iceberg diff --git a/src/iceberg/update/set_snapshot.cc b/src/iceberg/update/set_snapshot.cc index 7258bc4d2..79662890b 100644 --- a/src/iceberg/update/set_snapshot.cc +++ b/src/iceberg/update/set_snapshot.cc @@ -34,14 +34,13 @@ namespace iceberg { Result> SetSnapshot::Make( - std::shared_ptr transaction) { - ICEBERG_PRECHECK(transaction != nullptr, - "Cannot create SetSnapshot without a transaction"); - return std::shared_ptr(new SetSnapshot(std::move(transaction))); + std::shared_ptr ctx) { + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create SetSnapshot without a context"); + return std::shared_ptr(new SetSnapshot(std::move(ctx))); } -SetSnapshot::SetSnapshot(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)) {} +SetSnapshot::SetSnapshot(std::shared_ptr ctx) + : PendingUpdate(std::move(ctx)) {} SetSnapshot::~SetSnapshot() = default; @@ -89,7 +88,7 @@ SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) { Result SetSnapshot::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); - const TableMetadata& base_metadata = transaction_->current(); + const TableMetadata& base_metadata = ctx_->current(); // If no target snapshot was configured, return current state (NOOP) if (!target_snapshot_id_.has_value()) { diff --git a/src/iceberg/update/set_snapshot.h b/src/iceberg/update/set_snapshot.h index 1ad399605..6aeb92652 100644 --- a/src/iceberg/update/set_snapshot.h +++ b/src/iceberg/update/set_snapshot.h @@ -37,7 +37,7 @@ namespace iceberg { class ICEBERG_EXPORT SetSnapshot : public PendingUpdate { public: static Result> Make( - std::shared_ptr transaction); + std::shared_ptr ctx); ~SetSnapshot() override; @@ -56,7 +56,7 @@ class ICEBERG_EXPORT SetSnapshot : public PendingUpdate { Result Apply(); private: - explicit SetSnapshot(std::shared_ptr transaction); + explicit SetSnapshot(std::shared_ptr ctx); /// \brief Find the latest snapshot whose timestamp is before the provided timestamp. /// diff --git a/src/iceberg/update/snapshot_manager.cc b/src/iceberg/update/snapshot_manager.cc index d882dd320..55f73072e 100644 --- a/src/iceberg/update/snapshot_manager.cc +++ b/src/iceberg/update/snapshot_manager.cc @@ -34,8 +34,7 @@ Result> SnapshotManager::Make( std::shared_ptr
table) { ICEBERG_PRECHECK(table != nullptr, "Invalid input table: null"); ICEBERG_ASSIGN_OR_RAISE(auto transaction, - Transaction::Make(std::move(table), Transaction::Kind::kUpdate, - /*auto_commit=*/false)); + Transaction::Make(std::move(table), TransactionKind::kUpdate)); return std::shared_ptr( new SnapshotManager(std::move(transaction), /*is_external_transaction=*/false)); } diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index b44682561..3e5792667 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -154,8 +154,8 @@ Result AddMetadata(const ManifestFile& manifest, std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)), +SnapshotUpdate::SnapshotUpdate(std::shared_ptr ctx) + : PendingUpdate(std::move(ctx)), can_inherit_snapshot_id_( base().format_version > 1 || base().properties.Get(TableProperties::kSnapshotIdInheritanceEnabled)), @@ -176,11 +176,10 @@ Result> SnapshotUpdate::WriteDataManifests( RollingManifestWriter rolling_writer( [this, spec, schema = std::move(current_schema), snapshot_id = SnapshotId()]() -> Result> { - return ManifestWriter::MakeWriter(base().format_version, snapshot_id, - ManifestPath(), transaction_->table()->io(), - std::move(spec), std::move(schema), - ManifestContent::kData, - /*first_row_id=*/base().next_row_id); + return ManifestWriter::MakeWriter( + base().format_version, snapshot_id, ManifestPath(), ctx_->table->io(), + std::move(spec), std::move(schema), ManifestContent::kData, + /*first_row_id=*/base().next_row_id); }, target_manifest_size_bytes_); @@ -203,10 +202,9 @@ Result> SnapshotUpdate::WriteDeleteManifests( RollingManifestWriter rolling_writer( [this, spec, schema = std::move(current_schema), snapshot_id = SnapshotId()]() -> Result> { - return ManifestWriter::MakeWriter(base().format_version, snapshot_id, - ManifestPath(), transaction_->table()->io(), - std::move(spec), std::move(schema), - ManifestContent::kDeletes); + return ManifestWriter::MakeWriter( + base().format_version, snapshot_id, ManifestPath(), ctx_->table->io(), + std::move(spec), std::move(schema), ManifestContent::kDeletes); }, target_manifest_size_bytes_); @@ -245,8 +243,7 @@ Result SnapshotUpdate::Apply() { continue; } // TODO(xxx): read in parallel and cache enriched manifests for retries - ICEBERG_ASSIGN_OR_RAISE(manifest, - AddMetadata(manifest, transaction_->table()->io(), base())); + ICEBERG_ASSIGN_OR_RAISE(manifest, AddMetadata(manifest, ctx_->table->io(), base())); } std::string manifest_list_path = ManifestListPath(); @@ -254,8 +251,8 @@ Result SnapshotUpdate::Apply() { ICEBERG_ASSIGN_OR_RAISE( auto writer, ManifestListWriter::MakeWriter(base().format_version, SnapshotId(), parent_snapshot_id, manifest_list_path, - transaction_->table()->io(), - sequence_number, base().next_row_id)); + ctx_->table->io(), sequence_number, + base().next_row_id)); ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests)); ICEBERG_RETURN_UNEXPECTED(writer->Close()); @@ -313,8 +310,7 @@ Status SnapshotUpdate::Finalize(std::optional commit_error) { ICEBERG_CHECK(staged_snapshot_ != nullptr, "Staged snapshot is null during finalize after commit"); auto cached_snapshot = SnapshotCache(staged_snapshot_.get()); - ICEBERG_ASSIGN_OR_RAISE(auto manifests, - cached_snapshot.Manifests(transaction_->table()->io())); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, cached_snapshot.Manifests(ctx_->table->io())); CleanUncommitted(manifests | std::views::transform([](const auto& manifest) { return manifest.manifest_path; }) | @@ -391,7 +387,7 @@ void SnapshotUpdate::CleanAll() { Status SnapshotUpdate::DeleteFile(const std::string& path) { static const auto kDefaultDeleteFunc = [this](const std::string& path) { - return this->transaction_->table()->io()->DeleteFile(path); + return this->ctx_->table->io()->DeleteFile(path); }; if (delete_func_) { return delete_func_(path); @@ -406,14 +402,14 @@ std::string SnapshotUpdate::ManifestListPath() { int64_t snapshot_id = SnapshotId(); std::string filename = std::format("snap-{}-{}-{}.avro", snapshot_id, ++attempt_, commit_uuid_); - return transaction_->MetadataFileLocation(filename); + return ctx_->MetadataFileLocation(filename); } std::string SnapshotUpdate::ManifestPath() { // Generate manifest path // Format: {metadata_location}/{uuid}-m{manifest_count}.avro std::string filename = std::format("{}-m{}.avro", commit_uuid_, manifest_count_++); - return transaction_->MetadataFileLocation(filename); + return ctx_->MetadataFileLocation(filename); } } // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index fdbb2660d..284d1d2db 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -121,7 +121,7 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { Status Finalize(std::optional commit_error) override; protected: - explicit SnapshotUpdate(std::shared_ptr transaction); + explicit SnapshotUpdate(std::shared_ptr ctx); /// \brief Write data manifests for the given data files /// diff --git a/src/iceberg/update/update_location.cc b/src/iceberg/update/update_location.cc index c82a138fc..064bebc8c 100644 --- a/src/iceberg/update/update_location.cc +++ b/src/iceberg/update/update_location.cc @@ -30,14 +30,13 @@ namespace iceberg { Result> UpdateLocation::Make( - std::shared_ptr transaction) { - ICEBERG_PRECHECK(transaction != nullptr, - "Cannot create UpdateLocation without a transaction"); - return std::shared_ptr(new UpdateLocation(std::move(transaction))); + std::shared_ptr ctx) { + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateLocation without a context"); + return std::shared_ptr(new UpdateLocation(std::move(ctx))); } -UpdateLocation::UpdateLocation(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)) {} +UpdateLocation::UpdateLocation(std::shared_ptr ctx) + : PendingUpdate(std::move(ctx)) {} UpdateLocation::~UpdateLocation() = default; diff --git a/src/iceberg/update/update_location.h b/src/iceberg/update/update_location.h index 891853e9e..33864380e 100644 --- a/src/iceberg/update/update_location.h +++ b/src/iceberg/update/update_location.h @@ -34,7 +34,7 @@ namespace iceberg { class ICEBERG_EXPORT UpdateLocation : public PendingUpdate { public: static Result> Make( - std::shared_ptr transaction); + std::shared_ptr ctx); ~UpdateLocation() override; @@ -50,7 +50,7 @@ class ICEBERG_EXPORT UpdateLocation : public PendingUpdate { Result Apply(); private: - explicit UpdateLocation(std::shared_ptr transaction); + explicit UpdateLocation(std::shared_ptr ctx); std::string location_; }; diff --git a/src/iceberg/update/update_partition_spec.cc b/src/iceberg/update/update_partition_spec.cc index 812540f2b..2be6a59a5 100644 --- a/src/iceberg/update/update_partition_spec.cc +++ b/src/iceberg/update/update_partition_spec.cc @@ -37,15 +37,13 @@ namespace iceberg { Result> UpdatePartitionSpec::Make( - std::shared_ptr transaction) { - ICEBERG_PRECHECK(transaction != nullptr, - "Cannot create UpdatePartitionSpec without transaction"); - return std::shared_ptr( - new UpdatePartitionSpec(std::move(transaction))); + std::shared_ptr ctx) { + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdatePartitionSpec without context"); + return std::shared_ptr(new UpdatePartitionSpec(std::move(ctx))); } -UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)) { +UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr ctx) + : PendingUpdate(std::move(ctx)) { format_version_ = base().format_version; // Get the current/default partition spec diff --git a/src/iceberg/update/update_partition_spec.h b/src/iceberg/update/update_partition_spec.h index 1eab425df..6b3dd40ee 100644 --- a/src/iceberg/update/update_partition_spec.h +++ b/src/iceberg/update/update_partition_spec.h @@ -44,7 +44,7 @@ namespace iceberg { class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate { public: static Result> Make( - std::shared_ptr transaction); + std::shared_ptr ctx); ~UpdatePartitionSpec() override; @@ -107,7 +107,7 @@ class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate { Result Apply(); private: - explicit UpdatePartitionSpec(std::shared_ptr transaction); + explicit UpdatePartitionSpec(std::shared_ptr ctx); /// \brief Pair of source ID and transform string for indexing. using TransformKey = std::pair; diff --git a/src/iceberg/update/update_partition_statistics.cc b/src/iceberg/update/update_partition_statistics.cc index 2c06c0ce1..3a5ab4f8a 100644 --- a/src/iceberg/update/update_partition_statistics.cc +++ b/src/iceberg/update/update_partition_statistics.cc @@ -32,16 +32,16 @@ namespace iceberg { Result> UpdatePartitionStatistics::Make( - std::shared_ptr transaction) { - ICEBERG_PRECHECK(transaction != nullptr, - "Cannot create UpdatePartitionStatistics without a transaction"); + std::shared_ptr ctx) { + ICEBERG_PRECHECK(ctx != nullptr, + "Cannot create UpdatePartitionStatistics without a context"); return std::shared_ptr( - new UpdatePartitionStatistics(std::move(transaction))); + new UpdatePartitionStatistics(std::move(ctx))); } UpdatePartitionStatistics::UpdatePartitionStatistics( - std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)) {} + std::shared_ptr ctx) + : PendingUpdate(std::move(ctx)) {} UpdatePartitionStatistics::~UpdatePartitionStatistics() = default; diff --git a/src/iceberg/update/update_partition_statistics.h b/src/iceberg/update/update_partition_statistics.h index 740fe214e..bdaf5a703 100644 --- a/src/iceberg/update/update_partition_statistics.h +++ b/src/iceberg/update/update_partition_statistics.h @@ -39,7 +39,7 @@ namespace iceberg { class ICEBERG_EXPORT UpdatePartitionStatistics : public PendingUpdate { public: static Result> Make( - std::shared_ptr transaction); + std::shared_ptr ctx); ~UpdatePartitionStatistics() override; @@ -71,7 +71,7 @@ class ICEBERG_EXPORT UpdatePartitionStatistics : public PendingUpdate { Result Apply(); private: - explicit UpdatePartitionStatistics(std::shared_ptr transaction); + explicit UpdatePartitionStatistics(std::shared_ptr ctx); std::unordered_map> partition_statistics_to_set_; diff --git a/src/iceberg/update/update_properties.cc b/src/iceberg/update/update_properties.cc index e6fe7a603..1837ab009 100644 --- a/src/iceberg/update/update_properties.cc +++ b/src/iceberg/update/update_properties.cc @@ -34,14 +34,13 @@ namespace iceberg { Result> UpdateProperties::Make( - std::shared_ptr transaction) { - ICEBERG_PRECHECK(transaction != nullptr, - "Cannot create UpdateProperties without a transaction"); - return std::shared_ptr(new UpdateProperties(std::move(transaction))); + std::shared_ptr ctx) { + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateProperties without a context"); + return std::shared_ptr(new UpdateProperties(std::move(ctx))); } -UpdateProperties::UpdateProperties(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)) {} +UpdateProperties::UpdateProperties(std::shared_ptr ctx) + : PendingUpdate(std::move(ctx)) {} UpdateProperties::~UpdateProperties() = default; diff --git a/src/iceberg/update/update_properties.h b/src/iceberg/update/update_properties.h index ec9ab796e..491a55678 100644 --- a/src/iceberg/update/update_properties.h +++ b/src/iceberg/update/update_properties.h @@ -39,7 +39,7 @@ namespace iceberg { class ICEBERG_EXPORT UpdateProperties : public PendingUpdate { public: static Result> Make( - std::shared_ptr transaction); + std::shared_ptr ctx); ~UpdateProperties() override; @@ -71,7 +71,7 @@ class ICEBERG_EXPORT UpdateProperties : public PendingUpdate { Result Apply(); private: - explicit UpdateProperties(std::shared_ptr transaction); + explicit UpdateProperties(std::shared_ptr ctx); std::unordered_map updates_; std::unordered_set removals_; diff --git a/src/iceberg/update/update_schema.cc b/src/iceberg/update/update_schema.cc index 3fdce409f..1f35781fa 100644 --- a/src/iceberg/update/update_schema.cc +++ b/src/iceberg/update/update_schema.cc @@ -279,15 +279,14 @@ std::vector ApplyChangesVisitor::MoveFields( } // namespace Result> UpdateSchema::Make( - std::shared_ptr transaction) { - ICEBERG_PRECHECK(transaction != nullptr, - "Cannot create UpdateSchema without transaction"); - return std::shared_ptr(new UpdateSchema(std::move(transaction))); + std::shared_ptr ctx) { + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateSchema without context"); + return std::shared_ptr(new UpdateSchema(std::move(ctx))); } -UpdateSchema::UpdateSchema(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)) { - const TableMetadata& base_metadata = transaction_->current(); +UpdateSchema::UpdateSchema(std::shared_ptr ctx) + : PendingUpdate(std::move(ctx)) { + const TableMetadata& base_metadata = ctx_->current(); auto schema_result = base_metadata.Schema(); if (!schema_result.has_value()) { diff --git a/src/iceberg/update/update_schema.h b/src/iceberg/update/update_schema.h index 2223c0b81..564a03df1 100644 --- a/src/iceberg/update/update_schema.h +++ b/src/iceberg/update/update_schema.h @@ -48,7 +48,7 @@ namespace iceberg { class ICEBERG_EXPORT UpdateSchema : public PendingUpdate { public: static Result> Make( - std::shared_ptr transaction); + std::shared_ptr ctx); ~UpdateSchema() override; @@ -348,7 +348,7 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate { Result Apply(); private: - explicit UpdateSchema(std::shared_ptr transaction); + explicit UpdateSchema(std::shared_ptr ctx); /// \brief Internal implementation for adding a column with full control. /// diff --git a/src/iceberg/update/update_snapshot_reference.cc b/src/iceberg/update/update_snapshot_reference.cc index 923f0c8df..908962ecd 100644 --- a/src/iceberg/update/update_snapshot_reference.cc +++ b/src/iceberg/update/update_snapshot_reference.cc @@ -33,15 +33,15 @@ namespace iceberg { Result> UpdateSnapshotReference::Make( - std::shared_ptr transaction) { - ICEBERG_PRECHECK(transaction != nullptr, - "Cannot create UpdateSnapshotReference without a transaction"); + std::shared_ptr ctx) { + ICEBERG_PRECHECK(ctx != nullptr, + "Cannot create UpdateSnapshotReference without a context"); return std::shared_ptr( - new UpdateSnapshotReference(std::move(transaction))); + new UpdateSnapshotReference(std::move(ctx))); } -UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)), updated_refs_(base().refs) {} +UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr ctx) + : PendingUpdate(std::move(ctx)), updated_refs_(base().refs) {} UpdateSnapshotReference::~UpdateSnapshotReference() = default; @@ -143,7 +143,7 @@ UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal( if (fast_forward) { // Fast-forward is valid only when the current branch (from) is an ancestor of the // target (to), i.e. we are moving forward in history. - const auto& base_metadata = transaction_->current(); + const auto& base_metadata = ctx_->current(); ICEBERG_BUILDER_ASSIGN_OR_RETURN( auto from_is_ancestor_of_to, SnapshotUtil::IsAncestorOf( diff --git a/src/iceberg/update/update_snapshot_reference.h b/src/iceberg/update/update_snapshot_reference.h index e13f5bfa9..7d061ea3b 100644 --- a/src/iceberg/update/update_snapshot_reference.h +++ b/src/iceberg/update/update_snapshot_reference.h @@ -39,7 +39,7 @@ namespace iceberg { class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate { public: static Result> Make( - std::shared_ptr transaction); + std::shared_ptr ctx); ~UpdateSnapshotReference() override; @@ -145,7 +145,7 @@ class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate { Result Apply(); private: - explicit UpdateSnapshotReference(std::shared_ptr transaction); + explicit UpdateSnapshotReference(std::shared_ptr ctx); UpdateSnapshotReference& ReplaceBranchInternal(const std::string& from, const std::string& to, diff --git a/src/iceberg/update/update_sort_order.cc b/src/iceberg/update/update_sort_order.cc index c5c7be322..8086b903f 100644 --- a/src/iceberg/update/update_sort_order.cc +++ b/src/iceberg/update/update_sort_order.cc @@ -34,14 +34,13 @@ namespace iceberg { Result> UpdateSortOrder::Make( - std::shared_ptr transaction) { - ICEBERG_PRECHECK(transaction != nullptr, - "Cannot create UpdateSortOrder without a transaction"); - return std::shared_ptr(new UpdateSortOrder(std::move(transaction))); + std::shared_ptr ctx) { + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateSortOrder without a context"); + return std::shared_ptr(new UpdateSortOrder(std::move(ctx))); } -UpdateSortOrder::UpdateSortOrder(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)) {} +UpdateSortOrder::UpdateSortOrder(std::shared_ptr ctx) + : PendingUpdate(std::move(ctx)) {} UpdateSortOrder::~UpdateSortOrder() = default; diff --git a/src/iceberg/update/update_sort_order.h b/src/iceberg/update/update_sort_order.h index 364a70f62..53fe927ef 100644 --- a/src/iceberg/update/update_sort_order.h +++ b/src/iceberg/update/update_sort_order.h @@ -37,7 +37,7 @@ namespace iceberg { class ICEBERG_EXPORT UpdateSortOrder : public PendingUpdate { public: static Result> Make( - std::shared_ptr transaction); + std::shared_ptr ctx); ~UpdateSortOrder() override; @@ -71,7 +71,7 @@ class ICEBERG_EXPORT UpdateSortOrder : public PendingUpdate { Result> Apply(); private: - explicit UpdateSortOrder(std::shared_ptr transaction); + explicit UpdateSortOrder(std::shared_ptr ctx); std::vector sort_fields_; bool case_sensitive_ = true; diff --git a/src/iceberg/update/update_statistics.cc b/src/iceberg/update/update_statistics.cc index 461453369..afa6ea0c6 100644 --- a/src/iceberg/update/update_statistics.cc +++ b/src/iceberg/update/update_statistics.cc @@ -32,14 +32,13 @@ namespace iceberg { Result> UpdateStatistics::Make( - std::shared_ptr transaction) { - ICEBERG_PRECHECK(transaction != nullptr, - "Cannot create UpdateStatistics without a transaction"); - return std::shared_ptr(new UpdateStatistics(std::move(transaction))); + std::shared_ptr ctx) { + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateStatistics without a context"); + return std::shared_ptr(new UpdateStatistics(std::move(ctx))); } -UpdateStatistics::UpdateStatistics(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)) {} +UpdateStatistics::UpdateStatistics(std::shared_ptr ctx) + : PendingUpdate(std::move(ctx)) {} UpdateStatistics::~UpdateStatistics() = default; diff --git a/src/iceberg/update/update_statistics.h b/src/iceberg/update/update_statistics.h index 55e50fb1b..6441c02a3 100644 --- a/src/iceberg/update/update_statistics.h +++ b/src/iceberg/update/update_statistics.h @@ -39,7 +39,7 @@ namespace iceberg { class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate { public: static Result> Make( - std::shared_ptr transaction); + std::shared_ptr ctx); ~UpdateStatistics() override; @@ -70,7 +70,7 @@ class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate { Result Apply(); private: - explicit UpdateStatistics(std::shared_ptr transaction); + explicit UpdateStatistics(std::shared_ptr ctx); std::unordered_map> statistics_to_set_; };