Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 173 additions & 13 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,89 @@ Status TableScanContext::Validate() const {
return {};
}

bool TableScanContext::IsScanCurrentLineage() const {
return !from_snapshot_id.has_value() && !to_snapshot_id.has_value();
}

Result<int64_t> TableScanContext::ToSnapshotIdInclusive(
const TableMetadata& metadata) const {
// Get the branch's current snapshot ID if branch is set
std::shared_ptr<Snapshot> branch_snapshot;
if (!branch.empty()) {
auto iter = metadata.refs.find(branch);
ICEBERG_CHECK(iter != metadata.refs.end() && iter->second != nullptr,
"Cannot find branch: {}", branch);
ICEBERG_ASSIGN_OR_RAISE(branch_snapshot,
metadata.SnapshotById(iter->second->snapshot_id));
}

if (to_snapshot_id.has_value()) {
int64_t to_snapshot_id_value = to_snapshot_id.value();

if (branch_snapshot != nullptr) {
// Validate `to_snapshot_id` is on the current branch
ICEBERG_ASSIGN_OR_RAISE(
bool is_ancestor,
SnapshotUtil::IsAncestorOf(metadata, branch_snapshot->snapshot_id,
to_snapshot_id_value));
ICEBERG_CHECK(is_ancestor,
"End snapshot is not a valid snapshot on the current branch: {}",
branch);
}

return to_snapshot_id_value;
}

// If to_snapshot_id is not set, use branch's current snapshot if branch is set
if (branch_snapshot != nullptr) {
return branch_snapshot->snapshot_id;
}

// Get current snapshot from table's current snapshot
std::shared_ptr<Snapshot> current_snapshot;
ICEBERG_ASSIGN_OR_RAISE(current_snapshot, metadata.Snapshot());
ICEBERG_CHECK(current_snapshot != nullptr,
"End snapshot is not set and table has no current snapshot");
return current_snapshot->snapshot_id;
}

Result<std::optional<int64_t>> TableScanContext::FromSnapshotIdExclusive(
const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const {
if (!from_snapshot_id.has_value()) {
return std::nullopt;
}

int64_t from_snapshot_id_value = from_snapshot_id.value();

// Validate `from_snapshot_id` is an ancestor of `to_snapshot_id_inclusive`
if (from_snapshot_id_inclusive) {
ICEBERG_ASSIGN_OR_RAISE(bool is_ancestor,
SnapshotUtil::IsAncestorOf(metadata, to_snapshot_id_inclusive,
from_snapshot_id_value));
ICEBERG_CHECK(
is_ancestor,
"Starting snapshot (inclusive) {} is not an ancestor of end snapshot {}",
from_snapshot_id_value, to_snapshot_id_inclusive);

// For inclusive behavior, return the parent snapshot ID (can be nullopt)
ICEBERG_ASSIGN_OR_RAISE(auto from_snapshot,
metadata.SnapshotById(from_snapshot_id_value));
return from_snapshot->parent_snapshot_id;
}

// Validate there is an ancestor of `to_snapshot_id_inclusive` where parent is
// `from_snapshot_id`
ICEBERG_ASSIGN_OR_RAISE(bool is_parent_ancestor, SnapshotUtil::IsParentAncestorOf(
metadata, to_snapshot_id_inclusive,
from_snapshot_id_value));
ICEBERG_CHECK(
is_parent_ancestor,
"Starting snapshot (exclusive) {} is not a parent ancestor of end snapshot {}",
from_snapshot_id_value, to_snapshot_id_inclusive);

return from_snapshot_id_value;
}

} // namespace internal

ScanTask::~ScanTask() = default;
Expand Down Expand Up @@ -340,10 +423,15 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::AsOfTime(

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive)
int64_t from_snapshot_id, bool inclusive)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
if (inclusive) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore,
metadata_->SnapshotById(from_snapshot_id));
}
this->context_.from_snapshot_id = from_snapshot_id;
this->context_.from_snapshot_id_inclusive = inclusive;
return *this;
}

Expand All @@ -352,31 +440,45 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
const std::string& ref, bool inclusive)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
return *this;
auto iter = metadata_->refs.find(ref);
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", ref);
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
"Ref {} is not a tag", ref);
return FromSnapshot(iter->second->snapshot_id, inclusive);
}

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(int64_t to_snapshot_id)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(to_snapshot_id));
context_.to_snapshot_id = to_snapshot_id;
return *this;
}

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(const std::string& ref)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
return *this;
auto iter = metadata_->refs.find(ref);
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", ref);
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
"Ref {} is not a tag", ref);
return ToSnapshot(iter->second->snapshot_id);
}

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseBranch(
const std::string& branch)
requires IsIncrementalScan<ScanType>
{
auto iter = metadata_->refs.find(branch);
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", branch);
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", branch);
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kBranch,
"Ref {} is not a branch", branch);
context_.branch = branch;
return *this;
}
Expand Down Expand Up @@ -539,17 +641,75 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
// IncrementalAppendScan implementation

Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
[[maybe_unused]] std::shared_ptr<Schema> schema,
[[maybe_unused]] std::shared_ptr<FileIO> io,
[[maybe_unused]] internal::TableScanContext context) {
return NotImplemented("IncrementalAppendScan is not implemented");
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> io, internal::TableScanContext context) {
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
return std::unique_ptr<IncrementalAppendScan>(new IncrementalAppendScan(
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
}

Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
int64_t to_snapshot_id_inclusive) const {
return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
ICEBERG_ASSIGN_OR_RAISE(
auto ancestors_snapshots,
SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
from_snapshot_id_exclusive));

std::vector<std::shared_ptr<Snapshot>> append_snapshots;
std::ranges::copy_if(ancestors_snapshots, std::back_inserter(append_snapshots),
[](const auto& snapshot) {
return snapshot != nullptr &&
snapshot->Operation().has_value() &&
snapshot->Operation().value() == DataOperation::kAppend;
});
if (append_snapshots.empty()) {
return std::vector<std::shared_ptr<FileScanTask>>{};
}

std::unordered_set<int64_t> snapshot_ids;
std::ranges::transform(append_snapshots,
std::inserter(snapshot_ids, snapshot_ids.end()),
[](const auto& snapshot) { return snapshot->snapshot_id; });

std::vector<ManifestFile> data_manifests;
for (const auto& snapshot : append_snapshots) {
SnapshotCache snapshot_cache(snapshot.get());
ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(io_));
std::ranges::copy_if(manifests, std::back_inserter(data_manifests),
[&snapshot_ids](const ManifestFile& manifest) {
return snapshot_ids.contains(manifest.added_snapshot_id);
});
}
if (data_manifests.empty()) {
return std::vector<std::shared_ptr<FileScanTask>>{};
}

