Skip to content

Commit 01efa4c

Browse files
authored
Merge pull request #365 from 9uapaw/fix-await-requests-metrics
Fix active wait requests metric
2 parents 75c51d9 + 382c187 commit 01efa4c

File tree

1 file changed

+29
-23
lines changed

1 file changed

+29
-23
lines changed

src/conn/pool/mod.rs

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ impl Exchange {
107107
#[derive(Default, Debug)]
108108
struct Waitlist {
109109
queue: KeyedPriorityQueue<QueuedWaker, QueueId>,
110+
metrics: Arc<Metrics>,
110111
}
111112

112113
impl Waitlist {
@@ -125,19 +126,36 @@ impl Waitlist {
125126
// waker in the queue.
126127
let occupied = self.remove(queue_id);
127128
self.queue.push(QueuedWaker { queue_id, waker }, queue_id);
129+
130+
self.metrics
131+
.active_wait_requests
132+
.fetch_add(1, atomic::Ordering::Relaxed);
133+
128134
!occupied
129135
}
130136

131137
fn pop(&mut self) -> Option<Waker> {
132138
match self.queue.pop() {
133-
Some((qw, _)) => Some(qw.waker),
139+
Some((qw, _)) => {
140+
self.metrics
141+
.active_wait_requests
142+
.fetch_sub(1, atomic::Ordering::Relaxed);
143+
Some(qw.waker)
144+
}
134145
None => None,
135146
}
136147
}
137148

138149
/// Returns `true` if removed.
139150
fn remove(&mut self, id: QueueId) -> bool {
140-
self.queue.remove(&id).is_some()
151+
let is_removed = self.queue.remove(&id).is_some();
152+
if is_removed {
153+
self.metrics
154+
.active_wait_requests
155+
.fetch_sub(1, atomic::Ordering::Relaxed);
156+
}
157+
158+
is_removed
141159
}
142160

143161
fn peek_id(&mut self) -> Option<QueueId> {
@@ -146,7 +164,6 @@ impl Waitlist {
146164
}
147165

148166
const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));
149-
150167
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
151168
pub(crate) struct QueueId(Reverse<u64>);
152169

@@ -221,16 +238,20 @@ impl Pool {
221238
{
222239
let opts = Opts::try_from(opts).unwrap();
223240
let pool_opts = opts.pool_opts().clone();
241+
let metrics = Arc::new(Metrics::default());
224242
let (tx, rx) = mpsc::unbounded_channel();
225243
Pool {
226244
opts,
227245
inner: Arc::new(Inner {
228246
close: false.into(),
229247
closed: false.into(),
230-
metrics: Arc::new(Metrics::default()),
248+
metrics: metrics.clone(),
231249
exchange: Mutex::new(Exchange {
232250
available: VecDeque::with_capacity(pool_opts.constraints().max()),
233-
waiting: Waitlist::default(),
251+
waiting: Waitlist {
252+
queue: KeyedPriorityQueue::default(),
253+
metrics,
254+
},
234255
exist: 0,
235256
recycler: Some((rx, pool_opts)),
236257
}),
@@ -341,12 +362,7 @@ impl Pool {
341362

342363
// If we are not, just queue
343364
if !highest {
344-
if exchange.waiting.push(cx.waker().clone(), queue_id) {
345-
self.inner
346-
.metrics
347-
.active_wait_requests
348-
.fetch_add(1, atomic::Ordering::Relaxed);
349-
}
365+
exchange.waiting.push(cx.waker().clone(), queue_id);
350366
return Poll::Pending;
351367
}
352368

@@ -423,23 +439,13 @@ impl Pool {
423439
}
424440

425441
// Polled, but no conn available? Back into the queue.
426-
if exchange.waiting.push(cx.waker().clone(), queue_id) {
427-
self.inner
428-
.metrics
429-
.active_wait_requests
430-
.fetch_add(1, atomic::Ordering::Relaxed);
431-
}
442+
exchange.waiting.push(cx.waker().clone(), queue_id);
432443
Poll::Pending
433444
}
434445

435446
fn unqueue(&self, queue_id: QueueId) {
436447
let mut exchange = self.inner.exchange.lock().unwrap();
437-
if exchange.waiting.remove(queue_id) {
438-
self.inner
439-
.metrics
440-
.active_wait_requests
441-
.fetch_sub(1, atomic::Ordering::Relaxed);
442-
}
448+
exchange.waiting.remove(queue_id);
443449
}
444450
}
445451

0 commit comments

Comments
 (0)