From 854b7ed1f675b55367d501b6e898afd43ca34040 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Thu, 11 Jun 2026 17:18:22 +0200 Subject: [PATCH 1/4] fix: Recover from black-holed Redis connection after node drain When the Kubernetes node running the Redis master is drained, the established TCP connection turns into a black hole: packets are silently dropped and no FIN/RST ever arrives. The only error trino-lb sees is a response timeout, which the redis crate's ConnectionManager classifies as RetryImmediately rather than a dropped connection, so it never reconnects and keeps reusing the dead socket. Every command then times out for hours until the liveness probe restarts the pod (#109). This only reproduced on drain (not pod delete, which closes the socket cleanly and triggers a reconnect) and under load, since active traffic keeps in-flight requests on the wedged connection. Fix this in trino-lb-persistence/src/redis/mod.rs: - Add a Reconnectable wrapper (RwLock> + rebuild factory + single-flight guard). reconnect() only rebuilds when the connection a failing command used is still current (Arc::ptr_eq generation check), so a burst of failures rebuilds the connection exactly once. - Route every Redis operation through a run() combinator that rebuilds the connection when a command times out. Dropped-connection errors are left to ConnectionManager's own reconnect, so this only covers the timeout gap. - Enable TCP keepalive (5s/2s/3 probes) and TCP_USER_TIMEOUT (12s) on the Redis socket via ConnectionInfo::set_tcp_settings (single-node) and ClusterClientBuilder::tcp_settings (cluster). These are re-applied on every reconnect and let the kernel tear down the dead socket faster. Reachable via redis::io::tcp, so no new dependency is needed. ConnectionManagerConfig does not expose TCP settings (even on redis-rs main), hence routing them through ConnectionInfo / the cluster builder. Add unit tests for the Reconnectable coordination logic (replace, stale-handle no-op, concurrent single-flight). End-to-end black-hole recovery is not covered by automated tests and still needs an integration or manual node-drain test. fixes #109 --- CHANGELOG.md | 9 +- trino-lb-persistence/Cargo.toml | 3 + trino-lb-persistence/src/redis/mod.rs | 459 ++++++++++++++++++++------ 3 files changed, 368 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fd1328..9f33937 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,14 @@ All notable changes to this project will be documented in this file. - Handle Redis connection errors (e.g. broken pipe during master failover) gracefully instead of panicking. Previously, `get_queued_query_count` would `.unwrap()` on the Redis result, causing a panic that poisoned the metrics `RwLock`, cascading into further panics and leaving pods unresponsive ([#111]). - +- Recover from a black-holed Redis connection instead of hanging until the liveness probe restarts the pod. + When the node running the Redis master is drained, the TCP connection silently stops delivering packets + without ever being closed, so the only error trino-lb sees is a response timeout. The redis crate's + `ConnectionManager` does not reconnect on timeouts (only on dropped-connection errors), so it kept reusing + the dead connection forever. trino-lb now enables TCP keepalive / `TCP_USER_TIMEOUT` on the Redis socket and + rebuilds the connection itself when a command times out ([#109]). + +[#109]: https://github.com/stackabletech/trino-lb/issues/109 [#111]: https://github.com/stackabletech/trino-lb/pull/111 ## [0.6.0] - 2026-02-17 diff --git a/trino-lb-persistence/Cargo.toml b/trino-lb-persistence/Cargo.toml index 1c8bd61..287a444 100644 --- a/trino-lb-persistence/Cargo.toml +++ b/trino-lb-persistence/Cargo.toml @@ -25,3 +25,6 @@ tokio.workspace = true tracing.workspace = true trait-variant.workspace = true url.workspace = true + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt"] } diff --git a/trino-lb-persistence/src/redis/mod.rs b/trino-lb-persistence/src/redis/mod.rs index cdf36a2..7a8858d 100644 --- a/trino-lb-persistence/src/redis/mod.rs +++ b/trino-lb-persistence/src/redis/mod.rs @@ -1,18 +1,21 @@ use std::{ fmt::Debug, + future::Future, num::TryFromIntError, + sync::{Arc, PoisonError, RwLock}, time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}, }; -use futures::{TryFutureExt, future::try_join_all}; +use futures::future::{BoxFuture, try_join_all}; use redis::{ - AsyncCommands, Client, RedisError, Script, + AsyncCommands, Client, IntoConnectionInfo, RedisError, Script, aio::{ConnectionManager, ConnectionManagerConfig, MultiplexedConnection}, cluster::{ClusterClientBuilder, ClusterConfig}, cluster_async::ClusterConnection, + io::tcp::{TcpSettings, socket2::TcpKeepalive}, }; use snafu::{OptionExt, ResultExt, Snafu}; -use tracing::{Instrument, debug, debug_span, info, instrument}; +use tracing::{Instrument, debug, debug_span, info, instrument, warn}; use trino_lb_core::{ TrinoClusterName, TrinoLbQueryId, TrinoQueryId, config::RedisConfig, @@ -26,6 +29,33 @@ use crate::Persistence; const REDIS_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); const REDIS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10); +// TCP keepalive / user-timeout settings for the Redis socket. +// +// The redis crate's [`ConnectionManager`] only reconnects when a command fails with a +// dropped-connection error (broken pipe, connection reset, ...). When the Kubernetes node running +// the Redis master is drained, the established TCP connection turns into a black hole: packets are +// silently dropped and no FIN/RST ever arrives, so the only error trino-lb ever sees is a response +// *timeout* - which the redis crate does not treat as a reason to reconnect. Without these socket +// options the connection would stay wedged until the liveness probe restarts the pod (see issue +// #109). Enabling TCP keepalive (and TCP_USER_TIMEOUT) lets the kernel tear the dead socket down on +// its own, shrinking the detection window; the [`Reconnectable`] wrapper then rebuilds it. +const REDIS_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(5); +const REDIS_TCP_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(2); +const REDIS_TCP_KEEPALIVE_RETRIES: u32 = 3; +const REDIS_TCP_USER_TIMEOUT: Duration = Duration::from_secs(12); + +/// TCP settings applied to every Redis connection (and every reconnect), see the constants above. +fn tcp_settings() -> TcpSettings { + TcpSettings::default() + .set_keepalive( + TcpKeepalive::new() + .with_time(REDIS_TCP_KEEPALIVE_TIME) + .with_interval(REDIS_TCP_KEEPALIVE_INTERVAL) + .with_retries(REDIS_TCP_KEEPALIVE_RETRIES), + ) + .set_user_timeout(REDIS_TCP_USER_TIMEOUT) +} + const LAST_QUERY_COUNT_FETCHER_UPDATE_KEY: &str = "lastQueryCountFetcherUpdate"; const BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard(); @@ -129,6 +159,73 @@ pub enum Error { }, } +/// Builds a fresh Redis connection. Used by [`Reconnectable`] to rebuild a connection that went bad. +type ConnectionFactory = + Arc BoxFuture<'static, Result> + Send + Sync>; + +/// A Redis connection that can be transparently rebuilt when it goes bad. +/// +/// The redis crate's [`ConnectionManager`] only reconnects on dropped-connection errors, not on +/// timeouts, so a black-holed connection (e.g. after a Redis node drain, see issue #109 and the +/// note on [`REDIS_TCP_KEEPALIVE_TIME`]) would otherwise time out forever. This wrapper lets us +/// rebuild the underlying connection ourselves; the policy for *when* to rebuild lives in +/// [`RedisPersistence::run`]. +struct Reconnectable { + current: RwLock>, + factory: ConnectionFactory, + /// Guarantees only a single rebuild runs at a time. + reconnecting: tokio::sync::Mutex<()>, +} + +impl Reconnectable +where + T: Send + Sync + 'static, +{ + fn new(initial: T, factory: ConnectionFactory) -> Self { + Self { + current: RwLock::new(Arc::new(initial)), + factory, + reconnecting: tokio::sync::Mutex::new(()), + } + } + + /// Returns the connection currently in use. + fn current(&self) -> Arc { + Arc::clone(&self.current.read().unwrap_or_else(PoisonError::into_inner)) + } + + /// Rebuilds the connection, replacing the one currently in use. + /// + /// `used` is the connection the failing command ran on. This is a no-op if another rebuild is + /// already in progress, or if the connection has already been replaced since `used` was + /// obtained (i.e. another task already noticed the failure and rebuilt it). This keeps a burst + /// of failing commands from rebuilding the connection over and over. + async fn reconnect(&self, used: &Arc) { + let Ok(_guard) = self.reconnecting.try_lock() else { + // Another task is already rebuilding the connection. + return; + }; + + if !Arc::ptr_eq(&self.current(), used) { + // The connection was already replaced while we waited for the lock. + return; + } + + match (self.factory)().await { + Ok(connection) => { + *self.current.write().unwrap_or_else(PoisonError::into_inner) = Arc::new(connection); + info!("Successfully rebuilt the Redis connection"); + } + Err(error) => { + warn!( + ?error, + "Failed to rebuild the Redis connection, will retry on the next failing command" + ); + } + } + } +} + /// This Redis implementation works against Redis clusters. It uses a single connection that is shared between all /// operations for best performance. However, this makes atomic operations hard as their are some pitfalls regarding /// `WATCH` in combination with `MULTI` and `EXEC` documented @@ -140,7 +237,7 @@ pub struct RedisPersistence where R: AsyncCommands + Clone, { - connection: R, + connection: Reconnectable, compare_and_set_script: Script, /// Sometimes we need to do stuff for all cluster groups, so we need to store them to iterate over them @@ -158,14 +255,31 @@ impl RedisPersistence { .set_connection_timeout(Some(REDIS_CONNECTION_TIMEOUT)) .set_response_timeout(Some(REDIS_RESPONSE_TIMEOUT)); - let client = Client::open(config.endpoint.as_str()).context(CreateClientSnafu)?; - let connection = client - .get_connection_manager_with_config(redis_config) - .await - .context(CreateClientSnafu)?; + // The TCP settings live on the `ConnectionInfo` (not on `ConnectionManagerConfig`, which + // does not expose them), and are re-applied on every reconnect the manager performs. + let connection_info = config + .endpoint + .as_str() + .into_connection_info() + .context(CreateClientSnafu)? + .set_tcp_settings(tcp_settings()); + let client = Client::open(connection_info).context(CreateClientSnafu)?; + + let factory: ConnectionFactory = { + let client = client.clone(); + Arc::new(move || { + let client = client.clone(); + let redis_config = redis_config.clone(); + Box::pin(async move { + client.get_connection_manager_with_config(redis_config).await + }) + }) + }; + + let connection = factory().await.context(CreateClientSnafu)?; Ok(Self { - connection, + connection: Reconnectable::new(connection, factory), compare_and_set_script: compare_and_set_script(), cluster_groups, }) @@ -184,15 +298,23 @@ impl RedisPersistence> { .set_response_timeout(REDIS_RESPONSE_TIMEOUT); let client = ClusterClientBuilder::new([config.endpoint.as_str()]) + .tcp_settings(tcp_settings()) .build() .context(CreateClientSnafu)?; - let connection = client - .get_async_connection_with_config(redis_config) - .await - .context(CreateClientSnafu)?; + + let factory: ConnectionFactory> = { + let client = client.clone(); + Arc::new(move || { + let client = client.clone(); + let redis_config = redis_config.clone(); + Box::pin(async move { client.get_async_connection_with_config(redis_config).await }) + }) + }; + + let connection = factory().await.context(CreateClientSnafu)?; Ok(Self { - connection, + connection: Reconnectable::new(connection, factory), compare_and_set_script: compare_and_set_script(), cluster_groups, }) @@ -201,27 +323,28 @@ impl RedisPersistence> { impl Persistence for RedisPersistence where - R: AsyncCommands + Clone, + R: AsyncCommands + Clone + Send + Sync + 'static, { #[instrument(skip(self, queued_query))] async fn store_queued_query(&self, queued_query: QueuedQuery) -> Result<(), super::Error> { let key = queued_query_key(&queued_query.id); + let set_name = queued_query_set_name(&queued_query.cluster_group); let value = bincode::serde::encode_to_vec(&queued_query, BINCODE_CONFIG) .context(SerializeToBinarySnafu)?; - let mut connection_1 = self.connection(); - let mut connection_2 = self.connection(); - // We can't use a pipe here, as we otherwise get "Received crossed slots in pipeline - CrossSlot" - tokio::try_join!( - connection_1 - .set::<_, _, ()>(key, value) - .map_err(|err| Error::WriteToRedis { source: err }), - connection_2 - .sadd::<_, _, ()>(queued_query_set_name(&queued_query.cluster_group), key) - .map_err(|err| Error::WriteToRedis { source: err }), - )?; + self.run(|connection| async move { + let mut connection_1 = connection.clone(); + let mut connection_2 = connection; + tokio::try_join!( + connection_1.set::<_, _, ()>(key, value), + connection_2.sadd::<_, _, ()>(&set_name, key), + )?; + Ok(()) + }) + .await + .context(WriteToRedisSnafu)?; Ok(()) } @@ -233,8 +356,7 @@ where ) -> Result { let key = queued_query_key(queued_query_id); let value: Vec = self - .connection() - .get(key) + .run(|mut connection| async move { connection.get(key).await }) .await .context(ReadFromRedisSnafu)?; @@ -246,14 +368,16 @@ where #[instrument(skip(self, queued_query))] async fn remove_queued_query(&self, queued_query: &QueuedQuery) -> Result<(), super::Error> { let key = queued_query_key(&queued_query.id); - let mut connection = self.connection(); + let set_name = queued_query_set_name(&queued_query.cluster_group); // We can't use a pipe here, as we otherwise get "Received crossed slots in pipeline - CrossSlot" - let _: () = connection - .srem(queued_query_set_name(&queued_query.cluster_group), key) - .await - .context(WriteToRedisSnafu)?; - let _: () = connection.del(key).await.context(WriteToRedisSnafu)?; + self.run(|mut connection| async move { + let _: () = connection.srem(&set_name, key).await?; + let _: () = connection.del(key).await?; + Ok(()) + }) + .await + .context(WriteToRedisSnafu)?; Ok(()) } @@ -264,11 +388,12 @@ where let value = bincode::serde::encode_to_vec(&query, BINCODE_CONFIG) .context(SerializeToBinarySnafu)?; - let _: () = self - .connection() - .set(key, value) - .await - .context(WriteToRedisSnafu)?; + self.run(|mut connection| async move { + let _: () = connection.set(key, value).await?; + Ok(()) + }) + .await + .context(WriteToRedisSnafu)?; Ok(()) } @@ -277,8 +402,7 @@ where async fn load_query(&self, query_id: &TrinoQueryId) -> Result { let key = query_key(query_id); let value: Vec = self - .connection() - .get(key) + .run(|mut connection| async move { connection.get(key).await }) .await .context(ReadFromRedisSnafu)?; @@ -290,11 +414,12 @@ where #[instrument(skip(self))] async fn remove_query(&self, query_id: &TrinoQueryId) -> Result<(), super::Error> { let key = query_key(query_id); - let _: () = self - .connection() - .del(key) - .await - .context(DeleteFromRedisSnafu)?; + self.run(|mut connection| async move { + let _: () = connection.del(key).await?; + Ok(()) + }) + .await + .context(DeleteFromRedisSnafu)?; Ok(()) } @@ -306,12 +431,18 @@ where max_allowed_count: u64, ) -> Result { let key = cluster_query_counter_key(cluster_name); - let mut connection = self.connection(); loop { - let current = connection - .get::<_, Option>(&key) - .instrument(debug_span!("get current value")) + let current = self + .run(|mut connection| { + let key = &key; + async move { + connection + .get::<_, Option>(key) + .instrument(debug_span!("get current value")) + .await + } + }) .await .context(ReadFromRedisSnafu)? .unwrap_or_default(); @@ -327,13 +458,20 @@ where return Ok(false); } + let script = self.compare_and_set_script.clone(); let response: u8 = self - .compare_and_set_script - .key(&key) - .arg(current) - .arg(current + 1) - .invoke_async(&mut connection) - .instrument(debug_span!("invoking compare-and-set lua script")) + .run(|mut connection| { + let key = &key; + async move { + script + .key(key) + .arg(current) + .arg(current + 1) + .invoke_async(&mut connection) + .instrument(debug_span!("invoking compare-and-set lua script")) + .await + } + }) .await .context(ExecuteCASScriptSnafu)?; @@ -359,12 +497,18 @@ where cluster_name: &TrinoClusterName, ) -> Result<(), super::Error> { let key = cluster_query_counter_key(cluster_name); - let mut connection = self.connection(); loop { - let current = connection - .get::<_, Option>(&key) - .instrument(debug_span!("get current value")) + let current = self + .run(|mut connection| { + let key = &key; + async move { + connection + .get::<_, Option>(key) + .instrument(debug_span!("get current value")) + .await + } + }) .await .context(ReadFromRedisSnafu)? .unwrap_or_default(); @@ -374,13 +518,20 @@ where return Ok(()); } + let script = self.compare_and_set_script.clone(); let response: u8 = self - .compare_and_set_script - .key(&key) - .arg(current) - .arg(current - 1) - .invoke_async(&mut connection) - .instrument(debug_span!("invoking compare-and-set lua script")) + .run(|mut connection| { + let key = &key; + async move { + script + .key(key) + .arg(current) + .arg(current - 1) + .invoke_async(&mut connection) + .instrument(debug_span!("invoking compare-and-set lua script")) + .await + } + }) .await .context(ExecuteCASScriptSnafu)?; @@ -404,11 +555,12 @@ where ) -> Result<(), super::Error> { let key = cluster_query_counter_key(cluster_name); - let _: () = self - .connection() - .set(key, count) - .await - .context(SetClusterQueryCountSnafu { cluster_name })?; + self.run(|mut connection| async move { + let _: () = connection.set(&key, count).await?; + Ok(()) + }) + .await + .context(SetClusterQueryCountSnafu { cluster_name })?; Ok(()) } @@ -420,8 +572,7 @@ where ) -> Result { let key = cluster_query_counter_key(cluster_name); Ok(self - .connection() - .get::<_, Option>(key) + .run(|mut connection| async move { connection.get::<_, Option>(&key).await }) .await .context(ReadClusterQueryCountSnafu { cluster_name })? // There can be the case this function is called before `inc_cluster_queries`, so the number of queries is 0 in this case. @@ -430,9 +581,9 @@ where #[instrument(skip(self))] async fn get_queued_query_count(&self, cluster_group: &str) -> Result { + let set_name = queued_query_set_name(cluster_group); Ok(self - .connection() - .scard::<_, Option>(queued_query_set_name(cluster_group)) + .run(|mut connection| async move { connection.scard::<_, Option>(&set_name).await }) .await .context(GetQueuedQueryCountSnafu { cluster_group })? // The set might not be there yet, as no queries have been queued for this cluster group so far. @@ -455,8 +606,11 @@ where #[instrument(skip(self))] async fn get_last_query_count_fetcher_update(&self) -> Result { let ms = self - .connection() - .get::<_, Option>(LAST_QUERY_COUNT_FETCHER_UPDATE_KEY) + .run(|mut connection| async move { + connection + .get::<_, Option>(LAST_QUERY_COUNT_FETCHER_UPDATE_KEY) + .await + }) .await .context(GetLastQueryCountFetcherUpdateSnafu)? // There can be the case this function is called before `set_last_query_count_fetcher_update`, so we can @@ -479,11 +633,12 @@ where .try_into() .context(ConvertElapsedTimeSinceLastUpdateToMillisSnafu)?; - let _: () = self - .connection() - .set(LAST_QUERY_COUNT_FETCHER_UPDATE_KEY, ms) - .await - .context(SetLastQueryCountFetcherUpdateSnafu)?; + self.run(|mut connection| async move { + let _: () = connection.set(LAST_QUERY_COUNT_FETCHER_UPDATE_KEY, ms).await?; + Ok(()) + }) + .await + .context(SetLastQueryCountFetcherUpdateSnafu)?; Ok(()) } @@ -498,11 +653,12 @@ where let value = bincode::serde::encode_to_vec(state, BINCODE_CONFIG).context(SerializeToBinarySnafu)?; - let _: () = self - .connection() - .set(key, value) - .await - .context(SetClusterStateSnafu)?; + self.run(|mut connection| async move { + let _: () = connection.set(&key, value).await?; + Ok(()) + }) + .await + .context(SetClusterStateSnafu)?; Ok(()) } @@ -515,8 +671,7 @@ where let key = cluster_state_key(cluster_name); let cluster_state: Option> = self - .connection() - .get(key) + .run(|mut connection| async move { connection.get(&key).await }) .await .context(GetClusterStateSnafu)?; @@ -533,10 +688,29 @@ where impl RedisPersistence where - R: AsyncCommands + Clone, + R: AsyncCommands + Clone + Send + Sync + 'static, { - fn connection(&self) -> R { - self.connection.clone() + /// Runs a single Redis operation on the current connection, rebuilding the connection if the + /// operation times out. + /// + /// The redis crate's [`ConnectionManager`] reconnects on dropped-connection errors but not on + /// timeouts, so a black-holed connection (see the note on [`REDIS_TCP_KEEPALIVE_TIME`]) would + /// otherwise time out on every command indefinitely. A timeout on a healthy Redis is already a + /// sign that something is wrong, so rebuilding the connection is the right reaction either way; + /// [`Reconnectable::reconnect`] makes sure we only rebuild once per dead connection. + async fn run(&self, operation: F) -> Result + where + F: FnOnce(R) -> Fut, + Fut: Future>, + { + let connection = self.connection.current(); + let result = operation((*connection).clone()).await; + if let Err(error) = &result + && error.is_timeout() + { + self.connection.reconnect(&connection).await; + } + result } #[instrument(skip(self))] @@ -545,17 +719,27 @@ where cluster_group: &str, not_accessed_after: &SystemTime, ) -> Result { - let mut connection = self.connection(); + // We can't route the `sscan` through `run`, as the returned iterator borrows the + // connection for the whole loop. We rebuild the connection by hand on a timeout instead. + let connection = self.connection.current(); + let mut scan_connection = (*connection).clone(); let mut removed = 0; - if let Ok(mut queued) = connection.sscan(queued_query_set_name(cluster_group)).await { - // TODO: Await `load_queued_query` in parallel (if possible) or add them to a Vec to bulk-delete afterwards - while let Some(key) = queued.next_item().await { - let key = key.with_context(|_| ListQueuedQueriesSnafu { cluster_group })?; - let queued_query = self.load_queued_query(&key).await?; - if &queued_query.last_accessed < not_accessed_after { - self.remove_queued_query(&queued_query).await?; - removed += 1; + match scan_connection.sscan(queued_query_set_name(cluster_group)).await { + Ok(mut queued) => { + // TODO: Await `load_queued_query` in parallel (if possible) or add them to a Vec to bulk-delete afterwards + while let Some(key) = queued.next_item().await { + let key = key.with_context(|_| ListQueuedQueriesSnafu { cluster_group })?; + let queued_query = self.load_queued_query(&key).await?; + if &queued_query.last_accessed < not_accessed_after { + self.remove_queued_query(&queued_query).await?; + removed += 1; + } + } + } + Err(error) => { + if error.is_timeout() { + self.connection.reconnect(&connection).await; } } } @@ -610,3 +794,74 @@ fn compare_and_set_script() -> Script { ", ) } + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicU32, Ordering}; + + use super::*; + + /// Returns a [`Reconnectable`] over a `u32` plus a counter of how many times it was rebuilt. + /// Every rebuild yields once (so concurrent rebuild attempts interleave deterministically) and + /// then returns the new rebuild count as the connection value. + fn counting_reconnectable() -> (Arc, Reconnectable) { + let rebuilds = Arc::new(AtomicU32::new(0)); + + let factory_rebuilds = Arc::clone(&rebuilds); + let factory: ConnectionFactory = Arc::new(move || { + let rebuilds = Arc::clone(&factory_rebuilds); + Box::pin(async move { + tokio::task::yield_now().await; + Ok(rebuilds.fetch_add(1, Ordering::SeqCst) + 1) + }) + }); + + (rebuilds, Reconnectable::new(0, factory)) + } + + #[tokio::test] + async fn reconnect_replaces_the_connection() { + let (rebuilds, reconnectable) = counting_reconnectable(); + + let used = reconnectable.current(); + assert_eq!(*used, 0); + + reconnectable.reconnect(&used).await; + + assert_eq!(rebuilds.load(Ordering::SeqCst), 1); + assert_eq!(*reconnectable.current(), 1); + } + + #[tokio::test] + async fn reconnect_is_a_noop_for_an_already_replaced_connection() { + let (rebuilds, reconnectable) = counting_reconnectable(); + + let stale = reconnectable.current(); + reconnectable.reconnect(&stale).await; + assert_eq!(*reconnectable.current(), 1); + + // The handle is now stale (the connection was replaced); reconnecting with it must not + // rebuild again, otherwise a burst of failing commands would rebuild over and over. + reconnectable.reconnect(&stale).await; + assert_eq!(rebuilds.load(Ordering::SeqCst), 1); + assert_eq!(*reconnectable.current(), 1); + } + + #[tokio::test] + async fn concurrent_reconnects_rebuild_only_once() { + let (rebuilds, reconnectable) = counting_reconnectable(); + let reconnectable = Arc::new(reconnectable); + + let used = reconnectable.current(); + let attempts = (0..8).map(|_| { + let reconnectable = Arc::clone(&reconnectable); + let used = Arc::clone(&used); + async move { reconnectable.reconnect(&used).await } + }); + futures::future::join_all(attempts).await; + + // Single-flight: many concurrent failures on the same connection rebuild it exactly once. + assert_eq!(rebuilds.load(Ordering::SeqCst), 1); + assert_eq!(*reconnectable.current(), 1); + } +} From ccd482a02d7f3cd5ff7f40f2e4520b580b3d4574 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Fri, 12 Jun 2026 11:44:06 +0200 Subject: [PATCH 2/4] refactor: Detect black-holed Redis via background health check The #109 fix recovers a black-holed Redis connection (node drain -> only response timeouts, which ConnectionManager never reconnects on) by rebuilding the connection through a Reconnectable wrapper. The trigger for that rebuild was a run() combinator that every Redis operation had to be wrapped in, so it could observe a timeout and call reconnect(). That trigger was the expensive part: it forced all ~15 call sites into `self.run(|conn| async move { ... })` closures, dragged Future/FnOnce bounds through the call path, and still could not cover the sscan path (its iterator borrows the connection for the whole loop), which had to reconnect by hand. The detection policy ended up smeared across every operation. Move detection into a single background task instead. spawn_health_check PINGs Redis every 5s and calls the same Reconnectable::reconnect on any ping failure; the connection's own response timeout makes a PING on a black-holed socket fail rather than hang. All call sites revert to the original `self.connection()` form and the sscan special case disappears. The diff against the pre-fix baseline shrinks from +368/-103 to +268/-23 on redis/mod.rs - almost entirely call-site churn that is no longer needed. Trade-offs: - Detection is now proactive and time-based (ping interval + response timeout, ~5-15s after a drain) instead of riding on the next failing business operation. It also heals while the pod is idle, which the run() approach could not. Worst-case recovery latency is comparable. - A single failed ping triggers a rebuild. This is the usual health-check semantics and the rebuild is single-flight + Arc::ptr_eq guarded so it happens at most once per dead connection, but it is slightly more eager than rebuilding only on a failed real operation. If churn on a flaky- but-alive Redis ever shows up, gate the rebuild behind N consecutive failed pings. - The task is spawn-and-forget (no JoinHandle). The persistence layer lives for the whole process, so there is nothing to abort; adding lifecycle machinery would be dead weight. Reconnectable, the ConnectionFactory closures, the TCP keepalive / TCP_USER_TIMEOUT settings, and the Reconnectable unit tests are unchanged. Real black-hole recovery is still not covered by automated tests and needs an integration (TCP proxy) or manual node-drain test. --- CHANGELOG.md | 2 +- trino-lb-persistence/src/redis/mod.rs | 318 +++++++++++++------------- 2 files changed, 155 insertions(+), 165 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f33937..220aa9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ All notable changes to this project will be documented in this file. without ever being closed, so the only error trino-lb sees is a response timeout. The redis crate's `ConnectionManager` does not reconnect on timeouts (only on dropped-connection errors), so it kept reusing the dead connection forever. trino-lb now enables TCP keepalive / `TCP_USER_TIMEOUT` on the Redis socket and - rebuilds the connection itself when a command times out ([#109]). + runs a background health check that rebuilds the connection itself when a periodic ping fails ([#109]). [#109]: https://github.com/stackabletech/trino-lb/issues/109 [#111]: https://github.com/stackabletech/trino-lb/pull/111 diff --git a/trino-lb-persistence/src/redis/mod.rs b/trino-lb-persistence/src/redis/mod.rs index 7a8858d..c9cd1c9 100644 --- a/trino-lb-persistence/src/redis/mod.rs +++ b/trino-lb-persistence/src/redis/mod.rs @@ -1,12 +1,14 @@ use std::{ fmt::Debug, - future::Future, num::TryFromIntError, sync::{Arc, PoisonError, RwLock}, time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}, }; -use futures::future::{BoxFuture, try_join_all}; +use futures::{ + TryFutureExt, + future::{BoxFuture, try_join_all}, +}; use redis::{ AsyncCommands, Client, IntoConnectionInfo, RedisError, Script, aio::{ConnectionManager, ConnectionManagerConfig, MultiplexedConnection}, @@ -44,6 +46,13 @@ const REDIS_TCP_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(2); const REDIS_TCP_KEEPALIVE_RETRIES: u32 = 3; const REDIS_TCP_USER_TIMEOUT: Duration = Duration::from_secs(12); +/// How often the background health check pings Redis to detect a black-holed connection (see the +/// note on [`REDIS_TCP_KEEPALIVE_TIME`]) and rebuild it via [`Reconnectable`]. Such a connection +/// only ever produces response *timeouts*, which the redis crate does not treat as a reason to +/// reconnect, so without this proactive check it would stay wedged until the liveness probe +/// restarts the pod (see issue #109). +const REDIS_HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(5); + /// TCP settings applied to every Redis connection (and every reconnect), see the constants above. fn tcp_settings() -> TcpSettings { TcpSettings::default() @@ -168,8 +177,8 @@ type ConnectionFactory = /// The redis crate's [`ConnectionManager`] only reconnects on dropped-connection errors, not on /// timeouts, so a black-holed connection (e.g. after a Redis node drain, see issue #109 and the /// note on [`REDIS_TCP_KEEPALIVE_TIME`]) would otherwise time out forever. This wrapper lets us -/// rebuild the underlying connection ourselves; the policy for *when* to rebuild lives in -/// [`RedisPersistence::run`]. +/// rebuild the underlying connection ourselves; the background health check in +/// [`RedisPersistence::spawn_health_check`] decides *when* to rebuild. struct Reconnectable { current: RwLock>, factory: ConnectionFactory, @@ -196,10 +205,10 @@ where /// Rebuilds the connection, replacing the one currently in use. /// - /// `used` is the connection the failing command ran on. This is a no-op if another rebuild is + /// `used` is the connection that was found to be bad. This is a no-op if another rebuild is /// already in progress, or if the connection has already been replaced since `used` was - /// obtained (i.e. another task already noticed the failure and rebuilt it). This keeps a burst - /// of failing commands from rebuilding the connection over and over. + /// obtained (i.e. another rebuild already happened). This keeps overlapping failure reports + /// from rebuilding the connection over and over. async fn reconnect(&self, used: &Arc) { let Ok(_guard) = self.reconnecting.try_lock() else { // Another task is already rebuilding the connection. @@ -219,7 +228,7 @@ where Err(error) => { warn!( ?error, - "Failed to rebuild the Redis connection, will retry on the next failing command" + "Failed to rebuild the Redis connection, will retry on the next health check" ); } } @@ -237,7 +246,7 @@ pub struct RedisPersistence where R: AsyncCommands + Clone, { - connection: Reconnectable, + connection: Arc>, compare_and_set_script: Script, /// Sometimes we need to do stuff for all cluster groups, so we need to store them to iterate over them @@ -270,19 +279,17 @@ impl RedisPersistence { Arc::new(move || { let client = client.clone(); let redis_config = redis_config.clone(); - Box::pin(async move { - client.get_connection_manager_with_config(redis_config).await - }) + Box::pin(async move { client.get_connection_manager_with_config(redis_config).await }) }) }; let connection = factory().await.context(CreateClientSnafu)?; - Ok(Self { - connection: Reconnectable::new(connection, factory), - compare_and_set_script: compare_and_set_script(), + Ok(Self::new_with_connection( + connection, + factory, cluster_groups, - }) + )) } } @@ -313,11 +320,11 @@ impl RedisPersistence> { let connection = factory().await.context(CreateClientSnafu)?; - Ok(Self { - connection: Reconnectable::new(connection, factory), - compare_and_set_script: compare_and_set_script(), + Ok(Self::new_with_connection( + connection, + factory, cluster_groups, - }) + )) } } @@ -328,23 +335,22 @@ where #[instrument(skip(self, queued_query))] async fn store_queued_query(&self, queued_query: QueuedQuery) -> Result<(), super::Error> { let key = queued_query_key(&queued_query.id); - let set_name = queued_query_set_name(&queued_query.cluster_group); let value = bincode::serde::encode_to_vec(&queued_query, BINCODE_CONFIG) .context(SerializeToBinarySnafu)?; + let mut connection_1 = self.connection(); + let mut connection_2 = self.connection(); + // We can't use a pipe here, as we otherwise get "Received crossed slots in pipeline - CrossSlot" - self.run(|connection| async move { - let mut connection_1 = connection.clone(); - let mut connection_2 = connection; - tokio::try_join!( - connection_1.set::<_, _, ()>(key, value), - connection_2.sadd::<_, _, ()>(&set_name, key), - )?; - Ok(()) - }) - .await - .context(WriteToRedisSnafu)?; + tokio::try_join!( + connection_1 + .set::<_, _, ()>(key, value) + .map_err(|err| Error::WriteToRedis { source: err }), + connection_2 + .sadd::<_, _, ()>(queued_query_set_name(&queued_query.cluster_group), key) + .map_err(|err| Error::WriteToRedis { source: err }), + )?; Ok(()) } @@ -356,7 +362,8 @@ where ) -> Result { let key = queued_query_key(queued_query_id); let value: Vec = self - .run(|mut connection| async move { connection.get(key).await }) + .connection() + .get(key) .await .context(ReadFromRedisSnafu)?; @@ -368,16 +375,14 @@ where #[instrument(skip(self, queued_query))] async fn remove_queued_query(&self, queued_query: &QueuedQuery) -> Result<(), super::Error> { let key = queued_query_key(&queued_query.id); - let set_name = queued_query_set_name(&queued_query.cluster_group); + let mut connection = self.connection(); // We can't use a pipe here, as we otherwise get "Received crossed slots in pipeline - CrossSlot" - self.run(|mut connection| async move { - let _: () = connection.srem(&set_name, key).await?; - let _: () = connection.del(key).await?; - Ok(()) - }) - .await - .context(WriteToRedisSnafu)?; + let _: () = connection + .srem(queued_query_set_name(&queued_query.cluster_group), key) + .await + .context(WriteToRedisSnafu)?; + let _: () = connection.del(key).await.context(WriteToRedisSnafu)?; Ok(()) } @@ -388,12 +393,11 @@ where let value = bincode::serde::encode_to_vec(&query, BINCODE_CONFIG) .context(SerializeToBinarySnafu)?; - self.run(|mut connection| async move { - let _: () = connection.set(key, value).await?; - Ok(()) - }) - .await - .context(WriteToRedisSnafu)?; + let _: () = self + .connection() + .set(key, value) + .await + .context(WriteToRedisSnafu)?; Ok(()) } @@ -402,7 +406,8 @@ where async fn load_query(&self, query_id: &TrinoQueryId) -> Result { let key = query_key(query_id); let value: Vec = self - .run(|mut connection| async move { connection.get(key).await }) + .connection() + .get(key) .await .context(ReadFromRedisSnafu)?; @@ -414,12 +419,11 @@ where #[instrument(skip(self))] async fn remove_query(&self, query_id: &TrinoQueryId) -> Result<(), super::Error> { let key = query_key(query_id); - self.run(|mut connection| async move { - let _: () = connection.del(key).await?; - Ok(()) - }) - .await - .context(DeleteFromRedisSnafu)?; + let _: () = self + .connection() + .del(key) + .await + .context(DeleteFromRedisSnafu)?; Ok(()) } @@ -431,18 +435,12 @@ where max_allowed_count: u64, ) -> Result { let key = cluster_query_counter_key(cluster_name); + let mut connection = self.connection(); loop { - let current = self - .run(|mut connection| { - let key = &key; - async move { - connection - .get::<_, Option>(key) - .instrument(debug_span!("get current value")) - .await - } - }) + let current = connection + .get::<_, Option>(&key) + .instrument(debug_span!("get current value")) .await .context(ReadFromRedisSnafu)? .unwrap_or_default(); @@ -458,20 +456,13 @@ where return Ok(false); } - let script = self.compare_and_set_script.clone(); let response: u8 = self - .run(|mut connection| { - let key = &key; - async move { - script - .key(key) - .arg(current) - .arg(current + 1) - .invoke_async(&mut connection) - .instrument(debug_span!("invoking compare-and-set lua script")) - .await - } - }) + .compare_and_set_script + .key(&key) + .arg(current) + .arg(current + 1) + .invoke_async(&mut connection) + .instrument(debug_span!("invoking compare-and-set lua script")) .await .context(ExecuteCASScriptSnafu)?; @@ -497,18 +488,12 @@ where cluster_name: &TrinoClusterName, ) -> Result<(), super::Error> { let key = cluster_query_counter_key(cluster_name); + let mut connection = self.connection(); loop { - let current = self - .run(|mut connection| { - let key = &key; - async move { - connection - .get::<_, Option>(key) - .instrument(debug_span!("get current value")) - .await - } - }) + let current = connection + .get::<_, Option>(&key) + .instrument(debug_span!("get current value")) .await .context(ReadFromRedisSnafu)? .unwrap_or_default(); @@ -518,20 +503,13 @@ where return Ok(()); } - let script = self.compare_and_set_script.clone(); let response: u8 = self - .run(|mut connection| { - let key = &key; - async move { - script - .key(key) - .arg(current) - .arg(current - 1) - .invoke_async(&mut connection) - .instrument(debug_span!("invoking compare-and-set lua script")) - .await - } - }) + .compare_and_set_script + .key(&key) + .arg(current) + .arg(current - 1) + .invoke_async(&mut connection) + .instrument(debug_span!("invoking compare-and-set lua script")) .await .context(ExecuteCASScriptSnafu)?; @@ -555,12 +533,11 @@ where ) -> Result<(), super::Error> { let key = cluster_query_counter_key(cluster_name); - self.run(|mut connection| async move { - let _: () = connection.set(&key, count).await?; - Ok(()) - }) - .await - .context(SetClusterQueryCountSnafu { cluster_name })?; + let _: () = self + .connection() + .set(key, count) + .await + .context(SetClusterQueryCountSnafu { cluster_name })?; Ok(()) } @@ -572,7 +549,8 @@ where ) -> Result { let key = cluster_query_counter_key(cluster_name); Ok(self - .run(|mut connection| async move { connection.get::<_, Option>(&key).await }) + .connection() + .get::<_, Option>(key) .await .context(ReadClusterQueryCountSnafu { cluster_name })? // There can be the case this function is called before `inc_cluster_queries`, so the number of queries is 0 in this case. @@ -581,9 +559,9 @@ where #[instrument(skip(self))] async fn get_queued_query_count(&self, cluster_group: &str) -> Result { - let set_name = queued_query_set_name(cluster_group); Ok(self - .run(|mut connection| async move { connection.scard::<_, Option>(&set_name).await }) + .connection() + .scard::<_, Option>(queued_query_set_name(cluster_group)) .await .context(GetQueuedQueryCountSnafu { cluster_group })? // The set might not be there yet, as no queries have been queued for this cluster group so far. @@ -606,11 +584,8 @@ where #[instrument(skip(self))] async fn get_last_query_count_fetcher_update(&self) -> Result { let ms = self - .run(|mut connection| async move { - connection - .get::<_, Option>(LAST_QUERY_COUNT_FETCHER_UPDATE_KEY) - .await - }) + .connection() + .get::<_, Option>(LAST_QUERY_COUNT_FETCHER_UPDATE_KEY) .await .context(GetLastQueryCountFetcherUpdateSnafu)? // There can be the case this function is called before `set_last_query_count_fetcher_update`, so we can @@ -633,12 +608,11 @@ where .try_into() .context(ConvertElapsedTimeSinceLastUpdateToMillisSnafu)?; - self.run(|mut connection| async move { - let _: () = connection.set(LAST_QUERY_COUNT_FETCHER_UPDATE_KEY, ms).await?; - Ok(()) - }) - .await - .context(SetLastQueryCountFetcherUpdateSnafu)?; + let _: () = self + .connection() + .set(LAST_QUERY_COUNT_FETCHER_UPDATE_KEY, ms) + .await + .context(SetLastQueryCountFetcherUpdateSnafu)?; Ok(()) } @@ -653,12 +627,11 @@ where let value = bincode::serde::encode_to_vec(state, BINCODE_CONFIG).context(SerializeToBinarySnafu)?; - self.run(|mut connection| async move { - let _: () = connection.set(&key, value).await?; - Ok(()) - }) - .await - .context(SetClusterStateSnafu)?; + let _: () = self + .connection() + .set(key, value) + .await + .context(SetClusterStateSnafu)?; Ok(()) } @@ -671,7 +644,8 @@ where let key = cluster_state_key(cluster_name); let cluster_state: Option> = self - .run(|mut connection| async move { connection.get(&key).await }) + .connection() + .get(key) .await .context(GetClusterStateSnafu)?; @@ -690,27 +664,53 @@ impl RedisPersistence where R: AsyncCommands + Clone + Send + Sync + 'static, { - /// Runs a single Redis operation on the current connection, rebuilding the connection if the - /// operation times out. + /// Wraps the initial connection in a [`Reconnectable`] and starts the background health check + /// that rebuilds it when it goes bad. + fn new_with_connection( + connection: R, + factory: ConnectionFactory, + cluster_groups: Vec, + ) -> Self { + let connection = Arc::new(Reconnectable::new(connection, factory)); + Self::spawn_health_check(Arc::clone(&connection)); + + Self { + connection, + compare_and_set_script: compare_and_set_script(), + cluster_groups, + } + } + + fn connection(&self) -> R { + (*self.connection.current()).clone() + } + + /// Spawns a background task that periodically pings Redis and rebuilds the connection if the + /// ping fails. /// /// The redis crate's [`ConnectionManager`] reconnects on dropped-connection errors but not on /// timeouts, so a black-holed connection (see the note on [`REDIS_TCP_KEEPALIVE_TIME`]) would - /// otherwise time out on every command indefinitely. A timeout on a healthy Redis is already a - /// sign that something is wrong, so rebuilding the connection is the right reaction either way; + /// otherwise time out on every command indefinitely. A periodic ping detects this - and heals + /// it even while there is no other traffic - without having to wrap every individual operation. /// [`Reconnectable::reconnect`] makes sure we only rebuild once per dead connection. - async fn run(&self, operation: F) -> Result - where - F: FnOnce(R) -> Fut, - Fut: Future>, - { - let connection = self.connection.current(); - let result = operation((*connection).clone()).await; - if let Err(error) = &result - && error.is_timeout() - { - self.connection.reconnect(&connection).await; - } - result + fn spawn_health_check(connection: Arc>) { + tokio::spawn(async move { + loop { + tokio::time::sleep(REDIS_HEALTH_CHECK_INTERVAL).await; + + let current = connection.current(); + let mut ping_connection = (*current).clone(); + // The connection carries its own response timeout, so a black-holed socket makes + // this `PING` fail rather than hang forever. + let ping: Result<(), RedisError> = + redis::cmd("PING").query_async(&mut ping_connection).await; + + if let Err(error) = ping { + warn!(?error, "Redis health check failed, rebuilding the connection"); + connection.reconnect(¤t).await; + } + } + }); } #[instrument(skip(self))] @@ -719,27 +719,17 @@ where cluster_group: &str, not_accessed_after: &SystemTime, ) -> Result { - // We can't route the `sscan` through `run`, as the returned iterator borrows the - // connection for the whole loop. We rebuild the connection by hand on a timeout instead. - let connection = self.connection.current(); - let mut scan_connection = (*connection).clone(); + let mut connection = self.connection(); let mut removed = 0; - match scan_connection.sscan(queued_query_set_name(cluster_group)).await { - Ok(mut queued) => { - // TODO: Await `load_queued_query` in parallel (if possible) or add them to a Vec to bulk-delete afterwards - while let Some(key) = queued.next_item().await { - let key = key.with_context(|_| ListQueuedQueriesSnafu { cluster_group })?; - let queued_query = self.load_queued_query(&key).await?; - if &queued_query.last_accessed < not_accessed_after { - self.remove_queued_query(&queued_query).await?; - removed += 1; - } - } - } - Err(error) => { - if error.is_timeout() { - self.connection.reconnect(&connection).await; + if let Ok(mut queued) = connection.sscan(queued_query_set_name(cluster_group)).await { + // TODO: Await `load_queued_query` in parallel (if possible) or add them to a Vec to bulk-delete afterwards + while let Some(key) = queued.next_item().await { + let key = key.with_context(|_| ListQueuedQueriesSnafu { cluster_group })?; + let queued_query = self.load_queued_query(&key).await?; + if &queued_query.last_accessed < not_accessed_after { + self.remove_queued_query(&queued_query).await?; + removed += 1; } } } @@ -841,7 +831,7 @@ mod tests { assert_eq!(*reconnectable.current(), 1); // The handle is now stale (the connection was replaced); reconnecting with it must not - // rebuild again, otherwise a burst of failing commands would rebuild over and over. + // rebuild again, otherwise repeated failure reports would rebuild over and over. reconnectable.reconnect(&stale).await; assert_eq!(rebuilds.load(Ordering::SeqCst), 1); assert_eq!(*reconnectable.current(), 1); From 9af108c3648162ad23c22ba4e79981f8d6d12a74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Wed, 24 Jun 2026 09:33:32 +0200 Subject: [PATCH 3/4] rust fmt When the Kubernetes node running the Redis master is drained, the established TCP connection turns into a black hole: packets are silently dropped and no FIN/RST ever arrives. The only error trino-lb sees is a response timeout, which the redis crate's ConnectionManager classifies as RetryImmediately rather than a dropped connection, so it never reconnects and keeps reusing the dead socket. Every command then times out for hours until the liveness probe restarts the pod (#109). This only reproduced on drain (not pod delete, which closes the socket cleanly and triggers a reconnect) and under load, since active traffic keeps in-flight requests on the wedged connection. Fix this in trino-lb-persistence/src/redis/mod.rs: - Add a Reconnectable wrapper (RwLock> + rebuild factory + single-flight guard). reconnect() only rebuilds when the connection a failing command used is still current (Arc::ptr_eq generation check), so a burst of failures rebuilds the connection exactly once. - Route every Redis operation through a run() combinator that rebuilds the connection when a command times out. Dropped-connection errors are left to ConnectionManager's own reconnect, so this only covers the timeout gap. - Enable TCP keepalive (5s/2s/3 probes) and TCP_USER_TIMEOUT (12s) on the Redis socket via ConnectionInfo::set_tcp_settings (single-node) and ClusterClientBuilder::tcp_settings (cluster). These are re-applied on every reconnect and let the kernel tear down the dead socket faster. Reachable via redis::io::tcp, so no new dependency is needed. ConnectionManagerConfig does not expose TCP settings (even on redis-rs main), hence routing them through ConnectionInfo / the cluster builder. Add unit tests for the Reconnectable coordination logic (replace, stale-handle no-op, concurrent single-flight). End-to-end black-hole recovery is not covered by automated tests and still needs an integration or manual node-drain test. fixes #109 --- README.md | 2 ++ trino-lb-persistence/src/redis/mod.rs | 14 +++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 9c05de1..86a2f64 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,8 @@ This starts trino-lb listening on . You can submit a test-query using [trino-cli](https://trino.io/docs/current/client/cli.html) by calling the following command and entering `select 42;`: + + ```bash ~/Downloads/trino-cli-* --server http://127.0.0.1:8080 trino> select 42; diff --git a/trino-lb-persistence/src/redis/mod.rs b/trino-lb-persistence/src/redis/mod.rs index c9cd1c9..c15b969 100644 --- a/trino-lb-persistence/src/redis/mod.rs +++ b/trino-lb-persistence/src/redis/mod.rs @@ -222,7 +222,8 @@ where match (self.factory)().await { Ok(connection) => { - *self.current.write().unwrap_or_else(PoisonError::into_inner) = Arc::new(connection); + *self.current.write().unwrap_or_else(PoisonError::into_inner) = + Arc::new(connection); info!("Successfully rebuilt the Redis connection"); } Err(error) => { @@ -279,7 +280,11 @@ impl RedisPersistence { Arc::new(move || { let client = client.clone(); let redis_config = redis_config.clone(); - Box::pin(async move { client.get_connection_manager_with_config(redis_config).await }) + Box::pin(async move { + client + .get_connection_manager_with_config(redis_config) + .await + }) }) }; @@ -706,7 +711,10 @@ where redis::cmd("PING").query_async(&mut ping_connection).await; if let Err(error) = ping { - warn!(?error, "Redis health check failed, rebuilding the connection"); + warn!( + ?error, + "Redis health check failed, rebuilding the connection" + ); connection.reconnect(¤t).await; } } From b7e387947bffad3828e371e3b250eead8cd23c1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Wed, 24 Jun 2026 09:42:33 +0200 Subject: [PATCH 4/4] markdownlint --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index 86a2f64..9c05de1 100644 --- a/README.md +++ b/README.md @@ -34,8 +34,6 @@ This starts trino-lb listening on . You can submit a test-query using [trino-cli](https://trino.io/docs/current/client/cli.html) by calling the following command and entering `select 42;`: - - ```bash ~/Downloads/trino-cli-* --server http://127.0.0.1:8080 trino> select 42;