From 2b24bd4c823810de212281a753a2279b56cbf681 Mon Sep 17 00:00:00 2001 From: Nathanael Aninweze Date: Wed, 15 Apr 2026 19:41:11 -0400 Subject: [PATCH 1/4] Handle Flush ('H') extended-protocol message postgres.js (and likely other drivers) sends Flush between Parse/Describe and Bind/Execute when describeFirst is enabled (the default for parameterized non-prepared queries). pgcat was logging 'Unexpected code: H' and dropping the message, leaving the client hung waiting for the ParameterDescription/RowDescription response that never arrived. Add a Flush arm that drains the buffered extended-protocol messages into self.buffer (same as Sync), appends the Flush byte, sends to the server, forwards the server response, and keeps the server checked out so the subsequent Bind/Execute/Sync can complete on the same backend. --- src/client.rs | 125 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/src/client.rs b/src/client.rs index c72e9d2a..c8539a3e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1347,6 +1347,131 @@ where .push_back(ExtendedProtocolData::create_new_close(message, close)); } + // Flush + // Frontend asks the server to push pending responses + // without ending the extended-query sequence. The + // server stays in extended-query mode (no + // ReadyForQuery) and we keep the server checked out. + // Used by drivers like postgres.js between + // Parse/Describe and Bind/Execute (`describeFirst`). + 'H' => { + debug!("Flushing buffered extended-protocol messages to server"); + + // Drain the buffered extended-protocol messages + // into self.buffer the same way Sync does, but + // without sending Sync — append the Flush byte + // instead so the server pushes responses but + // stays in extended-query state. + while let Some(protocol_data) = + self.extended_protocol_data_buffer.pop_front() + { + match protocol_data { + ExtendedProtocolData::Parse { data, metadata } => { + let (parse, hash) = match metadata { + Some(metadata) => metadata, + None => { + let first_char_in_name = *data.get(5).unwrap_or(&0); + if first_char_in_name != 0 { + server.mark_dirty(); + } + self.buffer.put(&data[..]); + continue; + } + }; + + if server.has_prepared_statement(&parse.name) { + self.response_message_queue_buffer.put(parse_complete()); + } else { + self.register_parse_to_server_cache( + false, &hash, &parse, &pool, server, &address, + ) + .await?; + self.buffer.put(&data[..]); + } + } + ExtendedProtocolData::Bind { data, metadata } => { + if let Some(client_given_name) = metadata { + self.ensure_prepared_statement_is_on_server( + client_given_name, + &pool, + server, + &address, + ) + .await?; + } + self.buffer.put(&data[..]); + } + ExtendedProtocolData::Describe { data, metadata } => { + if let Some(client_given_name) = metadata { + self.ensure_prepared_statement_is_on_server( + client_given_name, + &pool, + server, + &address, + ) + .await?; + } + self.buffer.put(&data[..]); + } + ExtendedProtocolData::Execute { data } => { + self.buffer.put(&data[..]) + } + ExtendedProtocolData::Close { data, close } => { + if self.prepared_statements_enabled + && close.is_prepared_statement() + && !close.anonymous() + { + self.prepared_statements.remove(&close.name); + self.response_message_queue_buffer.put(close_complete()); + } else { + self.buffer.put(&data[..]); + } + } + } + } + + // Append the Flush byte so the server pushes its + // pending responses now. + self.buffer.put(&message[..]); + + // If the buffer contains only the Flush byte (no + // pending extended-protocol work), there is + // nothing for the server to flush — just emit any + // queued client responses and continue. + let only_flush = *self.buffer.first().unwrap() == b'H'; + + if !self.response_message_queue_buffer.is_empty() { + if let Err(err) = write_all_flush( + &mut self.write, + &self.response_message_queue_buffer, + ) + .await + { + server.mark_bad(err.to_string().as_str()); + return Err(err); + } + self.response_message_queue_buffer.clear(); + } + + if !only_flush { + self.send_and_receive_loop( + code, + None, + server, + &address, + &pool, + &self.stats.clone(), + ) + .await?; + } + + self.buffer.clear(); + + // Do NOT release the server: extended-query + // exchange is still open. A Sync from the client + // will eventually end it. + } + // Sync // Frontend (client) is asking for the query result now. 'S' => { From fb5b02e186d19c3209fc19d74e1a04eaa0174f8e Mon Sep 17 00:00:00 2001 From: Nathanael Aninweze Date: Wed, 15 Apr 2026 20:41:15 -0400 Subject: [PATCH 2/4] Read Flush responses without waiting for ReadyForQuery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit added a Flush arm in the client loop but reused send_and_receive_loop, which calls Server::recv() — that recv loop only terminates on ReadyForQuery ('Z'). Servers do NOT send ReadyForQuery in response to Flush, so the read hung forever. Add Server::recv_flush_response() that breaks on RowDescription, NoData, ErrorResponse, or BindComplete (the terminal describe / bind response markers). Wire it into the Flush arm via a direct send + recv_flush_response call path. Verified locally: postgres.js with prepare:false + params now completes in ~500ms through pgcat (previously hung). --- src/client.rs | 36 +++++++++++++++------ src/server.rs | 86 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 9 deletions(-) diff --git a/src/client.rs b/src/client.rs index c8539a3e..971e7d49 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1454,15 +1454,33 @@ where } if !only_flush { - self.send_and_receive_loop( - code, - None, - server, - &address, - &pool, - &self.stats.clone(), - ) - .await?; + // Send to server. + self.send_server_message(server, &self.buffer, &address, &pool) + .await?; + + // Read the Flush response — must NOT use the + // normal recv loop because the server does not + // send ReadyForQuery in response to Flush. + let response = match server + .recv_flush_response(Some(&mut self.server_parameters)) + .await + { + Ok(r) => r, + Err(err) => { + pool.ban(&address, BanReason::MessageReceiveFailed, Some(&self.stats)); + error_response_terminal( + &mut self.write, + &format!("error receiving Flush response: {:?}", err), + ) + .await?; + return Err(err); + } + }; + + if let Err(err) = write_all_flush(&mut self.write, &response).await { + server.mark_bad(err.to_string().as_str()); + return Err(err); + } } self.buffer.clear(); diff --git a/src/server.rs b/src/server.rs index 882450ea..54eaa48d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1114,6 +1114,92 @@ impl Server { Ok(bytes) } + /// Read server messages emitted in response to a Flush. + /// + /// Unlike `recv`, this loop never waits for ReadyForQuery (`Z`) — + /// the server does not send one in response to Flush. Instead we + /// read every message available, and stop once we hit a terminal + /// describe-response marker (RowDescription `T`, NoData `n`, + /// ParameterDescription `t`-followed-by-T-or-n, or ErrorResponse + /// `E`). On error we still keep buffering until ReadyForQuery + /// is NOT expected — we just return what we have. + pub async fn recv_flush_response( + &mut self, + mut client_server_parameters: Option<&mut ServerParameters>, + ) -> Result { + let mut saw_terminal = false; + loop { + let mut message = match read_message(&mut self.stream).await { + Ok(message) => message, + Err(err) => { + error!( + "Terminating server {:?} during Flush response: {:?}", + self.address, err + ); + self.bad = true; + return Err(err); + } + }; + + self.buffer.put(&message[..]); + let code = message.get_u8() as char; + let _len = message.get_i32(); + + match code { + // ParameterStatus — track session params. + 'S' => { + let key = message.read_string().unwrap(); + let value = message.read_string().unwrap(); + if let Some(client_server_parameters) = client_server_parameters.as_mut() { + client_server_parameters.set_param(key.clone(), value.clone(), false); + } + self.server_parameters.set_param(key, value, false); + } + + // ParseComplete — describe-only flush returns this alone. + '1' => { + self.registering_prepared_statement.pop_front(); + // If client only sent Parse+Flush, no further messages + // are coming. But normally client also sent Describe, + // so keep reading for ParameterDescription/RowDescription. + } + + // Terminal describe-response markers. + // RowDescription / NoData / ErrorResponse — once the + // server has emitted one of these the describe round-trip + // is complete. + 'T' | 'n' | 'E' => { + saw_terminal = true; + self.data_available = false; + break; + } + + // ParameterDescription — sent before RowDescription/NoData. + // Keep reading. + 't' => (), + + // BindComplete — possible if Flush follows Bind. + '2' => { + saw_terminal = true; + self.data_available = false; + break; + } + + _ => (), + } + + if saw_terminal { + break; + } + } + + let bytes = self.buffer.clone(); + self.stats().data_received(bytes.len()); + self.buffer.clear(); + self.last_activity = SystemTime::now(); + Ok(bytes) + } + // Determines if the server already has a prepared statement with the given name // Increments the prepared statement cache hit counter pub fn has_prepared_statement(&mut self, name: &str) -> bool { From 1e3a805d479a88bcf9c604a680ed394f74d2a6ef Mon Sep 17 00:00:00 2001 From: nathanaelaninweze Date: Wed, 20 May 2026 23:37:31 -0400 Subject: [PATCH 3/4] Flush response: drop double-copy parse and clone MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - recv_flush_response: skip body parse for non-S/non-1 codes (5–N byte protocol frames; framing byte is enough to dispatch). - recv_flush_response: replace self.buffer.clone() with std::mem::take to avoid one memcpy+alloc per Flush response. - Merge separate saw_terminal flag into single matches!() check on the framing byte. Net effect on cloudrest describe-heavy workload: removes one full-buffer clone and one redundant get_u8/get_i32/read_string pass per Flush. At intra-DC RTT (~0.3ms) and 10k+ qps the saved CPU is the dominant win. --- src/server.rs | 79 ++++++++++++++++++++++----------------------------- 1 file changed, 34 insertions(+), 45 deletions(-) diff --git a/src/server.rs b/src/server.rs index 54eaa48d..42992cae 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1127,9 +1127,8 @@ impl Server { &mut self, mut client_server_parameters: Option<&mut ServerParameters>, ) -> Result { - let mut saw_terminal = false; loop { - let mut message = match read_message(&mut self.stream).await { + let message = match read_message(&mut self.stream).await { Ok(message) => message, Err(err) => { error!( @@ -1141,61 +1140,51 @@ impl Server { } }; - self.buffer.put(&message[..]); - let code = message.get_u8() as char; - let _len = message.get_i32(); - - match code { - // ParameterStatus — track session params. - 'S' => { - let key = message.read_string().unwrap(); - let value = message.read_string().unwrap(); + // Inspect framing byte without consuming the message — avoid + // re-parsing length and avoid the double copy of body bytes. + let code = *message.first().unwrap_or(&0); + + // Only ParameterStatus needs body parsing (to track session + // params). Skip get_u8/get_i32/read_string for everything + // else — these messages are 5–N bytes and we only need to + // know the framing byte. + if code == b'S' { + // ParameterStatus body: code(1) + len(4) + key\0 + value\0. + // Parse into an owned BytesMut just for the body so we + // can use the existing BytesMutReader impl without + // disturbing the original `message` we still need to + // append to self.buffer below. + let mut body = BytesMut::from(&message[5..]); + if let (Ok(key), Ok(value)) = (body.read_string(), body.read_string()) { if let Some(client_server_parameters) = client_server_parameters.as_mut() { client_server_parameters.set_param(key.clone(), value.clone(), false); } self.server_parameters.set_param(key, value, false); } - - // ParseComplete — describe-only flush returns this alone. - '1' => { - self.registering_prepared_statement.pop_front(); - // If client only sent Parse+Flush, no further messages - // are coming. But normally client also sent Describe, - // so keep reading for ParameterDescription/RowDescription. - } - - // Terminal describe-response markers. - // RowDescription / NoData / ErrorResponse — once the - // server has emitted one of these the describe round-trip - // is complete. - 'T' | 'n' | 'E' => { - saw_terminal = true; - self.data_available = false; - break; - } - - // ParameterDescription — sent before RowDescription/NoData. - // Keep reading. - 't' => (), - - // BindComplete — possible if Flush follows Bind. - '2' => { - saw_terminal = true; - self.data_available = false; - break; - } - - _ => (), + } else if code == b'1' { + // ParseComplete — consume one pending registration. + self.registering_prepared_statement.pop_front(); } - if saw_terminal { + self.buffer.extend_from_slice(&message); + + // Terminal describe/bind-response markers — server done + // emitting in response to this Flush. + // T = RowDescription, n = NoData, E = ErrorResponse, + // 2 = BindComplete (Flush after Bind w/o Execute). + if matches!(code, b'T' | b'n' | b'E' | b'2') { + self.data_available = false; break; } } - let bytes = self.buffer.clone(); + // mem::take instead of clone — hands the BytesMut to caller + // without a memcpy+alloc. self.buffer is left empty with retained + // capacity is NOT a concern; the empty BytesMut here is fine + // because we reuse it next call (push triggers fresh alloc on + // first put, identical cost to the prior clear()). + let bytes = std::mem::take(&mut self.buffer); self.stats().data_received(bytes.len()); - self.buffer.clear(); self.last_activity = SystemTime::now(); Ok(bytes) } From 2cd3799828fc4317f07220cdb692ce61d71ef1ea Mon Sep 17 00:00:00 2001 From: Nathanael Aninweze Date: Tue, 26 May 2026 18:50:20 -0400 Subject: [PATCH 4/4] client: don't silently drop client-side prepared statement on server re-prepare failure When a checked-out backend rejected re-Parse of a previously-prepared statement (e.g. after schema-change cleanup, DEALLOCATE, or transient backend state), the old code removed the entry from the *client* prepared_statements HashMap and returned Ok(). This left tokio-postgres clients in an unrecoverable state: the client still believed the statement was valid (it never received a Close), so its very next Bind for that name missed in buffer_bind and the whole TCP conn dropped with 'Prepared statement sN doesn't exist'. For long-lived ETL clients (pgraft's tokio-postgres prepares many shapes via execute() and caches them per-conn by SQL), this killed every write inflight on that conn and orphaned any open COPY-IN, leaving AccessExclusiveLock on temp staging tables until session termination. New behavior: keep the client cache intact (it mirrors the client's own view), mark the misbehaving server bad so the pool replaces it, and propagate PreparedStatementError so the caller can fail this one operation and retry on a fresh backend. --- src/client.rs | 48 ++++++++++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/src/client.rs b/src/client.rs index 971e7d49..8bb52330 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1886,29 +1886,37 @@ where .register_parse_to_server_cache(true, hash, parse, pool, server, address) .await { - Ok(_) => (), - Err(err) => match err { - Error::PreparedStatementError => { - debug!("Removed {} from client cache", client_name); - self.prepared_statements.remove(&client_name); - } - - _ => { - return Err(err); - } - }, + Ok(_) => Ok(()), + Err(Error::PreparedStatementError) => { + // The backend rejected our Parse. This used to silently + // drop the statement from the *client* cache and return + // Ok, which left tokio-postgres clients believing the + // statement was still valid: the very next Bind would + // miss in `buffer_bind` and abort the whole TCP conn + // with "Prepared statement sN doesn't exist", killing + // pgraft-style ETL workloads that prepare-then-reuse + // across many transactions. + // + // Keep the client cache intact (it represents what the + // client believes), mark this server bad so the pool + // replaces it on next checkout, and surface the error + // so the caller can retry on a fresh backend. + warn!( + "Server {:?} rejected re-prepare of `{}` — marking bad so pool replaces it", + address, client_name + ); + server.mark_bad("prepared statement re-register failed"); + Err(Error::PreparedStatementError) + } + Err(err) => Err(err), } } - None => { - return Err(Error::ClientError(format!( - "prepared statement `{}` not found", - client_name - ))) - } - }; - - Ok(()) + None => Err(Error::ClientError(format!( + "prepared statement `{}` not found", + client_name + ))), + } } /// Register the parse to the server cache and send it to the server if requested (ie. requested by pgcat)