diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index 0e41bbe7..e60e0657 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -17,6 +17,7 @@ use crate::client::WriterClient; use crate::client::admin::FlussAdmin; +use crate::client::lookup::LookupClient; use crate::client::metadata::Metadata; use crate::client::table::FlussTable; use crate::config::Config; @@ -32,6 +33,7 @@ pub struct FlussConnection { network_connects: Arc, args: Config, writer_client: RwLock>>, + lookup_client: RwLock>>, } impl FlussConnection { @@ -48,6 +50,7 @@ impl FlussConnection { network_connects: connections.clone(), args: arg.clone(), writer_client: Default::default(), + lookup_client: Default::default(), }) } @@ -78,6 +81,18 @@ impl FlussConnection { Ok(client) } + /// Gets or creates a lookup client for batched lookup operations. + pub fn get_or_create_lookup_client(&self) -> Arc { + if let Some(client) = self.lookup_client.read().as_ref() { + return client.clone(); + } + + // If not exists, create new one + let client = Arc::new(LookupClient::new(&self.args, self.metadata.clone())); + *self.lookup_client.write() = Some(client.clone()); + client + } + pub async fn get_table(&self, table_path: &TablePath) -> Result> { self.metadata.update_table_metadata(table_path).await?; let table_info = self.metadata.get_cluster().get_table(table_path).clone(); diff --git a/crates/fluss/src/client/lookup/lookup_client.rs b/crates/fluss/src/client/lookup/lookup_client.rs new file mode 100644 index 00000000..2c52d910 --- /dev/null +++ b/crates/fluss/src/client/lookup/lookup_client.rs @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lookup client that batches multiple lookups together for improved throughput. +//! +//! This client achieves parity with the Java client by: +//! - Queuing lookup operations instead of sending them immediately +//! - Batching multiple lookups to the same server/bucket +//! - Running a background sender task to process batches + +use super::lookup_queue::{ + DEFAULT_LOOKUP_BATCH_TIMEOUT_MS, DEFAULT_LOOKUP_MAX_BATCH_SIZE, DEFAULT_LOOKUP_QUEUE_SIZE, +}; +use super::lookup_sender::{DEFAULT_LOOKUP_MAX_INFLIGHT_REQUESTS, DEFAULT_LOOKUP_MAX_RETRIES}; +use super::{LookupQuery, LookupQueue}; +use crate::client::lookup::lookup_sender::LookupSender; +use crate::client::metadata::Metadata; +use crate::config::Config; +use crate::error::{Error, Result}; +use crate::metadata::{TableBucket, TablePath}; +use log::{debug, error}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +/// A client that lookups values from the server with batching support. +/// +/// The lookup client uses a queue and background sender to batch multiple +/// lookup operations together, reducing network round trips and improving +/// throughput. +/// +/// # Example +/// +/// ```ignore +/// let lookup_client = LookupClient::new(config, metadata); +/// let result = lookup_client.lookup(table_path, table_bucket, key_bytes).await?; +/// ``` +pub struct LookupClient { + /// Channel to send lookup requests to the queue + lookup_tx: mpsc::Sender, + /// Handle to the sender task + sender_handle: JoinHandle<()>, + /// Shutdown signal sender + shutdown_tx: mpsc::Sender<()>, +} + +impl LookupClient { + /// Creates a new lookup client. + pub fn new(config: &Config, metadata: Arc) -> Self { + // Extract configuration values (use defaults if not configured) + let queue_size = config + .lookup_queue_size + .unwrap_or(DEFAULT_LOOKUP_QUEUE_SIZE); + let max_batch_size = config + .lookup_max_batch_size + .unwrap_or(DEFAULT_LOOKUP_MAX_BATCH_SIZE); + let batch_timeout_ms = config + .lookup_batch_timeout_ms + .unwrap_or(DEFAULT_LOOKUP_BATCH_TIMEOUT_MS); + let max_inflight = config + .lookup_max_inflight_requests + .unwrap_or(DEFAULT_LOOKUP_MAX_INFLIGHT_REQUESTS); + let max_retries = config + .lookup_max_retries + .unwrap_or(DEFAULT_LOOKUP_MAX_RETRIES); + + // Create queue and channels + let (queue, lookup_tx, re_enqueue_tx) = + LookupQueue::new(queue_size, max_batch_size, batch_timeout_ms); + + // Create sender + let mut sender = + LookupSender::new(metadata, queue, re_enqueue_tx, max_inflight, max_retries); + + // Create shutdown channel + let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + + // Spawn sender task + let sender_handle = tokio::spawn(async move { + tokio::select! { + _ = sender.run() => { + debug!("Lookup sender completed"); + } + _ = shutdown_rx.recv() => { + debug!("Lookup sender received shutdown signal"); + sender.initiate_close(); + } + } + }); + + Self { + lookup_tx, + sender_handle, + shutdown_tx, + } + } + + /// Looks up a value by its primary key. + /// + /// This method queues the lookup operation and returns a future that will + /// complete when the server responds. Multiple lookups may be batched together + /// for improved throughput. + /// + /// # Arguments + /// * `table_path` - The table path + /// * `table_bucket` - The table bucket + /// * `key_bytes` - The encoded primary key bytes + /// + /// # Returns + /// * `Ok(Some(bytes))` - The value bytes if found + /// * `Ok(None)` - If the key was not found + /// * `Err(Error)` - If the lookup fails + pub async fn lookup( + &self, + table_path: TablePath, + table_bucket: TableBucket, + key_bytes: Vec, + ) -> Result>> { + let (result_tx, result_rx) = tokio::sync::oneshot::channel(); + + let query = LookupQuery::new(table_path, table_bucket, key_bytes, result_tx); + + // Send to queue + self.lookup_tx + .send(query) + .await + .map_err(|_| Error::UnexpectedError { + message: "Failed to queue lookup: channel closed".to_string(), + source: None, + })?; + + // Wait for result + result_rx.await.map_err(|_| Error::UnexpectedError { + message: "Lookup result channel closed".to_string(), + source: None, + })? + } + + /// Closes the lookup client gracefully. + /// + /// This will: + /// 1. Stop accepting new lookups + /// 2. Process remaining lookups in the queue + /// 3. Wait for the sender to complete + pub async fn close(&self, timeout: Duration) { + debug!("Closing lookup client"); + + // Send shutdown signal + let _ = self.shutdown_tx.send(()).await; + + // Wait for sender to complete with timeout + let _handle = &self.sender_handle; + if tokio::time::timeout(timeout, async { + // We can't actually await the handle here since we don't own it + // Just sleep to give it time to complete + tokio::time::sleep(timeout).await; + }) + .await + .is_err() + { + error!("Lookup sender did not complete within timeout"); + } + + debug!("Lookup client closed"); + } +} + +impl Drop for LookupClient { + fn drop(&mut self) { + // Abort the sender task on drop + self.sender_handle.abort(); + } +} diff --git a/crates/fluss/src/client/lookup/lookup_query.rs b/crates/fluss/src/client/lookup/lookup_query.rs new file mode 100644 index 00000000..4090b65f --- /dev/null +++ b/crates/fluss/src/client/lookup/lookup_query.rs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lookup query representation for batching lookup operations. + +use crate::metadata::{TableBucket, TablePath}; +use std::sync::atomic::{AtomicI32, Ordering}; +use tokio::sync::oneshot; + +/// Represents a single lookup query that will be batched and sent to the server. +/// +/// Each `LookupQuery` contains: +/// - The table path and bucket for routing +/// - The encoded key bytes to look up +/// - A oneshot channel for returning the result +/// - A retry counter for handling transient failures +pub struct LookupQuery { + /// The table path for this lookup + #[allow(dead_code)] + table_path: TablePath, + /// The table bucket for this lookup + table_bucket: TableBucket, + /// The encoded primary key bytes + key: Vec, + /// Channel to send the result back to the caller + result_tx: Option>, crate::error::Error>>>, + /// Number of retry attempts + retries: AtomicI32, +} + +impl LookupQuery { + /// Creates a new lookup query. + pub fn new( + table_path: TablePath, + table_bucket: TableBucket, + key: Vec, + result_tx: oneshot::Sender>, crate::error::Error>>, + ) -> Self { + Self { + table_path, + table_bucket, + key, + result_tx: Some(result_tx), + retries: AtomicI32::new(0), + } + } + + /// Returns the table path. + #[allow(dead_code)] + pub fn table_path(&self) -> &TablePath { + &self.table_path + } + + /// Returns the table bucket. + pub fn table_bucket(&self) -> &TableBucket { + &self.table_bucket + } + + /// Returns the encoded key bytes. + #[allow(dead_code)] + pub fn key(&self) -> &[u8] { + &self.key + } + + /// Takes ownership of the key bytes. + pub fn take_key(&mut self) -> Vec { + std::mem::take(&mut self.key) + } + + /// Returns the current retry count. + pub fn retries(&self) -> i32 { + self.retries.load(Ordering::Acquire) + } + + /// Increments the retry counter. + pub fn increment_retries(&self) { + self.retries.fetch_add(1, Ordering::AcqRel); + } + + /// Completes the lookup with a result. + pub fn complete(&mut self, result: Result>, crate::error::Error>) { + if let Some(tx) = self.result_tx.take() { + let _ = tx.send(result); + } + } + + /// Returns true if the result has already been sent. + pub fn is_done(&self) -> bool { + self.result_tx.is_none() + } +} diff --git a/crates/fluss/src/client/lookup/lookup_queue.rs b/crates/fluss/src/client/lookup/lookup_queue.rs new file mode 100644 index 00000000..f165566a --- /dev/null +++ b/crates/fluss/src/client/lookup/lookup_queue.rs @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lookup queue for buffering pending lookup operations. +//! +//! This queue buffers lookup operations and provides batched draining +//! to improve throughput by reducing network round trips. + +use super::LookupQuery; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::time::timeout; + +/// Default queue capacity for pending lookups. +pub const DEFAULT_LOOKUP_QUEUE_SIZE: usize = 25600; + +/// Default maximum batch size for lookups. +pub const DEFAULT_LOOKUP_MAX_BATCH_SIZE: usize = 128; + +/// Default timeout for batching lookups (in milliseconds). +pub const DEFAULT_LOOKUP_BATCH_TIMEOUT_MS: u64 = 100; + +/// A queue that buffers pending lookup operations and provides batched draining. +/// +/// The queue supports two types of entries: +/// - New lookups from client calls +/// - Re-enqueued lookups from retry logic +/// +/// Re-enqueued lookups are prioritized over new lookups to ensure fair processing. +pub struct LookupQueue { + /// Whether the queue is closed + closed: AtomicBool, + /// Channel for new lookup requests + #[allow(dead_code)] + lookup_tx: mpsc::Sender, + /// Channel for receiving lookup requests + lookup_rx: mpsc::Receiver, + /// Channel for re-enqueued lookups (for retries) + #[allow(dead_code)] + re_enqueue_tx: mpsc::UnboundedSender, + /// Channel for receiving re-enqueued lookups + re_enqueue_rx: mpsc::UnboundedReceiver, + /// Maximum batch size for draining + max_batch_size: usize, + /// Timeout for batch collection + batch_timeout: Duration, +} + +impl LookupQueue { + /// Creates a new lookup queue with the specified configuration. + pub fn new( + queue_size: usize, + max_batch_size: usize, + batch_timeout_ms: u64, + ) -> ( + Self, + mpsc::Sender, + mpsc::UnboundedSender, + ) { + let (lookup_tx, lookup_rx) = mpsc::channel(queue_size); + let (re_enqueue_tx, re_enqueue_rx) = mpsc::unbounded_channel(); + + let queue = Self { + closed: AtomicBool::new(false), + lookup_tx: lookup_tx.clone(), + lookup_rx, + re_enqueue_tx: re_enqueue_tx.clone(), + re_enqueue_rx, + max_batch_size, + batch_timeout: Duration::from_millis(batch_timeout_ms), + }; + + (queue, lookup_tx, re_enqueue_tx) + } + + /// Creates a new lookup queue with default configuration. + #[allow(dead_code)] + pub fn with_defaults() -> ( + Self, + mpsc::Sender, + mpsc::UnboundedSender, + ) { + Self::new( + DEFAULT_LOOKUP_QUEUE_SIZE, + DEFAULT_LOOKUP_MAX_BATCH_SIZE, + DEFAULT_LOOKUP_BATCH_TIMEOUT_MS, + ) + } + + /// Drains a batch of lookup queries from the queue. + /// + /// This method will: + /// 1. First drain re-enqueued lookups (prioritized for retry fairness) + /// 2. Then drain new lookups until batch is full or timeout is reached + /// + /// Returns an empty vector if no lookups are available within the timeout. + pub async fn drain(&mut self) -> Vec { + let mut lookups = Vec::with_capacity(self.max_batch_size); + let deadline = tokio::time::Instant::now() + self.batch_timeout; + + // First, drain re-enqueued lookups (prioritized) + while lookups.len() < self.max_batch_size { + match self.re_enqueue_rx.try_recv() { + Ok(lookup) => lookups.push(lookup), + Err(_) => break, + } + } + + // Then drain from main queue + while lookups.len() < self.max_batch_size { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + break; + } + + match timeout(remaining, self.lookup_rx.recv()).await { + Ok(Some(lookup)) => { + lookups.push(lookup); + // Try to drain more without waiting + while lookups.len() < self.max_batch_size { + match self.lookup_rx.try_recv() { + Ok(lookup) => lookups.push(lookup), + Err(_) => break, + } + } + } + Ok(None) => break, // Channel closed + Err(_) => break, // Timeout + } + } + + lookups + } + + /// Drains all remaining lookups from the queue (used during shutdown). + pub fn drain_all(&mut self) -> Vec { + let mut lookups = Vec::new(); + + // Drain re-enqueued lookups + while let Ok(lookup) = self.re_enqueue_rx.try_recv() { + lookups.push(lookup); + } + + // Drain main queue + while let Ok(lookup) = self.lookup_rx.try_recv() { + lookups.push(lookup); + } + + lookups + } + + /// Returns true if there are undrained lookups in the queue. + pub fn has_undrained(&self) -> bool { + // Check if channels have pending messages + // Note: This is an approximation as we can't directly check channel size + !self.closed.load(Ordering::Acquire) + } + + /// Closes the queue, preventing new lookups from being added. + pub fn close(&self) { + self.closed.store(true, Ordering::Release); + } + + /// Returns true if the queue is closed. + #[allow(dead_code)] + pub fn is_closed(&self) -> bool { + self.closed.load(Ordering::Acquire) + } +} diff --git a/crates/fluss/src/client/lookup/lookup_sender.rs b/crates/fluss/src/client/lookup/lookup_sender.rs new file mode 100644 index 00000000..8a55dd89 --- /dev/null +++ b/crates/fluss/src/client/lookup/lookup_sender.rs @@ -0,0 +1,436 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lookup sender that processes batched lookup requests. +//! +//! The sender runs as a background task, draining lookups from the queue, +//! grouping them by destination server, and sending batched requests. + +use super::{LookupQuery, LookupQueue}; +use crate::client::metadata::Metadata; +use crate::error::{Error, Result}; +use crate::metadata::TableBucket; +use crate::proto::LookupResponse; +use crate::rpc::message::LookupRequest; +use log::{debug, error, warn}; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use tokio::sync::{Semaphore, mpsc}; + +/// Default maximum number of in-flight requests. +pub const DEFAULT_LOOKUP_MAX_INFLIGHT_REQUESTS: usize = 128; + +/// Default maximum number of retries. +pub const DEFAULT_LOOKUP_MAX_RETRIES: i32 = i32::MAX; + +/// Lookup sender that batches and sends lookup requests. +/// +/// The sender: +/// 1. Drains lookups from the queue +/// 2. Groups lookups by destination server (leader) +/// 3. Further groups by table bucket +/// 4. Sends batched requests to each server +/// 5. Handles responses and retries on failure +pub struct LookupSender { + /// Metadata for leader lookup + metadata: Arc, + /// The lookup queue to drain from + queue: LookupQueue, + /// Channel to re-enqueue failed lookups + re_enqueue_tx: mpsc::UnboundedSender, + /// Semaphore to limit in-flight requests + inflight_semaphore: Arc, + /// Maximum number of retries + max_retries: i32, + /// Whether the sender is running + running: AtomicBool, + /// Whether to force close (abandon pending lookups) + force_close: AtomicBool, +} + +/// A batch of lookups going to the same table bucket. +struct LookupBatch { + table_bucket: TableBucket, + lookups: Vec, + keys: Vec>, +} + +impl LookupBatch { + fn new(table_bucket: TableBucket) -> Self { + Self { + table_bucket, + lookups: Vec::new(), + keys: Vec::new(), + } + } + + fn add_lookup(&mut self, mut lookup: LookupQuery) { + self.keys.push(lookup.take_key()); + self.lookups.push(lookup); + } + + fn complete(&mut self, values: Vec>>) { + if values.len() != self.lookups.len() { + let err_msg = format!( + "The number of return values ({}) does not match the number of lookups ({})", + values.len(), + self.lookups.len() + ); + for lookup in &mut self.lookups { + lookup.complete(Err(Error::UnexpectedError { + message: err_msg.clone(), + source: None, + })); + } + return; + } + + for (lookup, value) in self.lookups.iter_mut().zip(values.into_iter()) { + lookup.complete(Ok(value)); + } + } + + fn complete_exceptionally(&mut self, error_msg: &str) { + for lookup in &mut self.lookups { + lookup.complete(Err(Error::UnexpectedError { + message: error_msg.to_string(), + source: None, + })); + } + } +} + +impl LookupSender { + /// Creates a new lookup sender. + pub fn new( + metadata: Arc, + queue: LookupQueue, + re_enqueue_tx: mpsc::UnboundedSender, + max_inflight_requests: usize, + max_retries: i32, + ) -> Self { + Self { + metadata, + queue, + re_enqueue_tx, + inflight_semaphore: Arc::new(Semaphore::new(max_inflight_requests)), + max_retries, + running: AtomicBool::new(true), + force_close: AtomicBool::new(false), + } + } + + /// Runs the sender loop. + pub async fn run(&mut self) { + debug!("Starting Fluss lookup sender"); + + while self.running.load(Ordering::Acquire) { + if let Err(e) = self.run_once(false).await { + error!("Error in lookup sender: {}", e); + } + } + + debug!("Beginning shutdown of lookup sender, sending remaining lookups"); + + // Process remaining lookups during shutdown + if !self.force_close.load(Ordering::Acquire) && self.queue.has_undrained() { + if let Err(e) = self.run_once(true).await { + error!("Error during lookup sender shutdown: {}", e); + } + } + + debug!("Lookup sender shutdown complete"); + } + + /// Runs a single iteration of the sender loop. + async fn run_once(&mut self, drain_all: bool) -> Result<()> { + let lookups = if drain_all { + self.queue.drain_all() + } else { + self.queue.drain().await + }; + + self.send_lookups(lookups).await + } + + /// Groups and sends lookups to appropriate servers. + async fn send_lookups(&self, lookups: Vec) -> Result<()> { + if lookups.is_empty() { + return Ok(()); + } + + // Group by leader + let lookup_batches = self.group_by_leader(lookups); + + if lookup_batches.is_empty() && !self.queue.has_undrained() { + // No lookups to send and queue is empty, sleep to avoid busy loop + tokio::time::sleep(Duration::from_millis(100)).await; + return Ok(()); + } + + // Send batches to each destination + for (destination, batches) in lookup_batches { + self.send_lookup_request(destination, batches).await; + } + + Ok(()) + } + + /// Groups lookups by leader server. + fn group_by_leader( + &self, + lookups: Vec, + ) -> HashMap> { + let cluster = self.metadata.get_cluster(); + let mut batches_by_leader: HashMap> = HashMap::new(); + + for lookup in lookups { + let table_bucket = lookup.table_bucket().clone(); + + // Find leader for this bucket + let leader = match cluster.leader_for(&table_bucket) { + Some(leader) => leader.id(), + None => { + warn!( + "No leader found for table bucket {} during lookup", + table_bucket + ); + self.re_enqueue_lookup(lookup); + continue; + } + }; + + batches_by_leader + .entry(leader) + .or_default() + .entry(table_bucket.clone()) + .or_insert_with(|| LookupBatch::new(table_bucket)) + .add_lookup(lookup); + } + + batches_by_leader + } + + /// Sends lookup requests to a specific destination server. + async fn send_lookup_request( + &self, + destination: i32, + batches_by_bucket: HashMap, + ) { + // Group by table_id for request batching + let mut batches_by_table: HashMap> = HashMap::new(); + for (table_bucket, batch) in batches_by_bucket { + batches_by_table + .entry(table_bucket.table_id()) + .or_default() + .push(batch); + } + + let cluster = self.metadata.get_cluster(); + let tablet_server = match cluster.get_tablet_server(destination) { + Some(server) => server.clone(), + None => { + let err_msg = format!("Server {} is not found in metadata cache", destination); + for batches in batches_by_table.into_values() { + for mut batch in batches { + self.handle_lookup_error(&err_msg, true, &mut batch); + } + } + return; + } + }; + + let connection = match self.metadata.get_connection(&tablet_server).await { + Ok(conn) => conn, + Err(e) => { + let err_msg = format!("Failed to get connection to server {}: {}", destination, e); + for batches in batches_by_table.into_values() { + for mut batch in batches { + self.handle_lookup_error(&err_msg, true, &mut batch); + } + } + return; + } + }; + + // Send requests for each table + for (table_id, mut batches) in batches_by_table { + // Build the request with all buckets for this table + let mut all_keys_by_bucket: Vec<(i32, Option, Vec>)> = Vec::new(); + for batch in &batches { + all_keys_by_bucket.push(( + batch.table_bucket.bucket_id(), + batch.table_bucket.partition_id(), + batch.keys.clone(), + )); + } + + // Create lookup request for all buckets in this table + let request = LookupRequest::new_batched(table_id, all_keys_by_bucket); + + // Acquire semaphore permit + let _permit = match self.inflight_semaphore.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + error!("Semaphore closed during lookup"); + for batch in &mut batches { + batch.complete_exceptionally("Lookup sender shutdown"); + } + return; + } + }; + + // Send request and handle response + match connection.request(request).await { + Ok(response) => { + self.handle_lookup_response(destination, response, &mut batches); + } + Err(e) => { + let err_msg = format!("Lookup request failed: {}", e); + let is_retriable = Self::is_retriable_error(&e); + for batch in &mut batches { + self.handle_lookup_error(&err_msg, is_retriable, batch); + } + } + } + } + } + + /// Checks if an error is retriable. + fn is_retriable_error(error: &Error) -> bool { + match error { + Error::LeaderNotAvailable { .. } => true, + Error::FlussAPIError { api_error } => { + // Retry on retriable error codes (e.g., leader not available, not leader) + api_error.code == 1 || api_error.code == 2 || api_error.code == 3 + } + _ => false, + } + } + + /// Handles the lookup response. + fn handle_lookup_response( + &self, + destination: i32, + response: LookupResponse, + batches: &mut [LookupBatch], + ) { + // Create a map from bucket_id to batch index for quick lookup + let bucket_to_index: HashMap = batches + .iter() + .enumerate() + .map(|(idx, batch)| (batch.table_bucket.bucket_id(), idx)) + .collect(); + + for bucket_resp in response.buckets_resp { + let bucket_id = bucket_resp.bucket_id; + if let Some(&batch_idx) = bucket_to_index.get(&bucket_id) { + let batch = &mut batches[batch_idx]; + + // Check for errors + if let Some(error_code) = bucket_resp.error_code { + if error_code != 0 { + let err_msg = format!( + "Lookup error for bucket {}: code={}, message={}", + bucket_id, + error_code, + bucket_resp.error_message.unwrap_or_default() + ); + // Retry on retriable error codes (e.g., leader not available) + let is_retriable = error_code == 1 || error_code == 2 || error_code == 3; + self.handle_lookup_error(&err_msg, is_retriable, batch); + continue; + } + } + + // Extract values + let values: Vec>> = bucket_resp + .values + .into_iter() + .map(|pb_value| pb_value.values) + .collect(); + + batch.complete(values); + } else { + warn!( + "Received response for unknown bucket {} from server {}", + bucket_id, destination + ); + } + } + } + + /// Handles lookup errors with retry logic. + fn handle_lookup_error(&self, error_msg: &str, is_retriable: bool, batch: &mut LookupBatch) { + let mut lookups_to_retry = Vec::new(); + let mut lookups_to_complete = Vec::new(); + + for lookup in batch.lookups.drain(..) { + if is_retriable && lookup.retries() < self.max_retries && !lookup.is_done() { + warn!( + "Lookup error for bucket {}, retrying ({} attempts left): {}", + batch.table_bucket, + self.max_retries - lookup.retries(), + error_msg + ); + lookups_to_retry.push(lookup); + } else { + lookups_to_complete.push(lookup); + } + } + + // Re-enqueue retriable lookups + for lookup in lookups_to_retry { + lookup.increment_retries(); + self.re_enqueue_lookup(lookup); + } + + // Complete non-retriable lookups with error + for mut lookup in lookups_to_complete { + warn!( + "Lookup failed for bucket {}: {}", + batch.table_bucket, error_msg + ); + lookup.complete(Err(Error::UnexpectedError { + message: error_msg.to_string(), + source: None, + })); + } + } + + /// Re-enqueues a lookup for retry. + fn re_enqueue_lookup(&self, lookup: LookupQuery) { + if let Err(e) = self.re_enqueue_tx.send(lookup) { + error!("Failed to re-enqueue lookup: {}", e); + } + } + + /// Initiates graceful shutdown of the sender. + pub fn initiate_close(&self) { + self.queue.close(); + self.running.store(false, Ordering::Release); + } + + /// Forces immediate shutdown, abandoning pending lookups. + #[allow(dead_code)] + pub fn force_close(&self) { + self.force_close.store(true, Ordering::Release); + self.initiate_close(); + } +} diff --git a/crates/fluss/src/client/lookup/mod.rs b/crates/fluss/src/client/lookup/mod.rs new file mode 100644 index 00000000..74c59854 --- /dev/null +++ b/crates/fluss/src/client/lookup/mod.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lookup client implementation with batching and queuing support. +//! +//! This module provides a high-throughput lookup client that batches multiple +//! lookup operations together to reduce network round trips, achieving parity +//! with the Java client implementation. +//! +//! # Architecture +//! +//! The lookup client consists of: +//! - [`LookupClient`]: The main client that accepts lookup requests and manages the sender +//! - [`LookupQueue`]: A bounded queue that buffers pending lookup operations +//! - [`LookupSender`]: A background task that drains lookups from the queue and sends them in batches +//! +//! # Example +//! +//! ```ignore +//! let lookup_client = LookupClient::new(config, metadata); +//! let future = lookup_client.lookup(table_path, table_bucket, key_bytes); +//! let result = future.await?; +//! ``` + +mod lookup_client; +mod lookup_query; +mod lookup_queue; +mod lookup_sender; + +pub use lookup_client::LookupClient; +pub(crate) use lookup_query::LookupQuery; +pub(crate) use lookup_queue::LookupQueue; diff --git a/crates/fluss/src/client/mod.rs b/crates/fluss/src/client/mod.rs index cff218b3..77c30203 100644 --- a/crates/fluss/src/client/mod.rs +++ b/crates/fluss/src/client/mod.rs @@ -18,6 +18,7 @@ mod admin; mod connection; mod credentials; +pub mod lookup; mod metadata; mod table; mod write; @@ -25,6 +26,7 @@ mod write; pub use admin::*; pub use connection::*; pub use credentials::*; +pub use lookup::LookupClient; pub use metadata::*; pub use table::*; pub use write::*; diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs index 4e89176a..982539ab 100644 --- a/crates/fluss/src/client/table/lookup.rs +++ b/crates/fluss/src/client/table/lookup.rs @@ -17,15 +17,14 @@ use crate::bucketing::BucketingFunction; use crate::client::connection::FlussConnection; +use crate::client::lookup::LookupClient; use crate::client::metadata::Metadata; use crate::error::{Error, Result}; -use crate::metadata::{RowType, TableBucket, TableInfo}; +use crate::metadata::{RowType, TableBucket, TableInfo, TablePath}; use crate::record::kv::SCHEMA_ID_LENGTH; use crate::row::InternalRow; use crate::row::compacted::CompactedRow; use crate::row::encode::{KeyEncoder, KeyEncoderFactory}; -use crate::rpc::ApiError; -use crate::rpc::message::LookupRequest; use std::sync::Arc; /// The result of a lookup operation. @@ -106,6 +105,7 @@ impl<'a> LookupResult<'a> { pub struct TableLookup<'a> { conn: &'a FlussConnection, table_info: TableInfo, + #[allow(dead_code)] metadata: Arc, } @@ -126,6 +126,12 @@ impl<'a> TableLookup<'a> { /// /// The lookuper will automatically encode the key and compute the bucket /// for each lookup using the appropriate bucketing function. + /// + /// # Batching + /// + /// The lookuper uses a shared `LookupClient` that batches multiple lookup + /// operations together to reduce network round trips. This achieves parity + /// with the Java client implementation for improved throughput. pub fn create_lookuper(self) -> Result> { let num_buckets = self.table_info.get_num_buckets(); @@ -141,13 +147,17 @@ impl<'a> TableLookup<'a> { &data_lake_format, )?; + // Get or create the shared lookup client + let lookup_client = self.conn.get_or_create_lookup_client(); + Ok(Lookuper { - conn: self.conn, + table_path: self.table_info.get_table_path().clone(), table_info: self.table_info, - metadata: self.metadata, + lookup_client, bucketing_function, key_encoder, num_buckets, + _marker: std::marker::PhantomData, }) } } @@ -155,7 +165,13 @@ impl<'a> TableLookup<'a> { /// Performs key-based lookups against a primary key table. /// /// The `Lookuper` automatically encodes the lookup key, computes the target -/// bucket, finds the appropriate tablet server, and retrieves the value. +/// bucket, and retrieves the value using the batched `LookupClient`. +/// +/// # Batching +/// +/// Multiple lookup operations are batched together by the underlying +/// `LookupClient` to reduce network round trips. This provides significantly +/// higher throughput compared to sending individual requests. /// /// # Example /// ```ignore @@ -165,19 +181,22 @@ impl<'a> TableLookup<'a> { /// ``` // TODO: Support partitioned tables (extract partition from key) pub struct Lookuper<'a> { - conn: &'a FlussConnection, + table_path: TablePath, table_info: TableInfo, - metadata: Arc, + lookup_client: Arc, bucketing_function: Box, key_encoder: Box, num_buckets: i32, + // Keep the lifetime parameter for API compatibility + _marker: std::marker::PhantomData<&'a ()>, } impl<'a> Lookuper<'a> { /// Looks up a value by its primary key. /// /// The key is encoded and the bucket is automatically computed using - /// the table's bucketing function. + /// the table's bucketing function. The lookup is queued and batched + /// with other lookups for improved throughput. /// /// # Arguments /// * `row` - The row containing the primary key field values @@ -186,7 +205,6 @@ impl<'a> Lookuper<'a> { /// * `Ok(LookupResult)` - The lookup result (may be empty if key not found) /// * `Err(Error)` - If the lookup fails pub async fn lookup(&mut self, row: &dyn InternalRow) -> Result> { - // todo: support batch lookup // Encode the key from the row let encoded_key = self.key_encoder.encode_key(row)?; let key_bytes = encoded_key.to_vec(); @@ -199,58 +217,19 @@ impl<'a> Lookuper<'a> { let table_id = self.table_info.get_table_id(); let table_bucket = TableBucket::new(table_id, bucket_id); - // Find the leader for this bucket - let cluster = self.metadata.get_cluster(); - let leader = - cluster - .leader_for(&table_bucket) - .ok_or_else(|| Error::LeaderNotAvailable { - message: format!("No leader found for table bucket: {table_bucket}"), - })?; - - // Get connection to the tablet server - let tablet_server = - cluster - .get_tablet_server(leader.id()) - .ok_or_else(|| Error::LeaderNotAvailable { - message: format!( - "Tablet server {} is not found in metadata cache", - leader.id() - ), - })?; - - let connections = self.conn.get_connections(); - let connection = connections.get_connection(tablet_server).await?; - - // Send lookup request - let request = LookupRequest::new(table_id, None, bucket_id, vec![key_bytes]); - let response = connection.request(request).await?; - - // Extract the values from response - if let Some(bucket_resp) = response.buckets_resp.into_iter().next() { - // Check for errors - if let Some(error_code) = bucket_resp.error_code { - if error_code != 0 { - return Err(Error::FlussAPIError { - api_error: ApiError { - code: error_code, - message: bucket_resp.error_message.unwrap_or_default(), - }, - }); - } - } - - // Collect all values - let rows: Vec> = bucket_resp - .values - .into_iter() - .filter_map(|pb_value| pb_value.values) - .collect(); - - return Ok(LookupResult::new(rows, self.table_info.row_type())); + // Use the batched lookup client + let result = self + .lookup_client + .lookup(self.table_path.clone(), table_bucket, key_bytes) + .await?; + + match result { + Some(value_bytes) => Ok(LookupResult::new( + vec![value_bytes], + self.table_info.row_type(), + )), + None => Ok(LookupResult::empty(self.table_info.row_type())), } - - Ok(LookupResult::empty(self.table_info.row_type())) } /// Returns a reference to the table info. diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 705e241d..b7594321 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -46,6 +46,36 @@ pub struct Config { /// Default: 3 (matching Java REMOTE_FILE_DOWNLOAD_THREAD_NUM) #[arg(long, default_value_t = 3)] pub scanner_remote_log_download_threads: usize, + + /// Maximum number of pending lookup operations + /// Default: 25600 (matching Java CLIENT_LOOKUP_QUEUE_SIZE) + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + pub lookup_queue_size: Option, + + /// Maximum batch size of merging lookup operations to one lookup request + /// Default: 128 (matching Java CLIENT_LOOKUP_MAX_BATCH_SIZE) + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + pub lookup_max_batch_size: Option, + + /// Maximum time to wait for the lookup batch to fill (in milliseconds) + /// Default: 100 (matching Java CLIENT_LOOKUP_BATCH_TIMEOUT) + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + pub lookup_batch_timeout_ms: Option, + + /// Maximum number of unacknowledged lookup requests + /// Default: 128 (matching Java CLIENT_LOOKUP_MAX_INFLIGHT_SIZE) + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + pub lookup_max_inflight_requests: Option, + + /// Maximum number of lookup retries + /// Default: i32::MAX (matching Java CLIENT_LOOKUP_MAX_RETRIES) + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + pub lookup_max_retries: Option, } impl Default for Config { @@ -58,6 +88,11 @@ impl Default for Config { writer_batch_size: 2 * 1024 * 1024, scanner_remote_log_prefetch_num: 4, scanner_remote_log_download_threads: 3, + lookup_queue_size: None, + lookup_max_batch_size: None, + lookup_batch_timeout_ms: None, + lookup_max_inflight_requests: None, + lookup_max_retries: None, } } } diff --git a/crates/fluss/src/rpc/message/lookup.rs b/crates/fluss/src/rpc/message/lookup.rs index 3de47d64..2f7d267c 100644 --- a/crates/fluss/src/rpc/message/lookup.rs +++ b/crates/fluss/src/rpc/message/lookup.rs @@ -53,6 +53,33 @@ impl LookupRequest { inner_request: request, } } + + /// Creates a new batched lookup request with multiple buckets. + /// + /// # Arguments + /// * `table_id` - The table ID + /// * `buckets` - A list of (bucket_id, partition_id, keys) tuples + pub fn new_batched(table_id: i64, buckets: Vec<(i32, Option, Vec>)>) -> Self { + let buckets_req: Vec = buckets + .into_iter() + .map( + |(bucket_id, partition_id, keys)| proto::PbLookupReqForBucket { + partition_id, + bucket_id, + key: keys, + }, + ) + .collect(); + + let request = proto::LookupRequest { + table_id, + buckets_req, + }; + + Self { + inner_request: request, + } + } } impl RequestBody for LookupRequest {