Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions paimon-python/pypaimon/compact/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
17 changes: 17 additions & 0 deletions paimon-python/pypaimon/compact/coordinator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from collections import defaultdict
from typing import Dict, List, Optional, Tuple

from pypaimon.common.predicate import Predicate
from pypaimon.compact.coordinator.coordinator import CompactCoordinator
from pypaimon.compact.options import CompactOptions
from pypaimon.compact.task.append_compact_task import AppendCompactTask
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.scanner.file_scanner import FileScanner


class AppendCompactCoordinator(CompactCoordinator):
"""Plan compaction tasks for append-only tables (HASH_FIXED or BUCKET_UNAWARE).

Per (partition, bucket) we filter to files smaller than the table's
target_file_size and bin-pack them into compaction tasks using the same
algorithm as Java AppendCompactCoordinator.SubCoordinator.pack:

1. Sort candidates by file size ascending (smaller files lead, so the
packer has the most flexibility to grow a bin without immediately
overshooting).
2. Walk the sorted list adding each file to the current bin, accruing
(file_size + open_file_cost) — open_file_cost mirrors Java's per-file
IO weight so a bin of many tiny files drains earlier than naive
size accounting would suggest.
3. Drain a bin as soon as it has >1 file AND its weighted size hits
target_file_size * 2. The ×2 is Java's hardcoded constant: each
task should produce roughly two target-sized output files, which
amortizes task setup cost while keeping output sizes predictable.
4. The trailing bin is emitted only if it has at least min_file_num
files; smaller tails are dropped to avoid spending an entire task
on a couple of files that will collect company on the next plan.

full_compaction=True relaxes both the size filter (large files also
enter packing) and the trailing-bin threshold (any non-empty tail
is emitted), matching the user-level intent of "rewrite this bucket
regardless of current shape".
"""

def __init__(
self,
table,
compact_options: Optional[CompactOptions] = None,
partition_predicate: Optional[Predicate] = None,
):
if table.is_primary_key_table:
raise ValueError(
"AppendCompactCoordinator only handles append-only tables; "
"use the merge-tree coordinator for primary-key tables."
)
self.table = table
self.options = compact_options or CompactOptions()
self.partition_predicate = partition_predicate

def plan(self) -> List[AppendCompactTask]:
manifest_entries = self._scan_live_files()
if not manifest_entries:
return []

# Reduce the manifest entry stream to (partition, bucket) → live files.
# We trust the manifest scanner to have already merged ADD/DELETE
# entries; whatever survives here is currently in the snapshot.
bucket_files: Dict[Tuple[Tuple, int], List[DataFileMeta]] = defaultdict(list)
for entry in manifest_entries:
key = (tuple(entry.partition.values), entry.bucket)
bucket_files[key].append(entry.file)

target_file_size = self.table.options.target_file_size(False)
open_file_cost = self.table.options.source_split_open_file_cost()

tasks: List[AppendCompactTask] = []
for (partition, bucket), files in bucket_files.items():
for chunk in self._pick_files_for_bucket(files, target_file_size, open_file_cost):
tasks.append(
AppendCompactTask(
partition=partition,
bucket=bucket,
files=chunk,
table=self.table,
)
)
return tasks

def _pick_files_for_bucket(
self,
files: List[DataFileMeta],
target_file_size: int,
open_file_cost: int,
) -> List[List[DataFileMeta]]:
"""Bin-pack one bucket's files into compaction tasks.

Mirrors org.apache.paimon.append.AppendCompactCoordinator
.SubCoordinator.pack — see class docstring for the reasoning behind
the size-based packing, the open_file_cost weight, and the
target_file_size * 2 drain threshold.
"""
if self.options.full_compaction:
candidates = list(files)
else:
# Files already at or above target size aren't worth rewriting —
# the output would be near-identical and we'd burn IO for it.
candidates = [f for f in files if f.file_size < target_file_size]

if not candidates:
return []

candidates.sort(key=lambda f: f.file_size)

chunks: List[List[DataFileMeta]] = []
bin_files: List[DataFileMeta] = []
bin_size = 0
drain_threshold = target_file_size * 2
for f in candidates:
bin_files.append(f)
bin_size += f.file_size + open_file_cost
if len(bin_files) > 1 and bin_size >= drain_threshold:
chunks.append(bin_files)
bin_files = []
bin_size = 0

# Trailing bin: under full_compaction any non-empty tail ships;
# otherwise we require min_file_num files so a tiny tail waits for
# company on the next plan instead of paying task overhead now.
min_tail = 1 if self.options.full_compaction else self.options.min_file_num
if len(bin_files) >= min_tail:
chunks.append(bin_files)
return chunks

def _scan_live_files(self):
"""Read manifest entries from the latest snapshot, applying partition filter."""
snapshot = self.table.snapshot_manager().get_latest_snapshot()
if snapshot is None:
return []

from pypaimon.manifest.manifest_list_manager import ManifestListManager
manifest_list_manager = ManifestListManager(self.table)

def manifest_scanner():
return manifest_list_manager.read_all(snapshot), snapshot

scanner = FileScanner(
self.table,
manifest_scanner,
partition_predicate=self.partition_predicate,
)
return scanner.plan_files()
40 changes: 40 additions & 0 deletions paimon-python/pypaimon/compact/coordinator/coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from abc import ABC, abstractmethod
from typing import List

from pypaimon.compact.task.compact_task import CompactTask


