diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py b/paimon-python/pypaimon/manifest/manifest_list_manager.py index bc14fd86e4b0..3790dfce04e8 100644 --- a/paimon-python/pypaimon/manifest/manifest_list_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py @@ -40,6 +40,7 @@ def __init__(self, table): self.file_io = self.table.file_io def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]: + """Read base + delta manifest lists for full file state.""" if snapshot is None: return [] manifest_files = [] diff --git a/paimon-python/pypaimon/read/scanner/changelog_follow_up_scanner.py b/paimon-python/pypaimon/read/scanner/changelog_follow_up_scanner.py new file mode 100644 index 000000000000..6ae3b4068a98 --- /dev/null +++ b/paimon-python/pypaimon/read/scanner/changelog_follow_up_scanner.py @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +"""ChangelogFollowUpScanner for tables with changelog-producer settings.""" + +from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner +from pypaimon.snapshot.snapshot import Snapshot + + +class ChangelogFollowUpScanner(FollowUpScanner): + """Scans any commit that has a changelog_manifest_list.""" + + def should_scan(self, snapshot: Snapshot) -> bool: + changelog_list = snapshot.changelog_manifest_list + return changelog_list is not None and changelog_list != "" diff --git a/paimon-python/pypaimon/read/scanner/delta_follow_up_scanner.py b/paimon-python/pypaimon/read/scanner/delta_follow_up_scanner.py new file mode 100644 index 000000000000..bb9b6a92bccd --- /dev/null +++ b/paimon-python/pypaimon/read/scanner/delta_follow_up_scanner.py @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +"""DeltaFollowUpScanner for append-only streaming reads.""" + +from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner +from pypaimon.snapshot.snapshot import Snapshot + + +class DeltaFollowUpScanner(FollowUpScanner): + """Scans only APPEND commits; skips compaction and maintenance.""" + + def should_scan(self, snapshot: Snapshot) -> bool: + return snapshot.commit_kind == "APPEND" diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index e56c568b95a3..d5e543b2fc8d 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -15,38 +15,39 @@ See the License for the specific language governing permissions and limitations under the License. """ +import logging import os import time -import logging -from typing import List, Optional, Dict, Set, Callable +from typing import Callable, Dict, List, Optional, Set logger = logging.getLogger(__name__) from pypaimon.common.predicate import Predicate from pypaimon.globalindex import ScoredGlobalIndexResult -from pypaimon.table.source.deletion_file import DeletionFile from pypaimon.manifest.index_manifest_file import IndexManifestFile from pypaimon.manifest.manifest_file_manager import ManifestFileManager from pypaimon.manifest.manifest_list_manager import ManifestListManager from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta +from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions from pypaimon.read.plan import Plan -from pypaimon.read.push_down_utils import ( - remove_row_id_filter, - trim_and_transform_predicate, -) -from pypaimon.read.scanner.append_table_split_generator import AppendTableSplitGenerator -from pypaimon.read.scanner.data_evolution_split_generator import DataEvolutionSplitGenerator -from pypaimon.read.scanner.primary_key_table_split_generator import PrimaryKeyTableSplitGenerator +from pypaimon.read.push_down_utils import (remove_row_id_filter, + trim_and_transform_predicate) +from pypaimon.read.scanner.append_table_split_generator import \ + AppendTableSplitGenerator +from pypaimon.read.scanner.data_evolution_split_generator import \ + DataEvolutionSplitGenerator +from pypaimon.read.scanner.primary_key_table_split_generator import \ + PrimaryKeyTableSplitGenerator from pypaimon.read.split import DataSplit from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.bucket_mode import BucketMode -from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions +from pypaimon.table.source.deletion_file import DeletionFile def _row_ranges_from_predicate(predicate: Optional[Predicate]) -> Optional[List]: - from pypaimon.utils.range import Range from pypaimon.table.special_fields import SpecialFields + from pypaimon.utils.range import Range if predicate is None: return None @@ -167,7 +168,10 @@ def __init__( manifest_scanner: Callable[[], List[ManifestFileMeta]], predicate: Optional[Predicate] = None, limit: Optional[int] = None, - vector_search: Optional['VectorSearch'] = None + vector_search: Optional['VectorSearch'] = None, + shard_index: Optional[int] = None, + shard_count: Optional[int] = None, + bucket_filter: Optional[Callable[[int], bool]] = None ): from pypaimon.table.file_store_table import FileStoreTable @@ -178,6 +182,11 @@ def __init__( self.limit = limit self.vector_search = vector_search + # Bucket-level sharding for parallel consumption + self._shard_index = shard_index + self._shard_count = shard_count + self._bucket_filter = bucket_filter + self.snapshot_manager = SnapshotManager(table) self.manifest_list_manager = ManifestListManager(table) self.manifest_file_manager = ManifestFileManager(table) @@ -299,9 +308,8 @@ def plan_files(self) -> List[ManifestEntry]: def _eval_global_index(self): from pypaimon.globalindex.global_index_result import GlobalIndexResult - from pypaimon.globalindex.global_index_scan_builder import ( + from pypaimon.globalindex.global_index_scan_builder import \ GlobalIndexScanBuilder - ) from pypaimon.utils.range import Range # No filter and no vector search - nothing to evaluate @@ -361,7 +369,7 @@ def _eval_global_index(self): return result def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ManifestEntry]: - max_workers = max(8, self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)) + max_workers = self.table.options.scan_manifest_parallelism(os.cpu_count() or 8) manifest_files = [entry for entry in manifest_files if self._filter_manifest_file(entry)] return self.manifest_file_manager.read_entries_parallel( manifest_files, @@ -414,6 +422,13 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: return False if self.partition_key_predicate and not self.partition_key_predicate.test(entry.partition): return False + # Apply bucket-level sharding for parallel consumption + if self._shard_index is not None and self._shard_count is not None: + if entry.bucket % self._shard_count != self._shard_index: + return False + elif self._bucket_filter is not None: + if not self._bucket_filter(entry.bucket): + return False # Get SimpleStatsEvolution for this schema evolution = self.simple_stats_evolutions.get_or_create(entry.file.schema_id) diff --git a/paimon-python/pypaimon/read/scanner/follow_up_scanner.py b/paimon-python/pypaimon/read/scanner/follow_up_scanner.py new file mode 100644 index 000000000000..cace582298ca --- /dev/null +++ b/paimon-python/pypaimon/read/scanner/follow_up_scanner.py @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +"""FollowUpScanner interface for streaming table scans.""" + +from abc import ABC, abstractmethod + +from pypaimon.snapshot.snapshot import Snapshot + + +class FollowUpScanner(ABC): + """Determines which snapshots to scan after the initial streaming scan.""" + + @abstractmethod + def should_scan(self, snapshot: Snapshot) -> bool: + ... diff --git a/paimon-python/pypaimon/read/scanner/incremental_diff_scanner.py b/paimon-python/pypaimon/read/scanner/incremental_diff_scanner.py new file mode 100644 index 000000000000..61ec66e03545 --- /dev/null +++ b/paimon-python/pypaimon/read/scanner/incremental_diff_scanner.py @@ -0,0 +1,100 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os +from typing import List, Set, Tuple + +from pypaimon.manifest.manifest_file_manager import ManifestFileManager +from pypaimon.manifest.manifest_list_manager import ManifestListManager +from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.read.plan import Plan +from pypaimon.read.scanner.append_table_split_generator import \ + AppendTableSplitGenerator +from pypaimon.snapshot.snapshot import Snapshot + + +class IncrementalDiffScanner: + """ + Scan files added between two snapshots via set-diff. + + More efficient than reading N delta_manifest_lists when many intermediate + snapshots are compaction-only. + """ + + def __init__(self, table): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table + self.manifest_list_manager = ManifestListManager(table) + self.manifest_file_manager = ManifestFileManager(table) + + options = self.table.options + self.target_split_size = options.source_split_target_size() + self.open_file_cost = options.source_split_open_file_cost() + + def scan(self, start_snapshot: Snapshot, end_snapshot: Snapshot) -> Plan: + """Scan files added between start (exclusive) and end (inclusive) snapshots.""" + added_entries = self.compute_diff(start_snapshot, end_snapshot) + + if not added_entries: + return Plan([]) + + split_generator = AppendTableSplitGenerator( + self.table, + self.target_split_size, + self.open_file_cost, + {} # No deletion files for incremental diff + ) + + splits = split_generator.create_splits(added_entries) + return Plan(splits) + + def compute_diff( + self, + start_snapshot: Snapshot, + end_snapshot: Snapshot + ) -> List[ManifestEntry]: + """Return files present in end_snapshot but absent from start_snapshot.""" + start_manifest_files = self.manifest_list_manager.read_all(start_snapshot) + end_manifest_files = self.manifest_list_manager.read_all(end_snapshot) + + max_workers = self.table.options.scan_manifest_parallelism(os.cpu_count() or 8) + + start_entries = self.manifest_file_manager.read_entries_parallel( + start_manifest_files, + max_workers=max_workers + ) + end_entries = self.manifest_file_manager.read_entries_parallel( + end_manifest_files, + max_workers=max_workers + ) + + start_keys: Set[Tuple] = {self._entry_key(e) for e in start_entries} + + added_entries = [ + entry for entry in end_entries + if self._entry_key(entry) not in start_keys + ] + + return added_entries + + def _entry_key(self, entry: ManifestEntry) -> Tuple: + return ( + tuple(entry.partition.values), + entry.bucket, + entry.file.file_name + ) diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 5206147f80a8..28e5a9f5baee 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -198,7 +198,7 @@ def to_ray( You needn't manually set this in most cases. **read_args: Additional kwargs passed to the datasource. For example, ``per_task_row_limit`` (Ray 2.52.0+). - + See `Ray Data API `_ for details. """ diff --git a/paimon-python/pypaimon/tests/changelog_follow_up_scanner_test.py b/paimon-python/pypaimon/tests/changelog_follow_up_scanner_test.py new file mode 100644 index 000000000000..2c959276c31f --- /dev/null +++ b/paimon-python/pypaimon/tests/changelog_follow_up_scanner_test.py @@ -0,0 +1,51 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +"""Tests for ChangelogFollowUpScanner.""" + +import unittest +from unittest.mock import Mock + +from pypaimon.read.scanner.changelog_follow_up_scanner import \ + ChangelogFollowUpScanner + + +class ChangelogFollowUpScannerTest(unittest.TestCase): + """Tests for ChangelogFollowUpScanner.""" + + def test_should_scan_any_commit_with_changelog(self): + """Scanner scans based on changelog_manifest_list, regardless of commit kind.""" + scanner = ChangelogFollowUpScanner() + for kind in ("APPEND", "COMPACT"): + snapshot = Mock(changelog_manifest_list=f"changelog-manifest-{kind}") + self.assertTrue(scanner.should_scan(snapshot), kind) + + def test_should_skip_when_no_changelog(self): + """Scanner should skip when changelog_manifest_list is None.""" + scanner = ChangelogFollowUpScanner() + snapshot = Mock(changelog_manifest_list=None) + self.assertFalse(scanner.should_scan(snapshot)) + + def test_should_skip_for_empty_string(self): + """Empty string should be treated as no changelog.""" + scanner = ChangelogFollowUpScanner() + snapshot = Mock(changelog_manifest_list="") + self.assertFalse(scanner.should_scan(snapshot)) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/follow_up_scanner_test.py b/paimon-python/pypaimon/tests/follow_up_scanner_test.py new file mode 100644 index 000000000000..e7b6bcd4dd6a --- /dev/null +++ b/paimon-python/pypaimon/tests/follow_up_scanner_test.py @@ -0,0 +1,59 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +"""Tests for FollowUpScanner implementations.""" + +import unittest +from unittest.mock import Mock + +from pypaimon.read.scanner.delta_follow_up_scanner import DeltaFollowUpScanner +from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner + + +class FollowUpScannerInterfaceTest(unittest.TestCase): + """Test that FollowUpScanner interface is properly defined.""" + + def test_follow_up_scanner_is_abstract(self): + """FollowUpScanner should be an abstract base class.""" + with self.assertRaises(TypeError): + FollowUpScanner() + + +class DeltaFollowUpScannerTest(unittest.TestCase): + """Tests for DeltaFollowUpScanner which handles APPEND commits only.""" + + def setUp(self): + self.scanner = DeltaFollowUpScanner() + + def test_should_scan_returns_true_for_append_commit(self): + """DeltaFollowUpScanner should scan APPEND commits.""" + snapshot = Mock() + snapshot.commit_kind = "APPEND" + + result = self.scanner.should_scan(snapshot) + + self.assertTrue(result) + + def test_should_scan_returns_false_for_non_append_commits(self): + """DeltaFollowUpScanner should skip non-APPEND commits.""" + for kind in ("COMPACT", "OVERWRITE", "EXPIRE", "ANALYZE"): + snapshot = Mock(commit_kind=kind) + self.assertFalse(self.scanner.should_scan(snapshot), kind) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/scanner/incremental_diff_scanner_test.py b/paimon-python/pypaimon/tests/scanner/incremental_diff_scanner_test.py new file mode 100644 index 000000000000..d5feba2b7de6 --- /dev/null +++ b/paimon-python/pypaimon/tests/scanner/incremental_diff_scanner_test.py @@ -0,0 +1,390 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +"""Tests for IncrementalDiffScanner.""" + +import unittest +from unittest.mock import Mock, patch + + +def _create_mock_table(): + table = Mock() + table.is_primary_key_table = False + table.options = Mock() + table.options.source_split_target_size.return_value = 128 * 1024 * 1024 + table.options.source_split_open_file_cost.return_value = 4 * 1024 * 1024 + table.options.scan_manifest_parallelism.return_value = 8 + table.partition_keys = [] + return table + + +def _create_mock_entry(partition_values, bucket, filename): + entry = Mock() + entry.partition = Mock() + entry.partition.values = partition_values + entry.bucket = bucket + entry.total_buckets = 1 + entry.file = Mock() + entry.file.file_name = filename + entry.file.file_size = 1024 + entry.file.row_count = 100 + entry.kind = 0 # ADD + return entry + + +def _create_mock_snapshot(snapshot_id, commit_kind="APPEND", + base_manifest="base", delta_manifest="delta"): + snapshot = Mock() + snapshot.id = snapshot_id + snapshot.commit_kind = commit_kind + snapshot.base_manifest_list = f"{base_manifest}-{snapshot_id}" + snapshot.delta_manifest_list = f"{delta_manifest}-{snapshot_id}" + return snapshot + + +class IncrementalDiffScannerTest(unittest.TestCase): + + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager') + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager') + def test_diff_returns_only_added_files(self, MockManifestFileManager, MockManifestListManager): + """Files in end but not in start should be returned.""" + from pypaimon.read.scanner.incremental_diff_scanner import \ + IncrementalDiffScanner + + table = _create_mock_table() + + start_entries = [ + _create_mock_entry([], 0, "file1.parquet"), + _create_mock_entry([], 0, "file2.parquet"), + ] + end_entries = [ + _create_mock_entry([], 0, "file1.parquet"), + _create_mock_entry([], 0, "file2.parquet"), + _create_mock_entry([], 0, "file3.parquet"), + _create_mock_entry([], 0, "file4.parquet"), + ] + + mock_mlm = MockManifestListManager.return_value + mock_mfm = MockManifestFileManager.return_value + + mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")] + mock_mfm.read_entries_parallel.side_effect = [start_entries, end_entries] + + scanner = IncrementalDiffScanner(table) + start_snapshot = _create_mock_snapshot(1) + end_snapshot = _create_mock_snapshot(5) + + added_entries = scanner.compute_diff(start_snapshot, end_snapshot) + + added_filenames = {e.file.file_name for e in added_entries} + self.assertEqual(added_filenames, {"file3.parquet", "file4.parquet"}) + + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager') + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager') + def test_diff_handles_empty_start(self, MockManifestFileManager, MockManifestListManager): + """When start is empty, all end files should be returned.""" + from pypaimon.read.scanner.incremental_diff_scanner import \ + IncrementalDiffScanner + + table = _create_mock_table() + + start_entries = [] + end_entries = [ + _create_mock_entry([], 0, "file1.parquet"), + _create_mock_entry([], 0, "file2.parquet"), + ] + + mock_mlm = MockManifestListManager.return_value + mock_mfm = MockManifestFileManager.return_value + mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")] + mock_mfm.read_entries_parallel.side_effect = [start_entries, end_entries] + + scanner = IncrementalDiffScanner(table) + start_snapshot = _create_mock_snapshot(0) + end_snapshot = _create_mock_snapshot(5) + + added_entries = scanner.compute_diff(start_snapshot, end_snapshot) + + added_filenames = {e.file.file_name for e in added_entries} + self.assertEqual(added_filenames, {"file1.parquet", "file2.parquet"}) + + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager') + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager') + def test_diff_handles_same_snapshots(self, MockManifestFileManager, MockManifestListManager): + """When start == end (same files), diff should return empty.""" + from pypaimon.read.scanner.incremental_diff_scanner import \ + IncrementalDiffScanner + + table = _create_mock_table() + + entries = [ + _create_mock_entry([], 0, "file1.parquet"), + _create_mock_entry([], 0, "file2.parquet"), + ] + + mock_mlm = MockManifestListManager.return_value + mock_mfm = MockManifestFileManager.return_value + mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")] + mock_mfm.read_entries_parallel.side_effect = [entries, entries] + + scanner = IncrementalDiffScanner(table) + snapshot = _create_mock_snapshot(5) + + added_entries = scanner.compute_diff(snapshot, snapshot) + + self.assertEqual(len(added_entries), 0) + + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager') + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager') + def test_diff_groups_by_partition_bucket(self, MockManifestFileManager, MockManifestListManager): + """Diff should be computed per (partition, bucket).""" + from pypaimon.read.scanner.incremental_diff_scanner import \ + IncrementalDiffScanner + + table = _create_mock_table() + + start_entries = [ + _create_mock_entry([1], 0, "file1.parquet"), # p1, bucket 0 + _create_mock_entry([1], 1, "file3.parquet"), # p1, bucket 1 + ] + end_entries = [ + _create_mock_entry([1], 0, "file1.parquet"), # p1, bucket 0 + _create_mock_entry([1], 0, "file2.parquet"), # p1, bucket 0 - NEW + _create_mock_entry([1], 1, "file3.parquet"), # p1, bucket 1 + ] + + mock_mlm = MockManifestListManager.return_value + mock_mfm = MockManifestFileManager.return_value + mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")] + mock_mfm.read_entries_parallel.side_effect = [start_entries, end_entries] + + scanner = IncrementalDiffScanner(table) + start_snapshot = _create_mock_snapshot(1) + end_snapshot = _create_mock_snapshot(5) + + added_entries = scanner.compute_diff(start_snapshot, end_snapshot) + + self.assertEqual(len(added_entries), 1) + self.assertEqual(added_entries[0].file.file_name, "file2.parquet") + + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager') + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager') + def test_diff_handles_file_deletions(self, MockManifestFileManager, MockManifestListManager): + """Files in start but not in end (deleted) should NOT be returned.""" + from pypaimon.read.scanner.incremental_diff_scanner import \ + IncrementalDiffScanner + + table = _create_mock_table() + + start_entries = [ + _create_mock_entry([], 0, "file1.parquet"), + _create_mock_entry([], 0, "file2.parquet"), + _create_mock_entry([], 0, "file3.parquet"), + ] + end_entries = [ + _create_mock_entry([], 0, "file1.parquet"), + _create_mock_entry([], 0, "file2.parquet"), + ] + + mock_mlm = MockManifestListManager.return_value + mock_mfm = MockManifestFileManager.return_value + mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")] + mock_mfm.read_entries_parallel.side_effect = [start_entries, end_entries] + + scanner = IncrementalDiffScanner(table) + start_snapshot = _create_mock_snapshot(1) + end_snapshot = _create_mock_snapshot(5) + + added_entries = scanner.compute_diff(start_snapshot, end_snapshot) + + self.assertEqual(len(added_entries), 0) + + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager') + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager') + def test_diff_creates_correct_splits(self, MockManifestFileManager, MockManifestListManager): + """scan() should return a Plan with correct DataSplits.""" + from pypaimon.read.plan import Plan + from pypaimon.read.scanner.incremental_diff_scanner import \ + IncrementalDiffScanner + + table = _create_mock_table() + + start_entries = [] + end_entries = [ + _create_mock_entry([], 0, "file1.parquet"), + ] + + mock_mlm = MockManifestListManager.return_value + mock_mfm = MockManifestFileManager.return_value + mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")] + mock_mfm.read_entries_parallel.side_effect = [start_entries, end_entries] + + scanner = IncrementalDiffScanner(table) + start_snapshot = _create_mock_snapshot(0) + end_snapshot = _create_mock_snapshot(5) + + plan = scanner.scan(start_snapshot, end_snapshot) + + self.assertIsInstance(plan, Plan) + splits = plan.splits() + self.assertGreater(len(splits), 0) + + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager') + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager') + def test_entry_key_uniqueness(self, MockManifestFileManager, MockManifestListManager): + """Entry key should uniquely identify a file by (partition, bucket, filename).""" + from pypaimon.read.scanner.incremental_diff_scanner import \ + IncrementalDiffScanner + + table = _create_mock_table() + + scanner = IncrementalDiffScanner(table) + + # Same file in different partitions should have different keys + entry1 = _create_mock_entry([1], 0, "file.parquet") + entry2 = _create_mock_entry([2], 0, "file.parquet") + self.assertNotEqual(scanner._entry_key(entry1), scanner._entry_key(entry2)) + + # Same file in different buckets should have different keys + entry3 = _create_mock_entry([1], 0, "file.parquet") + entry4 = _create_mock_entry([1], 1, "file.parquet") + self.assertNotEqual(scanner._entry_key(entry3), scanner._entry_key(entry4)) + + # Same partition/bucket/filename should have same key + entry5 = _create_mock_entry([1], 0, "file.parquet") + entry6 = _create_mock_entry([1], 0, "file.parquet") + self.assertEqual(scanner._entry_key(entry5), scanner._entry_key(entry6)) + + +class IncrementalDiffIntegrationTest(unittest.TestCase): + """Integration tests comparing diff vs delta approaches.""" + + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager') + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager') + def test_diff_equals_delta_results(self, MockManifestFileManager, MockManifestListManager): + """Diff and delta approaches should return the same added files.""" + from pypaimon.read.scanner.incremental_diff_scanner import \ + IncrementalDiffScanner + + table = _create_mock_table() + + base_entries_1 = [ + _create_mock_entry([], 0, "file1.parquet"), + _create_mock_entry([], 0, "file2.parquet"), + ] + + base_entries_5 = [ + _create_mock_entry([], 0, "file1.parquet"), + _create_mock_entry([], 0, "file2.parquet"), + _create_mock_entry([], 0, "file3.parquet"), + _create_mock_entry([], 0, "file4.parquet"), + _create_mock_entry([], 0, "file5.parquet"), + ] + + # Delta manifests for each snapshot + delta_2 = [_create_mock_entry([], 0, "file3.parquet")] + delta_3 = [] # COMPACT - no new files + delta_4 = [_create_mock_entry([], 0, "file4.parquet")] + delta_5 = [_create_mock_entry([], 0, "file5.parquet")] + + mock_mlm = MockManifestListManager.return_value + mock_mfm = MockManifestFileManager.return_value + + def mock_read_all(snapshot): + if snapshot.id == 1: + return [Mock(file_name="all-manifest-1")] + elif snapshot.id == 5: + return [Mock(file_name="all-manifest-5")] + return [] + + mock_mlm.read_all.side_effect = mock_read_all + + def mock_read_entries(manifests, *args, **kwargs): + if manifests and manifests[0].file_name == "all-manifest-1": + return base_entries_1 + elif manifests and manifests[0].file_name == "all-manifest-5": + return base_entries_5 + return [] + + mock_mfm.read_entries_parallel.side_effect = mock_read_entries + + scanner = IncrementalDiffScanner(table) + start_snapshot = _create_mock_snapshot(1) + end_snapshot = _create_mock_snapshot(5) + + diff_entries = scanner.compute_diff(start_snapshot, end_snapshot) + diff_filenames = {e.file.file_name for e in diff_entries} + + delta_filenames = { + e.file.file_name for entries in [delta_2, delta_3, delta_4, delta_5] + for e in entries + } + + self.assertEqual(diff_filenames, delta_filenames) + self.assertEqual(diff_filenames, {"file3.parquet", "file4.parquet", "file5.parquet"}) + + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager') + @patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager') + def test_diff_handles_compaction_correctly(self, MockManifestFileManager, MockManifestListManager): + """Compaction merging file1+file2 into file3 should return only file3.""" + from pypaimon.read.scanner.incremental_diff_scanner import \ + IncrementalDiffScanner + + table = _create_mock_table() + + base_entries_1 = [ + _create_mock_entry([], 0, "file1.parquet"), + _create_mock_entry([], 0, "file2.parquet"), + ] + + base_entries_5 = [ + _create_mock_entry([], 0, "file3.parquet"), # Compacted result + ] + + mock_mlm = MockManifestListManager.return_value + mock_mfm = MockManifestFileManager.return_value + + def mock_read_all(snapshot): + if snapshot.id == 1: + return [Mock(file_name="all-manifest-1")] + elif snapshot.id == 5: + return [Mock(file_name="all-manifest-5")] + return [] + + mock_mlm.read_all.side_effect = mock_read_all + + def mock_read_entries(manifests, *args, **kwargs): + if manifests and manifests[0].file_name == "all-manifest-1": + return base_entries_1 + elif manifests and manifests[0].file_name == "all-manifest-5": + return base_entries_5 + return [] + + mock_mfm.read_entries_parallel.side_effect = mock_read_entries + + scanner = IncrementalDiffScanner(table) + start_snapshot = _create_mock_snapshot(1) + end_snapshot = _create_mock_snapshot(5) + + diff_entries = scanner.compute_diff(start_snapshot, end_snapshot) + diff_filenames = {e.file.file_name for e in diff_entries} + + self.assertEqual(diff_filenames, {"file3.parquet"}) + + +if __name__ == '__main__': + unittest.main()