Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pgdog/src/backend/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl Inner {
/// Place connection back into the pool
/// or give it to a waiting client.
#[inline]
pub(super) fn put(&mut self, mut conn: Box<Server>, now: Instant) -> Result<(), Error> {
pub(super) fn put(&mut self, mut conn: Box<Server>) -> Result<(), Error> {
// Try to give it to a client that's been waiting, if any.
let id = *conn.id();
while let Some(waiter) = self.waiting.pop_front() {
Expand All @@ -263,8 +263,8 @@ impl Inner {
server: id,
client: waiter.request.id,
})?;
self.stats.counts.server_assignment_count += 1;
self.stats.counts.wait_time += now.duration_since(waiter.request.created_at);
self.stats
.record_checkout(waiter.request.created_at, waiter.request.read);
return Ok(());
}
}
Expand Down Expand Up @@ -380,7 +380,7 @@ impl Inner {
// Finally, if the server is ok,
// place the connection back into the idle list.
if server.can_check_in() {
self.put(server, now)?;
self.put(server)?;
result.replenish = false;
} else {
self.out_of_sync += 1;
Expand Down Expand Up @@ -854,7 +854,7 @@ mod test {
});

let server = Box::new(Server::default());
inner.put(server, Instant::now()).unwrap();
inner.put(server).unwrap();

assert_eq!(inner.idle(), 0); // Connection given to waiter, not idle
assert_eq!(inner.checked_out(), 1); // Connection now checked out to waiter
Expand All @@ -869,7 +869,7 @@ mod test {
let mut inner = Inner::default();
let server = Box::new(Server::default());

inner.put(server, Instant::now()).unwrap();
inner.put(server).unwrap();

assert_eq!(inner.idle(), 1); // Connection added to idle pool
assert_eq!(inner.checked_out(), 0);
Expand Down Expand Up @@ -1046,7 +1046,7 @@ mod test {
assert_eq!(inner.waiting.len(), 3);

let server = Box::new(Server::default());
inner.put(server, Instant::now()).unwrap();
inner.put(server).unwrap();

// All waiters should be removed from queue since we tried each one
assert_eq!(inner.waiting.len(), 0);
Expand Down Expand Up @@ -1083,7 +1083,7 @@ mod test {
assert_eq!(inner.waiting.len(), 2);

let server = Box::new(Server::default());
inner.put(server, Instant::now()).unwrap();
inner.put(server).unwrap();

// All waiters should be removed since they were all dropped
assert_eq!(inner.waiting.len(), 0);
Expand Down
3 changes: 1 addition & 2 deletions pgdog/src/backend/pool/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,10 @@ impl Monitor {
/// Replenish pool with one new connection.
async fn replenish(&self, reason: ConnectReason) -> Result<bool, Error> {
if let Ok(conn) = Self::create_connection(&self.pool, reason).await {
let now = Instant::now();
let server = Box::new(conn);
let mut guard = self.pool.lock();
if guard.online {
guard.put(server, now)?;
guard.put(server)?
}
Ok(true)
} else {
Expand Down
20 changes: 7 additions & 13 deletions pgdog/src/backend/pool/pool_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,26 +123,21 @@ impl Pool {

// Fast path, idle connection probably available.
let (server, granted_at, paused) = {
// Ask for time before we acquire the lock
// and only if we actually waited for a connection.
let granted_at = request.created_at;
let elapsed = granted_at.saturating_duration_since(request.created_at);
let mut guard = self.lock();

if !guard.online {
return Err(Error::Offline);
}

let conn = guard.take(request)?;
// Capture the grant time after the lock and after take() so that
// lock contention and any in-lock work are included in wait_time.
let granted_at = Instant::now();

if conn.is_some() {
guard.stats.counts.wait_time += elapsed;
guard.stats.counts.server_assignment_count += 1;
if request.read {
guard.stats.counts.reads += 1;
} else {
guard.stats.counts.writes += 1;
}
guard
.stats
.record_checkout(request.created_at, request.read);
}

(conn, granted_at, guard.paused)
Expand Down Expand Up @@ -296,7 +291,6 @@ impl Pool {
pub(crate) fn move_conns_to(&self, destination: &Pool) -> Result<(), Error> {
// Ensure no deadlock.
assert!(self.inner.id != destination.id());
let now = Instant::now();

{
let mut from_guard = self.lock();
Expand All @@ -305,7 +299,7 @@ impl Pool {
from_guard.online = false;
let (idle, taken) = from_guard.move_conns_to(destination);
for server in idle {
to_guard.put(server, now)?;
to_guard.put(server)?;
}
to_guard.set_taken(taken);
}
Expand Down
1 change: 1 addition & 0 deletions pgdog/src/backend/pool/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl State {
maxwait: guard
.waiting
.iter()
// The first waiter is the oldest, so their metric is basically the max wait time
.next()
.map(|req| now.duration_since(req.request.created_at))
.unwrap_or(Duration::ZERO),
Expand Down
14 changes: 14 additions & 0 deletions pgdog/src/backend/pool/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use pgdog_stats::memory::MemoryStats as StatsMemoryStats;
use pgdog_stats::pool::Counts as StatsCounts;
use pgdog_stats::pool::Stats as StatsStats;
use pgdog_stats::MessageBufferStats;
use tokio::time::Instant;

/// Pool statistics.
///
Expand Down Expand Up @@ -106,6 +107,19 @@ impl Stats {
pub fn calc_averages(&mut self, time: Duration) {
self.inner.calc_averages(time);
}

/// Record a successful connection checkout.
/// Centralises the four counts that must always move together:
/// wait time, assignment counter, and the read/write routing counter.
pub fn record_checkout(&mut self, started_at: Instant, read: bool) {
self.counts.wait_time += started_at.elapsed();
self.counts.server_assignment_count += 1;
if read {
self.counts.reads += 1;
} else {
self.counts.writes += 1;
}
}
}

/// Statistics calculated for the network buffer used
Expand Down
Loading