[python] Add compaction module with Ray distributed executor#7771
Draft
TheR1sing3un wants to merge 12 commits intoapache:masterfrom
Draft
[python] Add compaction module with Ray distributed executor#7771TheR1sing3un wants to merge 12 commits intoapache:masterfrom
TheR1sing3un wants to merge 12 commits intoapache:masterfrom
Conversation
Lay the protocol-level groundwork for upcoming compaction work: - CommitMessage gains compact_before / compact_after fields so a single message can carry both deletion and addition of files in a compact result. - FileStoreCommit emits ADD entries for compact_after and DELETE entries for compact_before; commit() auto-selects COMPACT kind when no new_files are present, and a dedicated commit_compact() helper enforces COMPACT-only semantics with no row-id assignment. - DataFileMeta exposes to_dict / from_dict round-trip plus tagged-value encoding (bytes, decimal, datetime, date, time, Timestamp) so file metas can be shipped JSON-safely between processes. - New CommitMessageSerializer wraps the JSON form for use as a CompactTask payload (Phase 4 will consume it from the Ray executor). No write/read behavior changes for existing callers.
… values Promote encode_value/decode_value to the public DataFileMeta API and reuse them for CommitMessage.partition. Without this, partitions containing DATE/DECIMAL/bytes/Timestamp would crash json.dumps once Phase 4 ships CommitMessage payloads through Ray workers. Tests: round-trip date/Decimal/bytes and Timestamp partition tuples.
End-to-end Append-only compaction in-process, exposed as
table.new_compact_job(...).execute(). The plumbing follows the same
Coordinator → Executor → Driver-commit shape Spark uses, so plugging
in a Ray backend in Phase 4 only swaps out the executor.
New compact package layout (kept stable as Phase 3 will plug PK):
pypaimon/compact/
options.py
coordinator/{coordinator.py, append_compact_coordinator.py}
task/{compact_task.py, append_compact_task.py}
rewriter/{rewriter.py, append_compact_rewriter.py}
executor/{executor.py, local_executor.py}
job/compact_job.py
Behavior:
- Coordinator scans the latest snapshot via FileScanner.plan_files,
groups by (partition, bucket), filters out already target_file_size+
files, and chunks each bucket at max_file_num. full_compaction=True
rewrites every file regardless of size or count.
- Rewriter feeds files batch-by-batch into AppendOnlyDataWriter so the
writer's existing target_file_size rolling produces correctly sized
output without a separate rolling layer.
- AppendCompactTask captures the in-process FileStoreTable directly;
to_dict/from_dict are stubbed and raise — Phase 4 will fill them in
once Ray needs serialization.
- CompactJob assembles CommitMessage(compact_before, compact_after)
from each task and calls FileStoreCommit.commit_compact for a single
atomic snapshot tagged COMPACT.
Tests cover threshold behavior, full_compaction override, max_file_num
chunking, PK rejection, partitioned/unpartitioned e2e (file count
shrinks, data identity preserved, snapshot kind=COMPACT), and the
no-op path.
- Rewriter: stop mutating manifest-owned DataFileMeta. Resolve the read path locally each iteration, preferring external_path (matches SplitRead.file_reader_supplier) over file_path, and never write back. - Rewriter: seed sequence_generator per bucket_mode — 0 for BUCKET_UNAWARE and max(input.max_seq) for HASH_FIXED — matching FileStoreWrite._create_data_writer instead of always using max. - Rewriter: abort the AppendOnlyDataWriter on failure so partial output files don't leak when an executor raises mid-rewrite. - CompactOptions: validate min_file_num >= 1 and max_file_num >= min_file_num at construction so misconfiguration fails loudly instead of being silently rounded up. - AppendCompactCoordinator: drop the silent max(min, max) rescue and document that the trailing chunk below min_file_num is intentionally dropped (deferred to a future change). - CompactTask: align docstring with reality — JSON serialization is declared on the base class but concrete subclasses may defer it until distributed execution arrives in Phase 4. Tests: rewriter must not mutate input metadata; rewriter must abort output on failure; CompactOptions validation. All 15 compact tests plus 60 commit/manifest/scanner regression tests pass.
…strategy
End-to-end primary-key compaction in-process. table.new_compact_job(...)
on a PK table now plans a MergeTreeCompactTask per (partition, bucket)
that is eligible under UniversalCompaction's three-stage decision
(size-amp / size-ratio / file-num), rewrites it via SortMergeReader +
MergeFunction, and commits the result with snapshot kind=COMPACT.
New modules:
pypaimon/compact/levels.py
Direct port of Java mergetree.Levels — L0 ordered by maxSeq DESC,
L1..N hold one SortedRun each, update() routes per-level.
pypaimon/compact/strategy/
compact_unit.py + strategy.py + universal_compaction.py — full
Universal Compaction algorithm (size-amp, size-ratio, file-num,
force-pick-L0). EarlyFullCompaction / OffPeak left for later.
pypaimon/compact/rewriter/merge_tree_rolling_writer.py
Subclass of DataWriter that consumes pre-merged KV batches; rewrites
each appended file's metadata with the strategy's output_level, the
actual min/max sequence numbers and retract count.
pypaimon/compact/rewriter/merge_tree_compact_rewriter.py
Drives IntervalPartition → per-section ConcatRecordReader →
SortMergeReader (with the table's MergeFunction) → optional
DropDeleteRecordReader → buffered RecordBatch → rolling writer.
pypaimon/compact/coordinator/merge_tree_compact_coordinator.py
Per-(partition, bucket) Levels build + strategy.pick + drop_delete
rule (output_level >= non_empty_highest_level).
pypaimon/compact/task/merge_tree_compact_task.py
Carries one CompactUnit; assembles CommitMessage(compact_before,
compact_after) for the driver to commit atomically.
Read path:
pypaimon/read/reader/merge_function.py
Abstract MergeFunction + Factory; DeduplicateMergeFunction migrated
from sort_merge_reader.py. PartialUpdate / Aggregate / FirstRow are
stubbed so configured tables fail loudly with a Phase 6 message.
SortMergeReaderWithMinHeap accepts an optional merge_function
(default DeduplicateMergeFunction → existing read path unchanged).
KeyValue.row_tuple exposes the underlying physical tuple so the
rewriter can buffer KVs back into a RecordBatch.
CompactJob now routes PK tables to MergeTreeCompactCoordinator.
Tests: 19 unit (Levels semantics, UniversalCompaction trigger
algorithm, MergeFunction registry + stubs) + 2 PK e2e (full-compaction
dedup keeps latest values & promotes level; below-trigger no-op).
99-test combined regression on commit/manifest/scanner/reader paths.
- Rewriter: count_retract_rows now matches RowKind.is_add_byte (only UPDATE_BEFORE=1 and DELETE=3 are retracts). The previous != 0 check wrongly inflated delete_row_count by counting UPDATE_AFTER rows, which would skew downstream size-amplification estimates and metrics. - Levels.update: reject out-of-range levels with a clear ValueError instead of letting an IndexError leak from _update_level when a buggy strategy hands back an output_level above number_of_levels(). - Extract build_kv_file_fields() to split_read.py and consume it from both SplitRead._create_key_value_fields and the merge-tree rewriter, so the on-disk KV file schema layout (key cols / seq / kind / value) cannot drift between read and compact paths.
Compact jobs can now run their work on Ray. table.new_compact_job(..., executor=RayExecutor(), catalog_options=..., table_identifier=...).execute() plans on the driver, ships JSON-serialized CompactTask payloads through ray.remote, rebuilds the FileStoreTable inside each worker via the configured catalog, runs the rewriter, and returns CommitMessages back to the driver for one atomic commit. CompactTask base class: - with_table_loader(catalog_options, table_identifier) attaches the spec a worker uses to rebuild its table. - to_dict / from_dict are now concrete: a base envelope holding type + loader spec + payload, with subclasses owning _to_payload / _from_payload. CompactTask.deserialize(payload) returns the right subclass via the registry. - _resolve_table_via_loader() centralizes catalog rebuild so subclasses share a single in-process-vs-distributed branch. AppendCompactTask / MergeTreeCompactTask: - replace the Phase 3 NotImplementedError stubs with real payload encoders that round-trip files via DataFileMeta.to_dict and partition tuples via encode_value/decode_value (handles DATE / Decimal / Timestamp partition columns). - _resolve_table prefers the in-process table when LocalExecutor attached one and falls back to the loader otherwise. CompactJob: - Accepts catalog_options + table_identifier and propagates them onto every task before dispatch when present. LocalExecutor path unchanged. RayExecutor: - Top-level _run_task_payload worker so Ray pickling stays cheap and worker code can't capture driver state. - ray.init only when not already initialized; respects ray_init_args. - num_cpus_per_task + ray_remote_args expose the usual Ray knobs. DataFileMeta serialization: - Tolerate manifest-side BinaryRow (lazy-decoded) in addition to GenericRow, and pyarrow Array-like null_counts. Without this the Ray round trip fails on files that were just produced by the writer. setup.py already declared ray as an optional extra (pip install pypaimon[ray]); no packaging changes required. Tests: - compact_task_serde_test (5 tests): round-trip Append + MergeTree payloads with loader spec and non-JSON-native partitions; clear error when neither table nor loader was attached; unknown-type rejection in the registry. - ray_executor_test (1 test): end-to-end Append-only compaction via a real ray.init(local_mode=True), asserting commit_kind=COMPACT and data identity. Skipped automatically if ray isn't installed.
- CompactJob.table_identifier default uses Identifier.get_full_name()
instead of str(identifier). Identifier is a dataclass with no custom
__str__, so str(...) returns its repr ("Identifier(database='db',
...)") and Identifier.from_string would refuse to parse that on the
worker side. The default path was untested in Phase 4 (e2e passed
only because the test explicitly passed table_identifier=...) — this
fixup also drops that explicit kwarg from the e2e so the default is
exercised.
- RayExecutor module imports the AppendCompactTask / MergeTreeCompactTask
modules at the top level so their @register_compact_task side effects
populate the task registry inside Ray worker processes. Without this,
a real (non-local_mode) Ray cluster would unpickle _run_task_payload
in a fresh process whose registry is empty and CompactTask.deserialize
would raise "Unknown CompactTask type".
- MergeTreeCompactTask docstring updated — it no longer says
"Phase 4 will plumb the loader fields" since Phase 4 already did.
…ctIncrement Restructure CommitMessage to mirror org.apache.paimon.table.sink.CommitMessageImpl exactly: instead of dropping new_files / compact_before / compact_after onto CommitMessage as flat fields, package them inside DataIncrement and CompactIncrement value objects that match their Java counterparts field-for-field. This makes Python and Java messages structurally identical and gives later phases a single, unambiguous slot to plug deletion vectors, changelog files, and global index deltas into without inventing parallel field names. New value objects: - DataIncrement(new_files, deleted_files, changelog_files, new_index_files, deleted_index_files) — direct port of org.apache.paimon.io.DataIncrement. - CompactIncrement(compact_before, compact_after, changelog_files, new_index_files, deleted_index_files) — direct port of org.apache.paimon.io.CompactIncrement. CommitMessage now holds (partition, bucket, total_buckets, data_increment, compact_increment, check_from_snapshot). Convenience properties (new_files, compact_before, compact_after, changelog_files, ...) keep call-sites readable without leaking the increment shape. Migration: - FileStoreWrite.prepare_commit, TableUpdate.prepare_commit, AppendCompactTask.run, MergeTreeCompactTask.run all build their CommitMessage through DataIncrement / CompactIncrement and now also populate total_buckets the way Java does. - CommitMessageSerializer wire format bumps to version=2 and round-trips the full increment shape, including index file lists. IndexFileMeta serialization covers identity fields only — dv_ranges / global_index_meta will be wired up alongside the deletion-vector and changelog phases. Tests updated to construct messages via increments. No behavior changes for the existing commit / read paths: FileStoreCommit still reads message.new_files / compact_before / compact_after through the new convenience properties.
This PR has not landed yet, so there is no on-disk / cross-process payload from a prior version to stay compatible with — VERSION still denotes "first shipped wire format". Bump it once we actually need to break compat with a released version.
Replace the count-based chunking in _pick_files_for_bucket with the size-based bin-packing algorithm Java's AppendCompactCoordinator .SubCoordinator.pack uses, so plans produced by the Python coordinator match Java's task shape on the same input: - Sort candidates by file_size ascending instead of by sequence number, so smaller files lead and the packer has the most room to grow each bin before overshooting. - Drain a bin as soon as it has >1 file AND its weighted size hits target_file_size * 2. The hardcoded ×2 is Java's "each task should yield ~2 target-sized output files" constant. - Account for source.split.open-file-cost in bin size, matching Java's per-file IO weight: a bucket of many tiny files now fans out into several tasks instead of being packed into one giant task. - Trailing bin emits only when it has at least min_file_num files; shorter tails wait for company on the next plan. full_compaction=True drops that minimum to 1 so a "rewrite this bucket" intent always produces at least one task. CompactOptions: - Drop max_file_num — Java has no such concept and size-based packing caps each task at ~2x target naturally. - Drop the now-irrelevant max>=min check; the only invariant left is min_file_num >= 1. Tests: - New append_compact_packing_test (9 cases) drives the algorithm directly with hand-built DataFileMeta lists, mirroring the kind of coverage Java's AppendCompactCoordinatorTest has for pack(). - E2E coordinator/rewriter/Ray tests now zero source.split.open-file-cost on their tiny test tables (default 4 MB would dominate the 1 KB parquet files and trigger spurious mid-loop drains). - Drop test_chunks_when_exceeding_max_file_num (max_file_num is gone) in favor of test_many_small_files_pack_into_single_task which documents the realistic tiny-file behavior.
… Java Java BaseAppendFileStoreWrite.compactRewrite seeds its RowDataRollingFileWriter with LongCounter(toCompact[0].minSequenceNumber()) and increments per row written. Each rolled output file therefore carries a precise [first_row_seq, last_row_seq] range and the union across all output files is contiguous: [seed, seed + total_input_rows - 1]. The previous Python rewriter: - seeded the writer with bucket-mode-dependent values (UNAWARE→0, HASH_FIXED→max(input.max_seq)) which had no Java analog; - never advanced sequence_generator.current — so every committed file ended up with min_seq == max_seq, i.e. compact output threw away the per-row seq information Java preserves. This commit introduces AppendCompactRollingWriter, an AppendOnlyDataWriter subclass that: - treats sequence_generator.start as Java's "next-to-assign" counter, so a slice of N rows is laid out as [seq_start, seq_start + N - 1]; - works around the base SequenceGenerator's off-by-one quirk by setting current = seq_end before super()._write_data_to_file (so the parent reads min/max as seq_start/seq_end exactly) and bumping both fields to seq_end + 1 afterwards (so the next slice picks up where this one ended); - stamps file_source = COMPACT on the just-appended DataFileMeta, the same shape MergeTreeRollingWriter uses on the PK side, instead of mutating it back in the rewriter. Rewriter: - seed_seq = files[0].min_sequence_number, matching Java's toCompact.get(0).minSequenceNumber(); - drops the bucket-mode-dependent _initial_max_seq helper. Tests: - new test_output_seq_range_starts_at_input0_min_seq_and_spans_total_rows enforces the Java contract: per-file (max - min + 1 == row_count) and cross-file (no gaps/overlaps, range = [seed, seed+total-1]); - new test_output_files_tagged_compact_source verifies file_source is set by the writer, not the rewriter. Out of scope (still NOTEd in the rewriter docstring): Java's reader path runs through ReadForCompact for schema-evolution + DV awareness; pypaimon still reads parquet directly. Both will be wired up alongside the broader schema-evolution / deletion-vector phases.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Brings Apache Paimon's compaction story to pypaimon end-to-end:
CommitMessagewithcompact_before/compact_after;FileStoreCommitemits ADD + DELETE manifest entries and a newcommit_compact()helper produces snapshots withcommit_kind=COMPACT.DataFileMetagains JSON-friendlyto_dict/from_dictand aCommitMessageSerializerfor cross-process transport.table.new_compact_job(...).execute(), plumbed through aCoordinator → Task → Executor → Driver-commitshape that mirrors SparkCompactProcedure. Ships aLocalExecutorfor in-process / test usage.Levels+UniversalCompaction(size-amp / size-ratio / file-num three-stage decision). New abstractMergeFunction+ factory;DeduplicateMergeFunctionmigrated,PartialUpdate/Aggregate/FirstRowstubbed so configured tables fail loudly with a Phase 6 message.RayExecutorwires the sameCompactJobto Ray. Driver serializes eachCompactTaskto JSON (table + payload + catalog loader spec), workers rebuild theirFileStoreTablevia the catalog and run the rewriter, driver collects messages for one atomic commit.Each phase landed as a separate commit, with a follow-up
*-fixupcommit addressing the review findings inline. Eight commits total — a single PR keeps the design coherent for review while commits stay small enough to walk through.Out of scope (later PRs):
python -m pypaimon.compact.entrypointforray job submitPartialUpdate/Aggregate/FirstRowMergeFunctionbodiesPlan / design doc: `/Users/lcy/.claude/plans/paimon-compaction-java-spark-python-com-cached-cosmos.md` (local).
Test plan