Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions paimon-python/pypaimon/manifest/manifest_list_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(self, table):
self.file_io = self.table.file_io

def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]:
"""Read base + delta manifest lists for full file state."""
if snapshot is None:
return []
manifest_files = []
Expand Down
29 changes: 29 additions & 0 deletions paimon-python/pypaimon/read/scanner/changelog_follow_up_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""ChangelogFollowUpScanner for tables with changelog-producer settings."""

from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner
from pypaimon.snapshot.snapshot import Snapshot


class ChangelogFollowUpScanner(FollowUpScanner):
"""Scans any commit that has a changelog_manifest_list."""

def should_scan(self, snapshot: Snapshot) -> bool:
changelog_list = snapshot.changelog_manifest_list
return changelog_list is not None and changelog_list != ""
28 changes: 28 additions & 0 deletions paimon-python/pypaimon/read/scanner/delta_follow_up_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""DeltaFollowUpScanner for append-only streaming reads."""

from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner
from pypaimon.snapshot.snapshot import Snapshot


class DeltaFollowUpScanner(FollowUpScanner):
"""Scans only APPEND commits; skips compaction and maintenance."""

def should_scan(self, snapshot: Snapshot) -> bool:
return snapshot.commit_kind == "APPEND"
47 changes: 31 additions & 16 deletions paimon-python/pypaimon/read/scanner/file_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,39 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
import os
import time
import logging
from typing import List, Optional, Dict, Set, Callable
from typing import Callable, Dict, List, Optional, Set

logger = logging.getLogger(__name__)

from pypaimon.common.predicate import Predicate
from pypaimon.globalindex import ScoredGlobalIndexResult
from pypaimon.table.source.deletion_file import DeletionFile
from pypaimon.manifest.index_manifest_file import IndexManifestFile
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
from pypaimon.read.plan import Plan
from pypaimon.read.push_down_utils import (
remove_row_id_filter,
trim_and_transform_predicate,
)
from pypaimon.read.scanner.append_table_split_generator import AppendTableSplitGenerator
from pypaimon.read.scanner.data_evolution_split_generator import DataEvolutionSplitGenerator
from pypaimon.read.scanner.primary_key_table_split_generator import PrimaryKeyTableSplitGenerator
from pypaimon.read.push_down_utils import (remove_row_id_filter,
trim_and_transform_predicate)
from pypaimon.read.scanner.append_table_split_generator import \
AppendTableSplitGenerator
from pypaimon.read.scanner.data_evolution_split_generator import \
DataEvolutionSplitGenerator
from pypaimon.read.scanner.primary_key_table_split_generator import \
PrimaryKeyTableSplitGenerator
from pypaimon.read.split import DataSplit
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
from pypaimon.table.source.deletion_file import DeletionFile


def _row_ranges_from_predicate(predicate: Optional[Predicate]) -> Optional[List]:
from pypaimon.utils.range import Range
from pypaimon.table.special_fields import SpecialFields
from pypaimon.utils.range import Range

if predicate is None:
return None
Expand Down Expand Up @@ -167,7 +168,10 @@ def __init__(
manifest_scanner: Callable[[], List[ManifestFileMeta]],
predicate: Optional[Predicate] = None,
limit: Optional[int] = None,
vector_search: Optional['VectorSearch'] = None
vector_search: Optional['VectorSearch'] = None,
shard_index: Optional[int] = None,
shard_count: Optional[int] = None,
bucket_filter: Optional[Callable[[int], bool]] = None
):
from pypaimon.table.file_store_table import FileStoreTable

