diff --git a/paimon-python/pypaimon/common/merge_engine_dispatch.py b/paimon-python/pypaimon/common/merge_engine_dispatch.py new file mode 100644 index 000000000000..2bdcf4a8b24a --- /dev/null +++ b/paimon-python/pypaimon/common/merge_engine_dispatch.py @@ -0,0 +1,129 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Centralised merge-engine dispatch. + +Both the read path (``MergeFileSplitRead``) and the write path +(``KeyValueDataWriter``'s in-memory merge buffer) need to pick a +``MergeFunction`` based on the table's ``merge-engine`` option. This +module is the single source of truth so the two sides cannot drift. + +Mirrors Java ``MergeFunctionFactory`` (paimon-core/.../mergetree/ +compact/MergeFunctionFactory.java). +""" + +from typing import List + +from pypaimon.common.options.core_options import MergeEngine +from pypaimon.read.reader.deduplicate_merge_function import \ + DeduplicateMergeFunction +from pypaimon.read.reader.partial_update_merge_function import \ + PartialUpdateMergeFunction + + +# Boolean-valued options that, when truthy, opt the table into +# behaviour the Python PartialUpdateMergeFunction does not implement. +# Mirrors org.apache.paimon.CoreOptions and the fallback keys in +# PartialUpdateMergeFunction.java. +_PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = ( + "ignore-delete", + "partial-update.ignore-delete", + "first-row.ignore-delete", + "deduplicate.ignore-delete", + "partial-update.remove-record-on-delete", + "partial-update.remove-record-on-sequence-group", +) +_FIELDS_PREFIX = "fields." +_FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group" +_FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function" +_DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function" + + +def build_merge_function( + *, + engine: MergeEngine, + raw_options: dict, + key_arity: int, + value_arity: int, + value_field_nullables: List[bool], +): + """Pick the MergeFunction for the table's ``merge-engine`` option. + + ``engine`` and ``raw_options`` come from the table's ``CoreOptions`` + (typically ``table.options.merge_engine()`` and + ``table.options.options.to_map()``). ``key_arity`` / ``value_arity`` + / ``value_field_nullables`` describe the value-side schema the + caller wants the merge function to operate on -- for the read path + this is the projected read schema, for the write path it's the full + table schema (minus primary keys). + """ + if engine == MergeEngine.DEDUPLICATE: + return DeduplicateMergeFunction() + if engine == MergeEngine.PARTIAL_UPDATE: + unsupported = partial_update_unsupported_options(raw_options) + if unsupported: + raise NotImplementedError( + "merge-engine 'partial-update' is enabled together with " + "options that pypaimon does not yet implement: {}. The " + "supported subset is per-key last-non-null merge with " + "no sequence-group, no per-field aggregator override, " + "no ignore-delete and no partial-update.remove-record-on-* " + "flags. Use the Java client for the full feature set, or " + "open an issue to track Python support.".format( + ", ".join(sorted(unsupported)) + ) + ) + return PartialUpdateMergeFunction( + key_arity=key_arity, + value_arity=value_arity, + nullables=list(value_field_nullables), + ) + raise NotImplementedError( + "merge-engine '{}' is not implemented in pypaimon yet " + "(supported: deduplicate, partial-update). Use the Java " + "client or open an issue to track support.".format(engine.value) + ) + + +def partial_update_unsupported_options(raw_options: dict): + """Return the set of option keys this table sets that + ``PartialUpdateMergeFunction`` does not yet support. Empty set + means we can safely run the simple last-non-null merge. + """ + flagged = set() + for key, value in raw_options.items(): + if (key in _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS + and _option_is_truthy(value)): + flagged.add(key) + elif key == _DEFAULT_AGGREGATE_FUNCTION_KEY: + flagged.add(key) + elif key.startswith(_FIELDS_PREFIX) and ( + key.endswith(_FIELD_SEQUENCE_GROUP_SUFFIX) + or key.endswith(_FIELD_AGGREGATE_FUNCTION_SUFFIX)): + flagged.add(key) + return flagged + + +def _option_is_truthy(raw): + if raw is None: + return False + if isinstance(raw, bool): + return raw + if isinstance(raw, str): + return raw.strip().lower() in ("true", "1", "yes", "on") + return bool(raw) diff --git a/paimon-python/pypaimon/read/reader/deduplicate_merge_function.py b/paimon-python/pypaimon/read/reader/deduplicate_merge_function.py new file mode 100644 index 000000000000..5b669aae5564 --- /dev/null +++ b/paimon-python/pypaimon/read/reader/deduplicate_merge_function.py @@ -0,0 +1,50 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Default merge function for primary-key tables. + +Mirrors Java ``DeduplicateMergeFunction`` -- for a run of KVs sharing +the same primary key, keep only the one with the highest sequence +number (by virtue of ``add`` being called in sequence-number order). +""" + +from typing import Optional + +from pypaimon.table.row.key_value import KeyValue + + +class DeduplicateMergeFunction: + """Keep only the latest KV per primary key. + + Used by both the read path (``SortMergeReaderWithMinHeap``) and the + write path (``KeyValueDataWriter`` in-memory merge buffer) -- the + latter is what enforces the LSM "PK unique within a file" + invariant on flush. + """ + + def __init__(self): + self.latest_kv: Optional[KeyValue] = None + + def reset(self) -> None: + self.latest_kv = None + + def add(self, kv: KeyValue) -> None: + self.latest_kv = kv + + def get_result(self) -> Optional[KeyValue]: + return self.latest_kv diff --git a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py new file mode 100644 index 000000000000..978b48011c54 --- /dev/null +++ b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py @@ -0,0 +1,134 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +""" +Python port of Java's ``PartialUpdateMergeFunction`` +(``paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ +PartialUpdateMergeFunction.java``). + +The merge function used by the ``partial-update`` merge engine on PK +tables: rows sharing a primary key are merged left-to-right, taking the +latest non-null value per non-PK field. ``DeduplicateMergeFunction`` +keeps only the latest row; ``PartialUpdateMergeFunction`` instead lets +later writes "fill in" fields the earlier writes left null, so users +can write the same logical record across multiple commits with +different sets of non-null columns. + +This is the **core merge semantics only**. The Java implementation also +supports per-field aggregator overrides (``fields..aggregate- +function``), sequence groups (``fields..sequence-group``), +``ignore-delete``, and ``partial-update.remove-record-on-*`` options. +None of those are implemented yet; non-INSERT row kinds raise +``NotImplementedError`` at ``add`` time so we never silently corrupt +data with a half-implemented contract. +""" + +from typing import Any, List, Optional + +from pypaimon.table.row.key_value import KeyValue +from pypaimon.table.row.row_kind import RowKind + + +class PartialUpdateMergeFunction: + """A MergeFunction where the key is the primary key (unique) and the + value is merged across all rows for that key by taking the latest + non-null value per non-PK field. + + Mirrors the ``MergeFunction`` protocol used by ``SortMergeReader``: + ``reset`` (between groups of same-key rows), ``add`` (one row at a + time, oldest to newest), ``get_result`` (after the group is + exhausted). + """ + + def __init__(self, key_arity: int, value_arity: int, + nullables: Optional[List[bool]] = None): + self._key_arity = key_arity + self._value_arity = value_arity + # Per-value-field nullable flags, parallel to value indices. When + # ``None``, no nullability check runs (preserves the contract for + # direct callers that don't have schema info handy). When given, + # mirrors Java's ``updateNonNullFields`` check: a null input on a + # NOT NULL field raises rather than being silently absorbed. + if nullables is not None and len(nullables) != value_arity: + raise ValueError( + "nullables length {} does not match value_arity {}".format( + len(nullables), value_arity)) + self._nullables = nullables + # Lazily allocated on first add(); ``None`` means "no rows yet". + self._accumulator: Optional[List[Any]] = None + # Reference to the most recently added kv. We use it only to + # propagate the key + sequence_number into the result row, and we + # snapshot those two values into a fresh tuple in ``get_result()`` + # so the result is not aliased to upstream's reused KeyValue. + self._latest_kv: Optional[KeyValue] = None + + def reset(self) -> None: + self._accumulator = None + self._latest_kv = None + + def add(self, kv: KeyValue) -> None: + row_kind_byte = kv.value_row_kind_byte + if not RowKind.is_add_byte(row_kind_byte): + # DELETE / UPDATE_BEFORE need ignore-delete or + # partial-update.remove-record-on-delete to be set in Java; + # neither option is wired up in pypaimon yet, so refuse the + # row rather than silently swallow it. + raise NotImplementedError( + "PartialUpdateMergeFunction received a {} row; this " + "Python port does not yet implement the ignore-delete / " + "partial-update.remove-record-on-delete options. Use the " + "Java client for tables that produce DELETE / " + "UPDATE_BEFORE rows.".format(RowKind(row_kind_byte).to_string()) + ) + + # Mirror Java's reset() + updateNonNullFields(): the accumulator + # starts as all-null (equivalent to ``new GenericRow(arity)``) and + # each add() writes non-null inputs; null inputs are absorbed — + # except when the schema marks the field NOT NULL, in which case + # we raise to match Java's IllegalArgumentException check. + if self._accumulator is None: + self._accumulator = [None] * self._value_arity + for i in range(self._value_arity): + v = kv.value.get_field(i) + if v is not None: + self._accumulator[i] = v + elif self._nullables is not None and not self._nullables[i]: + raise ValueError("Field {} can not be null".format(i)) + self._latest_kv = kv + + def get_result(self) -> Optional[KeyValue]: + if self._accumulator is None or self._latest_kv is None: + return None + + kv = self._latest_kv + # Snapshot the key as a fresh tuple — we cannot keep a reference + # to ``kv`` because upstream readers (e.g. KeyValueWrapReader) + # reuse a single KeyValue instance and mutate its underlying + # row_tuple between calls. Building a fresh tuple here means the + # result we return is decoupled from any subsequent iteration. + key_values = tuple( + kv.key.get_field(i) for i in range(self._key_arity) + ) + result_row = key_values + ( + kv.sequence_number, + RowKind.INSERT.value, + ) + tuple(self._accumulator) + + result = KeyValue(self._key_arity, self._value_arity) + result.replace(result_row) + return result diff --git a/paimon-python/pypaimon/read/reader/sort_merge_reader.py b/paimon-python/pypaimon/read/reader/sort_merge_reader.py index aedd593b702b..0904a6fb46f2 100644 --- a/paimon-python/pypaimon/read/reader/sort_merge_reader.py +++ b/paimon-python/pypaimon/read/reader/sort_merge_reader.py @@ -19,6 +19,8 @@ import heapq from typing import Any, Callable, List, Optional +from pypaimon.read.reader.deduplicate_merge_function import \ + DeduplicateMergeFunction from pypaimon.read.reader.iface.record_iterator import RecordIterator from pypaimon.read.reader.iface.record_reader import RecordReader from pypaimon.schema.data_types import DataField, Keyword @@ -30,9 +32,15 @@ class SortMergeReaderWithMinHeap(RecordReader): """SortMergeReader implemented with min-heap.""" - def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema): + def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema, + merge_function: Optional[Any] = None): self.next_batch_readers = list(readers) - self.merge_function = DeduplicateMergeFunction() + # Default to dedupe so callers that don't pass a merge_function + # keep their old behaviour. The merge engine dispatch lives in + # ``MergeFileSplitRead.section_reader_supplier`` for the read + # path; tests or other ad-hoc callers can pass a different + # implementation here. + self.merge_function = merge_function if merge_function is not None else DeduplicateMergeFunction() if schema.partition_keys: trimmed_primary_keys = [pk for pk in schema.primary_keys if pk not in schema.partition_keys] @@ -124,22 +132,6 @@ def _next_impl(self): return True -class DeduplicateMergeFunction: - """A MergeFunction where key is primary key (unique) and value is the full record, only keep the latest one.""" - - def __init__(self): - self.latest_kv = None - - def reset(self) -> None: - self.latest_kv = None - - def add(self, kv: KeyValue): - self.latest_kv = kv - - def get_result(self) -> Optional[KeyValue]: - return self.latest_kv - - class Element: def __init__(self, kv: KeyValue, iterator: RecordIterator[KeyValue], reader: RecordReader[KeyValue]): self.kv = kv diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 9a82b0000a85..eae686342799 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -21,6 +21,7 @@ from functools import partial from typing import Callable, List, Optional, Tuple +from pypaimon.common.merge_engine_dispatch import build_merge_function from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.predicate import Predicate from pypaimon.deletionvectors import ApplyDeletionVectorReader @@ -98,6 +99,10 @@ def __init__( self.split = split self.row_tracking_enabled = row_tracking_enabled self.value_arity = len(read_type) + # Snapshot the raw value-side schema before _create_key_value_fields + # wraps it, so MergeFileSplitRead can hand per-value-field nullable + # flags to merge functions that mirror Java's NOT-NULL check. + self.value_fields = list(read_type) self.trimmed_primary_key = self.table.trimmed_primary_keys self.read_fields = read_type @@ -482,7 +487,24 @@ def section_reader_supplier(self, section: List[SortedRun]) -> RecordReader: supplier = partial(self.kv_reader_supplier, file, self.deletion_file_readers.get(file.file_name, None)) data_readers.append(supplier) readers.append(ConcatRecordReader(data_readers)) - return SortMergeReaderWithMinHeap(readers, self.table.table_schema) + merge_function = self._build_merge_function() + return SortMergeReaderWithMinHeap( + readers, self.table.table_schema, merge_function=merge_function) + + def _build_merge_function(self): + """Pick the MergeFunction for the table's ``merge-engine`` option. + + Delegates to the shared dispatch in + ``pypaimon.common.merge_engine_dispatch`` so the read path and + the in-memory merge buffer on the write path cannot drift. + """ + return build_merge_function( + engine=self.table.options.merge_engine(), + raw_options=self.table.options.options.to_map(), + key_arity=len(self.trimmed_primary_key), + value_arity=self.value_arity, + value_field_nullables=[f.type.nullable for f in self.value_fields], + ) def create_reader(self) -> RecordReader: # Create a dict mapping data file name to deletion file reader method diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py b/paimon-python/pypaimon/tests/reader_primary_key_test.py index 541dabe895b8..6baddc5f38cb 100644 --- a/paimon-python/pypaimon/tests/reader_primary_key_test.py +++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py @@ -226,12 +226,15 @@ def test_pk_multi_write_once_commit(self): read_builder = table.new_read_builder() actual = self._read_test_table(read_builder).sort_by('user_id') - # TODO support pk merge feature when multiple write + # The in-memory merge buffer in KeyValueDataWriter folds the + # two writes for user_id=2 down to the latest row before flush + # (default merge engine is deduplicate), so the PK appears once + # with the second batch's value. expected = pa.Table.from_pydict({ - 'user_id': [1, 2, 2, 3, 4, 5, 7, 8], - 'item_id': [1001, 1002, 1002, 1003, 1004, 1005, 1007, 1008], - 'behavior': ['a', 'b', 'b-new', 'c', None, 'e', 'g', 'h'], - 'dt': ['p1', 'p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'], + 'user_id': [1, 2, 3, 4, 5, 7, 8], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1007, 1008], + 'behavior': ['a', 'b-new', 'c', None, 'e', 'g', 'h'], + 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'], }, schema=self.pa_schema) self.assertEqual(actual, expected) diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py b/paimon-python/pypaimon/tests/test_partial_update_e2e.py new file mode 100644 index 000000000000..eab697a1af67 --- /dev/null +++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py @@ -0,0 +1,389 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""End-to-end tests for the ``partial-update`` merge engine. + +Each test creates a PK table with ``merge-engine`` set to a particular +value, writes one or more batches, and reads back. Partial-update reads +must merge non-null fields across batches; ``deduplicate`` must keep +the latest row only; ``aggregation`` and ``first-row`` must raise +``NotImplementedError`` (until they are ported), since silently +treating them as deduplicate would corrupt the user's data. +""" + +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema + + +class PartialUpdateMergeEngineE2ETest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', True) + + cls.pa_schema = pa.schema([ + pa.field('id', pa.int64(), nullable=False), + ('a', pa.string()), + ('b', pa.string()), + ('c', pa.string()), + ]) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def _create_pk_table(self, table_name, merge_engine='partial-update', + extra_options=None): + # bucket=1 so all rows for any PK land in the same bucket; this is + # what forces the read path through SortMergeReader instead of the + # raw_convertible / single-file fast path. partial-update merging + # only happens inside SortMergeReader. + options = { + 'bucket': '1', + 'merge-engine': merge_engine, + } + if extra_options: + options.update(extra_options) + schema = Schema.from_pyarrow_schema( + self.pa_schema, + primary_keys=['id'], + options=options, + ) + full = 'default.{}'.format(table_name) + self.catalog.create_table(full, schema, False) + return self.catalog.get_table(full) + + def _write(self, table, rows): + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + try: + w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema)) + c.commit(w.prepare_commit()) + finally: + w.close() + c.close() + + def _write_many(self, table, batches): + """Multiple ``write_arrow`` calls inside a single ``prepare_commit``. + + Mirrors the reviewer's question: rows that land in the same + underlying data file must still go through the merge-engine + dispatch; in-writer merging cannot silently degrade to dedupe. + """ + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + try: + for rows in batches: + w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema)) + c.commit(w.prepare_commit()) + finally: + w.close() + c.close() + + def _read(self, table): + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + if not splits: + return [] + return sorted( + rb.new_read().to_arrow(splits).to_pylist(), + key=lambda r: r['id'], + ) + + # -- partial-update happy path --------------------------------------- + + def test_partial_update_two_writes_merges_non_null(self): + """Two writes against the same PK with disjoint non-null columns + must merge into a single row that has both columns populated. + """ + table = self._create_pk_table('two_writes') + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + + def test_partial_update_three_writes_merges_left_to_right(self): + """Three overlapping writes — each filling in a different column — + compose into the union of non-null fields. + """ + table = self._create_pk_table('three_writes') + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': 'C'}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + + def test_partial_update_disjoint_keys_unaffected(self): + """Three rows with disjoint PKs must all appear unchanged in the + output — partial-update only merges rows that share a PK. + """ + table = self._create_pk_table('disjoint_keys') + self._write(table, [ + {'id': 1, 'a': 'A1', 'b': None, 'c': None}, + {'id': 2, 'a': None, 'b': 'B2', 'c': None}, + {'id': 3, 'a': None, 'b': None, 'c': 'C3'}, + ]) + + self.assertEqual( + self._read(table), + [ + {'id': 1, 'a': 'A1', 'b': None, 'c': None}, + {'id': 2, 'a': None, 'b': 'B2', 'c': None}, + {'id': 3, 'a': None, 'b': None, 'c': 'C3'}, + ], + ) + + def test_partial_update_later_value_wins_over_earlier_non_null(self): + """When two writes both supply a non-null value for the same + column, the later value wins (latest non-null per field). + """ + table = self._create_pk_table('later_wins') + self._write(table, [{'id': 1, 'a': 'old', 'b': 'keep', 'c': None}]) + self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': 'fill'}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'new', 'b': 'keep', 'c': 'fill'}], + ) + + def test_partial_update_later_null_does_not_clobber_earlier_value(self): + """A later write with NULL for a column does NOT overwrite an + earlier non-null value for that column. + """ + table = self._create_pk_table('null_no_clobber') + self._write(table, [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}]) + self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + + # -- single-commit, multiple write_arrow calls ----------------------- + # + # The in-memory merge buffer added to ``KeyValueDataWriter`` runs + # the merge function on flush, so rows from multiple ``write_arrow`` + # calls that share a primary key are folded into a single row before + # the data file is written. The flushed file therefore satisfies the + # LSM "PK unique within a file" invariant the read-side + # ``raw_convertible`` fast path relies on. + + def test_partial_update_two_write_arrows_single_commit(self): + """Two ``write_arrow`` calls + one ``prepare_commit``: each + carries a disjoint non-null field; result is the per-field merge. + """ + table = self._create_pk_table('two_writes_single_commit') + self._write_many(table, [ + [{'id': 1, 'a': 'A', 'b': None, 'c': None}], + [{'id': 1, 'a': None, 'b': 'B', 'c': None}], + ]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + + def test_partial_update_three_write_arrows_single_commit(self): + """Three ``write_arrow`` calls in a single commit compose into + the union of non-null fields. + """ + table = self._create_pk_table('three_writes_single_commit') + self._write_many(table, [ + [{'id': 1, 'a': 'A', 'b': None, 'c': None}], + [{'id': 1, 'a': None, 'b': 'B', 'c': None}], + [{'id': 1, 'a': None, 'b': None, 'c': 'C'}], + ]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + + # -- deduplicate (regression) ---------------------------------------- + + def test_deduplicate_engine_unchanged(self): + """The default ``deduplicate`` engine must keep the latest row + intact, including its NULLs — exactly the pre-PR behaviour. + """ + table = self._create_pk_table('dedupe', merge_engine='deduplicate') + self._write(table, [{'id': 1, 'a': 'old', 'b': 'old-b', 'c': 'old-c'}]) + self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'new', 'b': None, 'c': None}], + ) + + def test_deduplicate_two_write_arrows_single_commit(self): + """Pre-PR master silently returned both rows because the + flushed file held two records sharing a primary key. With the + in-memory merge buffer in place, ``deduplicate`` collapses + same-PK rows in a single commit too -- LSM "PK unique within a + file" invariant restored. + """ + table = self._create_pk_table( + 'dedupe_two_writes_single_commit', + merge_engine='deduplicate', + ) + self._write_many(table, [ + [{'id': 1, 'a': 'first', 'b': 'old', 'c': None}], + [{'id': 1, 'a': 'second', 'b': 'new', 'c': None}], + ]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'second', 'b': 'new', 'c': None}], + ) + + # -- engines we don't support yet ------------------------------------ + + def test_aggregation_engine_raises_not_implemented(self): + """Until ``aggregation`` is ported, reading from an aggregation + table must raise rather than silently produce dedupe results. + + The write path's in-memory merge buffer falls back to dedupe + for wholly unsupported engines so the file still keeps the LSM + "PK unique within a file" invariant -- but the read-side + dispatch still raises explicitly, which is what protects users. + """ + table = self._create_pk_table('agg_unsupported', + merge_engine='aggregation') + self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}]) + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + with self.assertRaises(NotImplementedError) as cm: + rb.new_read().to_arrow(splits) + self.assertIn('aggregation', str(cm.exception)) + + def test_first_row_engine_raises_not_implemented(self): + """Same as the aggregation case above for ``first-row``.""" + table = self._create_pk_table('first_row_unsupported', + merge_engine='first-row') + self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}]) + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + with self.assertRaises(NotImplementedError) as cm: + rb.new_read().to_arrow(splits) + self.assertIn('first-row', str(cm.exception)) + + # -- partial-update + out-of-scope option combinations --------------- + # + # When a user pairs ``merge-engine: partial-update`` with any option + # this port doesn't implement (sequence-group, per-field aggregator + # override, ignore-delete, partial-update.remove-record-on-*), we + # must raise rather than silently run the simple last-non-null merge + # — otherwise we'd reproduce the same silent-corruption pattern this + # PR exists to close. + + def _assert_partial_update_unsupported(self, table_name, extra_options, + expected_keys): + # Shared dispatch runs at write time too, so the unsupported- + # option error surfaces inside the first ``write_arrow`` call + # (when ``FileStoreWrite._create_data_writer`` first runs) + # rather than waiting for read. + table = self._create_pk_table( + table_name, extra_options=extra_options) + with self.assertRaises(NotImplementedError) as cm: + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + msg = str(cm.exception) + self.assertIn("partial-update", msg) + for key in expected_keys: + self.assertIn(key, msg, + "expected option key '{}' in error: {}".format(key, msg)) + + def test_partial_update_with_sequence_group_raises(self): + self._assert_partial_update_unsupported( + 'pu_seq_group', + {'fields.b.sequence-group': 'a'}, + ['fields.b.sequence-group'], + ) + + def test_partial_update_with_field_aggregate_function_raises(self): + self._assert_partial_update_unsupported( + 'pu_field_agg', + {'fields.a.aggregate-function': 'last_non_null_value'}, + ['fields.a.aggregate-function'], + ) + + def test_partial_update_with_default_aggregate_function_raises(self): + self._assert_partial_update_unsupported( + 'pu_default_agg', + {'fields.default-aggregate-function': 'last_non_null_value'}, + ['fields.default-aggregate-function'], + ) + + def test_partial_update_with_ignore_delete_raises(self): + self._assert_partial_update_unsupported( + 'pu_ignore_delete', + {'ignore-delete': 'true'}, + ['ignore-delete'], + ) + + def test_partial_update_with_remove_record_on_delete_raises(self): + self._assert_partial_update_unsupported( + 'pu_rrod', + {'partial-update.remove-record-on-delete': 'true'}, + ['partial-update.remove-record-on-delete'], + ) + + def test_partial_update_with_remove_record_on_sequence_group_raises(self): + self._assert_partial_update_unsupported( + 'pu_rrosg', + {'partial-update.remove-record-on-sequence-group': 'true'}, + ['partial-update.remove-record-on-sequence-group'], + ) + + def test_partial_update_with_explicit_ignore_delete_false_does_not_raise(self): + """Explicitly setting ignore-delete=false is equivalent to leaving + it unset and must not trip the guard.""" + table = self._create_pk_table( + 'pu_ignore_delete_false', + extra_options={'ignore-delete': 'false'}, + ) + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py new file mode 100644 index 000000000000..60dfc7198dfb --- /dev/null +++ b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py @@ -0,0 +1,226 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Direct unit tests for ``PartialUpdateMergeFunction``. + +Drives the merge function with synthetic ``KeyValue`` instances so the +contract is pinned down without going through the full read pipeline. +The end-to-end behaviour on real PK tables is exercised separately in +``test_partial_update_e2e.py``. +""" + +import unittest + +from pypaimon.read.reader.partial_update_merge_function import \ + PartialUpdateMergeFunction +from pypaimon.table.row.key_value import KeyValue +from pypaimon.table.row.row_kind import RowKind + + +def _kv(key, seq, row_kind, value): + """Build a fresh KeyValue for a (key, sequence, row_kind, value) tuple. + + ``key`` and ``value`` are tuples of primitives — the helper handles + layout (key, seq, row_kind_byte, value) so individual tests can stay + focused on the merge semantics. + """ + kv = KeyValue(key_arity=len(key), value_arity=len(value)) + kv.replace(tuple(key) + (seq, row_kind.value) + tuple(value)) + return kv + + +def _result_value(kv): + """Extract the value tuple out of a KeyValue produced by get_result().""" + return tuple(kv.value.get_field(i) for i in range(kv.value_arity)) + + +def _result_key(kv): + return tuple(kv.key.get_field(i) for i in range(kv.key_arity)) + + +class PartialUpdateMergeFunctionTest(unittest.TestCase): + + def test_single_insert_returns_value(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + result = mf.get_result() + + self.assertIsNotNone(result) + self.assertEqual(_result_key(result), (1,)) + self.assertEqual(_result_value(result), ('a', 'x')) + self.assertEqual(result.sequence_number, 100) + self.assertEqual(result.value_row_kind_byte, RowKind.INSERT.value) + + def test_second_insert_overwrites_non_null(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.INSERT, ('b', None))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('b', None)) + # Sequence number tracks the latest add(). + self.assertEqual(result.sequence_number, 101) + + def test_second_insert_fills_in_null(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'x'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_third_insert_continues_merge(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=3) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None, None))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'b', None))) + mf.add(_kv((1,), 102, RowKind.INSERT, (None, None, 'c'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'b', 'c')) + + def test_later_null_does_not_clobber_earlier_value(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, None))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_reset_between_keys(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + first = mf.get_result() + self.assertEqual(_result_key(first), (1,)) + self.assertEqual(_result_value(first), ('a', 'x')) + + mf.reset() + mf.add(_kv((2,), 200, RowKind.INSERT, ('b', 'y'))) + second = mf.get_result() + self.assertEqual(_result_key(second), (2,)) + self.assertEqual(_result_value(second), ('b', 'y')) + + def test_get_result_before_any_add_returns_none(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + self.assertIsNone(mf.get_result()) + + def test_update_after_is_treated_as_insert(self): + # Java's PartialUpdate accepts UPDATE_AFTER alongside INSERT in + # non-sequence-group mode (both are "add" kinds). Mirror that. + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.UPDATE_AFTER, (None, 'x'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_delete_row_raises_not_implemented(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + with self.assertRaises(NotImplementedError) as cm: + mf.add(_kv((1,), 101, RowKind.DELETE, (None, None))) + self.assertIn('DELETE', str(cm.exception)) + self.assertIn('ignore-delete', str(cm.exception)) + + def test_update_before_row_raises_not_implemented(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + with self.assertRaises(NotImplementedError) as cm: + mf.add(_kv((1,), 100, RowKind.UPDATE_BEFORE, (None, None))) + self.assertIn('UPDATE_BEFORE', str(cm.exception)) + + def test_result_is_decoupled_from_input_kv(self): + """The merge function must build a fresh result tuple — upstream + readers reuse a single KeyValue instance and call ``replace`` on + each iteration, so holding a reference to the input is unsafe. + """ + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + kv = _kv((1,), 100, RowKind.INSERT, ('a', 'x')) + mf.add(kv) + result = mf.get_result() + + # Mutate the input's underlying tuple to simulate a reused + # KeyValue being rebound to a different row. + kv.replace((999, 999, RowKind.INSERT.value, 'evil', 'evil')) + + # The previously-returned result must NOT be affected. + self.assertEqual(_result_key(result), (1,)) + self.assertEqual(_result_value(result), ('a', 'x')) + + # -- NOT-NULL input validation (mirrors Java's updateNonNullFields) ---- + + def test_first_insert_with_null_for_not_null_field_raises(self): + """If the very first row writes null to a NOT NULL field, raise — + same input-validation Java does in updateNonNullFields().""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, False]) + mf.reset() + with self.assertRaises(ValueError) as cm: + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None))) + self.assertIn("Field 1", str(cm.exception)) + + def test_subsequent_insert_with_null_for_not_null_field_raises(self): + """A later null on a NOT NULL field must also raise — Java checks + on every add(), not just the first one.""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, False]) + mf.reset() + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x'))) + with self.assertRaises(ValueError) as cm: + mf.add(_kv((1,), 2, RowKind.INSERT, (None, None))) + self.assertIn("Field 1", str(cm.exception)) + + def test_null_for_nullable_field_is_absorbed(self): + """A null input on a nullable field is silently absorbed (existing + accumulator value wins) — the standard partial-update semantic.""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, True]) + mf.reset() + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x'))) + mf.add(_kv((1,), 2, RowKind.INSERT, (None, 'y'))) + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'y')) + + def test_nullables_length_mismatch_raises(self): + with self.assertRaises(ValueError): + PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True]) + + def test_no_nullables_arg_skips_check(self): + """Backward-compat: callers that don't pass ``nullables`` get the + previous behaviour (no NOT-NULL validation).""" + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + # Would have raised had we declared the second field NOT NULL. + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None))) + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', None)) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/test_write_merge_buffer.py b/paimon-python/pypaimon/tests/test_write_merge_buffer.py new file mode 100644 index 000000000000..40e49d539be2 --- /dev/null +++ b/paimon-python/pypaimon/tests/test_write_merge_buffer.py @@ -0,0 +1,194 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Unit tests for ``KeyValueDataWriter._merge_pending_by_pk``. + +Drives the in-memory merge buffer with synthetic ``pa.Table`` inputs +to pin down the merge-function dispatch independently of the rest of +the catalog/write stack. Mirrors the per-key fold loop in Java +``SortBufferWriteBuffer.MergeIterator.advanceIfNeeded``. +""" + +import unittest +from unittest.mock import Mock + +import pyarrow as pa + +from pypaimon.read.reader.deduplicate_merge_function import \ + DeduplicateMergeFunction +from pypaimon.read.reader.partial_update_merge_function import \ + PartialUpdateMergeFunction +from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter + + +# Layout matches what ``KeyValueDataWriter._add_system_fields`` emits: +# ``[_KEY_id, _SEQUENCE_NUMBER, _VALUE_KIND, id, a, b]``. ``id`` is +# duplicated on the value side because the value layout in Paimon's +# row tuple includes every original column. +_SCHEMA = pa.schema([ + pa.field('_KEY_id', pa.int64(), nullable=False), + pa.field('_SEQUENCE_NUMBER', pa.int64(), nullable=False), + pa.field('_VALUE_KIND', pa.int8(), nullable=False), + pa.field('id', pa.int64(), nullable=False), + pa.field('a', pa.string()), + pa.field('b', pa.string()), +]) + + +def _row(pk, seq, a, b): + return { + '_KEY_id': pk, + '_SEQUENCE_NUMBER': seq, + '_VALUE_KIND': 0, + 'id': pk, + 'a': a, + 'b': b, + } + + +class _Harness(KeyValueDataWriter): + """Bypass ``DataWriter.__init__`` to keep the tests focused on the + merge step. ``_merge_pending_by_pk`` only needs ``trimmed_primary_keys`` + and ``_merge_function``. + """ + + def __init__(self, merge_function): + self.trimmed_primary_keys = ['id'] + self._merge_function = merge_function + + +class WriteMergeBufferTest(unittest.TestCase): + + # -- deduplicate ------------------------------------------------------ + + def test_dedupe_collapses_same_pk_run_to_latest(self): + writer = _Harness(DeduplicateMergeFunction()) + data = pa.Table.from_pylist( + [_row(1, 1, 'old', None), _row(1, 2, 'new', None)], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.num_rows, 1) + self.assertEqual( + out.to_pylist(), + [_row(1, 2, 'new', None)], + ) + + def test_dedupe_keeps_disjoint_keys(self): + writer = _Harness(DeduplicateMergeFunction()) + data = pa.Table.from_pylist( + [_row(1, 1, 'A', None), + _row(2, 2, 'B', None), + _row(3, 3, 'C', None)], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.num_rows, 3) + self.assertEqual( + sorted(out.to_pylist(), key=lambda r: r['id']), + [_row(1, 1, 'A', None), + _row(2, 2, 'B', None), + _row(3, 3, 'C', None)], + ) + + # -- partial-update --------------------------------------------------- + + def _partial_update(self): + # Value-side carries 3 columns (id, a, b). The PK column ``id`` + # is duplicated into the value side so partial-update can apply + # last-non-null semantics uniformly across every original + # user column. + return PartialUpdateMergeFunction( + key_arity=1, value_arity=3, nullables=[True, True, True]) + + def test_partial_update_merges_non_null_per_field(self): + writer = _Harness(self._partial_update()) + data = pa.Table.from_pylist( + [_row(1, 1, 'A', None), _row(1, 2, None, 'B')], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.num_rows, 1) + self.assertEqual(out.to_pylist(), [_row(1, 2, 'A', 'B')]) + + def test_partial_update_three_writes_compose(self): + writer = _Harness(self._partial_update()) + data = pa.Table.from_pylist( + [_row(1, 1, 'A', None), + _row(1, 2, None, 'B'), + _row(1, 3, None, None)], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.to_pylist(), [_row(1, 3, 'A', 'B')]) + + def test_partial_update_later_null_does_not_clobber_earlier_value(self): + writer = _Harness(self._partial_update()) + data = pa.Table.from_pylist( + [_row(1, 1, 'KEEP', 'B'), _row(1, 2, None, None)], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.to_pylist(), [_row(1, 2, 'KEEP', 'B')]) + + # -- edge cases ------------------------------------------------------- + + def test_empty_buffer_returns_empty(self): + writer = _Harness(DeduplicateMergeFunction()) + empty = pa.Table.from_pylist([], schema=_SCHEMA) + out = writer._merge_pending_by_pk(empty) + self.assertEqual(out.num_rows, 0) + + def test_single_row_buffer_skips_merge(self): + # Mock to confirm the merge function isn't invoked: a single + # row cannot have duplicates, so we sidestep the to_pylist + # round-trip. + mock_mf = Mock() + writer = _Harness(mock_mf) + data = pa.Table.from_pylist( + [_row(1, 1, 'X', None)], schema=_SCHEMA) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.num_rows, 1) + mock_mf.reset.assert_not_called() + mock_mf.add.assert_not_called() + mock_mf.get_result.assert_not_called() + + def test_get_result_none_drops_pk_run(self): + # Future-proof: contract says ``get_result`` returning ``None`` + # means the entire PK group should be dropped. + class DropAll: + def reset(self): + pass + + def add(self, _): + pass + + def get_result(self): + return None + + writer = _Harness(DropAll()) + data = pa.Table.from_pylist( + [_row(1, 1, 'A', None), _row(1, 2, 'B', None)], + schema=_SCHEMA, + ) + out = writer._merge_pending_by_pk(data) + self.assertEqual(out.num_rows, 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 75b1d3a7d708..cda50e673017 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -77,7 +77,8 @@ def max_seq_number(): partition=partition, bucket=bucket, max_seq_number=max_seq_number(), - options=options) + options=options, + merge_function=self._build_pk_merge_function()) else: seq_number = 0 if self.table.bucket_mode() == BucketMode.BUCKET_UNAWARE else max_seq_number() return AppendOnlyDataWriter( @@ -89,6 +90,94 @@ def max_seq_number(): write_cols=self.write_cols ) + def _build_pk_merge_function(self): + """Build the merge function for the in-memory write buffer. + + Shares ``merge_engine_dispatch.build_merge_function`` with the + read path so the supported engines (deduplicate, partial-update + with no out-of-scope options) cannot drift between sides. + + For wholly unsupported engines (``aggregation`` / ``first-row``) + the writer falls back to ``DeduplicateMergeFunction`` so the + flushed file still maintains the LSM "PK unique within a file" + invariant. The read path's dispatch still raises + ``NotImplementedError``, so the user gets an explicit error + before they observe wrong-engine data; the fallback only + narrows the damage to "file is deduped, not aggregated" + rather than the silent multi-row-per-PK corruption that + existed pre-PR. + + Partial-update with out-of-scope options (sequence-group, + per-field aggregator, ignore-delete, remove-record-on-*) does + **not** fall back: ``partial_update_unsupported_options`` sees + the configured keys and re-raises, so the first + ``write_arrow`` call (where ``_create_data_writer`` first runs) + surfaces the error. Silently degrading to dedupe there is the + same live corruption pattern this PR exists to close. + + ``with_write_type`` (column-subset writes) on a PK table is + also rejected here. The buffer layout + ``_add_system_fields`` produces would carry only the subset + on the value side, while a ``MergeFunction`` such as + ``PartialUpdateMergeFunction`` is built against the full table + arity -- the two sides would mismatch on + ``KeyValue.value.get_field`` and raise ``IndexError`` at + flush time. Refusing it explicitly avoids that obscure failure + and keeps the supported surface narrow. + + The value-side schema must match the layout + ``KeyValueDataWriter`` flushes -- ``_add_system_fields`` keeps + every original user column on the value side (the primary keys + are duplicated as ``_KEY_`` columns to the left of the + value side). So ``value_arity`` here is ``len(table.fields)``, + not ``len(table.fields) - len(primary_keys)``. + """ + from pypaimon.common.merge_engine_dispatch import ( + build_merge_function, partial_update_unsupported_options) + from pypaimon.common.options.core_options import MergeEngine + from pypaimon.read.reader.deduplicate_merge_function import \ + DeduplicateMergeFunction + + engine = self.options.merge_engine() + raw_options = self.options.options.to_map() + + if self.write_cols is not None: + raise NotImplementedError( + "with_write_type is not yet supported on primary-key " + "tables: the writer-side merge buffer assumes the " + "input batch carries the full table schema. Drop the " + "with_write_type call or write the missing columns as " + "nulls in the input batch." + ) + + # PARTIAL_UPDATE + out-of-scope option: never silently fall + # back -- forward the read-side error verbatim so writes fail + # before the first flush rather than corrupt the file. + if engine == MergeEngine.PARTIAL_UPDATE \ + and partial_update_unsupported_options(raw_options): + return build_merge_function( + engine=engine, raw_options=raw_options, + key_arity=len(self.table.trimmed_primary_keys), + value_arity=len(self.table.table_schema.fields), + value_field_nullables=[ + f.type.nullable for f in self.table.table_schema.fields], + ) + + # Catch the dispatch's "wholly unsupported engine" raise only + # for the engines we know are out of scope today; any other + # NotImplementedError is a bug we want to surface, not swallow. + if engine in (MergeEngine.AGGREGATE, MergeEngine.FIRST_ROW): + return DeduplicateMergeFunction() + + all_value_fields = self.table.table_schema.fields + return build_merge_function( + engine=engine, raw_options=raw_options, + key_arity=len(self.table.trimmed_primary_keys), + value_arity=len(all_value_fields), + value_field_nullables=[ + f.type.nullable for f in all_value_fields], + ) + def _has_blob_columns(self) -> bool: """Check if the table schema contains blob columns.""" for field in self.table.table_schema.fields: diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py b/paimon-python/pypaimon/write/writer/key_value_data_writer.py index 1b2dde7d1dfb..1159448096d5 100644 --- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py +++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py @@ -16,14 +16,38 @@ # limitations under the License. ################################################################################ +from typing import List + import pyarrow as pa import pyarrow.compute as pc +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.read.reader.deduplicate_merge_function import \ + DeduplicateMergeFunction +from pypaimon.table.row.key_value import KeyValue from pypaimon.write.writer.data_writer import DataWriter class KeyValueDataWriter(DataWriter): - """Data writer for primary key tables with system fields and sorting.""" + """Data writer for primary key tables with system fields and sorting. + + On flush, applies the table's ``MergeFunction`` to fold rows that + share a primary key down to a single row, mirroring Java + ``SortBufferWriteBuffer.forEach`` / + ``MergeIterator.advanceIfNeeded`` (paimon-core/.../mergetree/ + SortBufferWriteBuffer.java). This is what enforces the LSM "PK + unique within a file" invariant the read-side ``raw_convertible`` + fast path relies on. + """ + + def __init__(self, table, partition, bucket, max_seq_number, + options=None, write_cols=None, merge_function=None): + super().__init__(table, partition, bucket, max_seq_number, + options, write_cols) + # Defaults to deduplicate so direct callers (tests / future code + # paths that don't go through FileStoreWrite) don't accidentally + # skip the merge step entirely. + self._merge_function = merge_function or DeduplicateMergeFunction() def _process_data(self, data: pa.RecordBatch) -> pa.Table: enhanced_data = self._add_system_fields(data) @@ -33,6 +57,86 @@ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: combined = pa.concat_tables([existing_data, new_data]) return self._sort_by_primary_key(combined) + def prepare_commit(self) -> List[DataFileMeta]: + if self.pending_data is not None and self.pending_data.num_rows > 0: + self.pending_data = self._merge_pending_by_pk(self.pending_data) + return super().prepare_commit() + + def _check_and_roll_if_needed(self): + # Mirror Java MergeTreeWriter: collapse same-PK runs *before* + # slicing for size, so each flushed file individually maintains + # PK uniqueness even when a single buffer spans multiple files. + if self.pending_data is not None and self.pending_data.num_rows > 0: + self.pending_data = self._merge_pending_by_pk(self.pending_data) + super()._check_and_roll_if_needed() + + def _merge_pending_by_pk(self, data: pa.Table) -> pa.Table: + """Fold same-PK runs in ``data`` using ``self._merge_function``. + + Mirrors Java ``MergeIterator.advanceIfNeeded`` + (SortBufferWriteBuffer.java:241-268). ``data`` is required to + already be sorted by ``(primary_key, _SEQUENCE_NUMBER)`` -- + ``_process_data`` / ``_merge_data`` enforce that. + """ + n = data.num_rows + if n < 2: + # Single-row buffer cannot have duplicates; sidestep the + # row-by-row pyarrow round-trip in the common streaming case. + return data + + rows = data.to_pylist() + col_names = data.schema.names + key_arity = len(self.trimmed_primary_keys) + # System fields sit at indices [key_arity, key_arity + 1] (the + # _SEQUENCE_NUMBER and _VALUE_KIND columns added by + # _add_system_fields). Everything to the right is the value side. + value_arity = len(col_names) - key_arity - 2 + + merged_rows: List[dict] = [] + i = 0 + while i < n: + j = i + first_key = self._key_tuple(rows[i], col_names, key_arity) + while j < n and \ + self._key_tuple(rows[j], col_names, key_arity) == first_key: + j += 1 + run = rows[i:j] + self._merge_function.reset() + for r in run: + kv = KeyValue(key_arity, value_arity) + kv.replace(self._row_to_tuple(r, col_names)) + self._merge_function.add(kv) + result_kv = self._merge_function.get_result() + if result_kv is not None: + merged_rows.append( + self._kv_to_row(result_kv, col_names, + key_arity, value_arity)) + i = j + + if not merged_rows: + return data.slice(0, 0) + return pa.Table.from_pylist(merged_rows, schema=data.schema) + + @staticmethod + def _key_tuple(row: dict, col_names: List[str], key_arity: int) -> tuple: + return tuple(row[col_names[i]] for i in range(key_arity)) + + @staticmethod + def _row_to_tuple(row: dict, col_names: List[str]) -> tuple: + return tuple(row[name] for name in col_names) + + @staticmethod + def _kv_to_row(kv: KeyValue, col_names: List[str], + key_arity: int, value_arity: int) -> dict: + out = {} + for i in range(key_arity): + out[col_names[i]] = kv.key.get_field(i) + out[col_names[key_arity]] = kv.sequence_number + out[col_names[key_arity + 1]] = kv.value_row_kind_byte + for i in range(value_arity): + out[col_names[key_arity + 2 + i]] = kv.value.get_field(i) + return out + def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch: """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND.""" num_rows = data.num_rows