Skip to content

Commit 2ee534b

Browse files
author
Nathan Moos
committed
feat(prometheus): add render_to_write function
In order to enable callers to emit metrics in chunks, or to a small buffer to keep memory consumption flat, this patch introduces the `render_to_write` function, which can be used to stream metrics in chunks, avoiding buffering the entire set of metrics into a single string.
1 parent c664a01 commit 2ee534b

File tree

1 file changed

+62
-20
lines changed

1 file changed

+62
-20
lines changed

metrics-exporter-prometheus/src/recorder.rs

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::collections::HashMap;
22
use std::sync::atomic::Ordering;
33
use std::sync::Arc;
44
use std::sync::{PoisonError, RwLock};
5+
use std::io;
56

67
use indexmap::IndexMap;
78
use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit};
@@ -113,54 +114,70 @@ impl Inner {
113114
}
114115
}
115116

116-
fn render(&self) -> String {
117+
fn render_to_write(&self, output: &mut impl io::Write) -> io::Result<()> {
117118
let Snapshot { mut counters, mut distributions, mut gauges } = self.get_recent_metrics();
118119

119-
let mut output = String::new();
120+
let mut intermediate = String::new();
120121
let descriptions = self.descriptions.read().unwrap_or_else(PoisonError::into_inner);
121122

122123
for (name, mut by_labels) in counters.drain() {
123124
let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
124125
let unit = unit.filter(|_| self.enable_unit_suffix);
125-
write_help_line(&mut output, name.as_str(), unit, self.counter_suffix, desc);
126+
write_help_line(&mut intermediate, name.as_str(), unit, self.counter_suffix, desc);
126127
unit
127128
});
128129

129-
write_type_line(&mut output, name.as_str(), unit, self.counter_suffix, "counter");
130+
write_type_line(&mut intermediate, name.as_str(), unit, self.counter_suffix, "counter");
131+
132+
// A chunk is emitted here, just in case there are a large number of sets below.
133+
output.write_all(intermediate.as_bytes())?;
134+
intermediate.clear();
135+
130136
for (labels, value) in by_labels.drain() {
131137
write_metric_line::<&str, u64>(
132-
&mut output,
138+
&mut intermediate,
133139
&name,
134140
self.counter_suffix,
135141
&labels,
136142
None,
137143
value,
138144
unit,
139145
);
146+
// Each set gets its own write invocation.
147+
output.write_all(intermediate.as_bytes())?;
148+
intermediate.clear();
140149
}
141-
output.push('\n');
150+
output.write_all(b"\n")?;
142151
}
143152

144153
for (name, mut by_labels) in gauges.drain() {
145154
let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
146155
let unit = unit.filter(|_| self.enable_unit_suffix);
147-
write_help_line(&mut output, name.as_str(), unit, None, desc);
156+
write_help_line(&mut intermediate, name.as_str(), unit, None, desc);
148157
unit
149158
});
150159

151-
write_type_line(&mut output, name.as_str(), unit, None, "gauge");
160+
write_type_line(&mut intermediate, name.as_str(), unit, None, "gauge");
161+
162+
// A chunk is emitted here, just in case there are a large number of sets below.
163+
output.write_all(intermediate.as_bytes())?;
164+
intermediate.clear();
165+
152166
for (labels, value) in by_labels.drain() {
153167
write_metric_line::<&str, f64>(
154-
&mut output,
168+
&mut intermediate,
155169
&name,
156170
None,
157171
&labels,
158172
None,
159173
value,
160174
unit,
161175
);
176+
// Each set gets its own write invocation.
177+
output.write_all(intermediate.as_bytes())?;
178+
intermediate.clear();
162179
}
163-
output.push('\n');
180+
output.write_all(b"\n")?;
164181
}
165182

166183
for (name, mut by_labels) in distributions.drain() {
@@ -173,19 +190,24 @@ impl Inner {
173190

174191
let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
175192
let unit = unit.filter(|_| self.enable_unit_suffix);
176-
write_help_line(&mut output, name.as_str(), unit, None, desc);
193+
write_help_line(&mut intermediate, name.as_str(), unit, None, desc);
177194
unit
178195
});
179196

180-
write_type_line(&mut output, name.as_str(), unit, None, distribution_type);
197+
write_type_line(&mut intermediate, name.as_str(), unit, None, distribution_type);
198+
199+
// A chunk is emitted here, just in case there are a large number of sets below.
200+
output.write_all(intermediate.as_bytes())?;
201+
intermediate.clear();
202+
181203
for (labels, distribution) in by_labels.drain(..) {
182204
let (sum, count) = match distribution {
183205
Distribution::Summary(summary, quantiles, sum) => {
184206
let snapshot = summary.snapshot(Instant::now());
185207
for quantile in quantiles.iter() {
186208
let value = snapshot.quantile(quantile.value()).unwrap_or(0.0);
187209
write_metric_line(
188-
&mut output,
210+
&mut intermediate,
189211
&name,
190212
None,
191213
&labels,
@@ -200,7 +222,7 @@ impl Inner {
200222
Distribution::Histogram(histogram) => {
201223
for (le, count) in histogram.buckets() {
202224
write_metric_line(
203-
&mut output,
225+
&mut intermediate,
204226
&name,
205227
Some("bucket"),
206228
&labels,
@@ -210,7 +232,7 @@ impl Inner {
210232
);
211233
}
212234
write_metric_line(
213-
&mut output,
235+
&mut intermediate,
214236
&name,
215237
Some("bucket"),
216238
&labels,
@@ -229,7 +251,7 @@ impl Inner {
229251
};
230252

231253
write_metric_line::<&str, f64>(
232-
&mut output,
254+
&mut intermediate,
233255
&name,
234256
Some("sum"),
235257
&labels,
@@ -238,20 +260,24 @@ impl Inner {
238260
unit,
239261
);
240262
write_metric_line::<&str, u64>(
241-
&mut output,
263+
&mut intermediate,
242264
&name,
243265
Some("count"),
244266
&labels,
245267
None,
246268
count,
247269
unit,
248270
);
271+
272+
// Each set gets its own write invocation.
273+
output.write_all(intermediate.as_bytes())?;
274+
intermediate.clear();
249275
}
250276

251-
output.push('\n');
277+
output.write_all(b"\n")?;
252278
}
253279

254-
output
280+
Ok(())
255281
}
256282

257283
fn run_upkeep(&self) {
@@ -334,8 +360,24 @@ pub struct PrometheusHandle {
334360
impl PrometheusHandle {
335361
/// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to
336362
/// the Prometheus exposition format.
363+
#[allow(clippy::missing_panics_doc)]
337364
pub fn render(&self) -> String {
338-
self.inner.render()
365+
let mut buf = Vec::new();
366+
// UNWRAP: writing to a Vec<u8> does not fail.
367+
self.inner.render_to_write(&mut buf).unwrap();
368+
// UNWRAP: Prometheus exposition format is always UTF-8.
369+
String::from_utf8(buf).unwrap()
370+
}
371+
372+
/// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to
373+
/// the Prometheus exposition format, incrementally. Use this function to emit metrics as a
374+
/// stream without buffering the entire metrics export.
375+
///
376+
/// # Errors
377+
///
378+
/// Writing to the provided output fails.
379+
pub fn render_to_write(&self, output: &mut impl io::Write) -> io::Result<()> {
380+
self.inner.render_to_write(output)
339381
}
340382

341383
/// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to

0 commit comments

Comments
 (0)