From d72fe4ef8094dd0c03be41e8a0c621a5cb9aaccd Mon Sep 17 00:00:00 2001 From: SkylerLin <44233950+linguoxuan@users.noreply.github.com> Date: Thu, 29 Jan 2026 11:36:49 +0800 Subject: [PATCH] feat: support batch lookup --- crates/fluss/src/client/connection.rs | 27 ++ .../fluss/src/client/lookup/lookup_client.rs | 194 ++++++++ .../fluss/src/client/lookup/lookup_query.rs | 92 ++++ .../fluss/src/client/lookup/lookup_queue.rs | 130 +++++ .../fluss/src/client/lookup/lookup_sender.rs | 444 ++++++++++++++++++ crates/fluss/src/client/lookup/mod.rs | 39 ++ crates/fluss/src/client/mod.rs | 2 + crates/fluss/src/client/table/lookup.rs | 96 ++-- crates/fluss/src/config.rs | 30 ++ crates/fluss/src/rpc/fluss_api_error.rs | 19 + crates/fluss/src/rpc/message/lookup.rs | 23 + crates/fluss/tests/integration/kv_table.rs | 251 ++++++++++ 12 files changed, 1283 insertions(+), 64 deletions(-) create mode 100644 crates/fluss/src/client/lookup/lookup_client.rs create mode 100644 crates/fluss/src/client/lookup/lookup_query.rs create mode 100644 crates/fluss/src/client/lookup/lookup_queue.rs create mode 100644 crates/fluss/src/client/lookup/lookup_sender.rs create mode 100644 crates/fluss/src/client/lookup/mod.rs diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index a19dbd2f..fce54b68 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -17,6 +17,7 @@ use crate::client::WriterClient; use crate::client::admin::FlussAdmin; +use crate::client::lookup::LookupClient; use crate::client::metadata::Metadata; use crate::client::table::FlussTable; use crate::config::Config; @@ -32,6 +33,7 @@ pub struct FlussConnection { network_connects: Arc, args: Config, writer_client: RwLock>>, + lookup_client: RwLock>>, } impl FlussConnection { @@ -48,6 +50,7 @@ impl FlussConnection { network_connects: connections.clone(), args: arg.clone(), writer_client: Default::default(), + lookup_client: Default::default(), }) } @@ -90,6 +93,30 @@ impl FlussConnection { Ok(new_client) } + /// Gets or creates a lookup client for batched lookup operations. + pub fn get_or_create_lookup_client(&self) -> Result> { + // 1. Fast path: Attempt to acquire a read lock to check if the client already exists. + if let Some(client) = self.lookup_client.read().as_ref() { + return Ok(client.clone()); + } + + // 2. Slow path: Acquire the write lock. + let mut lookup_guard = self.lookup_client.write(); + + // 3. Double-check: Another thread might have initialized the client + // while this thread was waiting for the write lock. + if let Some(client) = lookup_guard.as_ref() { + return Ok(client.clone()); + } + + // 4. Initialize the client since we are certain it doesn't exist yet. + let new_client = Arc::new(LookupClient::new(&self.args, self.metadata.clone())); + + // 5. Store and return the newly created client. + *lookup_guard = Some(new_client.clone()); + Ok(new_client) + } + pub async fn get_table(&self, table_path: &TablePath) -> Result> { self.metadata.update_table_metadata(table_path).await?; let table_info = self.metadata.get_cluster().get_table(table_path)?.clone(); diff --git a/crates/fluss/src/client/lookup/lookup_client.rs b/crates/fluss/src/client/lookup/lookup_client.rs new file mode 100644 index 00000000..b6e49754 --- /dev/null +++ b/crates/fluss/src/client/lookup/lookup_client.rs @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lookup client that batches multiple lookups together for improved throughput. +//! +//! This client achieves parity with the Java client by: +//! - Queuing lookup operations instead of sending them immediately +//! - Batching multiple lookups to the same server/bucket +//! - Running a background sender task to process batches + +use super::{LookupQuery, LookupQueue}; +use crate::client::lookup::lookup_sender::LookupSender; +use crate::client::metadata::Metadata; +use crate::config::Config; +use crate::error::{Error, Result}; +use crate::metadata::{TableBucket, TablePath}; +use log::{debug, error}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +/// A client that lookups values from the server with batching support. +/// +/// The lookup client uses a queue and background sender to batch multiple +/// lookup operations together, reducing network round trips and improving +/// throughput. +/// +/// # Example +/// +/// ```ignore +/// let lookup_client = LookupClient::new(config, metadata); +/// let result = lookup_client.lookup(table_path, table_bucket, key_bytes).await?; +/// ``` +pub struct LookupClient { + /// Channel to send lookup requests to the queue + lookup_tx: mpsc::Sender, + /// Handle to the sender task + sender_handle: Option>, + /// Shutdown signal sender + shutdown_tx: mpsc::Sender<()>, + /// Whether the client is closed + closed: AtomicBool, +} + +impl LookupClient { + /// Creates a new lookup client. + pub fn new(config: &Config, metadata: Arc) -> Self { + // Extract configuration values + let queue_size = config.lookup_queue_size; + let max_batch_size = config.lookup_max_batch_size; + let batch_timeout_ms = config.lookup_batch_timeout_ms; + let max_inflight = config.lookup_max_inflight_requests; + let max_retries = config.lookup_max_retries; + + // Create queue and channels + let (queue, lookup_tx, re_enqueue_tx) = + LookupQueue::new(queue_size, max_batch_size, batch_timeout_ms); + + // Create sender + let mut sender = + LookupSender::new(metadata, queue, re_enqueue_tx, max_inflight, max_retries); + + // Create shutdown channel + let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + + // Spawn sender task + let sender_handle = tokio::spawn(async move { + tokio::select! { + _ = sender.run() => { + debug!("Lookup sender completed"); + } + _ = shutdown_rx.recv() => { + debug!("Lookup sender received shutdown signal"); + sender.initiate_close(); + } + } + }); + + Self { + lookup_tx, + sender_handle: Some(sender_handle), + shutdown_tx, + closed: AtomicBool::new(false), + } + } + + /// Looks up a value by its primary key. + /// + /// This method queues the lookup operation and returns a future that will + /// complete when the server responds. Multiple lookups may be batched together + /// for improved throughput. + /// + /// # Arguments + /// * `table_path` - The table path + /// * `table_bucket` - The table bucket + /// * `key_bytes` - The encoded primary key bytes + /// + /// # Returns + /// * `Ok(Some(bytes))` - The value bytes if found + /// * `Ok(None)` - If the key was not found + /// * `Err(Error)` - If the lookup fails + pub async fn lookup( + &self, + table_path: TablePath, + table_bucket: TableBucket, + key_bytes: Vec, + ) -> Result>> { + // Check if the client is closed + if self.closed.load(Ordering::Acquire) { + return Err(Error::UnexpectedError { + message: "Lookup client is closed".to_string(), + source: None, + }); + } + + let (result_tx, result_rx) = tokio::sync::oneshot::channel(); + + let query = LookupQuery::new(table_path, table_bucket, key_bytes, result_tx); + + // Send to queue + self.lookup_tx + .send(query) + .await + .map_err(|_| Error::UnexpectedError { + message: "Failed to queue lookup: channel closed".to_string(), + source: None, + })?; + + // Wait for result + result_rx.await.map_err(|_| Error::UnexpectedError { + message: "Lookup result channel closed".to_string(), + source: None, + })? + } + + /// Closes the lookup client gracefully. + pub async fn close(mut self, timeout: Duration) { + debug!("Closing lookup client"); + + // Mark as closed to reject new lookups + self.closed.store(true, Ordering::Release); + + // Send shutdown signal + let _ = self.shutdown_tx.send(()).await; + + // Wait for sender to complete with timeout + if let Some(handle) = self.sender_handle.take() { + debug!("Waiting for sender task to complete..."); + let abort_handle = handle.abort_handle(); + + match tokio::time::timeout(timeout, handle).await { + Ok(Ok(())) => { + debug!("Lookup sender task completed gracefully."); + } + Ok(Err(join_error)) => { + error!("Lookup sender task panicked: {:?}", join_error); + } + Err(_elapsed) => { + error!("Lookup sender task did not complete within timeout. Forcing shutdown."); + abort_handle.abort(); + } + } + } else { + debug!("Lookup client was already closed or never initialized properly."); + } + + debug!("Lookup client closed"); + } +} + +impl Drop for LookupClient { + fn drop(&mut self) { + // Abort the sender task on drop if it wasn't already consumed by close() + if let Some(handle) = self.sender_handle.take() { + handle.abort(); + } + } +} diff --git a/crates/fluss/src/client/lookup/lookup_query.rs b/crates/fluss/src/client/lookup/lookup_query.rs new file mode 100644 index 00000000..c7c0038e --- /dev/null +++ b/crates/fluss/src/client/lookup/lookup_query.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lookup query representation for batching lookup operations. + +use crate::metadata::{TableBucket, TablePath}; +use std::sync::atomic::{AtomicI32, Ordering}; +use tokio::sync::oneshot; + +/// Represents a single lookup query that will be batched and sent to the server. +pub struct LookupQuery { + /// The table path for this lookup + table_path: TablePath, + /// The table bucket for this lookup + table_bucket: TableBucket, + /// The encoded primary key bytes + key: Vec, + /// Channel to send the result back to the caller + result_tx: Option>, crate::error::Error>>>, + /// Number of retry attempts + retries: AtomicI32, +} + +impl LookupQuery { + /// Creates a new lookup query. + pub fn new( + table_path: TablePath, + table_bucket: TableBucket, + key: Vec, + result_tx: oneshot::Sender>, crate::error::Error>>, + ) -> Self { + Self { + table_path, + table_bucket, + key, + result_tx: Some(result_tx), + retries: AtomicI32::new(0), + } + } + + /// Returns the table path. + #[allow(dead_code)] + pub fn table_path(&self) -> &TablePath { + &self.table_path + } + + /// Returns the table bucket. + pub fn table_bucket(&self) -> &TableBucket { + &self.table_bucket + } + + /// Returns the encoded key bytes. + pub fn key(&self) -> &[u8] { + &self.key + } + + /// Returns the current retry count. + pub fn retries(&self) -> i32 { + self.retries.load(Ordering::Acquire) + } + + /// Increments the retry counter. + pub fn increment_retries(&self) { + self.retries.fetch_add(1, Ordering::AcqRel); + } + + /// Completes the lookup with a result. + pub fn complete(&mut self, result: Result>, crate::error::Error>) { + if let Some(tx) = self.result_tx.take() { + let _ = tx.send(result); + } + } + + /// Returns true if the result has already been sent. + pub fn is_done(&self) -> bool { + self.result_tx.is_none() + } +} diff --git a/crates/fluss/src/client/lookup/lookup_queue.rs b/crates/fluss/src/client/lookup/lookup_queue.rs new file mode 100644 index 00000000..6b693c97 --- /dev/null +++ b/crates/fluss/src/client/lookup/lookup_queue.rs @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lookup queue for buffering pending lookup operations. +//! +//! This queue buffers lookup operations and provides batched draining +//! to improve throughput by reducing network round trips. + +use super::LookupQuery; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::time::timeout; + +/// A queue that buffers pending lookup operations and provides batched draining. +/// +/// The queue supports two types of entries: +/// - New lookups from client calls +/// - Re-enqueued lookups from retry logic +/// +/// Re-enqueued lookups are prioritized over new lookups to ensure fair processing. +pub struct LookupQueue { + /// Channel for receiving lookup requests + lookup_rx: mpsc::Receiver, + /// Channel for receiving re-enqueued lookups + re_enqueue_rx: mpsc::UnboundedReceiver, + /// Maximum batch size for draining + max_batch_size: usize, + /// Timeout for batch collection + batch_timeout: Duration, +} + +impl LookupQueue { + /// Creates a new lookup queue with the specified configuration. + pub fn new( + queue_size: usize, + max_batch_size: usize, + batch_timeout_ms: u64, + ) -> ( + Self, + mpsc::Sender, + mpsc::UnboundedSender, + ) { + let (lookup_tx, lookup_rx) = mpsc::channel(queue_size); + let (re_enqueue_tx, re_enqueue_rx) = mpsc::unbounded_channel(); + + let queue = Self { + lookup_rx, + re_enqueue_rx, + max_batch_size, + batch_timeout: Duration::from_millis(batch_timeout_ms), + }; + + (queue, lookup_tx, re_enqueue_tx) + } + + /// Drains a batch of lookup queries from the queue. + pub async fn drain(&mut self) -> Vec { + let mut lookups = Vec::with_capacity(self.max_batch_size); + let deadline = tokio::time::Instant::now() + self.batch_timeout; + + // First, drain re-enqueued lookups (prioritized) + while lookups.len() < self.max_batch_size { + match self.re_enqueue_rx.try_recv() { + Ok(lookup) => lookups.push(lookup), + Err(_) => break, + } + } + + // Then drain from main queue + while lookups.len() < self.max_batch_size { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + break; + } + + match timeout(remaining, self.lookup_rx.recv()).await { + Ok(Some(lookup)) => { + lookups.push(lookup); + // Try to drain more without waiting + while lookups.len() < self.max_batch_size { + match self.lookup_rx.try_recv() { + Ok(lookup) => lookups.push(lookup), + Err(_) => break, + } + } + } + Ok(None) => break, // Channel closed + Err(_) => break, // Timeout + } + } + + lookups + } + + /// Drains all remaining lookups from the queue. + pub fn drain_all(&mut self) -> Vec { + let mut lookups = Vec::new(); + + // Drain re-enqueued lookups + while let Ok(lookup) = self.re_enqueue_rx.try_recv() { + lookups.push(lookup); + } + + // Drain main queue + while let Ok(lookup) = self.lookup_rx.try_recv() { + lookups.push(lookup); + } + + lookups + } + + /// Returns true if there are undrained lookups in the queue. + pub fn has_undrained(&self) -> bool { + !self.lookup_rx.is_empty() || !self.re_enqueue_rx.is_empty() + } +} diff --git a/crates/fluss/src/client/lookup/lookup_sender.rs b/crates/fluss/src/client/lookup/lookup_sender.rs new file mode 100644 index 00000000..2f8daf2d --- /dev/null +++ b/crates/fluss/src/client/lookup/lookup_sender.rs @@ -0,0 +1,444 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lookup sender that processes batched lookup requests. +//! +//! The sender runs as a background task, draining lookups from the queue, +//! grouping them by destination server, and sending batched requests. + +use super::{LookupQuery, LookupQueue}; +use crate::client::metadata::Metadata; +use crate::error::{Error, FlussError, Result}; +use crate::metadata::TableBucket; +use crate::proto::LookupResponse; +use crate::rpc::message::LookupRequest; +use log::{debug, error, warn}; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use tokio::sync::{Semaphore, mpsc}; + +/// Lookup sender that batches and sends lookup requests. +pub struct LookupSender { + /// Metadata for leader lookup + metadata: Arc, + /// The lookup queue to drain from + queue: LookupQueue, + /// Channel to re-enqueue failed lookups + re_enqueue_tx: mpsc::UnboundedSender, + /// Semaphore to limit in-flight requests + inflight_semaphore: Arc, + /// Maximum number of retries + max_retries: i32, + /// Whether the sender is running + running: AtomicBool, + /// Whether to force close (abandon pending lookups) + force_close: AtomicBool, +} + +/// A batch of lookups going to the same table bucket. +struct LookupBatch { + table_bucket: TableBucket, + lookups: Vec, + keys: Vec>, +} + +impl LookupBatch { + fn new(table_bucket: TableBucket) -> Self { + Self { + table_bucket, + lookups: Vec::new(), + keys: Vec::new(), + } + } + + fn add_lookup(&mut self, lookup: LookupQuery) { + self.keys.push(lookup.key().to_vec()); + self.lookups.push(lookup); + } + + fn complete(&mut self, values: Vec>>) { + if values.len() != self.lookups.len() { + let err_msg = format!( + "The number of return values ({}) does not match the number of lookups ({})", + values.len(), + self.lookups.len() + ); + for lookup in &mut self.lookups { + lookup.complete(Err(Error::UnexpectedError { + message: err_msg.clone(), + source: None, + })); + } + return; + } + + for (lookup, value) in self.lookups.iter_mut().zip(values.into_iter()) { + lookup.complete(Ok(value)); + } + } + + fn complete_exceptionally(&mut self, error_msg: &str) { + for lookup in &mut self.lookups { + lookup.complete(Err(Error::UnexpectedError { + message: error_msg.to_string(), + source: None, + })); + } + } +} + +impl LookupSender { + /// Creates a new lookup sender. + pub fn new( + metadata: Arc, + queue: LookupQueue, + re_enqueue_tx: mpsc::UnboundedSender, + max_inflight_requests: usize, + max_retries: i32, + ) -> Self { + Self { + metadata, + queue, + re_enqueue_tx, + inflight_semaphore: Arc::new(Semaphore::new(max_inflight_requests)), + max_retries, + running: AtomicBool::new(true), + force_close: AtomicBool::new(false), + } + } + + /// Runs the sender loop. + pub async fn run(&mut self) { + debug!("Starting Fluss lookup sender"); + + while self.running.load(Ordering::Acquire) { + if let Err(e) = self.run_once(false).await { + error!("Error in lookup sender: {}", e); + } + } + + debug!("Beginning shutdown of lookup sender, sending remaining lookups"); + + // Process remaining lookups during shutdown + if !self.force_close.load(Ordering::Acquire) && self.queue.has_undrained() { + if let Err(e) = self.run_once(true).await { + error!("Error during lookup sender shutdown: {}", e); + } + } + + debug!("Lookup sender shutdown complete"); + } + + /// Runs a single iteration of the sender loop. + async fn run_once(&mut self, drain_all: bool) -> Result<()> { + let lookups = if drain_all { + self.queue.drain_all() + } else { + self.queue.drain().await + }; + + self.send_lookups(lookups).await + } + + /// Groups and sends lookups to appropriate servers. + async fn send_lookups(&self, lookups: Vec) -> Result<()> { + if lookups.is_empty() { + return Ok(()); + } + + // Group by leader + let lookup_batches = self.group_by_leader(lookups); + + if lookup_batches.is_empty() && !self.queue.has_undrained() { + // No lookups to send and queue is empty, sleep to avoid busy loop + tokio::time::sleep(Duration::from_millis(100)).await; + return Ok(()); + } + + // Send batches to each destination + for (destination, batches) in lookup_batches { + self.send_lookup_request(destination, batches).await; + } + + Ok(()) + } + + /// Groups lookups by leader server. + fn group_by_leader( + &self, + lookups: Vec, + ) -> HashMap> { + let cluster = self.metadata.get_cluster(); + let mut batches_by_leader: HashMap> = HashMap::new(); + + for lookup in lookups { + let table_bucket = lookup.table_bucket().clone(); + + // Find leader for this bucket + let leader = match cluster.leader_for(&table_bucket) { + Some(leader) => leader.id(), + None => { + warn!( + "No leader found for table bucket {} during lookup", + table_bucket + ); + self.re_enqueue_lookup(lookup); + continue; + } + }; + + batches_by_leader + .entry(leader) + .or_default() + .entry(table_bucket.clone()) + .or_insert_with(|| LookupBatch::new(table_bucket)) + .add_lookup(lookup); + } + + batches_by_leader + } + + /// Sends lookup requests to a specific destination server. + async fn send_lookup_request( + &self, + destination: i32, + batches_by_bucket: HashMap, + ) { + // Group by table_id for request batching + let mut batches_by_table: HashMap> = HashMap::new(); + for (table_bucket, batch) in batches_by_bucket { + batches_by_table + .entry(table_bucket.table_id()) + .or_default() + .push(batch); + } + + let cluster = self.metadata.get_cluster(); + let tablet_server = match cluster.get_tablet_server(destination) { + Some(server) => server.clone(), + None => { + let err_msg = format!("Server {} is not found in metadata cache", destination); + for batches in batches_by_table.into_values() { + for mut batch in batches { + self.handle_lookup_error(&err_msg, true, &mut batch); + } + } + return; + } + }; + + let connection = match self.metadata.get_connection(&tablet_server).await { + Ok(conn) => conn, + Err(e) => { + let err_msg = format!("Failed to get connection to server {}: {}", destination, e); + for batches in batches_by_table.into_values() { + for mut batch in batches { + self.handle_lookup_error(&err_msg, true, &mut batch); + } + } + return; + } + }; + + // Send requests for each table + for (table_id, mut batches) in batches_by_table { + // Build the request with all buckets for this table + // Use std::mem::take to move keys instead of cloning to avoid deep copy overhead + let mut all_keys_by_bucket: Vec<(i32, Option, Vec>)> = Vec::new(); + for batch in &mut batches { + all_keys_by_bucket.push(( + batch.table_bucket.bucket_id(), + batch.table_bucket.partition_id(), + std::mem::take(&mut batch.keys), + )); + } + + // Create lookup request for all buckets in this table + let request = LookupRequest::new_batched(table_id, all_keys_by_bucket); + + // Acquire semaphore permit + let _permit = match self.inflight_semaphore.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + error!("Semaphore closed during lookup"); + for batch in &mut batches { + batch.complete_exceptionally("Lookup sender shutdown"); + } + return; + } + }; + + // Send request and handle response + match connection.request(request).await { + Ok(response) => { + self.handle_lookup_response(destination, response, &mut batches); + } + Err(e) => { + let err_msg = format!("Lookup request failed: {}", e); + let is_retriable = Self::is_retriable_error(&e); + for batch in &mut batches { + self.handle_lookup_error(&err_msg, is_retriable, batch); + } + } + } + } + } + + /// Checks if an error is retriable. + fn is_retriable_error(error: &Error) -> bool { + match error { + Error::LeaderNotAvailable { .. } => true, + Error::FlussAPIError { api_error } => { + let fluss_error = FlussError::for_code(api_error.code); + fluss_error.is_retriable() + } + _ => false, + } + } + + /// Handles the lookup response. + fn handle_lookup_response( + &self, + destination: i32, + response: LookupResponse, + batches: &mut [LookupBatch], + ) { + // Create a map from bucket_id to batch index for quick lookup + let bucket_to_index: HashMap = batches + .iter() + .enumerate() + .map(|(idx, batch)| (batch.table_bucket.bucket_id(), idx)) + .collect(); + + // Track which batches have been processed + let mut processed_batches = vec![false; batches.len()]; + + for bucket_resp in response.buckets_resp { + let bucket_id = bucket_resp.bucket_id; + if let Some(&batch_idx) = bucket_to_index.get(&bucket_id) { + processed_batches[batch_idx] = true; + let batch = &mut batches[batch_idx]; + + // Check for errors + if let Some(error_code) = bucket_resp.error_code { + let fluss_error = FlussError::for_code(error_code); + if fluss_error != FlussError::None { + let err_msg = format!( + "Lookup error for bucket {}: code={}, message={}", + bucket_id, + error_code, + bucket_resp.error_message.unwrap_or_default() + ); + let is_retriable = fluss_error.is_retriable(); + self.handle_lookup_error(&err_msg, is_retriable, batch); + continue; + } + } + + // Extract values + let values: Vec>> = bucket_resp + .values + .into_iter() + .map(|pb_value| pb_value.values) + .collect(); + + batch.complete(values); + } else { + error!( + "Received response for unknown bucket {} from server {}", + bucket_id, destination + ); + } + } + + // Handle any batches that were not included in the response + for (idx, processed) in processed_batches.iter().enumerate() { + if !processed { + let batch = &mut batches[idx]; + // If the batch has lookups that haven't been processed, retry them + if !batch.lookups.is_empty() { + let err_msg = format!( + "Bucket {} response missing from server {}", + batch.table_bucket.bucket_id(), + destination + ); + // Treat missing bucket response as retriable + self.handle_lookup_error(&err_msg, true, batch); + } + } + } + } + + /// Handles lookup errors with retry logic. + fn handle_lookup_error(&self, error_msg: &str, is_retriable: bool, batch: &mut LookupBatch) { + let mut lookups_to_retry = Vec::new(); + let mut lookups_to_complete = Vec::new(); + + for lookup in batch.lookups.drain(..) { + if is_retriable && lookup.retries() < self.max_retries && !lookup.is_done() { + warn!( + "Lookup error for bucket {}, retrying ({} attempts left): {}", + batch.table_bucket, + self.max_retries - lookup.retries(), + error_msg + ); + lookups_to_retry.push(lookup); + } else { + lookups_to_complete.push(lookup); + } + } + + // Re-enqueue retriable lookups + for lookup in lookups_to_retry { + lookup.increment_retries(); + self.re_enqueue_lookup(lookup); + } + + // Complete non-retriable lookups with error + for mut lookup in lookups_to_complete { + warn!( + "Lookup failed for bucket {}: {}", + batch.table_bucket, error_msg + ); + lookup.complete(Err(Error::UnexpectedError { + message: error_msg.to_string(), + source: None, + })); + } + } + + /// Re-enqueues a lookup for retry. + fn re_enqueue_lookup(&self, lookup: LookupQuery) { + if let Err(e) = self.re_enqueue_tx.send(lookup) { + error!("Failed to re-enqueue lookup: {}", e); + } + } + + /// Initiates graceful shutdown of the sender. + pub fn initiate_close(&mut self) { + self.running.store(false, Ordering::Release); + } + + /// Forces immediate shutdown, abandoning pending lookups. + #[allow(dead_code)] + pub fn force_close(&mut self) { + self.force_close.store(true, Ordering::Release); + self.initiate_close(); + } +} diff --git a/crates/fluss/src/client/lookup/mod.rs b/crates/fluss/src/client/lookup/mod.rs new file mode 100644 index 00000000..f96aa2ed --- /dev/null +++ b/crates/fluss/src/client/lookup/mod.rs @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lookup client implementation with batching and queuing support. +//! +//! This module provides a high-throughput lookup client that batches multiple +//! lookup operations together to reduce network round trips, achieving parity +//! with the Java client implementation. +//! +//! # Example +//! +//! ```ignore +//! let lookup_client = LookupClient::new(config, metadata); +//! let future = lookup_client.lookup(table_path, table_bucket, key_bytes); +//! let result = future.await?; +//! ``` + +mod lookup_client; +mod lookup_query; +mod lookup_queue; +mod lookup_sender; + +pub use lookup_client::LookupClient; +pub(crate) use lookup_query::LookupQuery; +pub(crate) use lookup_queue::LookupQueue; diff --git a/crates/fluss/src/client/mod.rs b/crates/fluss/src/client/mod.rs index cff218b3..77c30203 100644 --- a/crates/fluss/src/client/mod.rs +++ b/crates/fluss/src/client/mod.rs @@ -18,6 +18,7 @@ mod admin; mod connection; mod credentials; +pub mod lookup; mod metadata; mod table; mod write; @@ -25,6 +26,7 @@ mod write; pub use admin::*; pub use connection::*; pub use credentials::*; +pub use lookup::LookupClient; pub use metadata::*; pub use table::*; pub use write::*; diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs index 4e89176a..8e4eb59a 100644 --- a/crates/fluss/src/client/table/lookup.rs +++ b/crates/fluss/src/client/table/lookup.rs @@ -17,15 +17,14 @@ use crate::bucketing::BucketingFunction; use crate::client::connection::FlussConnection; +use crate::client::lookup::LookupClient; use crate::client::metadata::Metadata; use crate::error::{Error, Result}; -use crate::metadata::{RowType, TableBucket, TableInfo}; +use crate::metadata::{RowType, TableBucket, TableInfo, TablePath}; use crate::record::kv::SCHEMA_ID_LENGTH; use crate::row::InternalRow; use crate::row::compacted::CompactedRow; use crate::row::encode::{KeyEncoder, KeyEncoderFactory}; -use crate::rpc::ApiError; -use crate::rpc::message::LookupRequest; use std::sync::Arc; /// The result of a lookup operation. @@ -106,6 +105,7 @@ impl<'a> LookupResult<'a> { pub struct TableLookup<'a> { conn: &'a FlussConnection, table_info: TableInfo, + #[allow(dead_code)] metadata: Arc, } @@ -126,7 +126,11 @@ impl<'a> TableLookup<'a> { /// /// The lookuper will automatically encode the key and compute the bucket /// for each lookup using the appropriate bucketing function. - pub fn create_lookuper(self) -> Result> { + /// + /// The lookuper uses a shared `LookupClient` that batches multiple lookup + /// operations together to reduce network round trips. This achieves parity + /// with the Java client implementation for improved throughput. + pub fn create_lookuper(self) -> Result { let num_buckets = self.table_info.get_num_buckets(); // Get data lake format from table config for bucketing function @@ -141,10 +145,13 @@ impl<'a> TableLookup<'a> { &data_lake_format, )?; + // Get or create the shared lookup client + let lookup_client = self.conn.get_or_create_lookup_client()?; + Ok(Lookuper { - conn: self.conn, + table_path: self.table_info.get_table_path().clone(), table_info: self.table_info, - metadata: self.metadata, + lookup_client, bucketing_function, key_encoder, num_buckets, @@ -155,7 +162,7 @@ impl<'a> TableLookup<'a> { /// Performs key-based lookups against a primary key table. /// /// The `Lookuper` automatically encodes the lookup key, computes the target -/// bucket, finds the appropriate tablet server, and retrieves the value. +/// bucket, and retrieves the value using the batched `LookupClient`. /// /// # Example /// ```ignore @@ -164,20 +171,21 @@ impl<'a> TableLookup<'a> { /// let result = lookuper.lookup(&row).await?; /// ``` // TODO: Support partitioned tables (extract partition from key) -pub struct Lookuper<'a> { - conn: &'a FlussConnection, +pub struct Lookuper { + table_path: TablePath, table_info: TableInfo, - metadata: Arc, + lookup_client: Arc, bucketing_function: Box, key_encoder: Box, num_buckets: i32, } -impl<'a> Lookuper<'a> { +impl Lookuper { /// Looks up a value by its primary key. /// /// The key is encoded and the bucket is automatically computed using - /// the table's bucketing function. + /// the table's bucketing function. The lookup is queued and batched + /// with other lookups for improved throughput. /// /// # Arguments /// * `row` - The row containing the primary key field values @@ -186,7 +194,6 @@ impl<'a> Lookuper<'a> { /// * `Ok(LookupResult)` - The lookup result (may be empty if key not found) /// * `Err(Error)` - If the lookup fails pub async fn lookup(&mut self, row: &dyn InternalRow) -> Result> { - // todo: support batch lookup // Encode the key from the row let encoded_key = self.key_encoder.encode_key(row)?; let key_bytes = encoded_key.to_vec(); @@ -199,58 +206,19 @@ impl<'a> Lookuper<'a> { let table_id = self.table_info.get_table_id(); let table_bucket = TableBucket::new(table_id, bucket_id); - // Find the leader for this bucket - let cluster = self.metadata.get_cluster(); - let leader = - cluster - .leader_for(&table_bucket) - .ok_or_else(|| Error::LeaderNotAvailable { - message: format!("No leader found for table bucket: {table_bucket}"), - })?; - - // Get connection to the tablet server - let tablet_server = - cluster - .get_tablet_server(leader.id()) - .ok_or_else(|| Error::LeaderNotAvailable { - message: format!( - "Tablet server {} is not found in metadata cache", - leader.id() - ), - })?; - - let connections = self.conn.get_connections(); - let connection = connections.get_connection(tablet_server).await?; - - // Send lookup request - let request = LookupRequest::new(table_id, None, bucket_id, vec![key_bytes]); - let response = connection.request(request).await?; - - // Extract the values from response - if let Some(bucket_resp) = response.buckets_resp.into_iter().next() { - // Check for errors - if let Some(error_code) = bucket_resp.error_code { - if error_code != 0 { - return Err(Error::FlussAPIError { - api_error: ApiError { - code: error_code, - message: bucket_resp.error_message.unwrap_or_default(), - }, - }); - } - } - - // Collect all values - let rows: Vec> = bucket_resp - .values - .into_iter() - .filter_map(|pb_value| pb_value.values) - .collect(); - - return Ok(LookupResult::new(rows, self.table_info.row_type())); + // Use the batched lookup client + let result = self + .lookup_client + .lookup(self.table_path.clone(), table_bucket, key_bytes) + .await?; + + match result { + Some(value_bytes) => Ok(LookupResult::new( + vec![value_bytes], + self.table_info.row_type(), + )), + None => Ok(LookupResult::empty(self.table_info.row_type())), } - - Ok(LookupResult::empty(self.table_info.row_type())) } /// Returns a reference to the table info. diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 705e241d..36036219 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -46,6 +46,31 @@ pub struct Config { /// Default: 3 (matching Java REMOTE_FILE_DOWNLOAD_THREAD_NUM) #[arg(long, default_value_t = 3)] pub scanner_remote_log_download_threads: usize, + + /// Maximum number of pending lookup operations + /// Default: 25600 (matching Java CLIENT_LOOKUP_QUEUE_SIZE) + #[arg(long, default_value_t = 25600)] + pub lookup_queue_size: usize, + + /// Maximum batch size of merging lookup operations to one lookup request + /// Default: 128 (matching Java CLIENT_LOOKUP_MAX_BATCH_SIZE) + #[arg(long, default_value_t = 128)] + pub lookup_max_batch_size: usize, + + /// Maximum time to wait for the lookup batch to fill (in milliseconds) + /// Default: 100 (matching Java CLIENT_LOOKUP_BATCH_TIMEOUT) + #[arg(long, default_value_t = 100)] + pub lookup_batch_timeout_ms: u64, + + /// Maximum number of unacknowledged lookup requests + /// Default: 128 (matching Java CLIENT_LOOKUP_MAX_INFLIGHT_SIZE) + #[arg(long, default_value_t = 128)] + pub lookup_max_inflight_requests: usize, + + /// Maximum number of lookup retries + /// Default: i32::MAX (matching Java CLIENT_LOOKUP_MAX_RETRIES) + #[arg(long, default_value_t = i32::MAX)] + pub lookup_max_retries: i32, } impl Default for Config { @@ -58,6 +83,11 @@ impl Default for Config { writer_batch_size: 2 * 1024 * 1024, scanner_remote_log_prefetch_num: 4, scanner_remote_log_download_threads: 3, + lookup_queue_size: 25600, + lookup_max_batch_size: 128, + lookup_batch_timeout_ms: 100, + lookup_max_inflight_requests: 128, + lookup_max_retries: i32::MAX, } } } diff --git a/crates/fluss/src/rpc/fluss_api_error.rs b/crates/fluss/src/rpc/fluss_api_error.rs index a501b997..d413b143 100644 --- a/crates/fluss/src/rpc/fluss_api_error.rs +++ b/crates/fluss/src/rpc/fluss_api_error.rs @@ -283,6 +283,25 @@ impl FlussError { } } + /// Returns true if this error is retriable. + /// Based on Java client's RetriableException hierarchy. + pub fn is_retriable(&self) -> bool { + matches!( + self, + FlussError::NetworkException + | FlussError::NotLeaderOrFollower + | FlussError::UnknownTableOrBucketException + | FlussError::LeaderNotAvailableException + | FlussError::CorruptMessage + | FlussError::CorruptRecordException + | FlussError::RequestTimeOut + | FlussError::StorageException + | FlussError::NotEnoughReplicasAfterAppendException + | FlussError::NotEnoughReplicasException + | FlussError::SchemaNotExist + ) + } + /// Get the FlussError for the given error code. /// Returns `UnknownServerError` if the code is not recognized. pub fn for_code(code: i32) -> Self { diff --git a/crates/fluss/src/rpc/message/lookup.rs b/crates/fluss/src/rpc/message/lookup.rs index 3de47d64..7e22d7fc 100644 --- a/crates/fluss/src/rpc/message/lookup.rs +++ b/crates/fluss/src/rpc/message/lookup.rs @@ -53,6 +53,29 @@ impl LookupRequest { inner_request: request, } } + + /// Creates a new batched lookup request with multiple buckets. + pub fn new_batched(table_id: i64, buckets: Vec<(i32, Option, Vec>)>) -> Self { + let buckets_req: Vec = buckets + .into_iter() + .map( + |(bucket_id, partition_id, keys)| proto::PbLookupReqForBucket { + partition_id, + bucket_id, + key: keys, + }, + ) + .collect(); + + let request = proto::LookupRequest { + table_id, + buckets_req, + }; + + Self { + inner_request: request, + } + } } impl RequestBody for LookupRequest { diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index a4f29617..722a716b 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -685,4 +685,255 @@ mod kv_table_test { .await .expect("Failed to drop table"); } + + /// Integration test for concurrent batched lookups. + #[tokio::test] + async fn batched_concurrent_lookups() { + use futures::stream::{FuturesUnordered, StreamExt}; + + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_batched_lookups".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("value", DataTypes::bigint()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + // Insert 100 records + let num_records = 100i32; + for i in 0..num_records { + let mut row = GenericRow::new(3); + row.set_field(0, i); + row.set_field(1, format!("name_{}", i)); + row.set_field(2, (i * 100) as i64); + upsert_writer.upsert(&row).await.expect("Failed to upsert"); + } + + // Create multiple lookupers for concurrent lookups + let num_lookups = 50i32; + let mut lookupers: Vec<_> = (0..num_lookups) + .map(|_| { + table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper") + }) + .collect(); + + // Run all lookups concurrently + let mut futures = FuturesUnordered::new(); + for (i, lookuper) in lookupers.iter_mut().enumerate() { + let id = i as i32; + futures.push(async move { + let key = make_key(id); + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + + assert_eq!(row.get_int(0), id, "id mismatch for key {}", id); + assert_eq!( + row.get_string(1), + format!("name_{}", id), + "name mismatch for key {}", + id + ); + assert_eq!( + row.get_long(2), + (id * 100) as i64, + "value mismatch for key {}", + id + ); + id + }); + } + + // Collect all results + let mut results = Vec::with_capacity(num_lookups as usize); + while let Some(id) = futures.next().await { + results.push(id); + } + + // Verify all lookups completed successfully + assert_eq!( + results.len(), + num_lookups as usize, + "Not all lookups completed" + ); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + + /// Integration test for lookups with mixed existing and non-existing keys. + #[tokio::test] + async fn lookups_mixed_existing_and_nonexisting_keys() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_batched_mixed_keys".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("data", DataTypes::string()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + // Insert only even-numbered records (0, 2, 4, 6, ...) + for i in (0..20).step_by(2) { + let mut row = GenericRow::new(2); + row.set_field(0, i as i32); + row.set_field(1, format!("data_{}", i)); + upsert_writer.upsert(&row).await.expect("Failed to upsert"); + } + + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + // Lookup all keys 0-19 (half exist, half don't) + for i in 0..20 { + let mut key = GenericRow::new(2); + key.set_field(0, i as i32); + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row_opt = result.get_single_row().expect("Failed to get row"); + + if i % 2 == 0 { + // Even keys should exist + let row = row_opt.unwrap_or_else(|| panic!("Row {} should exist", i)); + assert_eq!(row.get_int(0), i as i32, "id mismatch"); + assert_eq!(row.get_string(1), format!("data_{}", i), "data mismatch"); + } else { + // Odd keys should not exist + assert!(row_opt.is_none(), "Row {} should not exist", i); + } + } + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + + /// Integration test for lookups with repeated keys. + /// Multiple lookups for the same key should all return the correct result. + #[tokio::test] + async fn lookups_same_key_multiple_times() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = + TablePath::new("fluss".to_string(), "test_repeated_key_lookups".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("counter", DataTypes::bigint()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + // Insert a single record + let target_id = 42i32; + let target_counter = 12345i64; + let mut row = GenericRow::new(2); + row.set_field(0, target_id); + row.set_field(1, target_counter); + upsert_writer.upsert(&row).await.expect("Failed to upsert"); + + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + // Perform 100 lookups for the same key + let num_lookups = 100; + for _ in 0..num_lookups { + let mut key = GenericRow::new(2); + key.set_field(0, target_id); + + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + + assert_eq!(row.get_int(0), target_id, "id mismatch"); + assert_eq!(row.get_long(1), target_counter, "counter mismatch"); + } + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } }