Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

370 changes: 370 additions & 0 deletions docs/CLIENT_CONNECTION.md

Large diffs are not rendered by default.

221 changes: 221 additions & 0 deletions integration/rust/tests/integration/cancel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
use std::time::Duration;

use bytes::{BufMut, BytesMut};
use rust::setup::{admin_tokio, connection_sqlx_direct};
use sqlx::PgPool;
use tokio::{io::AsyncWriteExt, net::TcpStream, task::JoinHandle, time::timeout};
use tokio_postgres::{CancelToken, Error as PgError, NoTls, SimpleQueryMessage};

/// Returns whether `pid` has an active `pg_sleep` query visible in `pg_stat_activity`.
/// Uses a direct PostgreSQL connection so the result bypasses pgdog completely.
async fn is_sleeping(direct: &PgPool, pid: i32) -> bool {
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) \
FROM pg_stat_activity \
WHERE pid = $1 \
AND state = 'active' \
AND query LIKE '%pg_sleep%'",
)
.bind(pid)
.fetch_one(direct)
.await
.unwrap();
count == 1
}

/// Connect to pgdog, pin to a specific PG backend via BEGIN, capture the backend pid
/// via `pg_backend_pid()`, and launch `SELECT pg_sleep(60)` in a background task.
///
/// `application_name` is embedded in the connection string so the caller can identify
/// this connection in `SHOW CLIENTS` if needed.
///
/// Returns `(backend_pid, cancel_token, query_handle)`. The caller owns `cancel_token`
/// and `query_handle`; both must be driven to completion to keep the test clean.
async fn start_sleeping_connection(
application_name: &str,
) -> (
i32,
CancelToken,
JoinHandle<Result<Vec<SimpleQueryMessage>, PgError>>,
) {
let (client, connection) = tokio_postgres::connect(
&format!(
"host=127.0.0.1 user=pgdog dbname=pgdog password=pgdog port=6432 application_name={application_name}"
),
NoTls,
)
.await
.unwrap();

tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("pgdog connection error: {}", e);
}
});

let cancel_token = client.cancel_token();

// BEGIN pins the client to one backend for the duration of the transaction.
// Without this, transaction-mode pooling may assign a different backend to
// pg_sleep than the one whose pid we captured.
client.simple_query("BEGIN").await.unwrap();

let row = client
.query_one("SELECT pg_backend_pid()", &[])
.await
.unwrap();
let backend_pid: i32 = row.get(0);

let handle = tokio::spawn(async move { client.simple_query("SELECT pg_sleep(60)").await });

(backend_pid, cancel_token, handle)
}

/// Assert that a query handle returned by `start_sleeping_connection` was cancelled:
/// it must resolve to SQLSTATE 57014 (canceling statement due to user request).
async fn assert_cancelled(
handle: JoinHandle<Result<Vec<SimpleQueryMessage>, PgError>>,
label: &str,
) {
let result = timeout(Duration::from_secs(5), handle)
.await
.expect(&format!(
"{label}: cancelled query did not unblock within 5 seconds"
))
.expect(&format!("{label}: task panicked"));

let err = result.expect_err(&format!(
"{label}: query should have been cancelled, but it succeeded"
));
let db_err = err.as_db_error().expect(&format!(
"{label}: expected a PostgreSQL error, not a network error"
));

assert_eq!(
db_err.code().code(),
"57014",
"{label}: expected SQLSTATE 57014, got {}",
db_err.code().code()
);
}

/// Verify that cancellation is precise: two independent connections both run a long
/// query and each cancel request stops exactly one of them.
///
/// Steps:
/// 1. Two clients connect through pgdog; each starts `SELECT pg_sleep(60)`.
/// 2. Both queries are confirmed active on specific PG backends via `pg_stat_activity`.
/// 3. Cancel connection 1 → only backend 1 stops; backend 2 remains active.
/// 4. Cancel connection 2 → backend 2 stops.
#[tokio::test]
async fn test_cancel_query() {
let direct = connection_sqlx_direct().await;

let (pid1, token1, handle1) = start_sleeping_connection("cancel_test").await;
let (pid2, token2, handle2) = start_sleeping_connection("cancel_test").await;

// Give both queries time to reach their respective backends.
tokio::time::sleep(Duration::from_millis(300)).await;

assert!(
is_sleeping(&direct, pid1).await,
"connection 1 (backend {pid1}) should be active before any cancel"
);
assert!(
is_sleeping(&direct, pid2).await,
"connection 2 (backend {pid2}) should be active before any cancel"
);

// ── Cancel connection 1 ────────────────────────────────────────────────
token1.cancel_query(NoTls).await.unwrap();

// Wait for the client to receive the cancellation error.
// By the time the handle resolves, the backend has already stopped.
assert_cancelled(handle1, "connection 1").await;

// Connection 1's backend is gone; connection 2 must still be running.
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
!is_sleeping(&direct, pid1).await,
"backend {pid1} should be idle after cancelling connection 1"
);
assert!(
is_sleeping(&direct, pid2).await,
"backend {pid2} should still be active after cancelling connection 1 only"
);