Expand All @@ -178,6 +182,11 @@ def __init__(
self.limit = limit
self.vector_search = vector_search

# Bucket-level sharding for parallel consumption
self._shard_index = shard_index
self._shard_count = shard_count
self._bucket_filter = bucket_filter

self.snapshot_manager = SnapshotManager(table)
self.manifest_list_manager = ManifestListManager(table)
self.manifest_file_manager = ManifestFileManager(table)
Expand Down Expand Up @@ -299,9 +308,8 @@ def plan_files(self) -> List[ManifestEntry]:

def _eval_global_index(self):
from pypaimon.globalindex.global_index_result import GlobalIndexResult
from pypaimon.globalindex.global_index_scan_builder import (
from pypaimon.globalindex.global_index_scan_builder import \
GlobalIndexScanBuilder
)
from pypaimon.utils.range import Range

# No filter and no vector search - nothing to evaluate
Expand Down Expand Up @@ -361,7 +369,7 @@ def _eval_global_index(self):
return result

def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ManifestEntry]:
max_workers = max(8, self.table.options.scan_manifest_parallelism(os.cpu_count() or 8))
max_workers = self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)
manifest_files = [entry for entry in manifest_files if self._filter_manifest_file(entry)]
return self.manifest_file_manager.read_entries_parallel(
manifest_files,
Expand Down Expand Up @@ -414,6 +422,13 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
return False
if self.partition_key_predicate and not self.partition_key_predicate.test(entry.partition):
return False
# Apply bucket-level sharding for parallel consumption
if self._shard_index is not None and self._shard_count is not None:
if entry.bucket % self._shard_count != self._shard_index:
return False
elif self._bucket_filter is not None:
if not self._bucket_filter(entry.bucket):
return False
# Get SimpleStatsEvolution for this schema
evolution = self.simple_stats_evolutions.get_or_create(entry.file.schema_id)

Expand Down
30 changes: 30 additions & 0 deletions paimon-python/pypaimon/read/scanner/follow_up_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""FollowUpScanner interface for streaming table scans."""

from abc import ABC, abstractmethod

from pypaimon.snapshot.snapshot import Snapshot


class FollowUpScanner(ABC):
"""Determines which snapshots to scan after the initial streaming scan."""

@abstractmethod
def should_scan(self, snapshot: Snapshot) -> bool:
...
100 changes: 100 additions & 0 deletions paimon-python/pypaimon/read/scanner/incremental_diff_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
from typing import List, Set, Tuple

from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.read.plan import Plan
from pypaimon.read.scanner.append_table_split_generator import \
AppendTableSplitGenerator
from pypaimon.snapshot.snapshot import Snapshot


class IncrementalDiffScanner:
"""
Scan files added between two snapshots via set-diff.

More efficient than reading N delta_manifest_lists when many intermediate
snapshots are compaction-only.
"""

def __init__(self, table):
from pypaimon.table.file_store_table import FileStoreTable

self.table: FileStoreTable = table
self.manifest_list_manager = ManifestListManager(table)
self.manifest_file_manager = ManifestFileManager(table)

options = self.table.options
self.target_split_size = options.source_split_target_size()
self.open_file_cost = options.source_split_open_file_cost()

def scan(self, start_snapshot: Snapshot, end_snapshot: Snapshot) -> Plan:
"""Scan files added between start (exclusive) and end (inclusive) snapshots."""
added_entries = self.compute_diff(start_snapshot, end_snapshot)

if not added_entries:
return Plan([])

split_generator = AppendTableSplitGenerator(
self.table,
self.target_split_size,
self.open_file_cost,
{} # No deletion files for incremental diff
)

splits = split_generator.create_splits(added_entries)
return Plan(splits)

def compute_diff(
self,
start_snapshot: Snapshot,
end_snapshot: Snapshot
) -> List[ManifestEntry]:
"""Return files present in end_snapshot but absent from start_snapshot."""
start_manifest_files = self.manifest_list_manager.read_all(start_snapshot)
end_manifest_files = self.manifest_list_manager.read_all(end_snapshot)

max_workers = self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)

start_entries = self.manifest_file_manager.read_entries_parallel(
start_manifest_files,
max_workers=max_workers
)
end_entries = self.manifest_file_manager.read_entries_parallel(
end_manifest_files,
max_workers=max_workers
)

start_keys: Set[Tuple] = {self._entry_key(e) for e in start_entries}

added_entries = [
entry for entry in end_entries
if self._entry_key(entry) not in start_keys
]

return added_entries

def _entry_key(self, entry: ManifestEntry) -> Tuple:
return (
tuple(entry.partition.values),
entry.bucket,
entry.file.file_name
)
2 changes: 1 addition & 1 deletion paimon-python/pypaimon/read/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def to_ray(
You needn't manually set this in most cases.
**read_args: Additional kwargs passed to the datasource.
For example, ``per_task_row_limit`` (Ray 2.52.0+).
See `Ray Data API <https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html>`_
for details.
"""
Expand Down
51 changes: 51 additions & 0 deletions paimon-python/pypaimon/tests/changelog_follow_up_scanner_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Tests for ChangelogFollowUpScanner."""

import unittest
from unittest.mock import Mock

from pypaimon.read.scanner.changelog_follow_up_scanner import \
ChangelogFollowUpScanner


class ChangelogFollowUpScannerTest(unittest.TestCase):
"""Tests for ChangelogFollowUpScanner."""

def test_should_scan_any_commit_with_changelog(self):
"""Scanner scans based on changelog_manifest_list, regardless of commit kind."""
scanner = ChangelogFollowUpScanner()
for kind in ("APPEND", "COMPACT"):
snapshot = Mock(changelog_manifest_list=f"changelog-manifest-{kind}")
self.assertTrue(scanner.should_scan(snapshot), kind)

def test_should_skip_when_no_changelog(self):
"""Scanner should skip when changelog_manifest_list is None."""
scanner = ChangelogFollowUpScanner()
snapshot = Mock(changelog_manifest_list=None)
self.assertFalse(scanner.should_scan(snapshot))

def test_should_skip_for_empty_string(self):
"""Empty string should be treated as no changelog."""
scanner = ChangelogFollowUpScanner()
snapshot = Mock(changelog_manifest_list="")
self.assertFalse(scanner.should_scan(snapshot))


if __name__ == '__main__':
unittest.main()
Loading