From c5645c8acb23f2362d45ad549a027f13e189a972 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sat, 24 Jan 2026 23:58:49 +0100 Subject: [PATCH 1/2] [TASK-179] Add more infor to RecordBatches --- .../src/client/table/log_fetch_buffer.rs | 28 ++++--- crates/fluss/src/client/table/scanner.rs | 44 +++++++---- crates/fluss/src/record/mod.rs | 77 +++++++++++++++++++ 3 files changed, 120 insertions(+), 29 deletions(-) diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index 4a64eda2..b529806f 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -65,7 +65,7 @@ pub trait CompletedFetch: Send + Sync { fn fetch_error_context(&self) -> Option<&FetchErrorContext>; fn take_error(&mut self) -> Option; fn fetch_records(&mut self, max_records: usize) -> Result>; - fn fetch_batches(&mut self, max_batches: usize) -> Result>; + fn fetch_batches(&mut self, max_batches: usize) -> Result>; fn is_consumed(&self) -> bool; fn records_read(&self) -> usize; fn drain(&mut self); @@ -476,8 +476,9 @@ impl DefaultCompletedFetch { source: None, } } - /// Get the next batch directly without row iteration - fn next_fetched_batch(&mut self) -> Result> { + /// Get the next batch with its base offset. + /// Returns (RecordBatch, base_offset) where base_offset is the offset of the first record. + fn next_fetched_batch(&mut self) -> Result> { loop { let Some(log_batch_result) = self.log_record_batch.next() else { self.drain(); @@ -492,20 +493,23 @@ impl DefaultCompletedFetch { continue; } - // Truncate batch - let base_offset = log_batch.base_log_offset(); - if self.next_fetch_offset > base_offset { - let skip_count = (self.next_fetch_offset - base_offset) as usize; + // Calculate the effective base offset for this batch + let log_base_offset = log_batch.base_log_offset(); + let effective_base_offset = if self.next_fetch_offset > log_base_offset { + let skip_count = (self.next_fetch_offset - log_base_offset) as usize; if skip_count >= record_batch.num_rows() { continue; } // Slice the batch to skip the first skip_count rows record_batch = record_batch.slice(skip_count, record_batch.num_rows() - skip_count); - } + self.next_fetch_offset + } else { + log_base_offset + }; self.next_fetch_offset = log_batch.next_log_offset(); self.records_read += record_batch.num_rows(); - return Ok(Some(record_batch)); + return Ok(Some((record_batch, effective_base_offset))); } } } @@ -585,7 +589,7 @@ impl CompletedFetch for DefaultCompletedFetch { Ok(scan_records) } - fn fetch_batches(&mut self, max_batches: usize) -> Result> { + fn fetch_batches(&mut self, max_batches: usize) -> Result> { if let Some(error) = self.error.take() { return Err(error); } @@ -607,7 +611,7 @@ impl CompletedFetch for DefaultCompletedFetch { for _ in 0..max_batches { match self.next_fetched_batch()? { - Some(batch) => batches.push(batch), + Some(batch_with_offset) => batches.push(batch_with_offset), None => break, } } @@ -692,7 +696,7 @@ impl CompletedFetch for RemoteCompletedFetch { self.inner.fetch_records(max_records) } - fn fetch_batches(&mut self, max_batches: usize) -> Result> { + fn fetch_batches(&mut self, max_batches: usize) -> Result> { self.inner.fetch_batches(max_batches) } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 61ed56e1..dbebe1ac 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::RecordBatch; use arrow_schema::SchemaRef; use log::{debug, warn}; use parking_lot::{Mutex, RwLock}; @@ -39,7 +38,9 @@ use crate::client::table::remote_log::{RemoteLogDownloader, RemoteLogFetchInfo}; use crate::error::{ApiError, Error, FlussError, Result}; use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath}; use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; -use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; +use crate::record::{ + LogRecordsBatches, ReadContext, ScanBatch, ScanRecord, ScanRecords, to_arrow_schema, +}; use crate::rpc::{RpcClient, RpcError, message}; use crate::util::FairBucketStatusMap; @@ -380,7 +381,7 @@ impl LogScannerInner { self.log_fetcher.collect_fetches() } - async fn poll_batches(&self, timeout: Duration) -> Result> { + async fn poll_batches(&self, timeout: Duration) -> Result> { let start = Instant::now(); let deadline = start + timeout; @@ -410,7 +411,7 @@ impl LogScannerInner { } } - async fn poll_for_batches(&self) -> Result> { + async fn poll_for_batches(&self) -> Result> { let result = self.log_fetcher.collect_batches()?; if !result.is_empty() { return Ok(result); @@ -438,7 +439,8 @@ impl LogScanner { // Implementation for RecordBatchLogScanner (batches mode) impl RecordBatchLogScanner { - pub async fn poll(&self, timeout: Duration) -> Result> { + /// Poll for batches with metadata (bucket and offset information). + pub async fn poll(&self, timeout: Duration) -> Result> { self.inner.poll_batches(timeout).await } @@ -1134,13 +1136,13 @@ impl LogFetcher { } } - /// Collect completed fetches as RecordBatches - fn collect_batches(&self) -> Result> { + /// Collect completed fetches as ScanBatches (with bucket and offset metadata) + fn collect_batches(&self) -> Result> { // Limit memory usage with both batch count and byte size constraints. // Max 100 batches per poll, but also check total bytes (soft cap ~64MB). const MAX_BATCHES: usize = 100; const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap - let mut result: Vec = Vec::new(); + let mut result: Vec = Vec::new(); let mut batches_remaining = MAX_BATCHES; let mut bytes_consumed: usize = 0; @@ -1150,17 +1152,19 @@ impl LogFetcher { match next_in_line { Some(mut next_fetch) if !next_fetch.is_consumed() => { - let batches = + let scan_batches = self.fetch_batches_from_fetch(&mut next_fetch, batches_remaining)?; - let batch_count = batches.len(); + let batch_count = scan_batches.len(); - if !batches.is_empty() { + if !scan_batches.is_empty() { // Track bytes consumed (soft cap - may exceed by one fetch) - let batch_bytes: usize = - batches.iter().map(|b| b.get_array_memory_size()).sum(); + let batch_bytes: usize = scan_batches + .iter() + .map(|sb| sb.batch().get_array_memory_size()) + .sum(); bytes_consumed += batch_bytes; - result.extend(batches); + result.extend(scan_batches); batches_remaining = batches_remaining.saturating_sub(batch_count); } @@ -1214,7 +1218,7 @@ impl LogFetcher { &self, next_in_line_fetch: &mut Box, max_batches: usize, - ) -> Result> { + ) -> Result> { let table_bucket = next_in_line_fetch.table_bucket().clone(); let current_offset = self.log_scanner_status.get_bucket_offset(&table_bucket); @@ -1230,7 +1234,7 @@ impl LogFetcher { let fetch_offset = next_in_line_fetch.next_fetch_offset(); if fetch_offset == current_offset { - let batches = next_in_line_fetch.fetch_batches(max_batches)?; + let batches_with_offsets = next_in_line_fetch.fetch_batches(max_batches)?; let next_fetch_offset = next_in_line_fetch.next_fetch_offset(); if next_fetch_offset > current_offset { @@ -1238,7 +1242,13 @@ impl LogFetcher { .update_offset(&table_bucket, next_fetch_offset); } - Ok(batches) + // Convert to ScanBatch with bucket info + Ok(batches_with_offsets + .into_iter() + .map(|(batch, base_offset)| { + ScanBatch::new(table_bucket.clone(), batch, base_offset) + }) + .collect()) } else { warn!( "Ignoring fetched batches for {table_bucket:?} at offset {fetch_offset} since the current offset is {current_offset}" diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs index 94997e8d..8438b16d 100644 --- a/crates/fluss/src/record/mod.rs +++ b/crates/fluss/src/record/mod.rs @@ -17,6 +17,7 @@ use crate::metadata::TableBucket; use crate::row::ColumnarRow; +use ::arrow::array::RecordBatch; use core::fmt; use std::collections::HashMap; @@ -170,6 +171,60 @@ impl ScanRecords { } } +/// A batch of records with metadata about bucket and offsets. +/// +/// This is the batch-level equivalent of [`ScanRecord`], providing efficient +/// access to Arrow RecordBatches while preserving the bucket and offset information +/// needed for tracking consumption progress. +#[derive(Debug, Clone)] +pub struct ScanBatch { + /// The bucket this batch belongs to + bucket: TableBucket, + /// The Arrow RecordBatch containing the data + batch: RecordBatch, + /// Offset of the first record in this batch + base_offset: i64, +} + +impl ScanBatch { + pub fn new(bucket: TableBucket, batch: RecordBatch, base_offset: i64) -> Self { + Self { + bucket, + batch, + base_offset, + } + } + + pub fn bucket(&self) -> &TableBucket { + &self.bucket + } + + pub fn batch(&self) -> &RecordBatch { + &self.batch + } + + pub fn into_batch(self) -> RecordBatch { + self.batch + } + + pub fn base_offset(&self) -> i64 { + self.base_offset + } + + pub fn num_records(&self) -> usize { + self.batch.num_rows() + } + + /// Returns the offset of the last record in this batch. + pub fn last_offset(&self) -> i64 { + if self.batch.num_rows() == 0 { + self.base_offset - 1 + } else { + self.base_offset + self.batch.num_rows() as i64 - 1 + } + } +} + impl IntoIterator for ScanRecords { type Item = ScanRecord; type IntoIter = std::vec::IntoIter; @@ -243,4 +298,26 @@ mod tests { assert_eq!(record.timestamp(), -1); assert_eq!(record.change_type(), &ChangeType::Insert); } + + #[test] + fn scan_batch_last_offset() { + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)])); + let bucket = TableBucket::new(1, 0); + + // Batch with 3 records starting at offset 100 -> last_offset = 102 + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + let scan_batch = ScanBatch::new(bucket.clone(), batch, 100); + assert_eq!(scan_batch.num_records(), 3); + assert_eq!(scan_batch.last_offset(), 102); + + // Empty batch -> last_offset = base_offset - 1 + let empty_batch = RecordBatch::new_empty(schema); + let empty_scan_batch = ScanBatch::new(bucket, empty_batch, 100); + assert_eq!(empty_scan_batch.num_records(), 0); + assert_eq!(empty_scan_batch.last_offset(), 99); + } } From 7951287c026050c5fff6d2a69d7b21e6722672b9 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 25 Jan 2026 07:24:20 +0100 Subject: [PATCH 2/2] fix CI/CD --- crates/fluss/tests/integration/table.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index ef73b568..046ec023 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -498,8 +498,10 @@ mod table_test { let all_ids: Vec = batches .iter() .flat_map(|b| { - (0..b.num_rows()).map(|i| { - b.column(0) + let batch = b.batch(); + (0..batch.num_rows()).map(move |i| { + batch + .column(0) .as_any() .downcast_ref::() .unwrap() @@ -523,8 +525,10 @@ mod table_test { let new_ids: Vec = more .iter() .flat_map(|b| { - (0..b.num_rows()).map(|i| { - b.column(0) + let batch = b.batch(); + (0..batch.num_rows()).map(move |i| { + batch + .column(0) .as_any() .downcast_ref::() .unwrap() @@ -544,8 +548,10 @@ mod table_test { let trunc_ids: Vec = trunc_batches .iter() .flat_map(|b| { - (0..b.num_rows()).map(|i| { - b.column(0) + let batch = b.batch(); + (0..batch.num_rows()).map(move |i| { + batch + .column(0) .as_any() .downcast_ref::() .unwrap() @@ -568,6 +574,6 @@ mod table_test { let proj_batches = proj.poll(Duration::from_secs(10)).await.unwrap(); // Projected batch should have 1 column (id), not 2 (id, name) - assert_eq!(proj_batches[0].num_columns(), 1); + assert_eq!(proj_batches[0].batch().num_columns(), 1); } }