diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 730416b..5d0302e 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -224,6 +224,28 @@ async def main(): # TODO: support to_duckdb() + # Test the new poll() method for incremental reading + print("\n--- Testing poll() method ---") + # Reset subscription to start from the beginning + log_scanner.subscribe(None, None) + + # Poll with a timeout of 5000ms (5 seconds) + # Note: poll() returns an empty table (not an error) on timeout + try: + poll_result = log_scanner.poll(5000) + print(f"Number of rows: {poll_result.num_rows}") + + if poll_result.num_rows > 0: + poll_df = poll_result.to_pandas() + print(f"Polled data:\n{poll_df}") + else: + print("Empty result (no records available)") + # Empty table still has schema + print(f"Schema: {poll_result.schema}") + + except Exception as e: + print(f"Error during poll: {e}") + except Exception as e: print(f"Error during scanning: {e}") diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index b56a29d..9e9b518 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -20,10 +20,13 @@ use crate::*; use arrow::array::RecordBatch; use arrow_pyarrow::{FromPyArrow, ToPyArrow}; use fluss::client::EARLIEST_OFFSET; +use fluss::record::to_arrow_schema; use fluss::rpc::message::OffsetSpec; use pyo3::types::IntoPyDict; use pyo3_async_runtimes::tokio::future_into_py; +use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; // Time conversion constants const MILLIS_PER_SECOND: i64 = 1_000; @@ -186,7 +189,7 @@ impl FlussTable { } let rust_scanner = table_scan - .create_log_scanner() + .create_record_batch_log_scanner() .map_err(|e| FlussError::new_err(format!("Failed to create log scanner: {e}")))?; let admin = conn @@ -888,7 +891,7 @@ fn get_type_name(value: &Bound) -> String { /// Scanner for reading log data from a Fluss table #[pyclass] pub struct LogScanner { - inner: fcore::client::LogScanner, + inner: fcore::client::RecordBatchLogScanner, admin: fcore::client::FlussAdmin, table_info: fcore::metadata::TableInfo, #[allow(dead_code)] @@ -933,9 +936,6 @@ impl LogScanner { /// Convert all data to Arrow Table fn to_arrow(&self, py: Python) -> PyResult> { - use std::collections::HashMap; - use std::time::Duration; - let mut all_batches = Vec::new(); let num_buckets = self.table_info.get_num_buckets(); @@ -960,31 +960,56 @@ impl LogScanner { .block_on(async { self.inner.poll(Duration::from_millis(500)).await }); match batch_result { - Ok(scan_records) => { - let mut result_records: Vec = vec![]; - for (bucket, records) in scan_records.into_records_by_buckets() { - let stopping_offset = stopping_offsets.get(&bucket.bucket_id()); - - if stopping_offset.is_none() { - // not to include this bucket, skip records for this bucket - // since we already reach end offset for this bucket - continue; - } - if let Some(last_record) = records.last() { - let offset = last_record.offset(); - result_records.extend(records); - if offset >= stopping_offset.unwrap() - 1 { - stopping_offsets.remove(&bucket.bucket_id()); + Ok(scan_batches) => { + for scan_batch in scan_batches { + let bucket_id = scan_batch.bucket().bucket_id(); + + // Extract stopping_offset once to avoid double unwrap + let stop_exclusive = match stopping_offsets.get(&bucket_id) { + Some(&offset) => offset, + None => { + // we already reached end offset for this bucket + continue; } - } - } + }; + + // Compute the inclusive last offset we want (stop_exclusive - 1) + let stop_inclusive = match stop_exclusive.checked_sub(1) { + Some(v) => v, + None => { + // stop_exclusive was 0 or negative - nothing to read + stopping_offsets.remove(&bucket_id); + continue; + } + }; + + let base_offset = scan_batch.base_offset(); + let last_offset = scan_batch.last_offset(); + + // Check if we need to slice this batch to avoid overshoot + let batch = if last_offset > stop_inclusive { + // This batch extends past our stopping point + // Slice to only include records up to stop_inclusive + let records_to_keep = (stop_inclusive - base_offset + 1) as usize; + + let full_batch = scan_batch.into_batch(); + let actual_rows = full_batch.num_rows(); + + full_batch.slice(0, records_to_keep.min(actual_rows)) + } else { + // This batch is entirely before our stopping point + scan_batch.into_batch() + }; - if !result_records.is_empty() { - let arrow_batch = Utils::convert_scan_records_to_arrow(result_records); - all_batches.extend(arrow_batch); + all_batches.push(Arc::new(batch)); + + // Remove this bucket if we've reached or passed the stopping offset + if last_offset >= stop_inclusive { + stopping_offsets.remove(&bucket_id); + } } - // we have reach end offsets of all bucket + // we have reached end offsets of all buckets if stopping_offsets.is_empty() { break; } @@ -1006,15 +1031,68 @@ impl LogScanner { Ok(df) } + /// Poll for new records with the specified timeout + /// + /// Args: + /// timeout_ms: Timeout in milliseconds to wait for records + /// + /// Returns: + /// PyArrow Table containing the polled records + /// + /// Note: + /// - Returns an empty table (with correct schema) if no records are available + /// - When timeout expires, returns an empty table (NOT an error) + fn poll(&self, py: Python, timeout_ms: i64) -> PyResult> { + if timeout_ms < 0 { + return Err(FlussError::new_err(format!( + "timeout_ms must be non-negative, got: {timeout_ms}" + ))); + } + + let timeout = Duration::from_millis(timeout_ms as u64); + let scan_batches = py + .detach(|| TOKIO_RUNTIME.block_on(async { self.inner.poll(timeout).await })) + .map_err(|e| FlussError::new_err(e.to_string()))?; + + // Convert ScanBatch to Arrow batches + if scan_batches.is_empty() { + return self.create_empty_table(py); + } + + let arrow_batches: Vec<_> = scan_batches + .into_iter() + .map(|scan_batch| Arc::new(scan_batch.into_batch())) + .collect(); + + Utils::combine_batches_to_table(py, arrow_batches) + } + + /// Create an empty PyArrow table with the correct schema + fn create_empty_table(&self, py: Python) -> PyResult> { + let arrow_schema = to_arrow_schema(self.table_info.get_row_type()) + .map_err(|e| FlussError::new_err(format!("Failed to get arrow schema: {e}")))?; + let py_schema = arrow_schema + .as_ref() + .to_pyarrow(py) + .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; + + let pyarrow = py.import("pyarrow")?; + let empty_table = pyarrow + .getattr("Table")? + .call_method1("from_batches", (vec![] as Vec>, py_schema))?; + + Ok(empty_table.into()) + } + fn __repr__(&self) -> String { format!("LogScanner(table={})", self.table_info.table_path) } } impl LogScanner { - /// Create LogScanner from core LogScanner + /// Create LogScanner from core RecordBatchLogScanner pub fn from_core( - inner_scanner: fcore::client::LogScanner, + inner_scanner: fcore::client::RecordBatchLogScanner, admin: fcore::client::FlussAdmin, table_info: fcore::metadata::TableInfo, ) -> Self {