TableMetadataCache metadata_cache(metadata_.get());
ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById());

ICEBERG_ASSIGN_OR_RAISE(
auto manifest_group,
ManifestGroup::Make(io_, schema_, specs_by_id, std::move(data_manifests), {}));

manifest_group->CaseSensitive(context_.case_sensitive)
.Select(ScanColumns())
.FilterData(filter())
.FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) {
return entry.snapshot_id.has_value() &&
snapshot_ids.contains(entry.snapshot_id.value()) &&
entry.status == ManifestStatus::kAdded;
})
.IgnoreDeleted()
.ColumnsToKeepStats(context_.columns_to_keep_stats);

if (context_.ignore_residuals) {
manifest_group->IgnoreResiduals();
}

return manifest_group->PlanFiles();
}

// IncrementalChangelogScan implementation
Expand Down
42 changes: 39 additions & 3 deletions src/iceberg/table_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "iceberg/arrow_c_data.h"
#include "iceberg/result.h"
#include "iceberg/table_metadata.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/error_collector.h"

Expand Down Expand Up @@ -132,6 +133,17 @@ struct TableScanContext {

// Validate the context parameters to see if they have conflicts.
[[nodiscard]] Status Validate() const;

/// \brief Returns true if this scan is a current lineage scan, which means it does not
/// specify from/to snapshot IDs.
bool IsScanCurrentLineage() const;

/// \brief Get the snapshot ID to scan up to (inclusive) based on the context.
Result<int64_t> ToSnapshotIdInclusive(const TableMetadata& metadata) const;

/// \brief Get the snapshot ID to scan from (exclusive) based on the context.
Result<std::optional<int64_t>> FromSnapshotIdExclusive(
const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const;
};

} // namespace internal
Expand Down Expand Up @@ -361,9 +373,7 @@ class ICEBERG_EXPORT IncrementalScan : public TableScan {

/// \brief Plans the scan tasks by resolving manifests and data files.
/// \return A Result containing scan tasks or an error.
Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const {
return NotImplemented("IncrementalScan::PlanFiles is not implemented");
}
Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const;

protected:
virtual Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles(
Expand All @@ -373,6 +383,26 @@ class ICEBERG_EXPORT IncrementalScan : public TableScan {
using TableScan::TableScan;
};

// Template method implementation (must be in header for MSVC)
template <typename ScanTaskType>
Result<std::vector<std::shared_ptr<ScanTaskType>>>
IncrementalScan<ScanTaskType>::PlanFiles() const {
if (context_.IsScanCurrentLineage()) {
ICEBERG_ASSIGN_OR_RAISE(auto current_snapshot, metadata_->Snapshot());
if (current_snapshot == nullptr) {
return std::vector<std::shared_ptr<ScanTaskType>>{};
}
}

ICEBERG_ASSIGN_OR_RAISE(int64_t to_snapshot_id_inclusive,
context_.ToSnapshotIdInclusive(*metadata_));
ICEBERG_ASSIGN_OR_RAISE(
std::optional<int64_t> from_snapshot_id_exclusive,
context_.FromSnapshotIdExclusive(*metadata_, to_snapshot_id_inclusive));

return PlanFiles(from_snapshot_id_exclusive, to_snapshot_id_inclusive);
}

/// \brief A scan that reads data files added between snapshots (incremental appends).
class ICEBERG_EXPORT IncrementalAppendScan : public IncrementalScan<FileScanTask> {
public:
Expand All @@ -383,6 +413,9 @@ class ICEBERG_EXPORT IncrementalAppendScan : public IncrementalScan<FileScanTask

~IncrementalAppendScan() override = default;

// Bring the public PlanFiles() from base class into scope
using IncrementalScan<FileScanTask>::PlanFiles;

protected:
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
Expand All @@ -402,6 +435,9 @@ class ICEBERG_EXPORT IncrementalChangelogScan

~IncrementalChangelogScan() override = default;

// Bring the public PlanFiles() from base class into scope
using IncrementalScan<ChangelogScanTask>::PlanFiles;

protected:
Result<std::vector<std::shared_ptr<ChangelogScanTask>>> PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
file_scan_task_test.cc
incremental_append_scan_test.cc
table_scan_test.cc)

add_iceberg_test(table_update_test
Expand Down
Loading
Loading