// ── Cancel connection 2 ────────────────────────────────────────────────
token2.cancel_query(NoTls).await.unwrap();

assert_cancelled(handle2, "connection 2").await;

tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
!is_sleeping(&direct, pid2).await,
"backend {pid2} should be idle after cancelling connection 2"
);
}

/// Verify that a cancel request carrying a wrong pid and secret is silently rejected:
/// the running query is unaffected and the client does not receive a cancellation error.
///
/// pgdog's `verify_cancel` gate must reject the request before it reaches the pool,
/// so the backend continues executing as if nothing happened.
#[tokio::test]
async fn test_cancel_query_wrong_secret() {
let direct = connection_sqlx_direct().await;
let app_name = "cancel_test_wrong_secret";
let (backend_pid, real_cancel_token, query_handle) = start_sleeping_connection(app_name).await;

// Give the query time to reach the backend.
tokio::time::sleep(Duration::from_millis(300)).await;

assert!(
is_sleeping(&direct, backend_pid).await,
"query should be running before wrong-secret cancel"
);

// Look up the pgdog client pid from the admin interface.
// SHOW CLIENTS exposes the pid (the 'id' column) that pgdog assigned during login —
// the same value that was sent in the K message and that verify_cancel checks against.
let admin = admin_tokio().await;
let messages = admin.simple_query("SHOW CLIENTS").await.unwrap();
let pgdog_pid: i32 = messages
.iter()
.filter_map(|m| match m {
SimpleQueryMessage::Row(row) => Some(row),
_ => None,
})
.find(|row| row.get("application_name") == Some(app_name))
.expect("connection should appear in SHOW CLIENTS")
.get("id")
.expect("id column should be present")
.parse()
.expect("id should be a valid i32");

// Send a CancelRequest with the real pgdog client pid but a wrong secret.
// pgdog will find the client in comms by pid, then reject it because
// the secret doesn't match — verify_cancel returns false.
let mut raw = TcpStream::connect("127.0.0.1:6432").await.unwrap();
let mut buf = BytesMut::new();
buf.put_i32(16); // total message length (including the length field)
buf.put_i32(80877102); // CancelRequest magic code
buf.put_i32(pgdog_pid); // correct pid
buf.put_i32(0); // wrong secret
raw.write_all(&buf).await.unwrap();
// pgdog closes the connection silently after processing; no response is sent.
drop(raw);

// Give pgdog enough time to receive and process the bogus cancel.
tokio::time::sleep(Duration::from_millis(300)).await;

// The query must still be running — the secret mismatch was caught by verify_cancel.
assert!(
is_sleeping(&direct, backend_pid).await,
"query should still be running after wrong-secret cancel — verify_cancel must have rejected it"
);

// Clean up: cancel for real.
real_cancel_token.cancel_query(NoTls).await.unwrap();
assert_cancelled(query_handle, "wrong-secret test cleanup").await;
}
1 change: 1 addition & 0 deletions integration/rust/tests/integration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod auth;
pub mod auto_id;
pub mod avg;
pub mod ban;
pub mod cancel;
pub mod client_ids;
pub mod connection_recovery;
pub mod cross_shard_disabled;
Expand Down
13 changes: 2 additions & 11 deletions pgdog-postgres-types/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,10 @@ impl ToDataRowColumn for i64 {
}
}

