Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 70 additions & 4 deletions paimon-python/pypaimon/read/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,25 @@
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
from pypaimon.table.row.offset_row import OffsetRow

ROW_KIND_COLUMN = "_row_kind"


class TableRead:
"""Implementation of TableRead for native Python reading."""

def __init__(self, table, predicate: Optional[Predicate], read_type: List[DataField]):
def __init__(
self,
table,
predicate: Optional[Predicate],
read_type: List[DataField],
include_row_kind: bool = False
):
from pypaimon.table.file_store_table import FileStoreTable

self.table: FileStoreTable = table
self.predicate = predicate
self.read_type = read_type
self.include_row_kind = include_row_kind

def to_iterator(self, splits: List[Split]) -> Iterator:
def _record_generator():
Expand All @@ -55,9 +64,17 @@ def _record_generator():

def to_arrow_batch_reader(self, splits: List[Split]) -> pyarrow.ipc.RecordBatchReader:
schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
if self.include_row_kind:
schema = self._add_row_kind_to_schema(schema)
batch_iterator = self._arrow_batch_generator(splits, schema)
return pyarrow.ipc.RecordBatchReader.from_batches(schema, batch_iterator)

@staticmethod
def _add_row_kind_to_schema(schema: pyarrow.Schema) -> pyarrow.Schema:
"""Add _row_kind column to the schema as the first column."""
row_kind_field = pyarrow.field(ROW_KIND_COLUMN, pyarrow.string())
return pyarrow.schema([row_kind_field] + list(schema))

@staticmethod
def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema):
if batch.schema.names == target_schema.names:
Expand All @@ -79,6 +96,9 @@ def to_arrow(self, splits: List[Split]) -> Optional[pyarrow.Table]:
batch_reader = self.to_arrow_batch_reader(splits)

schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
if self.include_row_kind:
schema = self._add_row_kind_to_schema(schema)

table_list = []
for batch in iter(batch_reader.read_next_batch, None):
if batch.num_rows == 0:
Expand Down Expand Up @@ -146,26 +166,72 @@ def _arrow_batch_generator(self, splits: List[Split], schema: pyarrow.Schema) ->
reader = self._create_split_read(split).create_reader()
try:
if isinstance(reader, RecordBatchReader):
yield from iter(reader.read_arrow_batch, None)
# Add row kind column if requested (default to +I for RecordBatchReader)
if self.include_row_kind:
for batch in iter(reader.read_arrow_batch, None):
yield self._add_row_kind_column_to_batch(batch, "+I")
else:
yield from iter(reader.read_arrow_batch, None)
else:
row_tuple_chunk = []
row_kind_chunk = []
for row_iterator in iter(reader.read_batch, None):
for row in iter(row_iterator.next, None):
if not isinstance(row, OffsetRow):
raise TypeError(f"Expected OffsetRow, but got {type(row).__name__}")
row_tuple_chunk.append(row.row_tuple[row.offset: row.offset + row.arity])
if self.include_row_kind:
row_kind_chunk.append(row.get_row_kind().to_string())

if len(row_tuple_chunk) >= chunk_size:
batch = self.convert_rows_to_arrow_batch(row_tuple_chunk, schema)
batch = self._convert_rows_to_arrow_batch_with_row_kind(
row_tuple_chunk, row_kind_chunk, schema
)
yield batch
row_tuple_chunk = []
row_kind_chunk = []

if row_tuple_chunk:
batch = self.convert_rows_to_arrow_batch(row_tuple_chunk, schema)
batch = self._convert_rows_to_arrow_batch_with_row_kind(
row_tuple_chunk, row_kind_chunk, schema
)
yield batch
finally:
reader.close()

def _convert_rows_to_arrow_batch_with_row_kind(
self,
row_tuples: List[tuple],
row_kinds: List[str],
schema: pyarrow.Schema
) -> pyarrow.RecordBatch:
"""Convert rows to Arrow batch, optionally including row kind column."""
if not self.include_row_kind or not row_kinds:
# No row kind - use original schema (without _row_kind column)
data_schema = schema
columns_data = zip(*row_tuples)
pydict = {name: list(column) for name, column in zip(data_schema.names, columns_data)}
else:
# Include row kind as first column
# Schema already has _row_kind as first field
data_field_names = [f.name for f in schema if f.name != ROW_KIND_COLUMN]
columns_data = zip(*row_tuples)
pydict = {ROW_KIND_COLUMN: row_kinds}
for name, column in zip(data_field_names, columns_data):
pydict[name] = list(column)
return pyarrow.RecordBatch.from_pydict(pydict, schema=schema)

def _add_row_kind_column_to_batch(
self,
batch: pyarrow.RecordBatch,
default_row_kind: str = "+I"
) -> pyarrow.RecordBatch:
"""Add a _row_kind column to an existing batch."""
row_kind_array = pyarrow.array([default_row_kind] * batch.num_rows, type=pyarrow.string())
new_schema = self._add_row_kind_to_schema(batch.schema)
columns = [row_kind_array] + [batch.column(i) for i in range(batch.num_columns)]
return pyarrow.RecordBatch.from_arrays(columns, schema=new_schema)

def to_pandas(self, splits: List[Split]) -> pandas.DataFrame:
arrow_table = self.to_arrow(splits)
return arrow_table.to_pandas()
Expand Down
Loading