From f6ecfb0b1a3d6721d94b44bbfa75d9bd06dfe925 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Mon, 12 Jan 2026 02:59:40 +0000 Subject: [PATCH 1/2] [ISSUE-153] Add blocking poll into python bindings --- bindings/python/example/example.py | 21 ++++++++++++ bindings/python/src/table.rs | 54 ++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 730416b..cc7b7ad 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -224,6 +224,27 @@ async def main(): # TODO: support to_duckdb() + # Test the new poll() method for incremental reading + print("\n--- Testing poll() method ---") + log_scanner.subscribe(None, None) + + # Poll with a timeout of 5000ms (5 seconds) + # Note: poll() returns an empty table (not an error) on timeout + try: + poll_result = log_scanner.poll(5000) + print(f"Number of rows: {poll_result.num_rows}") + + if poll_result.num_rows > 0: + poll_df = poll_result.to_pandas() + print(f"Polled data:\n{poll_df}") + else: + print("Empty result (no records available)") + # Empty table still has schema + print(f"Schema: {poll_result.schema}") + + except Exception as e: + print(f"Error during poll: {e}") + except Exception as e: print(f"Error during scanning: {e}") diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index b56a29d..039e0f2 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -20,6 +20,7 @@ use crate::*; use arrow::array::RecordBatch; use arrow_pyarrow::{FromPyArrow, ToPyArrow}; use fluss::client::EARLIEST_OFFSET; +use fluss::record::to_arrow_schema; use fluss::rpc::message::OffsetSpec; use pyo3::types::IntoPyDict; use pyo3_async_runtimes::tokio::future_into_py; @@ -1006,6 +1007,59 @@ impl LogScanner { Ok(df) } + /// Poll for new records with the specified timeout + /// + /// Args: + /// timeout_ms: Timeout in milliseconds to wait for records + /// + /// Returns: + /// PyArrow Table containing the polled records + /// + /// Note: + /// - Returns an empty table (with correct schema) if no records are available + /// - When timeout expires, returns an empty table (NOT an error) + fn poll(&self, py: Python, timeout_ms: i64) -> PyResult> { + use std::time::Duration; + + if timeout_ms < 0 { + return Err(FlussError::new_err(format!( + "timeout_ms must be non-negative, got: {timeout_ms}" + ))); + } + + let timeout = Duration::from_millis(timeout_ms as u64); + let scan_records = py + .detach(|| TOKIO_RUNTIME.block_on(async { self.inner.poll(timeout).await })) + .map_err(|e| FlussError::new_err(e.to_string()))?; + + // Convert records to Arrow batches per bucket + let mut arrow_batches = Vec::new(); + for (_bucket, records) in scan_records.into_records_by_buckets() { + let mut batches = Utils::convert_scan_records_to_arrow(records); + arrow_batches.append(&mut batches); + } + if arrow_batches.is_empty() { + return self.create_empty_table(py); + } + + Utils::combine_batches_to_table(py, arrow_batches) + } + + /// Create an empty PyArrow table with the correct schema + fn create_empty_table(&self, py: Python) -> PyResult> { + let arrow_schema = to_arrow_schema(self.table_info.get_row_type()); + let py_schema = arrow_schema + .to_pyarrow(py) + .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; + + let pyarrow = py.import("pyarrow")?; + let empty_table = pyarrow + .getattr("Table")? + .call_method1("from_batches", (vec![] as Vec>, py_schema))?; + + Ok(empty_table.into()) + } + fn __repr__(&self) -> String { format!("LogScanner(table={})", self.table_info.table_path) } From 0333dd5dfe38fe03f6710adb7c0aabe86d87b857 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 25 Jan 2026 11:14:41 +0100 Subject: [PATCH 2/2] use poll_batches --- bindings/python/example/example.py | 1 + bindings/python/src/table.rs | 102 ++++++++++++++++++----------- 2 files changed, 64 insertions(+), 39 deletions(-) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index cc7b7ad..5d0302e 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -226,6 +226,7 @@ async def main(): # Test the new poll() method for incremental reading print("\n--- Testing poll() method ---") + # Reset subscription to start from the beginning log_scanner.subscribe(None, None) # Poll with a timeout of 5000ms (5 seconds) diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 039e0f2..9e9b518 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -24,7 +24,9 @@ use fluss::record::to_arrow_schema; use fluss::rpc::message::OffsetSpec; use pyo3::types::IntoPyDict; use pyo3_async_runtimes::tokio::future_into_py; +use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; // Time conversion constants const MILLIS_PER_SECOND: i64 = 1_000; @@ -187,7 +189,7 @@ impl FlussTable { } let rust_scanner = table_scan - .create_log_scanner() + .create_record_batch_log_scanner() .map_err(|e| FlussError::new_err(format!("Failed to create log scanner: {e}")))?; let admin = conn @@ -889,7 +891,7 @@ fn get_type_name(value: &Bound) -> String { /// Scanner for reading log data from a Fluss table #[pyclass] pub struct LogScanner { - inner: fcore::client::LogScanner, + inner: fcore::client::RecordBatchLogScanner, admin: fcore::client::FlussAdmin, table_info: fcore::metadata::TableInfo, #[allow(dead_code)] @@ -934,9 +936,6 @@ impl LogScanner { /// Convert all data to Arrow Table fn to_arrow(&self, py: Python) -> PyResult> { - use std::collections::HashMap; - use std::time::Duration; - let mut all_batches = Vec::new(); let num_buckets = self.table_info.get_num_buckets(); @@ -961,31 +960,56 @@ impl LogScanner { .block_on(async { self.inner.poll(Duration::from_millis(500)).await }); match batch_result { - Ok(scan_records) => { - let mut result_records: Vec = vec![]; - for (bucket, records) in scan_records.into_records_by_buckets() { - let stopping_offset = stopping_offsets.get(&bucket.bucket_id()); - - if stopping_offset.is_none() { - // not to include this bucket, skip records for this bucket - // since we already reach end offset for this bucket - continue; - } - if let Some(last_record) = records.last() { - let offset = last_record.offset(); - result_records.extend(records); - if offset >= stopping_offset.unwrap() - 1 { - stopping_offsets.remove(&bucket.bucket_id()); + Ok(scan_batches) => { + for scan_batch in scan_batches { + let bucket_id = scan_batch.bucket().bucket_id(); + + // Extract stopping_offset once to avoid double unwrap + let stop_exclusive = match stopping_offsets.get(&bucket_id) { + Some(&offset) => offset, + None => { + // we already reached end offset for this bucket + continue; } - } - } + }; + + // Compute the inclusive last offset we want (stop_exclusive - 1) + let stop_inclusive = match stop_exclusive.checked_sub(1) { + Some(v) => v, + None => { + // stop_exclusive was 0 or negative - nothing to read + stopping_offsets.remove(&bucket_id); + continue; + } + }; + + let base_offset = scan_batch.base_offset(); + let last_offset = scan_batch.last_offset(); + + // Check if we need to slice this batch to avoid overshoot + let batch = if last_offset > stop_inclusive { + // This batch extends past our stopping point + // Slice to only include records up to stop_inclusive + let records_to_keep = (stop_inclusive - base_offset + 1) as usize; + + let full_batch = scan_batch.into_batch(); + let actual_rows = full_batch.num_rows(); - if !result_records.is_empty() { - let arrow_batch = Utils::convert_scan_records_to_arrow(result_records); - all_batches.extend(arrow_batch); + full_batch.slice(0, records_to_keep.min(actual_rows)) + } else { + // This batch is entirely before our stopping point + scan_batch.into_batch() + }; + + all_batches.push(Arc::new(batch)); + + // Remove this bucket if we've reached or passed the stopping offset + if last_offset >= stop_inclusive { + stopping_offsets.remove(&bucket_id); + } } - // we have reach end offsets of all bucket + // we have reached end offsets of all buckets if stopping_offsets.is_empty() { break; } @@ -1019,8 +1043,6 @@ impl LogScanner { /// - Returns an empty table (with correct schema) if no records are available /// - When timeout expires, returns an empty table (NOT an error) fn poll(&self, py: Python, timeout_ms: i64) -> PyResult> { - use std::time::Duration; - if timeout_ms < 0 { return Err(FlussError::new_err(format!( "timeout_ms must be non-negative, got: {timeout_ms}" @@ -1028,27 +1050,29 @@ impl LogScanner { } let timeout = Duration::from_millis(timeout_ms as u64); - let scan_records = py + let scan_batches = py .detach(|| TOKIO_RUNTIME.block_on(async { self.inner.poll(timeout).await })) .map_err(|e| FlussError::new_err(e.to_string()))?; - // Convert records to Arrow batches per bucket - let mut arrow_batches = Vec::new(); - for (_bucket, records) in scan_records.into_records_by_buckets() { - let mut batches = Utils::convert_scan_records_to_arrow(records); - arrow_batches.append(&mut batches); - } - if arrow_batches.is_empty() { + // Convert ScanBatch to Arrow batches + if scan_batches.is_empty() { return self.create_empty_table(py); } + let arrow_batches: Vec<_> = scan_batches + .into_iter() + .map(|scan_batch| Arc::new(scan_batch.into_batch())) + .collect(); + Utils::combine_batches_to_table(py, arrow_batches) } /// Create an empty PyArrow table with the correct schema fn create_empty_table(&self, py: Python) -> PyResult> { - let arrow_schema = to_arrow_schema(self.table_info.get_row_type()); + let arrow_schema = to_arrow_schema(self.table_info.get_row_type()) + .map_err(|e| FlussError::new_err(format!("Failed to get arrow schema: {e}")))?; let py_schema = arrow_schema + .as_ref() .to_pyarrow(py) .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; @@ -1066,9 +1090,9 @@ impl LogScanner { } impl LogScanner { - /// Create LogScanner from core LogScanner + /// Create LogScanner from core RecordBatchLogScanner pub fn from_core( - inner_scanner: fcore::client::LogScanner, + inner_scanner: fcore::client::RecordBatchLogScanner, admin: fcore::client::FlussAdmin, table_info: fcore::metadata::TableInfo, ) -> Self {