From fa9c9764b27115b6a09d49b434b25d0d301f7655 Mon Sep 17 00:00:00 2001 From: Levi Morrison Date: Fri, 30 Jan 2026 15:38:00 -0700 Subject: [PATCH 1/5] perf(profiling): separate queue and waking thread On every sample we send data to another thread and attempt to wake that thread. The problem is the syscall here is almost as expensive as collecting the sample if the other thread is asleep, which is often the case. This branch avoids that. Instead it writes to a queue and samples are handled when that background thread wakes up for other reasons. Notably one of those reasons is every 10ms when wall-time is enabled (default) so _probably_ this should work pretty well without filling the queues. --- Cargo.lock | 1 + profiling/Cargo.toml | 1 + profiling/src/profiling/mod.rs | 58 ++++++++++++++++++++++++---------- 3 files changed, 43 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7ba737bef2f..69f58542b73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1244,6 +1244,7 @@ dependencies = [ "criterion", "criterion-perf-events", "crossbeam-channel", + "crossbeam-queue", "datadog-php-profiling", "env_logger 0.11.6", "lazy_static", diff --git a/profiling/Cargo.toml b/profiling/Cargo.toml index 222392889f8..3791c3d06c3 100644 --- a/profiling/Cargo.toml +++ b/profiling/Cargo.toml @@ -38,6 +38,7 @@ rustc-hash = "1.1.0" thiserror = "2" tracing = { version = "0.1", optional = true } uuid = { version = "1.0", features = ["v4"] } +crossbeam-queue = "0.3.12" [dependencies.tracing-subscriber] version = "0.3" diff --git a/profiling/src/profiling/mod.rs b/profiling/src/profiling/mod.rs index cc9cc845e60..b6d80174aa0 100644 --- a/profiling/src/profiling/mod.rs +++ b/profiling/src/profiling/mod.rs @@ -24,6 +24,7 @@ use chrono::Utc; use core::mem::forget; use core::{ptr, str}; use crossbeam_channel::{Receiver, Sender, TrySendError}; +use crossbeam_queue::ArrayQueue; use libdd_profiling::api::{ Function, Label as ApiLabel, Location, Period, Sample, UpscalingInfo, ValueType as ApiValueType, }; @@ -49,6 +50,9 @@ use crate::io::{ const UPLOAD_PERIOD: Duration = Duration::from_secs(67); +const MESSAGE_CHANNEL_CAPACITY: usize = 100 * if cfg!(php_zts) { 16 } else { 1 }; +const SAMPLE_QUEUE_CAPACITY: usize = 256 * if cfg!(php_zts) { 16 } else { 1 }; + pub const NO_TIMESTAMP: i64 = 0; // Guide: upload period / upload timeout should give about the order of @@ -191,7 +195,6 @@ pub struct LocalRootSpanResourceMessage { #[derive(Debug)] pub enum ProfilerMessage { Cancel, - Sample(SampleMessage), LocalRootSpanResource(LocalRootSpanResourceMessage), /// Used to put the helper thread into a barrier for caller so it can fork. @@ -221,6 +224,7 @@ pub struct Profiler { fork_barrier: Arc, interrupt_manager: Arc, message_sender: Sender, + sample_queue: Arc>, upload_sender: Sender, time_collector_handle: JoinHandle<()>, uploader_handle: JoinHandle<()>, @@ -237,11 +241,22 @@ struct TimeCollector { fork_barrier: Arc, interrupt_manager: Arc, message_receiver: Receiver, + sample_queue: Arc>, upload_sender: Sender, upload_period: Duration, } impl TimeCollector { + fn drain_sample_queue( + &self, + profiles: &mut HashMap, + last_wall_export: &WallTime, + ) { + while let Some(sample) = self.sample_queue.pop() { + Self::handle_sample_message(sample, profiles, last_wall_export); + } + } + fn handle_timeout( &self, profiles: &mut HashMap, @@ -586,11 +601,10 @@ impl TimeCollector { recv(self.message_receiver) -> result => { match result { Ok(message) => match message { - ProfilerMessage::Sample(sample) => - Self::handle_sample_message(sample, &mut profiles, &last_wall_export), ProfilerMessage::LocalRootSpanResource(message) => Self::handle_resource_message(message, &mut profiles), ProfilerMessage::Cancel => { + self.drain_sample_queue(&mut profiles, &last_wall_export); // flush what we have before exiting last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export); running = false; @@ -629,11 +643,15 @@ impl TimeCollector { recv(upload_tick) -> message => { if message.is_ok() { + self.drain_sample_queue(&mut profiles, &last_wall_export); last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export); } }, } + + // Process any queued samples after handling the wake reason. + self.drain_sample_queue(&mut profiles, &last_wall_export); } } } @@ -674,12 +692,15 @@ impl Profiler { pub fn new(system_settings: &SystemSettings) -> Self { let fork_barrier = Arc::new(Barrier::new(3)); let interrupt_manager = Arc::new(InterruptManager::new()); - let (message_sender, message_receiver) = crossbeam_channel::bounded(100); + let (message_sender, message_receiver) = + crossbeam_channel::bounded(MESSAGE_CHANNEL_CAPACITY); + let sample_queue = Arc::new(ArrayQueue::new(SAMPLE_QUEUE_CAPACITY)); let (upload_sender, upload_receiver) = crossbeam_channel::bounded(UPLOAD_CHANNEL_CAPACITY); let time_collector = TimeCollector { fork_barrier: fork_barrier.clone(), interrupt_manager: interrupt_manager.clone(), message_receiver, + sample_queue: sample_queue.clone(), upload_sender: upload_sender.clone(), upload_period: UPLOAD_PERIOD, }; @@ -697,6 +718,7 @@ impl Profiler { fork_barrier, interrupt_manager, message_sender, + sample_queue, upload_sender, time_collector_handle: thread_utils::spawn(DDPROF_TIME, move || { time_collector.run(); @@ -762,15 +784,6 @@ impl Profiler { self.fork_barrier.wait(); } - pub fn send_sample( - &self, - message: SampleMessage, - ) -> Result<(), Box>> { - self.message_sender - .try_send(ProfilerMessage::Sample(message)) - .map_err(Box::new) - } - pub fn send_local_root_span_resource( &self, message: LocalRootSpanResourceMessage, @@ -1508,11 +1521,9 @@ impl Profiler { samples: SampleValues, labels: Vec