From 8070eec7efabc10d2f9dea22f6996eef81b7f96f Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 30 Apr 2026 02:20:42 +0800 Subject: [PATCH 1/8] [python] Implement partial-update merge engine in pypaimon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``MergeEngine.PARTIAL_UPDATE`` is exposed in ``core_options.py`` and accepts ``merge-engine: partial-update`` as a table option, but the read path never reads that option — ``sort_merge_reader.py`` hardcodes ``DeduplicateMergeFunction()``. So a user who creates a PK table with ``merge-engine: partial-update`` and writes overlapping rows whose non-null columns differ gets silently deduplicated results instead of the expected per-field merge: their data is wrong, with no error or warning. The same is true for ``aggregation`` and ``first-row`` — both are silently degraded to dedupe today. This change ports the core ``PartialUpdateMergeFunction`` semantics from Java (paimon-core/.../mergetree/compact/PartialUpdateMergeFunction.java) and wires the Python read path to dispatch on ``merge-engine``: * New ``pypaimon/read/reader/partial_update_merge_function.py``: on each ``add(kv)`` copy non-null fields of ``kv.value`` into an accumulator; ``get_result()`` returns a fresh KeyValue with the merged row. Result is built into a brand-new tuple so the merge output is decoupled from upstream's reused KeyValue instances. * ``SortMergeReaderWithMinHeap.__init__`` gains an optional ``merge_function`` kwarg; default still ``DeduplicateMergeFunction()`` so any direct callers (none in-tree) are unchanged. * ``MergeFileSplitRead.section_reader_supplier`` selects the merge function based on ``self.table.options.merge_engine()``: DEDUPLICATE -> DeduplicateMergeFunction (unchanged) PARTIAL_UPDATE -> PartialUpdateMergeFunction AGGREGATE / FIRST_ROW -> NotImplementedError (was silent dedupe) Out of scope, intentionally: * Per-field aggregator overrides (``fields..aggregate-function``) * Sequence-group support (``fields..sequence-group``) * ``ignore-delete`` / ``partial-update.remove-record-on-*`` options * AGGREGATE / FIRST_ROW merge engine implementations DELETE / UPDATE_BEFORE rows raise ``NotImplementedError`` at ``add()`` time so we can't silently corrupt data with a half-implemented contract. Tests: * ``test_partial_update_merge_function.py`` — 11 unit cases covering single insert, two-way overlapping merges, three-way merges, later- null-does-not-clobber, reset between keys, get_result-before-any- add, UPDATE_AFTER acceptance, DELETE / UPDATE_BEFORE refusal, and result decoupling from input kv (proves we're not aliasing upstream's reused KeyValue). * ``test_partial_update_e2e.py`` — 8 cases: two-write merge, three- write merge, disjoint keys unaffected, later-non-null wins, later- null preserves earlier value, deduplicate engine unchanged (regression), and aggregation / first-row raise NotImplementedError. Verified by checking out ``origin/master``'s ``sort_merge_reader.py`` / ``split_read.py`` and rerunning ``test_partial_update_e2e.py``: master fails the 4 partial-update merge cases (silent dedupe) and the 2 aggregation / first-row "raises" cases (silent dedupe instead of raising); fix passes all 8. --- .../reader/partial_update_merge_function.py | 119 ++++++++++ .../pypaimon/read/reader/sort_merge_reader.py | 10 +- paimon-python/pypaimon/read/split_read.py | 35 ++- .../pypaimon/tests/test_partial_update_e2e.py | 219 ++++++++++++++++++ .../test_partial_update_merge_function.py | 177 ++++++++++++++ 5 files changed, 555 insertions(+), 5 deletions(-) create mode 100644 paimon-python/pypaimon/read/reader/partial_update_merge_function.py create mode 100644 paimon-python/pypaimon/tests/test_partial_update_e2e.py create mode 100644 paimon-python/pypaimon/tests/test_partial_update_merge_function.py diff --git a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py new file mode 100644 index 000000000000..12e29e33eb8d --- /dev/null +++ b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py @@ -0,0 +1,119 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +""" +Python port of Java's ``PartialUpdateMergeFunction`` +(``paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ +PartialUpdateMergeFunction.java``). + +The merge function used by the ``partial-update`` merge engine on PK +tables: rows sharing a primary key are merged left-to-right, taking the +latest non-null value per non-PK field. ``DeduplicateMergeFunction`` +keeps only the latest row; ``PartialUpdateMergeFunction`` instead lets +later writes "fill in" fields the earlier writes left null, so users +can write the same logical record across multiple commits with +different sets of non-null columns. + +This is the **core merge semantics only**. The Java implementation also +supports per-field aggregator overrides (``fields..aggregate- +function``), sequence groups (``fields..sequence-group``), +``ignore-delete``, and ``partial-update.remove-record-on-*`` options. +None of those are implemented yet; non-INSERT row kinds raise +``NotImplementedError`` at ``add`` time so we never silently corrupt +data with a half-implemented contract. +""" + +from typing import Any, List, Optional + +from pypaimon.table.row.key_value import KeyValue +from pypaimon.table.row.row_kind import RowKind + + +class PartialUpdateMergeFunction: + """A MergeFunction where the key is the primary key (unique) and the + value is merged across all rows for that key by taking the latest + non-null value per non-PK field. + + Mirrors the ``MergeFunction`` protocol used by ``SortMergeReader``: + ``reset`` (between groups of same-key rows), ``add`` (one row at a + time, oldest to newest), ``get_result`` (after the group is + exhausted). + """ + + def __init__(self, key_arity: int, value_arity: int): + self._key_arity = key_arity + self._value_arity = value_arity + # Lazily allocated on first add(); ``None`` means "no rows yet". + self._accumulator: Optional[List[Any]] = None + # Reference to the most recently added kv. We use it only to + # propagate the key + sequence_number into the result row, and we + # snapshot those two values into a fresh tuple in ``get_result()`` + # so the result is not aliased to upstream's reused KeyValue. + self._latest_kv: Optional[KeyValue] = None + + def reset(self) -> None: + self._accumulator = None + self._latest_kv = None + + def add(self, kv: KeyValue) -> None: + row_kind_byte = kv.value_row_kind_byte + if not RowKind.is_add_byte(row_kind_byte): + # DELETE / UPDATE_BEFORE need ignore-delete or + # partial-update.remove-record-on-delete to be set in Java; + # neither option is wired up in pypaimon yet, so refuse the + # row rather than silently swallow it. + raise NotImplementedError( + "PartialUpdateMergeFunction received a {} row; this " + "Python port does not yet implement the ignore-delete / " + "partial-update.remove-record-on-delete options. Use the " + "Java client for tables that produce DELETE / " + "UPDATE_BEFORE rows.".format(RowKind(row_kind_byte).to_string()) + ) + + if self._accumulator is None: + self._accumulator = [ + kv.value.get_field(i) for i in range(self._value_arity) + ] + else: + for i in range(self._value_arity): + v = kv.value.get_field(i) + if v is not None: + self._accumulator[i] = v + self._latest_kv = kv + + def get_result(self) -> Optional[KeyValue]: + if self._accumulator is None or self._latest_kv is None: + return None + + kv = self._latest_kv + # Snapshot the key as a fresh tuple — we cannot keep a reference + # to ``kv`` because upstream readers (e.g. KeyValueWrapReader) + # reuse a single KeyValue instance and mutate its underlying + # row_tuple between calls. Building a fresh tuple here means the + # result we return is decoupled from any subsequent iteration. + key_values = tuple( + kv.key.get_field(i) for i in range(self._key_arity) + ) + result_row = key_values + ( + kv.sequence_number, + RowKind.INSERT.value, + ) + tuple(self._accumulator) + + result = KeyValue(self._key_arity, self._value_arity) + result.replace(result_row) + return result diff --git a/paimon-python/pypaimon/read/reader/sort_merge_reader.py b/paimon-python/pypaimon/read/reader/sort_merge_reader.py index aedd593b702b..2dcf152e601b 100644 --- a/paimon-python/pypaimon/read/reader/sort_merge_reader.py +++ b/paimon-python/pypaimon/read/reader/sort_merge_reader.py @@ -30,9 +30,15 @@ class SortMergeReaderWithMinHeap(RecordReader): """SortMergeReader implemented with min-heap.""" - def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema): + def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema, + merge_function: Optional[Any] = None): self.next_batch_readers = list(readers) - self.merge_function = DeduplicateMergeFunction() + # Default to dedupe so callers that don't pass a merge_function + # keep their old behaviour. The merge engine dispatch lives in + # ``MergeFileSplitRead.section_reader_supplier`` for the read + # path; tests or other ad-hoc callers can pass a different + # implementation here. + self.merge_function = merge_function if merge_function is not None else DeduplicateMergeFunction() if schema.partition_keys: trimmed_primary_keys = [pk for pk in schema.primary_keys if pk not in schema.partition_keys] diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 9a82b0000a85..0b3d2fe7fca2 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -21,7 +21,7 @@ from functools import partial from typing import Callable, List, Optional, Tuple -from pypaimon.common.options.core_options import CoreOptions +from pypaimon.common.options.core_options import CoreOptions, MergeEngine from pypaimon.common.predicate import Predicate from pypaimon.deletionvectors import ApplyDeletionVectorReader from pypaimon.deletionvectors.deletion_vector import DeletionVector @@ -54,7 +54,10 @@ KeyValueUnwrapRecordReader from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader from pypaimon.read.reader.shard_batch_reader import ShardBatchReader -from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap +from pypaimon.read.reader.partial_update_merge_function import \ + PartialUpdateMergeFunction +from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction, + SortMergeReaderWithMinHeap) from pypaimon.read.push_down_utils import _get_all_fields from pypaimon.read.split import Split from pypaimon.read.sliced_split import SlicedSplit @@ -482,7 +485,33 @@ def section_reader_supplier(self, section: List[SortedRun]) -> RecordReader: supplier = partial(self.kv_reader_supplier, file, self.deletion_file_readers.get(file.file_name, None)) data_readers.append(supplier) readers.append(ConcatRecordReader(data_readers)) - return SortMergeReaderWithMinHeap(readers, self.table.table_schema) + merge_function = self._build_merge_function() + return SortMergeReaderWithMinHeap( + readers, self.table.table_schema, merge_function=merge_function) + + def _build_merge_function(self): + """Pick the right MergeFunction implementation for the table's + ``merge-engine`` option. ``DEDUPLICATE`` is the default and the + only engine supported on the Python read path historically; + ``PARTIAL_UPDATE`` is now wired up to its dedicated + implementation. The remaining engines (``AGGREGATE`` / + ``FIRST_ROW``) used to silently degrade to dedupe — that quietly + produced wrong data — so we now raise an explicit + ``NotImplementedError`` instead, until they're ported. + """ + engine = self.table.options.merge_engine() + if engine == MergeEngine.DEDUPLICATE: + return DeduplicateMergeFunction() + if engine == MergeEngine.PARTIAL_UPDATE: + return PartialUpdateMergeFunction( + key_arity=len(self.trimmed_primary_key), + value_arity=self.value_arity, + ) + raise NotImplementedError( + "merge-engine '{}' is not implemented in pypaimon yet " + "(supported: deduplicate, partial-update). Use the Java " + "client or open an issue to track support.".format(engine.value) + ) def create_reader(self) -> RecordReader: # Create a dict mapping data file name to deletion file reader method diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py b/paimon-python/pypaimon/tests/test_partial_update_e2e.py new file mode 100644 index 000000000000..e56536a044ef --- /dev/null +++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py @@ -0,0 +1,219 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""End-to-end tests for the ``partial-update`` merge engine. + +Each test creates a PK table with ``merge-engine`` set to a particular +value, writes one or more batches, and reads back. Partial-update reads +must merge non-null fields across batches; ``deduplicate`` must keep +the latest row only; ``aggregation`` and ``first-row`` must raise +``NotImplementedError`` (until they are ported), since silently +treating them as deduplicate would corrupt the user's data. +""" + +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema + + +class PartialUpdateMergeEngineE2ETest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', True) + + cls.pa_schema = pa.schema([ + pa.field('id', pa.int64(), nullable=False), + ('a', pa.string()), + ('b', pa.string()), + ('c', pa.string()), + ]) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def _create_pk_table(self, table_name, merge_engine='partial-update'): + # bucket=1 so all rows for any PK land in the same bucket; this is + # what forces the read path through SortMergeReader instead of the + # raw_convertible / single-file fast path. partial-update merging + # only happens inside SortMergeReader. + schema = Schema.from_pyarrow_schema( + self.pa_schema, + primary_keys=['id'], + options={ + 'bucket': '1', + 'merge-engine': merge_engine, + }, + ) + full = 'default.{}'.format(table_name) + self.catalog.create_table(full, schema, False) + return self.catalog.get_table(full) + + def _write(self, table, rows): + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + try: + w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema)) + c.commit(w.prepare_commit()) + finally: + w.close() + c.close() + + def _read(self, table): + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + if not splits: + return [] + return sorted( + rb.new_read().to_arrow(splits).to_pylist(), + key=lambda r: r['id'], + ) + + # -- partial-update happy path --------------------------------------- + + def test_partial_update_two_writes_merges_non_null(self): + """Two writes against the same PK with disjoint non-null columns + must merge into a single row that has both columns populated. + """ + table = self._create_pk_table('two_writes') + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + + def test_partial_update_three_writes_merges_left_to_right(self): + """Three overlapping writes — each filling in a different column — + compose into the union of non-null fields. + """ + table = self._create_pk_table('three_writes') + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': 'C'}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + + def test_partial_update_disjoint_keys_unaffected(self): + """Three rows with disjoint PKs must all appear unchanged in the + output — partial-update only merges rows that share a PK. + """ + table = self._create_pk_table('disjoint_keys') + self._write(table, [ + {'id': 1, 'a': 'A1', 'b': None, 'c': None}, + {'id': 2, 'a': None, 'b': 'B2', 'c': None}, + {'id': 3, 'a': None, 'b': None, 'c': 'C3'}, + ]) + + self.assertEqual( + self._read(table), + [ + {'id': 1, 'a': 'A1', 'b': None, 'c': None}, + {'id': 2, 'a': None, 'b': 'B2', 'c': None}, + {'id': 3, 'a': None, 'b': None, 'c': 'C3'}, + ], + ) + + def test_partial_update_later_value_wins_over_earlier_non_null(self): + """When two writes both supply a non-null value for the same + column, the later value wins (latest non-null per field). + """ + table = self._create_pk_table('later_wins') + self._write(table, [{'id': 1, 'a': 'old', 'b': 'keep', 'c': None}]) + self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': 'fill'}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'new', 'b': 'keep', 'c': 'fill'}], + ) + + def test_partial_update_later_null_does_not_clobber_earlier_value(self): + """A later write with NULL for a column does NOT overwrite an + earlier non-null value for that column. + """ + table = self._create_pk_table('null_no_clobber') + self._write(table, [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}]) + self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + + # -- deduplicate (regression) ---------------------------------------- + + def test_deduplicate_engine_unchanged(self): + """The default ``deduplicate`` engine must keep the latest row + intact, including its NULLs — exactly the pre-PR behaviour. + """ + table = self._create_pk_table('dedupe', merge_engine='deduplicate') + self._write(table, [{'id': 1, 'a': 'old', 'b': 'old-b', 'c': 'old-c'}]) + self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'new', 'b': None, 'c': None}], + ) + + # -- engines we don't support yet ------------------------------------ + + def test_aggregation_engine_raises_not_implemented(self): + """Until ``aggregation`` is ported, reading an aggregation table + must raise rather than silently produce dedupe results.""" + table = self._create_pk_table('agg_unsupported', + merge_engine='aggregation') + self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}]) + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + with self.assertRaises(NotImplementedError) as cm: + rb.new_read().to_arrow(splits) + self.assertIn('aggregation', str(cm.exception)) + + def test_first_row_engine_raises_not_implemented(self): + """Until ``first-row`` is ported, reading a first-row table must + raise rather than silently produce dedupe results.""" + table = self._create_pk_table('first_row_unsupported', + merge_engine='first-row') + self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}]) + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + with self.assertRaises(NotImplementedError) as cm: + rb.new_read().to_arrow(splits) + self.assertIn('first-row', str(cm.exception)) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py new file mode 100644 index 000000000000..11e187766c48 --- /dev/null +++ b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py @@ -0,0 +1,177 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Direct unit tests for ``PartialUpdateMergeFunction``. + +Drives the merge function with synthetic ``KeyValue`` instances so the +contract is pinned down without going through the full read pipeline. +The end-to-end behaviour on real PK tables is exercised separately in +``test_partial_update_e2e.py``. +""" + +import unittest + +from pypaimon.read.reader.partial_update_merge_function import \ + PartialUpdateMergeFunction +from pypaimon.table.row.key_value import KeyValue +from pypaimon.table.row.row_kind import RowKind + + +def _kv(key, seq, row_kind, value): + """Build a fresh KeyValue for a (key, sequence, row_kind, value) tuple. + + ``key`` and ``value`` are tuples of primitives — the helper handles + layout (key, seq, row_kind_byte, value) so individual tests can stay + focused on the merge semantics. + """ + kv = KeyValue(key_arity=len(key), value_arity=len(value)) + kv.replace(tuple(key) + (seq, row_kind.value) + tuple(value)) + return kv + + +def _result_value(kv): + """Extract the value tuple out of a KeyValue produced by get_result().""" + return tuple(kv.value.get_field(i) for i in range(kv.value_arity)) + + +def _result_key(kv): + return tuple(kv.key.get_field(i) for i in range(kv.key_arity)) + + +class PartialUpdateMergeFunctionTest(unittest.TestCase): + + def test_single_insert_returns_value(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + result = mf.get_result() + + self.assertIsNotNone(result) + self.assertEqual(_result_key(result), (1,)) + self.assertEqual(_result_value(result), ('a', 'x')) + self.assertEqual(result.sequence_number, 100) + self.assertEqual(result.value_row_kind_byte, RowKind.INSERT.value) + + def test_second_insert_overwrites_non_null(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.INSERT, ('b', None))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('b', None)) + # Sequence number tracks the latest add(). + self.assertEqual(result.sequence_number, 101) + + def test_second_insert_fills_in_null(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'x'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_third_insert_continues_merge(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=3) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None, None))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'b', None))) + mf.add(_kv((1,), 102, RowKind.INSERT, (None, None, 'c'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'b', 'c')) + + def test_later_null_does_not_clobber_earlier_value(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, None))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_reset_between_keys(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + first = mf.get_result() + self.assertEqual(_result_key(first), (1,)) + self.assertEqual(_result_value(first), ('a', 'x')) + + mf.reset() + mf.add(_kv((2,), 200, RowKind.INSERT, ('b', 'y'))) + second = mf.get_result() + self.assertEqual(_result_key(second), (2,)) + self.assertEqual(_result_value(second), ('b', 'y')) + + def test_get_result_before_any_add_returns_none(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + self.assertIsNone(mf.get_result()) + + def test_update_after_is_treated_as_insert(self): + # Java's PartialUpdate accepts UPDATE_AFTER alongside INSERT in + # non-sequence-group mode (both are "add" kinds). Mirror that. + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.UPDATE_AFTER, (None, 'x'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_delete_row_raises_not_implemented(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + with self.assertRaises(NotImplementedError) as cm: + mf.add(_kv((1,), 101, RowKind.DELETE, (None, None))) + self.assertIn('DELETE', str(cm.exception)) + self.assertIn('ignore-delete', str(cm.exception)) + + def test_update_before_row_raises_not_implemented(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + with self.assertRaises(NotImplementedError) as cm: + mf.add(_kv((1,), 100, RowKind.UPDATE_BEFORE, (None, None))) + self.assertIn('UPDATE_BEFORE', str(cm.exception)) + + def test_result_is_decoupled_from_input_kv(self): + """The merge function must build a fresh result tuple — upstream + readers reuse a single KeyValue instance and call ``replace`` on + each iteration, so holding a reference to the input is unsafe. + """ + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + kv = _kv((1,), 100, RowKind.INSERT, ('a', 'x')) + mf.add(kv) + result = mf.get_result() + + # Mutate the input's underlying tuple to simulate a reused + # KeyValue being rebound to a different row. + kv.replace((999, 999, RowKind.INSERT.value, 'evil', 'evil')) + + # The previously-returned result must NOT be affected. + self.assertEqual(_result_key(result), (1,)) + self.assertEqual(_result_value(result), ('a', 'x')) + + +if __name__ == '__main__': + unittest.main() From 70b9682231d4635a82ae37dde4d56bd9926fb9e7 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 30 Apr 2026 22:23:54 +0800 Subject: [PATCH 2/8] [python] partial-update: refuse to run on tables that configure out-of-scope options Address review on r3168491328: previously `_build_merge_function()` dispatched on `merge-engine: partial-update` alone, so a table that ALSO configured sequence-group / per-field aggregator / ignore-delete / partial-update.remove-record-on-* would fall into the simple PartialUpdateMergeFunction and silently drop those semantics -- exactly the same silent-corruption pattern this PR exists to close, just reshaped from "silent dedupe" to "silent half-partial-update". Now the PARTIAL_UPDATE branch first scans the table options for any of the unsupported keys: * fields..sequence-group * fields..aggregate-function * fields.default-aggregate-function * ignore-delete (and the partial-update./first-row./deduplicate. prefixed aliases) when truthy * partial-update.remove-record-on-delete when truthy * partial-update.remove-record-on-sequence-group when truthy If any are set, raise NotImplementedError naming every offending key so the user can either drop them or escalate. Same shape as the existing AGGREGATE / FIRST_ROW raise. Tests: 7 new e2e cases in test_partial_update_e2e.py, one per option plus a regression case asserting `ignore-delete: false` (explicitly disabled) still passes through to the merge function. --- paimon-python/pypaimon/read/split_read.py | 67 +++++++++++++ .../pypaimon/tests/test_partial_update_e2e.py | 97 ++++++++++++++++++- 2 files changed, 159 insertions(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 0b3d2fe7fca2..faf4fb3683a1 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -498,11 +498,32 @@ def _build_merge_function(self): ``FIRST_ROW``) used to silently degrade to dedupe — that quietly produced wrong data — so we now raise an explicit ``NotImplementedError`` instead, until they're ported. + + For ``PARTIAL_UPDATE``, we also refuse to run when the table + configures any option whose semantics this port does not yet + implement (sequence-group, per-field aggregator overrides, + ignore-delete and friends). Without this guard those options + would be silently ignored and produce subtly wrong results — + the same anti-pattern this PR exists to close. """ engine = self.table.options.merge_engine() if engine == MergeEngine.DEDUPLICATE: return DeduplicateMergeFunction() if engine == MergeEngine.PARTIAL_UPDATE: + unsupported = self._partial_update_unsupported_options() + if unsupported: + raise NotImplementedError( + "merge-engine 'partial-update' is enabled together " + "with options that pypaimon does not yet implement: " + "{}. The supported subset is per-key last-non-null " + "merge with no sequence-group, no per-field " + "aggregator override, no ignore-delete and no " + "partial-update.remove-record-on-* flags. Use the " + "Java client for the full feature set, or open an " + "issue to track Python support.".format( + ", ".join(sorted(unsupported)) + ) + ) return PartialUpdateMergeFunction( key_arity=len(self.trimmed_primary_key), value_arity=self.value_arity, @@ -513,6 +534,52 @@ def _build_merge_function(self): "client or open an issue to track support.".format(engine.value) ) + # Boolean-valued options that, when truthy, opt the table into + # behaviour the Python PartialUpdateMergeFunction does not implement. + # Mirrors org.apache.paimon.CoreOptions and the fallback keys in + # PartialUpdateMergeFunction.java. + _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = ( + "ignore-delete", + "partial-update.ignore-delete", + "first-row.ignore-delete", + "deduplicate.ignore-delete", + "partial-update.remove-record-on-delete", + "partial-update.remove-record-on-sequence-group", + ) + _FIELDS_PREFIX = "fields." + _FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group" + _FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function" + _DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function" + + def _partial_update_unsupported_options(self): + """Return the set of option keys configured on this table that + ``PartialUpdateMergeFunction`` does not yet support. Empty set + means we can safely run the simple last-non-null merge. + """ + flagged = set() + raw = self.table.options.options.to_map() + for key, value in raw.items(): + if (key in self._PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS + and self._option_is_truthy(value)): + flagged.add(key) + elif key == self._DEFAULT_AGGREGATE_FUNCTION_KEY: + flagged.add(key) + elif key.startswith(self._FIELDS_PREFIX) and ( + key.endswith(self._FIELD_SEQUENCE_GROUP_SUFFIX) + or key.endswith(self._FIELD_AGGREGATE_FUNCTION_SUFFIX)): + flagged.add(key) + return flagged + + @staticmethod + def _option_is_truthy(raw): + if raw is None: + return False + if isinstance(raw, bool): + return raw + if isinstance(raw, str): + return raw.strip().lower() in ("true", "1", "yes", "on") + return bool(raw) + def create_reader(self) -> RecordReader: # Create a dict mapping data file name to deletion file reader method self._genarate_deletion_file_readers() diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py b/paimon-python/pypaimon/tests/test_partial_update_e2e.py index e56536a044ef..a7a29970a89c 100644 --- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py +++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py @@ -56,18 +56,22 @@ def setUpClass(cls): def tearDownClass(cls): shutil.rmtree(cls.tempdir, ignore_errors=True) - def _create_pk_table(self, table_name, merge_engine='partial-update'): + def _create_pk_table(self, table_name, merge_engine='partial-update', + extra_options=None): # bucket=1 so all rows for any PK land in the same bucket; this is # what forces the read path through SortMergeReader instead of the # raw_convertible / single-file fast path. partial-update merging # only happens inside SortMergeReader. + options = { + 'bucket': '1', + 'merge-engine': merge_engine, + } + if extra_options: + options.update(extra_options) schema = Schema.from_pyarrow_schema( self.pa_schema, primary_keys=['id'], - options={ - 'bucket': '1', - 'merge-engine': merge_engine, - }, + options=options, ) full = 'default.{}'.format(table_name) self.catalog.create_table(full, schema, False) @@ -214,6 +218,89 @@ def test_first_row_engine_raises_not_implemented(self): rb.new_read().to_arrow(splits) self.assertIn('first-row', str(cm.exception)) + # -- partial-update + out-of-scope option combinations --------------- + # + # When a user pairs ``merge-engine: partial-update`` with any option + # this port doesn't implement (sequence-group, per-field aggregator + # override, ignore-delete, partial-update.remove-record-on-*), we + # must raise rather than silently run the simple last-non-null merge + # — otherwise we'd reproduce the same silent-corruption pattern this + # PR exists to close. + + def _assert_partial_update_unsupported(self, table_name, extra_options, + expected_keys): + table = self._create_pk_table( + table_name, extra_options=extra_options) + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + with self.assertRaises(NotImplementedError) as cm: + rb.new_read().to_arrow(splits) + msg = str(cm.exception) + self.assertIn("partial-update", msg) + for key in expected_keys: + self.assertIn(key, msg, + "expected option key '{}' in error: {}".format(key, msg)) + + def test_partial_update_with_sequence_group_raises(self): + self._assert_partial_update_unsupported( + 'pu_seq_group', + {'fields.b.sequence-group': 'a'}, + ['fields.b.sequence-group'], + ) + + def test_partial_update_with_field_aggregate_function_raises(self): + self._assert_partial_update_unsupported( + 'pu_field_agg', + {'fields.a.aggregate-function': 'last_non_null_value'}, + ['fields.a.aggregate-function'], + ) + + def test_partial_update_with_default_aggregate_function_raises(self): + self._assert_partial_update_unsupported( + 'pu_default_agg', + {'fields.default-aggregate-function': 'last_non_null_value'}, + ['fields.default-aggregate-function'], + ) + + def test_partial_update_with_ignore_delete_raises(self): + self._assert_partial_update_unsupported( + 'pu_ignore_delete', + {'ignore-delete': 'true'}, + ['ignore-delete'], + ) + + def test_partial_update_with_remove_record_on_delete_raises(self): + self._assert_partial_update_unsupported( + 'pu_rrod', + {'partial-update.remove-record-on-delete': 'true'}, + ['partial-update.remove-record-on-delete'], + ) + + def test_partial_update_with_remove_record_on_sequence_group_raises(self): + self._assert_partial_update_unsupported( + 'pu_rrosg', + {'partial-update.remove-record-on-sequence-group': 'true'}, + ['partial-update.remove-record-on-sequence-group'], + ) + + def test_partial_update_with_explicit_ignore_delete_false_does_not_raise(self): + """Explicitly setting ignore-delete=false is equivalent to leaving + it unset and must not trip the guard.""" + table = self._create_pk_table( + 'pu_ignore_delete_false', + extra_options={'ignore-delete': 'false'}, + ) + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + if __name__ == '__main__': unittest.main() From 1c4b1f0c297092d5914f2a51c10db0af9a6a07db Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 30 Apr 2026 22:39:12 +0800 Subject: [PATCH 3/8] [python] partial-update: enforce NOT NULL inputs to mirror Java updateNonNullFields Java PartialUpdateMergeFunction.updateNonNullFields (line 177-188) raises IllegalArgumentException when an input field is null and the schema marks that field NOT NULL. The Python port previously absorbed such inputs silently, letting writes whose first value was null on a NOT NULL field land null in the accumulator. Changes: * PartialUpdateMergeFunction.__init__ takes an optional `nullables` list parallel to value indices. When given, every add() checks each null input against `nullables[i]` and raises ValueError on a NOT NULL field, matching Java semantics on every row (not just the first). When omitted, behaviour is unchanged (back-compat for direct callers). * MergeFileSplitRead snapshots the raw value-side schema as `value_fields` before _create_key_value_fields wraps it, then hands `[f.type.nullable for f in self.value_fields]` to the merge function. * Five new unit cases in test_partial_update_merge_function.py: first row null on NOT NULL raises, subsequent row null on NOT NULL raises, null on nullable field is absorbed, length-mismatch nullables raises, omitting nullables preserves the previous lenient behaviour. Result: with the existing guard in _build_merge_function (which refuses out-of-scope options) and the NOT NULL enforcement here, the simple last-non-null path is now feature-equivalent to Java's updateNonNullFields + getResult on the supported subset. --- .../reader/partial_update_merge_function.py | 33 +++++++++---- paimon-python/pypaimon/read/split_read.py | 5 ++ .../test_partial_update_merge_function.py | 49 +++++++++++++++++++ 3 files changed, 78 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py index 12e29e33eb8d..978b48011c54 100644 --- a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py +++ b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py @@ -55,9 +55,20 @@ class PartialUpdateMergeFunction: exhausted). """ - def __init__(self, key_arity: int, value_arity: int): + def __init__(self, key_arity: int, value_arity: int, + nullables: Optional[List[bool]] = None): self._key_arity = key_arity self._value_arity = value_arity + # Per-value-field nullable flags, parallel to value indices. When + # ``None``, no nullability check runs (preserves the contract for + # direct callers that don't have schema info handy). When given, + # mirrors Java's ``updateNonNullFields`` check: a null input on a + # NOT NULL field raises rather than being silently absorbed. + if nullables is not None and len(nullables) != value_arity: + raise ValueError( + "nullables length {} does not match value_arity {}".format( + len(nullables), value_arity)) + self._nullables = nullables # Lazily allocated on first add(); ``None`` means "no rows yet". self._accumulator: Optional[List[Any]] = None # Reference to the most recently added kv. We use it only to @@ -85,15 +96,19 @@ def add(self, kv: KeyValue) -> None: "UPDATE_BEFORE rows.".format(RowKind(row_kind_byte).to_string()) ) + # Mirror Java's reset() + updateNonNullFields(): the accumulator + # starts as all-null (equivalent to ``new GenericRow(arity)``) and + # each add() writes non-null inputs; null inputs are absorbed — + # except when the schema marks the field NOT NULL, in which case + # we raise to match Java's IllegalArgumentException check. if self._accumulator is None: - self._accumulator = [ - kv.value.get_field(i) for i in range(self._value_arity) - ] - else: - for i in range(self._value_arity): - v = kv.value.get_field(i) - if v is not None: - self._accumulator[i] = v + self._accumulator = [None] * self._value_arity + for i in range(self._value_arity): + v = kv.value.get_field(i) + if v is not None: + self._accumulator[i] = v + elif self._nullables is not None and not self._nullables[i]: + raise ValueError("Field {} can not be null".format(i)) self._latest_kv = kv def get_result(self) -> Optional[KeyValue]: diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index faf4fb3683a1..ea2e8cf4b103 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -101,6 +101,10 @@ def __init__( self.split = split self.row_tracking_enabled = row_tracking_enabled self.value_arity = len(read_type) + # Snapshot the raw value-side schema before _create_key_value_fields + # wraps it, so MergeFileSplitRead can hand per-value-field nullable + # flags to merge functions that mirror Java's NOT-NULL check. + self.value_fields = list(read_type) self.trimmed_primary_key = self.table.trimmed_primary_keys self.read_fields = read_type @@ -527,6 +531,7 @@ def _build_merge_function(self): return PartialUpdateMergeFunction( key_arity=len(self.trimmed_primary_key), value_arity=self.value_arity, + nullables=[f.type.nullable for f in self.value_fields], ) raise NotImplementedError( "merge-engine '{}' is not implemented in pypaimon yet " diff --git a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py index 11e187766c48..60dfc7198dfb 100644 --- a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py +++ b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py @@ -172,6 +172,55 @@ def test_result_is_decoupled_from_input_kv(self): self.assertEqual(_result_key(result), (1,)) self.assertEqual(_result_value(result), ('a', 'x')) + # -- NOT-NULL input validation (mirrors Java's updateNonNullFields) ---- + + def test_first_insert_with_null_for_not_null_field_raises(self): + """If the very first row writes null to a NOT NULL field, raise — + same input-validation Java does in updateNonNullFields().""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, False]) + mf.reset() + with self.assertRaises(ValueError) as cm: + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None))) + self.assertIn("Field 1", str(cm.exception)) + + def test_subsequent_insert_with_null_for_not_null_field_raises(self): + """A later null on a NOT NULL field must also raise — Java checks + on every add(), not just the first one.""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, False]) + mf.reset() + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x'))) + with self.assertRaises(ValueError) as cm: + mf.add(_kv((1,), 2, RowKind.INSERT, (None, None))) + self.assertIn("Field 1", str(cm.exception)) + + def test_null_for_nullable_field_is_absorbed(self): + """A null input on a nullable field is silently absorbed (existing + accumulator value wins) — the standard partial-update semantic.""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, True]) + mf.reset() + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x'))) + mf.add(_kv((1,), 2, RowKind.INSERT, (None, 'y'))) + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'y')) + + def test_nullables_length_mismatch_raises(self): + with self.assertRaises(ValueError): + PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True]) + + def test_no_nullables_arg_skips_check(self): + """Backward-compat: callers that don't pass ``nullables`` get the + previous behaviour (no NOT-NULL validation).""" + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + # Would have raised had we declared the second field NOT NULL. + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None))) + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', None)) + if __name__ == '__main__': unittest.main() From 0d58859d04c84b823b65194aab10e1a193e41487 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 1 May 2026 15:41:09 +0800 Subject: [PATCH 4/8] [python] partial-update: cover same-commit multi write_arrow as expectedFailure Reviewer asked to cover rows that land in the same data file -- multiple write_arrow() calls before a single prepare_commit(). Adding the cases revealed the writer-side / read-side gap upstream of this PR: KeyValueDataWriter._merge_data only does concat+sort (no merge function applied), so the flushed file holds duplicate primary keys; on read, _build_split_from_pack treats any single-file group as raw_convertible and routes through the fast path, skipping SortMergeReader and the merge-engine dispatch this PR adds. Fixing it requires either a merge buffer in KeyValueDataWriter (mirroring Java SortBufferWriteBuffer / MergeTreeWriter) or a tighter raw_convertible check that proves intra-file PK uniqueness -- both are write-path / scan-path restructuring outside this read-side merge-engine port. The two new cases are kept as unittest.expectedFailure so the gap stays visible and converts to passing regressions when the writer-side fix lands. --- .../pypaimon/tests/test_partial_update_e2e.py | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py b/paimon-python/pypaimon/tests/test_partial_update_e2e.py index a7a29970a89c..2f567918ec3a 100644 --- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py +++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py @@ -88,6 +88,24 @@ def _write(self, table, rows): w.close() c.close() + def _write_many(self, table, batches): + """Multiple ``write_arrow`` calls inside a single ``prepare_commit``. + + Mirrors the reviewer's question: rows that land in the same + underlying data file must still go through the merge-engine + dispatch; in-writer merging cannot silently degrade to dedupe. + """ + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + try: + for rows in batches: + w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema)) + c.commit(w.prepare_commit()) + finally: + w.close() + c.close() + def _read(self, table): rb = table.new_read_builder() splits = rb.new_scan().plan().splits() @@ -173,6 +191,76 @@ def test_partial_update_later_null_does_not_clobber_earlier_value(self): [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], ) + # -- single-commit, multiple write_arrow calls ----------------------- + # + # Reviewer concern (#7745): rows from multiple ``write_arrow`` calls + # inside a single ``prepare_commit`` may land in the same data file + # and bypass the merge-engine dispatch we added in + # ``MergeFileSplitRead._build_merge_function``. Verified: they do. + # + # Root cause is upstream of this PR. ``KeyValueDataWriter._merge_data`` + # simply ``concat + sort``s incoming batches without applying any + # merge function, so the flushed file holds multiple rows for the + # same primary key -- violating the Java LSM invariant "PK is unique + # within a file". On the read side, ``_build_split_from_pack`` then + # marks any single-file group as ``raw_convertible=True`` + # (split_generator.py:99-100), which routes the split through the + # raw-convertible fast path and skips ``SortMergeReader`` entirely. + # The merge-engine dispatch this PR adds only fires inside + # ``SortMergeReader``, so partial-update semantics are lost. + # + # Fixing this requires either (a) giving ``KeyValueDataWriter`` a + # merge buffer that applies the merge function during flush + # (mirrors Java ``SortBufferWriteBuffer`` / ``MergeTreeWriter``), + # or (b) tightening ``raw_convertible`` to require proof that the + # file contains no duplicate keys. Both are write-/scan-path + # restructuring, well outside the scope of this read-side + # merge-engine port. Tracked for a follow-up PR. + # + # The two cases below are kept as ``expectedFailure`` so the gap + # is visible and will turn into a passing regression once the + # writer-side fix lands. + + @unittest.expectedFailure + def test_partial_update_two_write_arrows_single_commit(self): + """Two ``write_arrow`` calls + one ``prepare_commit``: each + carries a disjoint non-null field; result must be the per-field + merge. + + Currently fails: see module-level note above. The flushed file + keeps both rows verbatim and the read split goes through the + raw-convertible fast path, so neither dedupe nor partial-update + merge runs. + """ + table = self._create_pk_table('two_writes_single_commit') + self._write_many(table, [ + [{'id': 1, 'a': 'A', 'b': None, 'c': None}], + [{'id': 1, 'a': None, 'b': 'B', 'c': None}], + ]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + + @unittest.expectedFailure + def test_partial_update_three_write_arrows_single_commit(self): + """Three ``write_arrow`` calls in a single commit must compose + into the union of non-null fields. Same expected-failure + condition as the two-write case above. + """ + table = self._create_pk_table('three_writes_single_commit') + self._write_many(table, [ + [{'id': 1, 'a': 'A', 'b': None, 'c': None}], + [{'id': 1, 'a': None, 'b': 'B', 'c': None}], + [{'id': 1, 'a': None, 'b': None, 'c': 'C'}], + ]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + # -- deduplicate (regression) ---------------------------------------- def test_deduplicate_engine_unchanged(self): From c784f0c87832d1a49b86eb0d3c67a263788e026a Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 1 May 2026 17:25:32 +0800 Subject: [PATCH 5/8] [python] Extract MergeFunction dispatch into a shared module Move DeduplicateMergeFunction (previously embedded at the end of sort_merge_reader.py) into its own module pypaimon/read/reader/ deduplicate_merge_function.py so it can be reused outside the read path. Add pypaimon/common/merge_engine_dispatch.py with a single build_merge_function entry point and partial_update_unsupported_options helper, both lifted verbatim from MergeFileSplitRead so the dispatch has exactly one implementation. MergeFileSplitRead._build_merge_function shrinks to a thin wrapper. This keeps the read path's behaviour byte-identical and prepares the write path (next commit) to pick its merge function through the same dispatch. --- .../pypaimon/common/merge_engine_dispatch.py | 129 ++++++++++++++++++ .../read/reader/deduplicate_merge_function.py | 50 +++++++ .../pypaimon/read/reader/sort_merge_reader.py | 18 +-- paimon-python/pypaimon/read/split_read.py | 107 ++------------- 4 files changed, 195 insertions(+), 109 deletions(-) create mode 100644 paimon-python/pypaimon/common/merge_engine_dispatch.py create mode 100644 paimon-python/pypaimon/read/reader/deduplicate_merge_function.py diff --git a/paimon-python/pypaimon/common/merge_engine_dispatch.py b/paimon-python/pypaimon/common/merge_engine_dispatch.py new file mode 100644 index 000000000000..2bdcf4a8b24a --- /dev/null +++ b/paimon-python/pypaimon/common/merge_engine_dispatch.py @@ -0,0 +1,129 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Centralised merge-engine dispatch. + +Both the read path (``MergeFileSplitRead``) and the write path +(``KeyValueDataWriter``'s in-memory merge buffer) need to pick a +``MergeFunction`` based on the table's ``merge-engine`` option. This +module is the single source of truth so the two sides cannot drift. + +Mirrors Java ``MergeFunctionFactory`` (paimon-core/.../mergetree/ +compact/MergeFunctionFactory.java). +""" + +from typing import List + +from pypaimon.common.options.core_options import MergeEngine +from pypaimon.read.reader.deduplicate_merge_function import \ + DeduplicateMergeFunction +from pypaimon.read.reader.partial_update_merge_function import \ + PartialUpdateMergeFunction + + +# Boolean-valued options that, when truthy, opt the table into +# behaviour the Python PartialUpdateMergeFunction does not implement. +# Mirrors org.apache.paimon.CoreOptions and the fallback keys in +# PartialUpdateMergeFunction.java. +_PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = ( + "ignore-delete", + "partial-update.ignore-delete", + "first-row.ignore-delete", + "deduplicate.ignore-delete", + "partial-update.remove-record-on-delete", + "partial-update.remove-record-on-sequence-group", +) +_FIELDS_PREFIX = "fields." +_FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group" +_FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function" +_DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function" + + +def build_merge_function( + *, + engine: MergeEngine, + raw_options: dict, + key_arity: int, + value_arity: int, + value_field_nullables: List[bool], +): + """Pick the MergeFunction for the table's ``merge-engine`` option. + + ``engine`` and ``raw_options`` come from the table's ``CoreOptions`` + (typically ``table.options.merge_engine()`` and + ``table.options.options.to_map()``). ``key_arity`` / ``value_arity`` + / ``value_field_nullables`` describe the value-side schema the + caller wants the merge function to operate on -- for the read path + this is the projected read schema, for the write path it's the full + table schema (minus primary keys). + """ + if engine == MergeEngine.DEDUPLICATE: + return DeduplicateMergeFunction() + if engine == MergeEngine.PARTIAL_UPDATE: + unsupported = partial_update_unsupported_options(raw_options) + if unsupported: + raise NotImplementedError( + "merge-engine 'partial-update' is enabled together with " + "options that pypaimon does not yet implement: {}. The " + "supported subset is per-key last-non-null merge with " + "no sequence-group, no per-field aggregator override, " + "no ignore-delete and no partial-update.remove-record-on-* " + "flags. Use the Java client for the full feature set, or " + "open an issue to track Python support.".format( + ", ".join(sorted(unsupported)) + ) + ) + return PartialUpdateMergeFunction( + key_arity=key_arity, + value_arity=value_arity, + nullables=list(value_field_nullables), + ) + raise NotImplementedError( + "merge-engine '{}' is not implemented in pypaimon yet " + "(supported: deduplicate, partial-update). Use the Java " + "client or open an issue to track support.".format(engine.value) + ) + + +def partial_update_unsupported_options(raw_options: dict): + """Return the set of option keys this table sets that + ``PartialUpdateMergeFunction`` does not yet support. Empty set + means we can safely run the simple last-non-null merge. + """ + flagged = set() + for key, value in raw_options.items(): + if (key in _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS + and _option_is_truthy(value)): + flagged.add(key) + elif key == _DEFAULT_AGGREGATE_FUNCTION_KEY: + flagged.add(key) + elif key.startswith(_FIELDS_PREFIX) and ( + key.endswith(_FIELD_SEQUENCE_GROUP_SUFFIX) + or key.endswith(_FIELD_AGGREGATE_FUNCTION_SUFFIX)): + flagged.add(key) + return flagged + + +def _option_is_truthy(raw): + if raw is None: + return False + if isinstance(raw, bool): + return raw + if isinstance(raw, str): + return raw.strip().lower() in ("true", "1", "yes", "on") + return bool(raw) diff --git a/paimon-python/pypaimon/read/reader/deduplicate_merge_function.py b/paimon-python/pypaimon/read/reader/deduplicate_merge_function.py new file mode 100644 index 000000000000..5b669aae5564 --- /dev/null +++ b/paimon-python/pypaimon/read/reader/deduplicate_merge_function.py @@ -0,0 +1,50 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Default merge function for primary-key tables. + +Mirrors Java ``DeduplicateMergeFunction`` -- for a run of KVs sharing +the same primary key, keep only the one with the highest sequence +number (by virtue of ``add`` being called in sequence-number order). +""" + +from typing import Optional + +from pypaimon.table.row.key_value import KeyValue + + +class DeduplicateMergeFunction: + """Keep only the latest KV per primary key. + + Used by both the read path (``SortMergeReaderWithMinHeap``) and the + write path (``KeyValueDataWriter`` in-memory merge buffer) -- the + latter is what enforces the LSM "PK unique within a file" + invariant on flush. + """ + + def __init__(self): + self.latest_kv: Optional[KeyValue] = None + + def reset(self) -> None: + self.latest_kv = None + + def add(self, kv: KeyValue) -> None: + self.latest_kv = kv + + def get_result(self) -> Optional[KeyValue]: + return self.latest_kv diff --git a/paimon-python/pypaimon/read/reader/sort_merge_reader.py b/paimon-python/pypaimon/read/reader/sort_merge_reader.py index 2dcf152e601b..0904a6fb46f2 100644 --- a/paimon-python/pypaimon/read/reader/sort_merge_reader.py +++ b/paimon-python/pypaimon/read/reader/sort_merge_reader.py @@ -19,6 +19,8 @@ import heapq from typing import Any, Callable, List, Optional +from pypaimon.read.reader.deduplicate_merge_function import \ + DeduplicateMergeFunction from pypaimon.read.reader.iface.record_iterator import RecordIterator from pypaimon.read.reader.iface.record_reader import RecordReader from pypaimon.schema.data_types import DataField, Keyword @@ -130,22 +132,6 @@ def _next_impl(self): return True -class DeduplicateMergeFunction: - """A MergeFunction where key is primary key (unique) and value is the full record, only keep the latest one.""" - - def __init__(self): - self.latest_kv = None - - def reset(self) -> None: - self.latest_kv = None - - def add(self, kv: KeyValue): - self.latest_kv = kv - - def get_result(self) -> Optional[KeyValue]: - return self.latest_kv - - class Element: def __init__(self, kv: KeyValue, iterator: RecordIterator[KeyValue], reader: RecordReader[KeyValue]): self.kv = kv diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index ea2e8cf4b103..eae686342799 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -21,7 +21,8 @@ from functools import partial from typing import Callable, List, Optional, Tuple -from pypaimon.common.options.core_options import CoreOptions, MergeEngine +from pypaimon.common.merge_engine_dispatch import build_merge_function +from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.predicate import Predicate from pypaimon.deletionvectors import ApplyDeletionVectorReader from pypaimon.deletionvectors.deletion_vector import DeletionVector @@ -54,10 +55,7 @@ KeyValueUnwrapRecordReader from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader from pypaimon.read.reader.shard_batch_reader import ShardBatchReader -from pypaimon.read.reader.partial_update_merge_function import \ - PartialUpdateMergeFunction -from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction, - SortMergeReaderWithMinHeap) +from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap from pypaimon.read.push_down_utils import _get_all_fields from pypaimon.read.split import Split from pypaimon.read.sliced_split import SlicedSplit @@ -494,96 +492,19 @@ def section_reader_supplier(self, section: List[SortedRun]) -> RecordReader: readers, self.table.table_schema, merge_function=merge_function) def _build_merge_function(self): - """Pick the right MergeFunction implementation for the table's - ``merge-engine`` option. ``DEDUPLICATE`` is the default and the - only engine supported on the Python read path historically; - ``PARTIAL_UPDATE`` is now wired up to its dedicated - implementation. The remaining engines (``AGGREGATE`` / - ``FIRST_ROW``) used to silently degrade to dedupe — that quietly - produced wrong data — so we now raise an explicit - ``NotImplementedError`` instead, until they're ported. - - For ``PARTIAL_UPDATE``, we also refuse to run when the table - configures any option whose semantics this port does not yet - implement (sequence-group, per-field aggregator overrides, - ignore-delete and friends). Without this guard those options - would be silently ignored and produce subtly wrong results — - the same anti-pattern this PR exists to close. - """ - engine = self.table.options.merge_engine() - if engine == MergeEngine.DEDUPLICATE: - return DeduplicateMergeFunction() - if engine == MergeEngine.PARTIAL_UPDATE: - unsupported = self._partial_update_unsupported_options() - if unsupported: - raise NotImplementedError( - "merge-engine 'partial-update' is enabled together " - "with options that pypaimon does not yet implement: " - "{}. The supported subset is per-key last-non-null " - "merge with no sequence-group, no per-field " - "aggregator override, no ignore-delete and no " - "partial-update.remove-record-on-* flags. Use the " - "Java client for the full feature set, or open an " - "issue to track Python support.".format( - ", ".join(sorted(unsupported)) - ) - ) - return PartialUpdateMergeFunction( - key_arity=len(self.trimmed_primary_key), - value_arity=self.value_arity, - nullables=[f.type.nullable for f in self.value_fields], - ) - raise NotImplementedError( - "merge-engine '{}' is not implemented in pypaimon yet " - "(supported: deduplicate, partial-update). Use the Java " - "client or open an issue to track support.".format(engine.value) - ) + """Pick the MergeFunction for the table's ``merge-engine`` option. - # Boolean-valued options that, when truthy, opt the table into - # behaviour the Python PartialUpdateMergeFunction does not implement. - # Mirrors org.apache.paimon.CoreOptions and the fallback keys in - # PartialUpdateMergeFunction.java. - _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = ( - "ignore-delete", - "partial-update.ignore-delete", - "first-row.ignore-delete", - "deduplicate.ignore-delete", - "partial-update.remove-record-on-delete", - "partial-update.remove-record-on-sequence-group", - ) - _FIELDS_PREFIX = "fields." - _FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group" - _FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function" - _DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function" - - def _partial_update_unsupported_options(self): - """Return the set of option keys configured on this table that - ``PartialUpdateMergeFunction`` does not yet support. Empty set - means we can safely run the simple last-non-null merge. + Delegates to the shared dispatch in + ``pypaimon.common.merge_engine_dispatch`` so the read path and + the in-memory merge buffer on the write path cannot drift. """ - flagged = set() - raw = self.table.options.options.to_map() - for key, value in raw.items(): - if (key in self._PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS - and self._option_is_truthy(value)): - flagged.add(key) - elif key == self._DEFAULT_AGGREGATE_FUNCTION_KEY: - flagged.add(key) - elif key.startswith(self._FIELDS_PREFIX) and ( - key.endswith(self._FIELD_SEQUENCE_GROUP_SUFFIX) - or key.endswith(self._FIELD_AGGREGATE_FUNCTION_SUFFIX)): - flagged.add(key) - return flagged - - @staticmethod - def _option_is_truthy(raw): - if raw is None: - return False - if isinstance(raw, bool): - return raw - if isinstance(raw, str): - return raw.strip().lower() in ("true", "1", "yes", "on") - return bool(raw) + return build_merge_function( + engine=self.table.options.merge_engine(), + raw_options=self.table.options.options.to_map(), + key_arity=len(self.trimmed_primary_key), + value_arity=self.value_arity, + value_field_nullables=[f.type.nullable for f in self.value_fields], + ) def create_reader(self) -> RecordReader: # Create a dict mapping data file name to deletion file reader method From cd50d7e56ee94eb07f93b12f62284c1354a0f3b3 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 1 May 2026 17:25:50 +0800 Subject: [PATCH 6/8] [python] Add in-memory merge buffer to KeyValueDataWriter KeyValueDataWriter._merge_data previously did concat + sort only -- no merge function was ever applied -- so a primary-key flush could emit a single data file containing two or more rows for the same primary key. The read-side raw_convertible fast path (split_generator.py:99-100) treats single-file PK splits as "merge-free" and skips SortMergeReader, which produced silent multi-row-per-PK results in master regardless of merge engine. Mirror Java MergeTreeWriter.flushWriteBuffer + SortBufferWriteBuffer.MergeIterator.advanceIfNeeded (paimon-core/.../mergetree/SortBufferWriteBuffer.java:163-293): fold each run of equal-PK rows in the sorted pending buffer through the table's MergeFunction (reset + add + get_result) before writing to the file. The flushed file therefore satisfies the LSM "PK unique within a file" invariant the read side relies on. FileStoreWrite._build_pk_merge_function picks the merge function through the shared dispatch added in the previous commit. PARTIAL_UPDATE with out-of-scope options keeps the explicit raise introduced in #7745 -- silently degrading there would reintroduce the same data-quality risk this PR exists to close. Wholly unsupported engines (aggregation / first-row) fall back to DeduplicateMergeFunction on the write side so the file still keeps the LSM invariant; the read side's dispatch still raises, so users get an explicit error before observing wrong-engine data. Sister to #7745. Closes the writer-side gap that #7745's expectedFailure cases exposed. --- .../pypaimon/write/file_store_write.py | 68 ++++++++++- .../write/writer/key_value_data_writer.py | 106 +++++++++++++++++- 2 files changed, 172 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 75b1d3a7d708..2c49f4104101 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -77,7 +77,8 @@ def max_seq_number(): partition=partition, bucket=bucket, max_seq_number=max_seq_number(), - options=options) + options=options, + merge_function=self._build_pk_merge_function()) else: seq_number = 0 if self.table.bucket_mode() == BucketMode.BUCKET_UNAWARE else max_seq_number() return AppendOnlyDataWriter( @@ -89,6 +90,71 @@ def max_seq_number(): write_cols=self.write_cols ) + def _build_pk_merge_function(self): + """Build the merge function for the in-memory write buffer. + + Shares ``merge_engine_dispatch.build_merge_function`` with the + read path so the supported engines (deduplicate, partial-update + with no out-of-scope options) cannot drift between sides. + + For wholly unsupported engines (``aggregation`` / ``first-row``) + we fall back to ``DeduplicateMergeFunction`` here so the writer + still maintains the LSM "PK unique within a file" invariant. + The read path's dispatch still raises ``NotImplementedError``, + so the user gets an explicit error before they observe wrong- + engine data; the fallback only narrows the damage to "file is + deduped, not aggregated" rather than the silent multi-row-per- + PK corruption that existed pre-PR. + + Partial-update with out-of-scope options (sequence-group, + per-field aggregator, ignore-delete, remove-record-on-*) does + **not** fall back -- silently degrading to dedupe there is a + live corruption pattern this PR exists to close, so we re-raise + and let the writer fail explicitly at flush time. + + The value-side schema must match the layout + ``KeyValueDataWriter`` flushes -- ``_add_system_fields`` keeps + every original user column on the value side (the primary keys + are duplicated as ``_KEY_`` columns to the left of the + value side). So ``value_arity`` here is ``len(table.fields)``, + not ``len(table.fields) - len(primary_keys)``. + """ + from pypaimon.common.merge_engine_dispatch import ( + build_merge_function, partial_update_unsupported_options) + from pypaimon.common.options.core_options import MergeEngine + from pypaimon.read.reader.deduplicate_merge_function import \ + DeduplicateMergeFunction + + engine = self.options.merge_engine() + raw_options = self.options.options.to_map() + + # PARTIAL_UPDATE + out-of-scope option: never silently fall + # back -- forward the read-side error verbatim so writes fail + # at flush time rather than corrupt the file. + if engine == MergeEngine.PARTIAL_UPDATE \ + and partial_update_unsupported_options(raw_options): + return build_merge_function( + engine=engine, raw_options=raw_options, + key_arity=len(self.table.trimmed_primary_keys), + value_arity=len(self.table.table_schema.fields), + value_field_nullables=[ + f.type.nullable for f in self.table.table_schema.fields], + ) + + all_value_fields = self.table.table_schema.fields + try: + return build_merge_function( + engine=engine, raw_options=raw_options, + key_arity=len(self.table.trimmed_primary_keys), + value_arity=len(all_value_fields), + value_field_nullables=[ + f.type.nullable for f in all_value_fields], + ) + except NotImplementedError: + # Wholly unsupported engine -- maintain PK uniqueness via + # dedupe; read-side will still raise. + return DeduplicateMergeFunction() + def _has_blob_columns(self) -> bool: """Check if the table schema contains blob columns.""" for field in self.table.table_schema.fields: diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py b/paimon-python/pypaimon/write/writer/key_value_data_writer.py index 1b2dde7d1dfb..1159448096d5 100644 --- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py +++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py @@ -16,14 +16,38 @@ # limitations under the License. ################################################################################ +from typing import List + import pyarrow as pa import pyarrow.compute as pc +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.read.reader.deduplicate_merge_function import \ + DeduplicateMergeFunction +from pypaimon.table.row.key_value import KeyValue from pypaimon.write.writer.data_writer import DataWriter class KeyValueDataWriter(DataWriter): - """Data writer for primary key tables with system fields and sorting.""" + """Data writer for primary key tables with system fields and sorting. + + On flush, applies the table's ``MergeFunction`` to fold rows that + share a primary key down to a single row, mirroring Java + ``SortBufferWriteBuffer.forEach`` / + ``MergeIterator.advanceIfNeeded`` (paimon-core/.../mergetree/ + SortBufferWriteBuffer.java). This is what enforces the LSM "PK + unique within a file" invariant the read-side ``raw_convertible`` + fast path relies on. + """ + + def __init__(self, table, partition, bucket, max_seq_number, + options=None, write_cols=None, merge_function=None): + super().__init__(table, partition, bucket, max_seq_number, + options, write_cols) + # Defaults to deduplicate so direct callers (tests / future code + # paths that don't go through FileStoreWrite) don't accidentally + # skip the merge step entirely. + self._merge_function = merge_function or DeduplicateMergeFunction() def _process_data(self, data: pa.RecordBatch) -> pa.Table: enhanced_data = self._add_system_fields(data) @@ -33,6 +57,86 @@ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: combined = pa.concat_tables([existing_data, new_data]) return self._sort_by_primary_key(combined) + def prepare_commit(self) -> List[DataFileMeta]: + if self.pending_data is not None and self.pending_data.num_rows > 0: + self.pending_data = self._merge_pending_by_pk(self.pending_data) + return super().prepare_commit() + + def _check_and_roll_if_needed(self): + # Mirror Java MergeTreeWriter: collapse same-PK runs *before* + # slicing for size, so each flushed file individually maintains + # PK uniqueness even when a single buffer spans multiple files. + if self.pending_data is not None and self.pending_data.num_rows > 0: + self.pending_data = self._merge_pending_by_pk(self.pending_data) + super()._check_and_roll_if_needed() + + def _merge_pending_by_pk(self, data: pa.Table) -> pa.Table: + """Fold same-PK runs in ``data`` using ``self._merge_function``. + + Mirrors Java ``MergeIterator.advanceIfNeeded`` + (SortBufferWriteBuffer.java:241-268). ``data`` is required to + already be sorted by ``(primary_key, _SEQUENCE_NUMBER)`` -- + ``_process_data`` / ``_merge_data`` enforce that. + """ + n = data.num_rows + if n < 2: + # Single-row buffer cannot have duplicates; sidestep the + # row-by-row pyarrow round-trip in the common streaming case. + return data + + rows = data.to_pylist() + col_names = data.schema.names + key_arity = len(self.trimmed_primary_keys) + # System fields sit at indices [key_arity, key_arity + 1] (the + # _SEQUENCE_NUMBER and _VALUE_KIND columns added by + # _add_system_fields). Everything to the right is the value side. + value_arity = len(col_names) - key_arity - 2 + + merged_rows: List[dict] = [] + i = 0 + while i < n: + j = i + first_key = self._key_tuple(rows[i], col_names, key_arity) + while j < n and \ + self._key_tuple(rows[j], col_names, key_arity) == first_key: + j += 1 + run = rows[i:j] + self._merge_function.reset() + for r in run: + kv = KeyValue(key_arity, value_arity) + kv.replace(self._row_to_tuple(r, col_names)) + self._merge_function.add(kv) + result_kv = self._merge_function.get_result() + if result_kv is not None: + merged_rows.append( + self._kv_to_row(result_kv, col_names, + key_arity, value_arity)) + i = j + + if not merged_rows: + return data.slice(0, 0) + return pa.Table.from_pylist(merged_rows, schema=data.schema) + + @staticmethod + def _key_tuple(row: dict, col_names: List[str], key_arity: int) -> tuple: + return tuple(row[col_names[i]] for i in range(key_arity)) + + @staticmethod + def _row_to_tuple(row: dict, col_names: List[str]) -> tuple: + return tuple(row[name] for name in col_names) + + @staticmethod + def _kv_to_row(kv: KeyValue, col_names: List[str], + key_arity: int, value_arity: int) -> dict: + out = {} + for i in range(key_arity): + out[col_names[i]] = kv.key.get_field(i) + out[col_names[key_arity]] = kv.sequence_number + out[col_names[key_arity + 1]] = kv.value_row_kind_byte + for i in range(value_arity): + out[col_names[key_arity + 2 + i]] = kv.value.get_field(i) + return out + def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch: """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND.""" num_rows = data.num_rows From bb3aef82377fa9a76155a27e2282342b0fcdb9a3 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 1 May 2026 17:26:04 +0800 Subject: [PATCH 7/8] [python] partial-update: tests covering the writer-side merge buffer Drop the unittest.expectedFailure decorators on the two same-commit multi write_arrow cases #7745 added: the writer-side merge buffer now folds same-PK runs before flush, so they pass. Add a same-commit deduplicate regression (test_deduplicate_two_write_arrows_single_commit) so the bug master silently returned both rows on the default merge engine cannot come back undetected. Add unit coverage in tests/test_write_merge_buffer.py exercising KeyValueDataWriter._merge_pending_by_pk directly with synthetic pa.Table inputs: dedupe collapse / disjoint keys / partial-update fold across two and three writes / later-null does not clobber / empty buffer / single-row fast path / get_result returning None drops the run. Adjust two e2e cases that previously asserted a NotImplementedError at read time: with the writer-side dispatch in place the unsupported- engine fallback runs at write time but read still raises, and the partial-update + unsupported-option cases now surface their NotImplementedError on the writer's first flush. Update the assertion sites accordingly. reader_primary_key_test.test_pk_multi_write_once_commit drops a TODO that explicitly documented the missing merge: with the writer-side fold now in place, user_id=2's two writes deduplicate to the latest row, so the expected table no longer contains a duplicate PK. --- .../pypaimon/tests/reader_primary_key_test.py | 13 +- .../pypaimon/tests/test_partial_update_e2e.py | 92 ++++----- .../pypaimon/tests/test_write_merge_buffer.py | 192 ++++++++++++++++++ 3 files changed, 243 insertions(+), 54 deletions(-) create mode 100644 paimon-python/pypaimon/tests/test_write_merge_buffer.py diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py b/paimon-python/pypaimon/tests/reader_primary_key_test.py index 541dabe895b8..6baddc5f38cb 100644 --- a/paimon-python/pypaimon/tests/reader_primary_key_test.py +++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py @@ -226,12 +226,15 @@ def test_pk_multi_write_once_commit(self): read_builder = table.new_read_builder() actual = self._read_test_table(read_builder).sort_by('user_id') - # TODO support pk merge feature when multiple write + # The in-memory merge buffer in KeyValueDataWriter folds the + # two writes for user_id=2 down to the latest row before flush + # (default merge engine is deduplicate), so the PK appears once + # with the second batch's value. expected = pa.Table.from_pydict({ - 'user_id': [1, 2, 2, 3, 4, 5, 7, 8], - 'item_id': [1001, 1002, 1002, 1003, 1004, 1005, 1007, 1008], - 'behavior': ['a', 'b', 'b-new', 'c', None, 'e', 'g', 'h'], - 'dt': ['p1', 'p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'], + 'user_id': [1, 2, 3, 4, 5, 7, 8], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1007, 1008], + 'behavior': ['a', 'b-new', 'c', None, 'e', 'g', 'h'], + 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'], }, schema=self.pa_schema) self.assertEqual(actual, expected) diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py b/paimon-python/pypaimon/tests/test_partial_update_e2e.py index 2f567918ec3a..abebe91cf3a1 100644 --- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py +++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py @@ -193,44 +193,16 @@ def test_partial_update_later_null_does_not_clobber_earlier_value(self): # -- single-commit, multiple write_arrow calls ----------------------- # - # Reviewer concern (#7745): rows from multiple ``write_arrow`` calls - # inside a single ``prepare_commit`` may land in the same data file - # and bypass the merge-engine dispatch we added in - # ``MergeFileSplitRead._build_merge_function``. Verified: they do. - # - # Root cause is upstream of this PR. ``KeyValueDataWriter._merge_data`` - # simply ``concat + sort``s incoming batches without applying any - # merge function, so the flushed file holds multiple rows for the - # same primary key -- violating the Java LSM invariant "PK is unique - # within a file". On the read side, ``_build_split_from_pack`` then - # marks any single-file group as ``raw_convertible=True`` - # (split_generator.py:99-100), which routes the split through the - # raw-convertible fast path and skips ``SortMergeReader`` entirely. - # The merge-engine dispatch this PR adds only fires inside - # ``SortMergeReader``, so partial-update semantics are lost. - # - # Fixing this requires either (a) giving ``KeyValueDataWriter`` a - # merge buffer that applies the merge function during flush - # (mirrors Java ``SortBufferWriteBuffer`` / ``MergeTreeWriter``), - # or (b) tightening ``raw_convertible`` to require proof that the - # file contains no duplicate keys. Both are write-/scan-path - # restructuring, well outside the scope of this read-side - # merge-engine port. Tracked for a follow-up PR. - # - # The two cases below are kept as ``expectedFailure`` so the gap - # is visible and will turn into a passing regression once the - # writer-side fix lands. + # The in-memory merge buffer added to ``KeyValueDataWriter`` runs + # the merge function on flush, so rows from multiple ``write_arrow`` + # calls that share a primary key are folded into a single row before + # the data file is written. The flushed file therefore satisfies the + # LSM "PK unique within a file" invariant the read-side + # ``raw_convertible`` fast path relies on. - @unittest.expectedFailure def test_partial_update_two_write_arrows_single_commit(self): """Two ``write_arrow`` calls + one ``prepare_commit``: each - carries a disjoint non-null field; result must be the per-field - merge. - - Currently fails: see module-level note above. The flushed file - keeps both rows verbatim and the read split goes through the - raw-convertible fast path, so neither dedupe nor partial-update - merge runs. + carries a disjoint non-null field; result is the per-field merge. """ table = self._create_pk_table('two_writes_single_commit') self._write_many(table, [ @@ -243,11 +215,9 @@ def test_partial_update_two_write_arrows_single_commit(self): [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], ) - @unittest.expectedFailure def test_partial_update_three_write_arrows_single_commit(self): - """Three ``write_arrow`` calls in a single commit must compose - into the union of non-null fields. Same expected-failure - condition as the two-write case above. + """Three ``write_arrow`` calls in a single commit compose into + the union of non-null fields. """ table = self._create_pk_table('three_writes_single_commit') self._write_many(table, [ @@ -276,11 +246,38 @@ def test_deduplicate_engine_unchanged(self): [{'id': 1, 'a': 'new', 'b': None, 'c': None}], ) + def test_deduplicate_two_write_arrows_single_commit(self): + """Pre-PR master silently returned both rows because the + flushed file held two records sharing a primary key. With the + in-memory merge buffer in place, ``deduplicate`` collapses + same-PK rows in a single commit too -- LSM "PK unique within a + file" invariant restored. + """ + table = self._create_pk_table( + 'dedupe_two_writes_single_commit', + merge_engine='deduplicate', + ) + self._write_many(table, [ + [{'id': 1, 'a': 'first', 'b': 'old', 'c': None}], + [{'id': 1, 'a': 'second', 'b': 'new', 'c': None}], + ]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'second', 'b': 'new', 'c': None}], + ) + # -- engines we don't support yet ------------------------------------ def test_aggregation_engine_raises_not_implemented(self): - """Until ``aggregation`` is ported, reading an aggregation table - must raise rather than silently produce dedupe results.""" + """Until ``aggregation`` is ported, reading from an aggregation + table must raise rather than silently produce dedupe results. + + The write path's in-memory merge buffer falls back to dedupe + for wholly unsupported engines so the file still keeps the LSM + "PK unique within a file" invariant -- but the read-side + dispatch still raises explicitly, which is what protects users. + """ table = self._create_pk_table('agg_unsupported', merge_engine='aggregation') self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}]) @@ -293,8 +290,7 @@ def test_aggregation_engine_raises_not_implemented(self): self.assertIn('aggregation', str(cm.exception)) def test_first_row_engine_raises_not_implemented(self): - """Until ``first-row`` is ported, reading a first-row table must - raise rather than silently produce dedupe results.""" + """Same as the aggregation case above for ``first-row``.""" table = self._create_pk_table('first_row_unsupported', merge_engine='first-row') self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}]) @@ -317,15 +313,13 @@ def test_first_row_engine_raises_not_implemented(self): def _assert_partial_update_unsupported(self, table_name, extra_options, expected_keys): + # Shared dispatch runs at write time too, so the unsupported- + # option error surfaces on the first ``write_arrow`` flush + # rather than waiting for read. table = self._create_pk_table( table_name, extra_options=extra_options) - self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) - self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) - - rb = table.new_read_builder() - splits = rb.new_scan().plan().splits() with self.assertRaises(NotImplementedError) as cm: - rb.new_read().to_arrow(splits) + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) msg = str(cm.exception) self.assertIn("partial-update", msg) for key in expected_keys: diff --git a/paimon-python/pypaimon/tests/test_write_merge_buffer.py b/paimon-python/pypaimon/tests/test_write_merge_buffer.py new file mode 100644 index 000000000000..1f95ae5cd6f0 --- /dev/null +++ b/paimon-python/pypaimon/tests/test_write_merge_buffer.py @@ -0,0 +1,192 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Unit tests for ``KeyValueDataWriter._merge_pending_by_pk``. + +Drives the in-memory merge buffer with synthetic ``pa.Table`` inputs +to pin down the merge-function dispatch independently of the rest of +the catalog/write stack. Mirrors the per-key fold loop in Java +``SortBufferWriteBuffer.MergeIterator.advanceIfNeeded``. +""" + +import unittest +from unittest.mock import Mock + +import pyarrow as pa + +from pypaimon.read.reader.deduplicate_merge_function import \ + DeduplicateMergeFunction +from pypaimon.read.reader.partial_update_merge_function import \ + PartialUpdateMergeFunction +from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter + + +# Layout matches what ``KeyValueDataWriter._add_system_fields`` emits: +# ``[_KEY_id, _SEQUENCE_NUMBER, _VALUE_KIND, id, a, b]``. ``id`` is +# duplicated on the value side because the value layout in Paimon's +# row tuple includes every original column. +_SCHEMA = pa.schema([ + pa.field('_KEY_id', pa.int64(), nullable=False), + pa.field('_SEQUENCE_NUMBER', pa.int64(), nullable=False), + pa.field('_VALUE_KIND', pa.int8(), nullable=False), + pa.field('id', pa.int64(), nullable=False), + pa.field('a', pa.string()), + pa.field('b', pa.string()), +]) + + +def _row(pk, seq, a, b): + return { + '_KEY_id': pk, + '_SEQUENCE_NUMBER': seq, + '_VALUE_KIND': 0, + 'id': pk, + 'a': a, + 'b': b, + } + + +class _Harness(KeyValueDataWriter): + """Bypass ``DataWriter.__init__`` to keep the tests focused on the + merge step. ``_merge_pending_by_pk`` only needs ``trimmed_primary_keys`` + and ``_merge_function``. + """ + + def __init__(self, merge_function): + self.trimmed_primary_keys = ['id'] + self._merge_function = merge_function + + +class WriteMergeBufferTest(unittest.TestCase): + + # -- deduplicate ------------------------------------------------------ + + def test_dedupe_collapses_same_pk_run_to_latest(self): + writer = _Harness(DeduplicateMergeFunction()) + data = pa.Table.from_pylist( + [_row(1, 1, 'old', None), _row(1, 2, 'new', None)], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.num_rows, 1) + self.assertEqual( + out.to_pylist(), + [_row(1, 2, 'new', None)], + ) + + def test_dedupe_keeps_disjoint_keys(self): + writer = _Harness(DeduplicateMergeFunction()) + data = pa.Table.from_pylist( + [_row(1, 1, 'A', None), + _row(2, 2, 'B', None), + _row(3, 3, 'C', None)], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.num_rows, 3) + self.assertEqual( + sorted(out.to_pylist(), key=lambda r: r['id']), + [_row(1, 1, 'A', None), + _row(2, 2, 'B', None), + _row(3, 3, 'C', None)], + ) + + # -- partial-update --------------------------------------------------- + + def _partial_update(self): + # 4 value-side columns (id, a, b — but the layout has id duplicated + # so value_arity is 3 here: id + a + b). + return PartialUpdateMergeFunction( + key_arity=1, value_arity=3, nullables=[True, True, True]) + + def test_partial_update_merges_non_null_per_field(self): + writer = _Harness(self._partial_update()) + data = pa.Table.from_pylist( + [_row(1, 1, 'A', None), _row(1, 2, None, 'B')], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.num_rows, 1) + self.assertEqual(out.to_pylist(), [_row(1, 2, 'A', 'B')]) + + def test_partial_update_three_writes_compose(self): + writer = _Harness(self._partial_update()) + data = pa.Table.from_pylist( + [_row(1, 1, 'A', None), + _row(1, 2, None, 'B'), + _row(1, 3, None, None)], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.to_pylist(), [_row(1, 3, 'A', 'B')]) + + def test_partial_update_later_null_does_not_clobber_earlier_value(self): + writer = _Harness(self._partial_update()) + data = pa.Table.from_pylist( + [_row(1, 1, 'KEEP', 'B'), _row(1, 2, None, None)], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.to_pylist(), [_row(1, 2, 'KEEP', 'B')]) + + # -- edge cases ------------------------------------------------------- + + def test_empty_buffer_returns_empty(self): + writer = _Harness(DeduplicateMergeFunction()) + empty = pa.Table.from_pylist([], schema=_SCHEMA) + out = writer._merge_pending_by_pk(empty) + self.assertEqual(out.num_rows, 0) + + def test_single_row_buffer_skips_merge(self): + # Mock to confirm the merge function isn't invoked: a single + # row cannot have duplicates, so we sidestep the to_pylist + # round-trip. + mock_mf = Mock() + writer = _Harness(mock_mf) + data = pa.Table.from_pylist( + [_row(1, 1, 'X', None)], schema=_SCHEMA) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.num_rows, 1) + mock_mf.reset.assert_not_called() + mock_mf.add.assert_not_called() + mock_mf.get_result.assert_not_called() + + def test_get_result_none_drops_pk_run(self): + # Future-proof: contract says ``get_result`` returning ``None`` + # means the entire PK group should be dropped. + class DropAll: + def reset(self): + pass + + def add(self, _): + pass + + def get_result(self): + return None + + writer = _Harness(DropAll()) + data = pa.Table.from_pylist( + [_row(1, 1, 'A', None), _row(1, 2, 'B', None)], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.num_rows, 0) + + +if __name__ == '__main__': + unittest.main() From f3493d9fa600ce2b4fde47706f640476a127ce38 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 1 May 2026 17:47:18 +0800 Subject: [PATCH 8/8] [python] pk-write merge buffer: tighten review feedback * Reject ``with_write_type`` on PK tables in ``_build_pk_merge_function`` with a clear NotImplementedError. Without this guard the buffer layout (column subset on the value side) and the merge function's arity (full table) drift apart and crash with IndexError on flush. * Replace ``except NotImplementedError`` with an explicit ``MergeEngine.AGGREGATE / FIRST_ROW`` check so a future engine that legitimately raises won't be silently swallowed. * Tighten three docstrings/comments: "fail at flush time" and "first write_arrow flush" misnamed the timing -- the dispatch fires inside ``FileStoreWrite._create_data_writer``, not on flush; and the partial-update unit test comment said "4 value-side columns" while listing three. --- .../pypaimon/tests/test_partial_update_e2e.py | 3 +- .../pypaimon/tests/test_write_merge_buffer.py | 6 +- .../pypaimon/write/file_store_write.py | 69 ++++++++++++------- 3 files changed, 52 insertions(+), 26 deletions(-) diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py b/paimon-python/pypaimon/tests/test_partial_update_e2e.py index abebe91cf3a1..eab697a1af67 100644 --- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py +++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py @@ -314,7 +314,8 @@ def test_first_row_engine_raises_not_implemented(self): def _assert_partial_update_unsupported(self, table_name, extra_options, expected_keys): # Shared dispatch runs at write time too, so the unsupported- - # option error surfaces on the first ``write_arrow`` flush + # option error surfaces inside the first ``write_arrow`` call + # (when ``FileStoreWrite._create_data_writer`` first runs) # rather than waiting for read. table = self._create_pk_table( table_name, extra_options=extra_options) diff --git a/paimon-python/pypaimon/tests/test_write_merge_buffer.py b/paimon-python/pypaimon/tests/test_write_merge_buffer.py index 1f95ae5cd6f0..40e49d539be2 100644 --- a/paimon-python/pypaimon/tests/test_write_merge_buffer.py +++ b/paimon-python/pypaimon/tests/test_write_merge_buffer.py @@ -109,8 +109,10 @@ def test_dedupe_keeps_disjoint_keys(self): # -- partial-update --------------------------------------------------- def _partial_update(self): - # 4 value-side columns (id, a, b — but the layout has id duplicated - # so value_arity is 3 here: id + a + b). + # Value-side carries 3 columns (id, a, b). The PK column ``id`` + # is duplicated into the value side so partial-update can apply + # last-non-null semantics uniformly across every original + # user column. return PartialUpdateMergeFunction( key_arity=1, value_arity=3, nullables=[True, True, True]) diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 2c49f4104101..cda50e673017 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -98,19 +98,32 @@ def _build_pk_merge_function(self): with no out-of-scope options) cannot drift between sides. For wholly unsupported engines (``aggregation`` / ``first-row``) - we fall back to ``DeduplicateMergeFunction`` here so the writer - still maintains the LSM "PK unique within a file" invariant. - The read path's dispatch still raises ``NotImplementedError``, - so the user gets an explicit error before they observe wrong- - engine data; the fallback only narrows the damage to "file is - deduped, not aggregated" rather than the silent multi-row-per- - PK corruption that existed pre-PR. + the writer falls back to ``DeduplicateMergeFunction`` so the + flushed file still maintains the LSM "PK unique within a file" + invariant. The read path's dispatch still raises + ``NotImplementedError``, so the user gets an explicit error + before they observe wrong-engine data; the fallback only + narrows the damage to "file is deduped, not aggregated" + rather than the silent multi-row-per-PK corruption that + existed pre-PR. Partial-update with out-of-scope options (sequence-group, per-field aggregator, ignore-delete, remove-record-on-*) does - **not** fall back -- silently degrading to dedupe there is a - live corruption pattern this PR exists to close, so we re-raise - and let the writer fail explicitly at flush time. + **not** fall back: ``partial_update_unsupported_options`` sees + the configured keys and re-raises, so the first + ``write_arrow`` call (where ``_create_data_writer`` first runs) + surfaces the error. Silently degrading to dedupe there is the + same live corruption pattern this PR exists to close. + + ``with_write_type`` (column-subset writes) on a PK table is + also rejected here. The buffer layout + ``_add_system_fields`` produces would carry only the subset + on the value side, while a ``MergeFunction`` such as + ``PartialUpdateMergeFunction`` is built against the full table + arity -- the two sides would mismatch on + ``KeyValue.value.get_field`` and raise ``IndexError`` at + flush time. Refusing it explicitly avoids that obscure failure + and keeps the supported surface narrow. The value-side schema must match the layout ``KeyValueDataWriter`` flushes -- ``_add_system_fields`` keeps @@ -128,9 +141,18 @@ def _build_pk_merge_function(self): engine = self.options.merge_engine() raw_options = self.options.options.to_map() + if self.write_cols is not None: + raise NotImplementedError( + "with_write_type is not yet supported on primary-key " + "tables: the writer-side merge buffer assumes the " + "input batch carries the full table schema. Drop the " + "with_write_type call or write the missing columns as " + "nulls in the input batch." + ) + # PARTIAL_UPDATE + out-of-scope option: never silently fall # back -- forward the read-side error verbatim so writes fail - # at flush time rather than corrupt the file. + # before the first flush rather than corrupt the file. if engine == MergeEngine.PARTIAL_UPDATE \ and partial_update_unsupported_options(raw_options): return build_merge_function( @@ -141,20 +163,21 @@ def _build_pk_merge_function(self): f.type.nullable for f in self.table.table_schema.fields], ) - all_value_fields = self.table.table_schema.fields - try: - return build_merge_function( - engine=engine, raw_options=raw_options, - key_arity=len(self.table.trimmed_primary_keys), - value_arity=len(all_value_fields), - value_field_nullables=[ - f.type.nullable for f in all_value_fields], - ) - except NotImplementedError: - # Wholly unsupported engine -- maintain PK uniqueness via - # dedupe; read-side will still raise. + # Catch the dispatch's "wholly unsupported engine" raise only + # for the engines we know are out of scope today; any other + # NotImplementedError is a bug we want to surface, not swallow. + if engine in (MergeEngine.AGGREGATE, MergeEngine.FIRST_ROW): return DeduplicateMergeFunction() + all_value_fields = self.table.table_schema.fields + return build_merge_function( + engine=engine, raw_options=raw_options, + key_arity=len(self.table.trimmed_primary_keys), + value_arity=len(all_value_fields), + value_field_nullables=[ + f.type.nullable for f in all_value_fields], + ) + def _has_blob_columns(self) -> bool: """Check if the table schema contains blob columns.""" for field in self.table.table_schema.fields: