From b691105e46073c40c1529ab7dae374574a3496bf Mon Sep 17 00:00:00 2001 From: Nathan Moos Date: Fri, 14 Nov 2025 15:12:44 -0800 Subject: [PATCH] feat(prometheus): add render_to_write function In order to enable callers to emit metrics in chunks, or to a small buffer to keep memory consumption flat, this patch introduces the `render_to_write` function, which can be used to stream metrics in chunks, avoiding buffering the entire set of metrics into a single string. --- metrics-exporter-prometheus/src/recorder.rs | 82 ++++++++++++++++----- 1 file changed, 62 insertions(+), 20 deletions(-) diff --git a/metrics-exporter-prometheus/src/recorder.rs b/metrics-exporter-prometheus/src/recorder.rs index c3f64ffa..1ff09038 100644 --- a/metrics-exporter-prometheus/src/recorder.rs +++ b/metrics-exporter-prometheus/src/recorder.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::io; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::{PoisonError, RwLock}; @@ -113,23 +114,28 @@ impl Inner { } } - fn render(&self) -> String { + fn render_to_write(&self, output: &mut impl io::Write) -> io::Result<()> { let Snapshot { mut counters, mut distributions, mut gauges } = self.get_recent_metrics(); - let mut output = String::new(); + let mut intermediate = String::new(); let descriptions = self.descriptions.read().unwrap_or_else(PoisonError::into_inner); for (name, mut by_labels) in counters.drain() { let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| { let unit = unit.filter(|_| self.enable_unit_suffix); - write_help_line(&mut output, name.as_str(), unit, self.counter_suffix, desc); + write_help_line(&mut intermediate, name.as_str(), unit, self.counter_suffix, desc); unit }); - write_type_line(&mut output, name.as_str(), unit, self.counter_suffix, "counter"); + write_type_line(&mut intermediate, name.as_str(), unit, self.counter_suffix, "counter"); + + // A chunk is emitted here, just in case there are a large number of sets below. + output.write_all(intermediate.as_bytes())?; + intermediate.clear(); + for (labels, value) in by_labels.drain() { write_metric_line::<&str, u64>( - &mut output, + &mut intermediate, &name, self.counter_suffix, &labels, @@ -137,21 +143,29 @@ impl Inner { value, unit, ); + // Each set gets its own write invocation. + output.write_all(intermediate.as_bytes())?; + intermediate.clear(); } - output.push('\n'); + output.write_all(b"\n")?; } for (name, mut by_labels) in gauges.drain() { let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| { let unit = unit.filter(|_| self.enable_unit_suffix); - write_help_line(&mut output, name.as_str(), unit, None, desc); + write_help_line(&mut intermediate, name.as_str(), unit, None, desc); unit }); - write_type_line(&mut output, name.as_str(), unit, None, "gauge"); + write_type_line(&mut intermediate, name.as_str(), unit, None, "gauge"); + + // A chunk is emitted here, just in case there are a large number of sets below. + output.write_all(intermediate.as_bytes())?; + intermediate.clear(); + for (labels, value) in by_labels.drain() { write_metric_line::<&str, f64>( - &mut output, + &mut intermediate, &name, None, &labels, @@ -159,8 +173,11 @@ impl Inner { value, unit, ); + // Each set gets its own write invocation. + output.write_all(intermediate.as_bytes())?; + intermediate.clear(); } - output.push('\n'); + output.write_all(b"\n")?; } for (name, mut by_labels) in distributions.drain() { @@ -173,11 +190,16 @@ impl Inner { let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| { let unit = unit.filter(|_| self.enable_unit_suffix); - write_help_line(&mut output, name.as_str(), unit, None, desc); + write_help_line(&mut intermediate, name.as_str(), unit, None, desc); unit }); - write_type_line(&mut output, name.as_str(), unit, None, distribution_type); + write_type_line(&mut intermediate, name.as_str(), unit, None, distribution_type); + + // A chunk is emitted here, just in case there are a large number of sets below. + output.write_all(intermediate.as_bytes())?; + intermediate.clear(); + for (labels, distribution) in by_labels.drain(..) { let (sum, count) = match distribution { Distribution::Summary(summary, quantiles, sum) => { @@ -185,7 +207,7 @@ impl Inner { for quantile in quantiles.iter() { let value = snapshot.quantile(quantile.value()).unwrap_or(0.0); write_metric_line( - &mut output, + &mut intermediate, &name, None, &labels, @@ -200,7 +222,7 @@ impl Inner { Distribution::Histogram(histogram) => { for (le, count) in histogram.buckets() { write_metric_line( - &mut output, + &mut intermediate, &name, Some("bucket"), &labels, @@ -210,7 +232,7 @@ impl Inner { ); } write_metric_line( - &mut output, + &mut intermediate, &name, Some("bucket"), &labels, @@ -229,7 +251,7 @@ impl Inner { }; write_metric_line::<&str, f64>( - &mut output, + &mut intermediate, &name, Some("sum"), &labels, @@ -238,7 +260,7 @@ impl Inner { unit, ); write_metric_line::<&str, u64>( - &mut output, + &mut intermediate, &name, Some("count"), &labels, @@ -246,12 +268,16 @@ impl Inner { count, unit, ); + + // Each set gets its own write invocation. + output.write_all(intermediate.as_bytes())?; + intermediate.clear(); } - output.push('\n'); + output.write_all(b"\n")?; } - output + Ok(()) } fn run_upkeep(&self) { @@ -334,8 +360,24 @@ pub struct PrometheusHandle { impl PrometheusHandle { /// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to /// the Prometheus exposition format. + #[allow(clippy::missing_panics_doc)] pub fn render(&self) -> String { - self.inner.render() + let mut buf = Vec::new(); + // UNWRAP: writing to a Vec does not fail. + self.inner.render_to_write(&mut buf).unwrap(); + // UNWRAP: Prometheus exposition format is always UTF-8. + String::from_utf8(buf).unwrap() + } + + /// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to + /// the Prometheus exposition format, incrementally. Use this function to emit metrics as a + /// stream without buffering the entire metrics export. + /// + /// # Errors + /// + /// Writing to the provided output fails. + pub fn render_to_write(&self, output: &mut impl io::Write) -> io::Result<()> { + self.inner.render_to_write(output) } /// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to