Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions profiling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rustc-hash = "1.1.0"
thiserror = "2"
tracing = { version = "0.1", optional = true }
uuid = { version = "1.0", features = ["v4"] }
crossbeam-queue = "0.3.12"

[dependencies.tracing-subscriber]
version = "0.3"
Expand Down
79 changes: 57 additions & 22 deletions profiling/src/profiling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ use chrono::Utc;
use core::mem::forget;
use core::{ptr, str};
use crossbeam_channel::{Receiver, Sender, TrySendError};
use crossbeam_queue::ArrayQueue;
use libdd_profiling::api::{
Function, Label as ApiLabel, Location, Period, Sample, UpscalingInfo, ValueType as ApiValueType,
};
use libdd_profiling::exporter::Tag;
use libdd_profiling::internal::Profile as InternalProfile;
use log::{debug, info, trace, warn};
use rustc_hash::FxHashMap;
use std::borrow::Cow;
use std::collections::HashMap;
use std::hash::Hash;
use std::num::NonZeroI64;
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, Ordering};
Expand All @@ -49,6 +50,9 @@ use crate::io::{

const UPLOAD_PERIOD: Duration = Duration::from_secs(67);

const MESSAGE_CHANNEL_CAPACITY: usize = 100 * if cfg!(php_zts) { 16 } else { 1 };
const SAMPLE_QUEUE_CAPACITY: usize = 256 * if cfg!(php_zts) { 16 } else { 1 };

pub const NO_TIMESTAMP: i64 = 0;

// Guide: upload period / upload timeout should give about the order of
Expand Down Expand Up @@ -191,7 +195,6 @@ pub struct LocalRootSpanResourceMessage {
#[derive(Debug)]
pub enum ProfilerMessage {
Cancel,
Sample(SampleMessage),
LocalRootSpanResource(LocalRootSpanResourceMessage),

/// Used to put the helper thread into a barrier for caller so it can fork.
Expand Down Expand Up @@ -221,6 +224,7 @@ pub struct Profiler {
fork_barrier: Arc<Barrier>,
interrupt_manager: Arc<InterruptManager>,
message_sender: Sender<ProfilerMessage>,
sample_queue: Arc<ArrayQueue<SampleMessage>>,
upload_sender: Sender<UploadMessage>,
time_collector_handle: JoinHandle<()>,
uploader_handle: JoinHandle<()>,
Expand All @@ -237,14 +241,35 @@ struct TimeCollector {
fork_barrier: Arc<Barrier>,
interrupt_manager: Arc<InterruptManager>,
message_receiver: Receiver<ProfilerMessage>,
sample_queue: Arc<ArrayQueue<SampleMessage>>,
upload_sender: Sender<UploadMessage>,
upload_period: Duration,
}

impl TimeCollector {
/// Processes pending samples.
///
/// Since the samples are in a queue, it's possible new samples will be
/// added as this runs. This is somewhat mitigated by only draining up to
/// the configured queue capacity in one pass, so the collector remains
/// responsive to other messages even under sustained producer load.
fn process_pending_samples(
&self,
profiles: &mut FxHashMap<ProfileIndex, InternalProfile>,
last_wall_export: &WallTime,
) {
for _ in 0..SAMPLE_QUEUE_CAPACITY {
if let Some(sample) = self.sample_queue.pop() {
Self::handle_sample_message(sample, profiles, last_wall_export);
} else {
break;
}
}
}

fn handle_timeout(
&self,
profiles: &mut HashMap<ProfileIndex, InternalProfile>,
profiles: &mut FxHashMap<ProfileIndex, InternalProfile>,
last_export: &WallTime,
) -> WallTime {
let wall_export = WallTime::now();
Expand Down Expand Up @@ -473,7 +498,7 @@ impl TimeCollector {

fn handle_resource_message(
message: LocalRootSpanResourceMessage,
profiles: &mut HashMap<ProfileIndex, InternalProfile>,
profiles: &mut FxHashMap<ProfileIndex, InternalProfile>,
) {
trace!(
"Received Endpoint Profiling message for span id {}.",
Expand All @@ -498,7 +523,7 @@ impl TimeCollector {

fn handle_sample_message(
message: SampleMessage,
profiles: &mut HashMap<ProfileIndex, InternalProfile>,
profiles: &mut FxHashMap<ProfileIndex, InternalProfile>,
started_at: &WallTime,
) {
if message.key.sample_types.is_empty() {
Expand Down Expand Up @@ -556,7 +581,8 @@ impl TimeCollector {

pub fn run(self) {
let mut last_wall_export = WallTime::now();
let mut profiles: HashMap<ProfileIndex, InternalProfile> = HashMap::with_capacity(1);
let mut profiles: FxHashMap<ProfileIndex, InternalProfile> =
FxHashMap::with_capacity_and_hasher(1, Default::default());

debug!(
"Started with an upload period of {} seconds and approximate wall-time period of {} milliseconds.",
Expand Down Expand Up @@ -586,11 +612,10 @@ impl TimeCollector {
recv(self.message_receiver) -> result => {
match result {
Ok(message) => match message {
ProfilerMessage::Sample(sample) =>
Self::handle_sample_message(sample, &mut profiles, &last_wall_export),
ProfilerMessage::LocalRootSpanResource(message) =>
Self::handle_resource_message(message, &mut profiles),
ProfilerMessage::Cancel => {
self.process_pending_samples(&mut profiles, &last_wall_export);
// flush what we have before exiting
last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export);
running = false;
Expand Down Expand Up @@ -629,11 +654,15 @@ impl TimeCollector {

recv(upload_tick) -> message => {
if message.is_ok() {
self.process_pending_samples(&mut profiles, &last_wall_export);
last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export);
}
},

}

// Process any queued samples after handling the wake reason.
self.process_pending_samples(&mut profiles, &last_wall_export);
}
}
}
Expand Down Expand Up @@ -674,12 +703,15 @@ impl Profiler {
pub fn new(system_settings: &SystemSettings) -> Self {
let fork_barrier = Arc::new(Barrier::new(3));
let interrupt_manager = Arc::new(InterruptManager::new());
let (message_sender, message_receiver) = crossbeam_channel::bounded(100);
let (message_sender, message_receiver) =
crossbeam_channel::bounded(MESSAGE_CHANNEL_CAPACITY);
let sample_queue = Arc::new(ArrayQueue::new(SAMPLE_QUEUE_CAPACITY));
let (upload_sender, upload_receiver) = crossbeam_channel::bounded(UPLOAD_CHANNEL_CAPACITY);
let time_collector = TimeCollector {
fork_barrier: fork_barrier.clone(),
interrupt_manager: interrupt_manager.clone(),
message_receiver,
sample_queue: sample_queue.clone(),
upload_sender: upload_sender.clone(),
upload_period: UPLOAD_PERIOD,
};
Expand All @@ -697,6 +729,7 @@ impl Profiler {
fork_barrier,
interrupt_manager,
message_sender,
sample_queue,
upload_sender,
time_collector_handle: thread_utils::spawn(DDPROF_TIME, move || {
time_collector.run();
Expand Down Expand Up @@ -762,15 +795,6 @@ impl Profiler {
self.fork_barrier.wait();
}

pub fn send_sample(
&self,
message: SampleMessage,
) -> Result<(), Box<TrySendError<ProfilerMessage>>> {
self.message_sender
.try_send(ProfilerMessage::Sample(message))
.map_err(Box::new)
}

pub fn send_local_root_span_resource(
&self,
message: LocalRootSpanResourceMessage,
Expand Down Expand Up @@ -1508,11 +1532,9 @@ impl Profiler {
samples: SampleValues,
labels: Vec<Label>,
timestamp: i64,
) -> Result<(), Box<TrySendError<ProfilerMessage>>> {
) -> Result<(), EnqueueError> {
let message = self.prepare_sample_message(frames, samples, labels, timestamp);
self.message_sender
.try_send(ProfilerMessage::Sample(message))
.map_err(Box::new)
self.enqueue_sample(message)
}

fn prepare_sample_message(
Expand Down Expand Up @@ -1542,6 +1564,19 @@ impl Profiler {
},
}
}

fn enqueue_sample(&self, message: SampleMessage) -> Result<(), EnqueueError> {
self.sample_queue.push(message).map_err(|_| EnqueueError)
}
}

#[derive(Debug)]
pub struct EnqueueError;

impl std::fmt::Display for EnqueueError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
"sample queue full; sample dropped".fmt(f)
}
}

pub struct JoinError {
Expand Down
Loading