From a1681ac101145f6227bb654462e5be3837b17db8 Mon Sep 17 00:00:00 2001 From: Toby Cole Date: Tue, 10 Mar 2026 15:25:57 +0000 Subject: [PATCH] [python] Add row kind support for TableRead Add include_row_kind option to TableRead that prepends a _row_kind string column to Arrow output. For RecordBatchReader (append-only tables) all rows default to "+I"; for RowIterator (primary-key tables) row kind is read per-row via OffsetRow.get_row_kind(). The feature is opt-in (include_row_kind=False by default) so existing read paths are unaffected. StreamReadBuilder in the next PR will enable it for changelog/streaming reads. Co-Authored-By: Claude Opus 4.6 --- paimon-python/pypaimon/read/table_read.py | 74 +++++++++++++++++++++-- 1 file changed, 70 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 28e5a9f5baee..2e21189092fa 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -30,16 +30,25 @@ from pypaimon.schema.data_types import DataField, PyarrowFieldParser from pypaimon.table.row.offset_row import OffsetRow +ROW_KIND_COLUMN = "_row_kind" + class TableRead: """Implementation of TableRead for native Python reading.""" - def __init__(self, table, predicate: Optional[Predicate], read_type: List[DataField]): + def __init__( + self, + table, + predicate: Optional[Predicate], + read_type: List[DataField], + include_row_kind: bool = False + ): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table self.predicate = predicate self.read_type = read_type + self.include_row_kind = include_row_kind def to_iterator(self, splits: List[Split]) -> Iterator: def _record_generator(): @@ -55,9 +64,17 @@ def _record_generator(): def to_arrow_batch_reader(self, splits: List[Split]) -> pyarrow.ipc.RecordBatchReader: schema = PyarrowFieldParser.from_paimon_schema(self.read_type) + if self.include_row_kind: + schema = self._add_row_kind_to_schema(schema) batch_iterator = self._arrow_batch_generator(splits, schema) return pyarrow.ipc.RecordBatchReader.from_batches(schema, batch_iterator) + @staticmethod + def _add_row_kind_to_schema(schema: pyarrow.Schema) -> pyarrow.Schema: + """Add _row_kind column to the schema as the first column.""" + row_kind_field = pyarrow.field(ROW_KIND_COLUMN, pyarrow.string()) + return pyarrow.schema([row_kind_field] + list(schema)) + @staticmethod def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema): if batch.schema.names == target_schema.names: @@ -79,6 +96,9 @@ def to_arrow(self, splits: List[Split]) -> Optional[pyarrow.Table]: batch_reader = self.to_arrow_batch_reader(splits) schema = PyarrowFieldParser.from_paimon_schema(self.read_type) + if self.include_row_kind: + schema = self._add_row_kind_to_schema(schema) + table_list = [] for batch in iter(batch_reader.read_next_batch, None): if batch.num_rows == 0: @@ -146,26 +166,72 @@ def _arrow_batch_generator(self, splits: List[Split], schema: pyarrow.Schema) -> reader = self._create_split_read(split).create_reader() try: if isinstance(reader, RecordBatchReader): - yield from iter(reader.read_arrow_batch, None) + # Add row kind column if requested (default to +I for RecordBatchReader) + if self.include_row_kind: + for batch in iter(reader.read_arrow_batch, None): + yield self._add_row_kind_column_to_batch(batch, "+I") + else: + yield from iter(reader.read_arrow_batch, None) else: row_tuple_chunk = [] + row_kind_chunk = [] for row_iterator in iter(reader.read_batch, None): for row in iter(row_iterator.next, None): if not isinstance(row, OffsetRow): raise TypeError(f"Expected OffsetRow, but got {type(row).__name__}") row_tuple_chunk.append(row.row_tuple[row.offset: row.offset + row.arity]) + if self.include_row_kind: + row_kind_chunk.append(row.get_row_kind().to_string()) if len(row_tuple_chunk) >= chunk_size: - batch = self.convert_rows_to_arrow_batch(row_tuple_chunk, schema) + batch = self._convert_rows_to_arrow_batch_with_row_kind( + row_tuple_chunk, row_kind_chunk, schema + ) yield batch row_tuple_chunk = [] + row_kind_chunk = [] if row_tuple_chunk: - batch = self.convert_rows_to_arrow_batch(row_tuple_chunk, schema) + batch = self._convert_rows_to_arrow_batch_with_row_kind( + row_tuple_chunk, row_kind_chunk, schema + ) yield batch finally: reader.close() + def _convert_rows_to_arrow_batch_with_row_kind( + self, + row_tuples: List[tuple], + row_kinds: List[str], + schema: pyarrow.Schema + ) -> pyarrow.RecordBatch: + """Convert rows to Arrow batch, optionally including row kind column.""" + if not self.include_row_kind or not row_kinds: + # No row kind - use original schema (without _row_kind column) + data_schema = schema + columns_data = zip(*row_tuples) + pydict = {name: list(column) for name, column in zip(data_schema.names, columns_data)} + else: + # Include row kind as first column + # Schema already has _row_kind as first field + data_field_names = [f.name for f in schema if f.name != ROW_KIND_COLUMN] + columns_data = zip(*row_tuples) + pydict = {ROW_KIND_COLUMN: row_kinds} + for name, column in zip(data_field_names, columns_data): + pydict[name] = list(column) + return pyarrow.RecordBatch.from_pydict(pydict, schema=schema) + + def _add_row_kind_column_to_batch( + self, + batch: pyarrow.RecordBatch, + default_row_kind: str = "+I" + ) -> pyarrow.RecordBatch: + """Add a _row_kind column to an existing batch.""" + row_kind_array = pyarrow.array([default_row_kind] * batch.num_rows, type=pyarrow.string()) + new_schema = self._add_row_kind_to_schema(batch.schema) + columns = [row_kind_array] + [batch.column(i) for i in range(batch.num_columns)] + return pyarrow.RecordBatch.from_arrays(columns, schema=new_schema) + def to_pandas(self, splits: List[Split]) -> pandas.DataFrame: arrow_table = self.to_arrow(splits) return arrow_table.to_pandas()