Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 62 additions & 20 deletions metrics-exporter-prometheus/src/recorder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::io;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::{PoisonError, RwLock};
Expand Down Expand Up @@ -113,54 +114,70 @@ impl Inner {
}
}

fn render(&self) -> String {
fn render_to_write(&self, output: &mut impl io::Write) -> io::Result<()> {
let Snapshot { mut counters, mut distributions, mut gauges } = self.get_recent_metrics();

let mut output = String::new();
let mut intermediate = String::new();
let descriptions = self.descriptions.read().unwrap_or_else(PoisonError::into_inner);

for (name, mut by_labels) in counters.drain() {
let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
let unit = unit.filter(|_| self.enable_unit_suffix);
write_help_line(&mut output, name.as_str(), unit, self.counter_suffix, desc);
write_help_line(&mut intermediate, name.as_str(), unit, self.counter_suffix, desc);
unit
});

write_type_line(&mut output, name.as_str(), unit, self.counter_suffix, "counter");
write_type_line(&mut intermediate, name.as_str(), unit, self.counter_suffix, "counter");

// A chunk is emitted here, just in case there are a large number of sets below.
output.write_all(intermediate.as_bytes())?;
intermediate.clear();

for (labels, value) in by_labels.drain() {
write_metric_line::<&str, u64>(
&mut output,
&mut intermediate,
&name,
self.counter_suffix,
&labels,
None,
value,
unit,
);
// Each set gets its own write invocation.
output.write_all(intermediate.as_bytes())?;
intermediate.clear();
}
output.push('\n');
output.write_all(b"\n")?;
}

for (name, mut by_labels) in gauges.drain() {
let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
let unit = unit.filter(|_| self.enable_unit_suffix);
write_help_line(&mut output, name.as_str(), unit, None, desc);
write_help_line(&mut intermediate, name.as_str(), unit, None, desc);
unit
});

write_type_line(&mut output, name.as_str(), unit, None, "gauge");
write_type_line(&mut intermediate, name.as_str(), unit, None, "gauge");

// A chunk is emitted here, just in case there are a large number of sets below.
output.write_all(intermediate.as_bytes())?;
intermediate.clear();

for (labels, value) in by_labels.drain() {
write_metric_line::<&str, f64>(
&mut output,
&mut intermediate,
&name,
None,
&labels,
None,
value,
unit,
);
// Each set gets its own write invocation.
output.write_all(intermediate.as_bytes())?;
intermediate.clear();
}
output.push('\n');
output.write_all(b"\n")?;
}

for (name, mut by_labels) in distributions.drain() {
Expand All @@ -173,19 +190,24 @@ impl Inner {

let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
let unit = unit.filter(|_| self.enable_unit_suffix);
write_help_line(&mut output, name.as_str(), unit, None, desc);
write_help_line(&mut intermediate, name.as_str(), unit, None, desc);
unit
});

write_type_line(&mut output, name.as_str(), unit, None, distribution_type);
write_type_line(&mut intermediate, name.as_str(), unit, None, distribution_type);

// A chunk is emitted here, just in case there are a large number of sets below.
output.write_all(intermediate.as_bytes())?;
intermediate.clear();

for (labels, distribution) in by_labels.drain(..) {
let (sum, count) = match distribution {
Distribution::Summary(summary, quantiles, sum) => {
let snapshot = summary.snapshot(Instant::now());
for quantile in quantiles.iter() {
let value = snapshot.quantile(quantile.value()).unwrap_or(0.0);
write_metric_line(
&mut output,
&mut intermediate,
&name,
None,
&labels,
Expand All @@ -200,7 +222,7 @@ impl Inner {
Distribution::Histogram(histogram) => {
for (le, count) in histogram.buckets() {
write_metric_line(
&mut output,
&mut intermediate,
&name,
Some("bucket"),
&labels,
Expand All @@ -210,7 +232,7 @@ impl Inner {
);
}
write_metric_line(
&mut output,
&mut intermediate,
&name,
Some("bucket"),
&labels,
Expand All @@ -229,7 +251,7 @@ impl Inner {
};

write_metric_line::<&str, f64>(
&mut output,
&mut intermediate,
&name,
Some("sum"),
&labels,
Expand All @@ -238,20 +260,24 @@ impl Inner {
unit,
);
write_metric_line::<&str, u64>(
&mut output,
&mut intermediate,
&name,
Some("count"),
&labels,
None,
count,
unit,
);

// Each set gets its own write invocation.
output.write_all(intermediate.as_bytes())?;
intermediate.clear();
}

output.push('\n');
output.write_all(b"\n")?;
}

output
Ok(())
}

fn run_upkeep(&self) {
Expand Down Expand Up @@ -334,8 +360,24 @@ pub struct PrometheusHandle {
impl PrometheusHandle {
/// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to
/// the Prometheus exposition format.
#[allow(clippy::missing_panics_doc)]
pub fn render(&self) -> String {
self.inner.render()
let mut buf = Vec::new();
// UNWRAP: writing to a Vec<u8> does not fail.
self.inner.render_to_write(&mut buf).unwrap();
// UNWRAP: Prometheus exposition format is always UTF-8.
String::from_utf8(buf).unwrap()
}

/// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to
/// the Prometheus exposition format, incrementally. Use this function to emit metrics as a
/// stream without buffering the entire metrics export.
///
/// # Errors
///
/// Writing to the provided output fails.
pub fn render_to_write(&self, output: &mut impl io::Write) -> io::Result<()> {
self.inner.render_to_write(output)
}

/// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to
Expand Down