diff --git a/src/client.rs b/src/client.rs index c72e9d2a..8bb52330 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1347,6 +1347,149 @@ where .push_back(ExtendedProtocolData::create_new_close(message, close)); } + // Flush + // Frontend asks the server to push pending responses + // without ending the extended-query sequence. The + // server stays in extended-query mode (no + // ReadyForQuery) and we keep the server checked out. + // Used by drivers like postgres.js between + // Parse/Describe and Bind/Execute (`describeFirst`). + 'H' => { + debug!("Flushing buffered extended-protocol messages to server"); + + // Drain the buffered extended-protocol messages + // into self.buffer the same way Sync does, but + // without sending Sync — append the Flush byte + // instead so the server pushes responses but + // stays in extended-query state. + while let Some(protocol_data) = + self.extended_protocol_data_buffer.pop_front() + { + match protocol_data { + ExtendedProtocolData::Parse { data, metadata } => { + let (parse, hash) = match metadata { + Some(metadata) => metadata, + None => { + let first_char_in_name = *data.get(5).unwrap_or(&0); + if first_char_in_name != 0 { + server.mark_dirty(); + } + self.buffer.put(&data[..]); + continue; + } + }; + + if server.has_prepared_statement(&parse.name) { + self.response_message_queue_buffer.put(parse_complete()); + } else { + self.register_parse_to_server_cache( + false, &hash, &parse, &pool, server, &address, + ) + .await?; + self.buffer.put(&data[..]); + } + } + ExtendedProtocolData::Bind { data, metadata } => { + if let Some(client_given_name) = metadata { + self.ensure_prepared_statement_is_on_server( + client_given_name, + &pool, + server, + &address, + ) + .await?; + } + self.buffer.put(&data[..]); + } + ExtendedProtocolData::Describe { data, metadata } => { + if let Some(client_given_name) = metadata { + self.ensure_prepared_statement_is_on_server( + client_given_name, + &pool, + server, + &address, + ) + .await?; + } + self.buffer.put(&data[..]); + } + ExtendedProtocolData::Execute { data } => { + self.buffer.put(&data[..]) + } + ExtendedProtocolData::Close { data, close } => { + if self.prepared_statements_enabled + && close.is_prepared_statement() + && !close.anonymous() + { + self.prepared_statements.remove(&close.name); + self.response_message_queue_buffer.put(close_complete()); + } else { + self.buffer.put(&data[..]); + } + } + } + } + + // Append the Flush byte so the server pushes its + // pending responses now. + self.buffer.put(&message[..]); + + // If the buffer contains only the Flush byte (no + // pending extended-protocol work), there is + // nothing for the server to flush — just emit any + // queued client responses and continue. + let only_flush = *self.buffer.first().unwrap() == b'H'; + + if !self.response_message_queue_buffer.is_empty() { + if let Err(err) = write_all_flush( + &mut self.write, + &self.response_message_queue_buffer, + ) + .await + { + server.mark_bad(err.to_string().as_str()); + return Err(err); + } + self.response_message_queue_buffer.clear(); + } + + if !only_flush { + // Send to server. + self.send_server_message(server, &self.buffer, &address, &pool) + .await?; + + // Read the Flush response — must NOT use the + // normal recv loop because the server does not + // send ReadyForQuery in response to Flush. + let response = match server + .recv_flush_response(Some(&mut self.server_parameters)) + .await + { + Ok(r) => r, + Err(err) => { + pool.ban(&address, BanReason::MessageReceiveFailed, Some(&self.stats)); + error_response_terminal( + &mut self.write, + &format!("error receiving Flush response: {:?}", err), + ) + .await?; + return Err(err); + } + }; + + if let Err(err) = write_all_flush(&mut self.write, &response).await { + server.mark_bad(err.to_string().as_str()); + return Err(err); + } + } + + self.buffer.clear(); + + // Do NOT release the server: extended-query + // exchange is still open. A Sync from the client + // will eventually end it. + } + // Sync // Frontend (client) is asking for the query result now. 'S' => { @@ -1743,29 +1886,37 @@ where .register_parse_to_server_cache(true, hash, parse, pool, server, address) .await { - Ok(_) => (), - Err(err) => match err { - Error::PreparedStatementError => { - debug!("Removed {} from client cache", client_name); - self.prepared_statements.remove(&client_name); - } - - _ => { - return Err(err); - } - }, + Ok(_) => Ok(()), + Err(Error::PreparedStatementError) => { + // The backend rejected our Parse. This used to silently + // drop the statement from the *client* cache and return + // Ok, which left tokio-postgres clients believing the + // statement was still valid: the very next Bind would + // miss in `buffer_bind` and abort the whole TCP conn + // with "Prepared statement sN doesn't exist", killing + // pgraft-style ETL workloads that prepare-then-reuse + // across many transactions. + // + // Keep the client cache intact (it represents what the + // client believes), mark this server bad so the pool + // replaces it on next checkout, and surface the error + // so the caller can retry on a fresh backend. + warn!( + "Server {:?} rejected re-prepare of `{}` — marking bad so pool replaces it", + address, client_name + ); + server.mark_bad("prepared statement re-register failed"); + Err(Error::PreparedStatementError) + } + Err(err) => Err(err), } } - None => { - return Err(Error::ClientError(format!( - "prepared statement `{}` not found", - client_name - ))) - } - }; - - Ok(()) + None => Err(Error::ClientError(format!( + "prepared statement `{}` not found", + client_name + ))), + } } /// Register the parse to the server cache and send it to the server if requested (ie. requested by pgcat) diff --git a/src/server.rs b/src/server.rs index 882450ea..42992cae 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1114,6 +1114,81 @@ impl Server { Ok(bytes) } + /// Read server messages emitted in response to a Flush. + /// + /// Unlike `recv`, this loop never waits for ReadyForQuery (`Z`) — + /// the server does not send one in response to Flush. Instead we + /// read every message available, and stop once we hit a terminal + /// describe-response marker (RowDescription `T`, NoData `n`, + /// ParameterDescription `t`-followed-by-T-or-n, or ErrorResponse + /// `E`). On error we still keep buffering until ReadyForQuery + /// is NOT expected — we just return what we have. + pub async fn recv_flush_response( + &mut self, + mut client_server_parameters: Option<&mut ServerParameters>, + ) -> Result { + loop { + let message = match read_message(&mut self.stream).await { + Ok(message) => message, + Err(err) => { + error!( + "Terminating server {:?} during Flush response: {:?}", + self.address, err + ); + self.bad = true; + return Err(err); + } + }; + + // Inspect framing byte without consuming the message — avoid + // re-parsing length and avoid the double copy of body bytes. + let code = *message.first().unwrap_or(&0); + + // Only ParameterStatus needs body parsing (to track session + // params). Skip get_u8/get_i32/read_string for everything + // else — these messages are 5–N bytes and we only need to + // know the framing byte. + if code == b'S' { + // ParameterStatus body: code(1) + len(4) + key\0 + value\0. + // Parse into an owned BytesMut just for the body so we + // can use the existing BytesMutReader impl without + // disturbing the original `message` we still need to + // append to self.buffer below. + let mut body = BytesMut::from(&message[5..]); + if let (Ok(key), Ok(value)) = (body.read_string(), body.read_string()) { + if let Some(client_server_parameters) = client_server_parameters.as_mut() { + client_server_parameters.set_param(key.clone(), value.clone(), false); + } + self.server_parameters.set_param(key, value, false); + } + } else if code == b'1' { + // ParseComplete — consume one pending registration. + self.registering_prepared_statement.pop_front(); + } + + self.buffer.extend_from_slice(&message); + + // Terminal describe/bind-response markers — server done + // emitting in response to this Flush. + // T = RowDescription, n = NoData, E = ErrorResponse, + // 2 = BindComplete (Flush after Bind w/o Execute). + if matches!(code, b'T' | b'n' | b'E' | b'2') { + self.data_available = false; + break; + } + } + + // mem::take instead of clone — hands the BytesMut to caller + // without a memcpy+alloc. self.buffer is left empty with retained + // capacity is NOT a concern; the empty BytesMut here is fine + // because we reuse it next call (push triggers fresh alloc on + // first put, identical cost to the prior clear()). + let bytes = std::mem::take(&mut self.buffer); + self.stats().data_received(bytes.len()); + self.last_activity = SystemTime::now(); + Ok(bytes) + } + // Determines if the server already has a prepared statement with the given name // Increments the prepared statement cache hit counter pub fn has_prepared_statement(&mut self, name: &str) -> bool {