diff --git a/Cargo.lock b/Cargo.lock index 7ba737bef2f..69f58542b73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1244,6 +1244,7 @@ dependencies = [ "criterion", "criterion-perf-events", "crossbeam-channel", + "crossbeam-queue", "datadog-php-profiling", "env_logger 0.11.6", "lazy_static", diff --git a/profiling/Cargo.toml b/profiling/Cargo.toml index 222392889f8..3791c3d06c3 100644 --- a/profiling/Cargo.toml +++ b/profiling/Cargo.toml @@ -38,6 +38,7 @@ rustc-hash = "1.1.0" thiserror = "2" tracing = { version = "0.1", optional = true } uuid = { version = "1.0", features = ["v4"] } +crossbeam-queue = "0.3.12" [dependencies.tracing-subscriber] version = "0.3" diff --git a/profiling/src/profiling/mod.rs b/profiling/src/profiling/mod.rs index cc9cc845e60..47e4741ee04 100644 --- a/profiling/src/profiling/mod.rs +++ b/profiling/src/profiling/mod.rs @@ -24,14 +24,15 @@ use chrono::Utc; use core::mem::forget; use core::{ptr, str}; use crossbeam_channel::{Receiver, Sender, TrySendError}; +use crossbeam_queue::ArrayQueue; use libdd_profiling::api::{ Function, Label as ApiLabel, Location, Period, Sample, UpscalingInfo, ValueType as ApiValueType, }; use libdd_profiling::exporter::Tag; use libdd_profiling::internal::Profile as InternalProfile; use log::{debug, info, trace, warn}; +use rustc_hash::FxHashMap; use std::borrow::Cow; -use std::collections::HashMap; use std::hash::Hash; use std::num::NonZeroI64; use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, Ordering}; @@ -49,6 +50,9 @@ use crate::io::{ const UPLOAD_PERIOD: Duration = Duration::from_secs(67); +const MESSAGE_CHANNEL_CAPACITY: usize = 100 * if cfg!(php_zts) { 16 } else { 1 }; +const SAMPLE_QUEUE_CAPACITY: usize = 256 * if cfg!(php_zts) { 16 } else { 1 }; + pub const NO_TIMESTAMP: i64 = 0; // Guide: upload period / upload timeout should give about the order of @@ -191,7 +195,6 @@ pub struct LocalRootSpanResourceMessage { #[derive(Debug)] pub enum ProfilerMessage { Cancel, - Sample(SampleMessage), LocalRootSpanResource(LocalRootSpanResourceMessage), /// Used to put the helper thread into a barrier for caller so it can fork. @@ -221,6 +224,7 @@ pub struct Profiler { fork_barrier: Arc, interrupt_manager: Arc, message_sender: Sender, + sample_queue: Arc>, upload_sender: Sender, time_collector_handle: JoinHandle<()>, uploader_handle: JoinHandle<()>, @@ -237,14 +241,35 @@ struct TimeCollector { fork_barrier: Arc, interrupt_manager: Arc, message_receiver: Receiver, + sample_queue: Arc>, upload_sender: Sender, upload_period: Duration, } impl TimeCollector { + /// Processes pending samples. + /// + /// Since the samples are in a queue, it's possible new samples will be + /// added as this runs. This is somewhat mitigated by only draining up to + /// the configured queue capacity in one pass, so the collector remains + /// responsive to other messages even under sustained producer load. + fn process_pending_samples( + &self, + profiles: &mut FxHashMap, + last_wall_export: &WallTime, + ) { + for _ in 0..SAMPLE_QUEUE_CAPACITY { + if let Some(sample) = self.sample_queue.pop() { + Self::handle_sample_message(sample, profiles, last_wall_export); + } else { + break; + } + } + } + fn handle_timeout( &self, - profiles: &mut HashMap, + profiles: &mut FxHashMap, last_export: &WallTime, ) -> WallTime { let wall_export = WallTime::now(); @@ -473,7 +498,7 @@ impl TimeCollector { fn handle_resource_message( message: LocalRootSpanResourceMessage, - profiles: &mut HashMap, + profiles: &mut FxHashMap, ) { trace!( "Received Endpoint Profiling message for span id {}.", @@ -498,7 +523,7 @@ impl TimeCollector { fn handle_sample_message( message: SampleMessage, - profiles: &mut HashMap, + profiles: &mut FxHashMap, started_at: &WallTime, ) { if message.key.sample_types.is_empty() { @@ -556,7 +581,8 @@ impl TimeCollector { pub fn run(self) { let mut last_wall_export = WallTime::now(); - let mut profiles: HashMap = HashMap::with_capacity(1); + let mut profiles: FxHashMap = + FxHashMap::with_capacity_and_hasher(1, Default::default()); debug!( "Started with an upload period of {} seconds and approximate wall-time period of {} milliseconds.", @@ -586,11 +612,10 @@ impl TimeCollector { recv(self.message_receiver) -> result => { match result { Ok(message) => match message { - ProfilerMessage::Sample(sample) => - Self::handle_sample_message(sample, &mut profiles, &last_wall_export), ProfilerMessage::LocalRootSpanResource(message) => Self::handle_resource_message(message, &mut profiles), ProfilerMessage::Cancel => { + self.process_pending_samples(&mut profiles, &last_wall_export); // flush what we have before exiting last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export); running = false; @@ -629,11 +654,15 @@ impl TimeCollector { recv(upload_tick) -> message => { if message.is_ok() { + self.process_pending_samples(&mut profiles, &last_wall_export); last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export); } }, } + + // Process any queued samples after handling the wake reason. + self.process_pending_samples(&mut profiles, &last_wall_export); } } } @@ -674,12 +703,15 @@ impl Profiler { pub fn new(system_settings: &SystemSettings) -> Self { let fork_barrier = Arc::new(Barrier::new(3)); let interrupt_manager = Arc::new(InterruptManager::new()); - let (message_sender, message_receiver) = crossbeam_channel::bounded(100); + let (message_sender, message_receiver) = + crossbeam_channel::bounded(MESSAGE_CHANNEL_CAPACITY); + let sample_queue = Arc::new(ArrayQueue::new(SAMPLE_QUEUE_CAPACITY)); let (upload_sender, upload_receiver) = crossbeam_channel::bounded(UPLOAD_CHANNEL_CAPACITY); let time_collector = TimeCollector { fork_barrier: fork_barrier.clone(), interrupt_manager: interrupt_manager.clone(), message_receiver, + sample_queue: sample_queue.clone(), upload_sender: upload_sender.clone(), upload_period: UPLOAD_PERIOD, }; @@ -697,6 +729,7 @@ impl Profiler { fork_barrier, interrupt_manager, message_sender, + sample_queue, upload_sender, time_collector_handle: thread_utils::spawn(DDPROF_TIME, move || { time_collector.run(); @@ -762,15 +795,6 @@ impl Profiler { self.fork_barrier.wait(); } - pub fn send_sample( - &self, - message: SampleMessage, - ) -> Result<(), Box>> { - self.message_sender - .try_send(ProfilerMessage::Sample(message)) - .map_err(Box::new) - } - pub fn send_local_root_span_resource( &self, message: LocalRootSpanResourceMessage, @@ -1508,11 +1532,9 @@ impl Profiler { samples: SampleValues, labels: Vec