class CompactCoordinator(ABC):
"""Driver-side planner that turns a snapshot into a list of CompactTask.

The coordinator runs once per CompactJob, in the driver, and **does not**
rewrite data itself. Splitting plan() from task.run() lets us pin the
snapshot scan to a single process (no concurrent manifest re-reads) while
still letting the executor distribute the resulting tasks however it likes.
"""

@abstractmethod
def plan(self) -> List[CompactTask]:
"""Return a possibly-empty list of compact tasks for the current snapshot.

An empty list means there is nothing worth compacting at this moment;
the job should commit nothing rather than produce an empty snapshot.
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Driver-side planner for primary-key (merge-tree) compaction."""

from collections import defaultdict
from typing import Dict, List, Optional, Tuple

from pypaimon.common.predicate import Predicate
from pypaimon.compact.coordinator.coordinator import CompactCoordinator
from pypaimon.compact.levels import Levels
from pypaimon.compact.options import CompactOptions
from pypaimon.compact.strategy.strategy import (CompactStrategy,
pick_full_compaction)
from pypaimon.compact.strategy.universal_compaction import UniversalCompaction
from pypaimon.compact.task.merge_tree_compact_task import MergeTreeCompactTask
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.reader.sort_merge_reader import builtin_key_comparator
from pypaimon.read.scanner.file_scanner import FileScanner

DEFAULT_NUM_LEVELS = 5


class MergeTreeCompactCoordinator(CompactCoordinator):
"""Plan one MergeTreeCompactTask per (partition, bucket) that the strategy says to compact.

The coordinator owns the Levels object for a bucket and asks the
strategy.pick(...) which runs to combine. full_compaction=True bypasses
the strategy entirely and picks every file in every bucket.
"""

def __init__(
self,
table,
compact_options: Optional[CompactOptions] = None,
partition_predicate: Optional[Predicate] = None,
strategy: Optional[CompactStrategy] = None,
):
if not table.is_primary_key_table:
raise ValueError(
"MergeTreeCompactCoordinator only handles primary-key tables; "
"use AppendCompactCoordinator for append-only tables."
)
self.table = table
self.options = compact_options or CompactOptions()
self.partition_predicate = partition_predicate
self.num_levels = self._resolve_num_levels()
self.strategy = strategy or self._default_strategy()
self.key_comparator = builtin_key_comparator(self.table.trimmed_primary_keys_fields)

def plan(self) -> List[MergeTreeCompactTask]:
manifest_entries = self._scan_live_files()
if not manifest_entries:
return []

bucket_files: Dict[Tuple[Tuple, int], List[DataFileMeta]] = defaultdict(list)
for entry in manifest_entries:
key = (tuple(entry.partition.values), entry.bucket)
bucket_files[key].append(entry.file)

tasks: List[MergeTreeCompactTask] = []
for (partition, bucket), files in bucket_files.items():
levels = Levels(self.key_comparator, files, self.num_levels)
unit = self._pick_unit(levels)
if unit is None:
continue
drop_delete = self._should_drop_delete(unit, levels)
tasks.append(
MergeTreeCompactTask(
partition=partition,
bucket=bucket,
files=unit.files,
output_level=unit.output_level,
drop_delete=drop_delete,
table=self.table,
)
)
return tasks

# ---- internals ---------------------------------------------------------

def _pick_unit(self, levels: Levels):
runs = levels.level_sorted_runs()
if self.options.full_compaction:
return pick_full_compaction(levels.number_of_levels(), runs)
return self.strategy.pick(levels.number_of_levels(), runs)

def _should_drop_delete(self, unit, levels: Levels) -> bool:
# Mirrors MergeTreeCompactManager.triggerCompaction's dropDelete rule:
# we may drop retract rows only when nothing older could need them, i.e.
# we are writing to a level >= the highest non-empty level (and never
# to L0, which by definition can have older data above and below).
if unit.output_level == 0:
return False
return unit.output_level >= levels.non_empty_highest_level()

def _resolve_num_levels(self) -> int:
# Java reads num-levels off CoreOptions; pypaimon's CoreOptions doesn't
# surface it as a typed accessor yet, so we read the raw map and fall
# back to Java's CoreOptions.NUM_LEVELS default (5). Any input file
# already at a higher level wins during Levels construction anyway.
raw = self._raw_options_map().get("num-levels")
return int(raw) if raw is not None else DEFAULT_NUM_LEVELS

def _default_strategy(self) -> CompactStrategy:
raw = self._raw_options_map()
max_size_amp = int(raw.get("compaction.max-size-amplification-percent") or 200)
size_ratio = int(raw.get("compaction.size-ratio") or 1)
trigger = int(raw.get("num-sorted-run.compaction-trigger") or 5)
return UniversalCompaction(
max_size_amp=max_size_amp,
size_ratio=size_ratio,
num_run_compaction_trigger=trigger,
)

def _raw_options_map(self) -> dict:
opts = self.table.options.options
if hasattr(opts, "to_map"):
return opts.to_map()
return dict(opts) if opts else {}

def _scan_live_files(self):
snapshot = self.table.snapshot_manager().get_latest_snapshot()
if snapshot is None:
return []

from pypaimon.manifest.manifest_list_manager import ManifestListManager
manifest_list_manager = ManifestListManager(self.table)

def manifest_scanner():
return manifest_list_manager.read_all(snapshot), snapshot

scanner = FileScanner(
self.table,
manifest_scanner,
partition_predicate=self.partition_predicate,
)
return scanner.plan_files()
Loading
Loading