impl ToDataRowColumn for Option<i64> {
impl<T: ToDataRowColumn> ToDataRowColumn for Option<T> {
fn to_data_row_column(&self) -> Data {
match self {
Some(value) => ToDataRowColumn::to_data_row_column(value),
None => Data::null(),
}
}
}

impl ToDataRowColumn for Option<String> {
fn to_data_row_column(&self) -> Data {
match self {
Some(value) => ToDataRowColumn::to_data_row_column(value),
Some(value) => value.to_data_row_column(),
None => Data::null(),
}
}
Expand Down
3 changes: 2 additions & 1 deletion pgdog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "std"]
tracing-throttle = "0.4"
parking_lot = "0.12"
thiserror = "2"
derive_more = { version = "2", features = ["display", "error"] }
derive_more = { version = "2", features = ["display", "error", "from", "into"] }
bytes = "1"
clap = { version = "4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
Expand Down Expand Up @@ -77,6 +77,7 @@ azure_identity = "0.34.0"
azure_core = "0.34.0"
crc32c = "0.6.8"
bit-vec = "0.8"
smallvec = "1"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots-no-provider"] }
hex = "0.4"
x509-parser = "0.18"
Expand Down
7 changes: 2 additions & 5 deletions pgdog/src/admin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use tracing::debug;
use crate::frontend::ClientRequest;
use crate::net::messages::command_complete::CommandComplete;
use crate::net::messages::{ErrorResponse, FromBytes, Protocol, Query, ReadyForQuery};
use crate::net::ProtocolMessage;
use crate::net::ToBytes;
use crate::net::{BackendKeyData, ProtocolMessage};

use super::parser::Parser;
use super::prelude::Message;
Expand Down Expand Up @@ -63,10 +63,7 @@ impl AdminServer {
self.messages.extend(messages);
self.messages.push_back(ReadyForQuery::idle().message()?);

self.messages = std::mem::take(&mut self.messages)
.into_iter()
.map(|m| m.backend(BackendKeyData::default()))
.collect();
self.messages = std::mem::take(&mut self.messages).into_iter().collect();

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/admin/show_client_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Command for ShowClientMemory {
let user = client.paramters.get_default("user", "postgres");
let database = client.paramters.get_default("database", user);

row.add(client.id.pid as i64)
row.add(client.key.pid())
.add(database)
.add(user)
.add(client.addr.ip().to_string().as_str())
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/admin/show_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Command for ShowClients {
let row = self
.filter
.clone()
.add("id", client.id.pid as i64)
.add("id", client.key.pid())
.add("user", user)
.add("database", client.paramters.get_default("database", user))
.add("addr", client.addr.ip().to_string())
Expand Down
4 changes: 2 additions & 2 deletions pgdog/src/admin/show_server_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Command for ShowServerMemory {
let mut messages = vec![rd.message()?];

let stats = stats();
for (_, server) in stats {
for server in stats {
let mut row = DataRow::new();
let memory = &server.stats.memory;

Expand All @@ -45,7 +45,7 @@ impl Command for ShowServerMemory {
.add(server.addr.user.as_str())
.add(server.addr.host.as_str())
.add(server.addr.port as i64)
.add(server.stats.id.pid as i64)
.add(server.stats.id)
.add(memory.buffer.reallocs as i64)
.add(memory.buffer.reclaims as i64)
.add(memory.buffer.bytes_used as i64)
Expand Down
9 changes: 3 additions & 6 deletions pgdog/src/admin/show_servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Command for ShowServers {
let now = Instant::now();
let now_time = SystemTime::now();

for (_, server) in stats {
for server in stats {
let age = now.duration_since(server.stats.created_at);
let request_age = now.duration_since(server.stats.last_used);
let request_time = now_time - request_age;
Expand All @@ -98,11 +98,8 @@ impl Command for ShowServers {
format_time(server.stats.created_at_time.into()),
)
.add("request_time", format_time(request_time.into()))
.add("remote_pid", server.stats.id.pid as i64)
.add(
"client_id",
server.stats.client_id.map(|client| client.pid as i64),
)
.add("remote_pid", server.stats.id)
.add("client_id", server.stats.client_id)
.add("transactions", server.stats.total.transactions)
.add("queries", server.stats.total.queries)
.add("rollbacks", server.stats.total.rollbacks)
Expand Down
4 changes: 2 additions & 2 deletions pgdog/src/backend/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::frontend::PreparedStatements;
use crate::{
backend::pool::PoolConfig,
config::{config, load, set, ConfigAndUsers, ManualQuery, Role, User as ConfigUser},
net::{messages::BackendKeyData, tls},
net::{messages::BackendPid, tls},
};

use super::{
Expand Down Expand Up @@ -399,7 +399,7 @@ impl Databases {
}

/// Cancel a query running on one of the databases proxied by the pooler.
pub async fn cancel(&self, id: &BackendKeyData) -> Result<(), Error> {
pub async fn cancel(&self, id: BackendPid) -> Result<(), Error> {
for cluster in self.databases.values() {
cluster.cancel(id).await?;
}
Expand Down
4 changes: 2 additions & 2 deletions pgdog/src/backend/pool/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
ShardedTable, User,
},
frontend::{ClientRequest, RegexParser},
net::{messages::BackendKeyData, Query},
net::{messages::BackendPid, Query},
};

use super::{Address, Config, Error, Guard, MirrorStats, Request, Shard, ShardConfig};
Expand Down Expand Up @@ -388,7 +388,7 @@ impl Cluster {
}

/// Cancel a query executed by one of the shards.
pub async fn cancel(&self, id: &BackendKeyData) -> Result<(), super::super::Error> {
pub async fn cancel(&self, id: BackendPid) -> Result<(), super::super::Error> {
for shard in &self.shards {
shard.cancel(id).await?;
}
Expand Down
Loading
Loading