Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/spawned_task_component_tags.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Internal telemetry (metrics and logs) emitted from work that Vector runs on spawned `tokio` tasks now correctly inherits the owning component's tags (`component_id`, `component_kind`, `component_type`). Previously, several components spawned background tasks without propagating the tracing span, so some internal events emitted from those tasks were missing their component tags. Affected emissions include the `datadog_logs` sink's `component_discarded_events_total` (events too large to encode), the `gcp_pubsub` source's `component_errors_total`/`component_discarded_events_total` from its per-stream tasks, and the `splunk_hec` sinks' acknowledgement-handling `component_errors_total`.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed

authors: gwenaskell
17 changes: 10 additions & 7 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::{
time::sleep,
};

use tracing::{debug, error, info, trace};
use tracing::{Instrument, debug, error, info, trace};

use crate::{
file_watcher::{FileWatcher, RawLineResult},
Expand Down Expand Up @@ -148,12 +148,15 @@ where
let mut stats = TimingStats::default();

// Spawn the checkpoint writer task
let checkpoint_task_handle = tokio::spawn(checkpoint_writer(
checkpointer,
self.glob_minimum_cooldown,
shutdown_checkpointer,
self.emitter.clone(),
));
let checkpoint_task_handle = tokio::spawn(
checkpoint_writer(
checkpointer,
self.glob_minimum_cooldown,
shutdown_checkpointer,
self.emitter.clone(),
)
.in_current_span(),
);

// Alright friends, how does this work?
//
Expand Down
14 changes: 9 additions & 5 deletions lib/vector-buffers/src/variants/disk_v2/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use futures::StreamExt;
use rkyv::{Archive, Serialize, with::Atomic};
use snafu::{ResultExt, Snafu};
use tokio::{fs, io::AsyncWriteExt, sync::Notify};
use tracing::Instrument;
use vector_common::finalizer::OrderedFinalizer;

use super::{
Expand Down Expand Up @@ -700,12 +701,15 @@ where
#[must_use]
pub(super) fn spawn_finalizer(self: Arc<Self>) -> OrderedFinalizer<u64> {
let (finalizer, mut stream) = OrderedFinalizer::new(None);
tokio::spawn(async move {
while let Some((_status, amount)) = stream.next().await {
self.increment_pending_acks(amount);
self.notify_writer_waiters();
tokio::spawn(
async move {
while let Some((_status, amount)) = stream.next().await {
self.increment_pending_acks(amount);
self.notify_writer_waiters();
}
}
});
.in_current_span(),
);
finalizer
}
}
Expand Down
4 changes: 4 additions & 0 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ impl<'a> SendGroup<'a> {

fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
if let Some(send) = self.sends.remove(id) {
// Deliberately not instrumented with the current span: this drains a send to a sink
// that has just been detached from the topology, so it is unrelated to the upstream
// component that owns this fanout. Attaching the current span would mis-tag this
// task's logs with the upstream component's identity rather than the detached sink's.
tokio::spawn(async move {
if let Err(e) = send.await {
warn!(
Expand Down
5 changes: 5 additions & 0 deletions lib/vector-stream/src/concurrent_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ where
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(item)) => {
let fut = (this.f)(item);
// `ConcurrentMap` does not instrument the spawned future itself: the
// mapping closure runs on a detached task, so the current span at poll
// time is not necessarily meaningful for the work being performed. It is
// the caller's responsibility to propagate any span (e.g. the owning
// component's span for internal metric/log tagging) into `fut`.
let handle = tokio::spawn(fut);
this.in_flight.push_back(handle);
}
Expand Down
25 changes: 14 additions & 11 deletions lib/vector-tap/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,20 @@ impl TapController {
fn shutdown_trigger(control_tx: fanout::ControlChannel, sink_id: ComponentKey) -> ShutdownTx {
let (shutdown_tx, shutdown_rx) = oneshot::channel();

tokio::spawn(async move {
_ = shutdown_rx.await;
if control_tx
.send(fanout::ControlMessage::Remove(sink_id.clone()))
.is_err()
{
debug!(message = "Couldn't disconnect sink.", ?sink_id);
} else {
debug!(message = "Disconnected sink.", ?sink_id);
tokio::spawn(
async move {
_ = shutdown_rx.await;
if control_tx
.send(fanout::ControlMessage::Remove(sink_id.clone()))
.is_err()
{
debug!(message = "Couldn't disconnect sink.", ?sink_id);
} else {
debug!(message = "Disconnected sink.", ?sink_id);
}
}
});
.in_current_span(),
);

shutdown_tx
}
Expand Down Expand Up @@ -370,7 +373,7 @@ async fn tap_handler(
while let Some(events) = tap_buffer_rx.next().await {
tap_transformer.try_send(events);
}
});
}.in_current_span());

// Attempt to connect the sink.
//
Expand Down
36 changes: 20 additions & 16 deletions src/api/grpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tokio_stream::{
wrappers::{IntervalStream, ReceiverStream},
};
use tonic::{Request, Response, Status};
use tracing::Instrument;
use vector_lib::tap::{
controller::{TapController, TapPatterns, TapPayload},
topology::WatchRx,
Expand Down Expand Up @@ -676,27 +677,30 @@ impl observability::Service for ObservabilityService {

let watch_rx = self.watch_rx.clone();

tokio::spawn(async move {
let _tap_controller = TapController::new(watch_rx, tap_tx, patterns);
let mut tap_rx = ReceiverStream::new(tap_rx);
let mut interval = time::interval(time::Duration::from_millis(interval_ms));
let mut reservoir = Reservoir::new(limit);

loop {
select! {
Some(tap_payload) = tokio_stream::StreamExt::next(&mut tap_rx) => {
if reservoir.handle_payload(tap_payload, &event_tx).await.is_err() {
break;
tokio::spawn(
async move {
let _tap_controller = TapController::new(watch_rx, tap_tx, patterns);
let mut tap_rx = ReceiverStream::new(tap_rx);
let mut interval = time::interval(time::Duration::from_millis(interval_ms));
let mut reservoir = Reservoir::new(limit);

loop {
select! {
Some(tap_payload) = tokio_stream::StreamExt::next(&mut tap_rx) => {
if reservoir.handle_payload(tap_payload, &event_tx).await.is_err() {
break;
}
}
}
_ = interval.tick() => {
if event_tx.is_closed() || reservoir.flush(&event_tx).await.is_err() {
break;
_ = interval.tick() => {
if event_tx.is_closed() || reservoir.flush(&event_tx).await.is_err() {
break;
}
}
}
}
}
});
.in_current_span(),
);

let stream = FuturesStreamExt::flat_map(ReceiverStream::new(event_rx), |events| {
stream::iter(events.into_iter().map(Ok))
Expand Down
80 changes: 42 additions & 38 deletions src/api/grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use axum::{
use tokio::sync::oneshot;
use tonic::transport::Server as TonicServer;
use tonic_health::server::{HealthReporter, health_reporter};
use tracing::Instrument;
use vector_lib::tap::topology::WatchRx;

use super::grpc::ObservabilityService;
Expand Down Expand Up @@ -80,45 +81,48 @@ impl GrpcServer {
let router_serving = Arc::clone(&serving);

// Spawn the server with the already-bound listener
tokio::spawn(async move {
// Build reflection service for tools like grpcurl
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(
crate::proto::observability::FILE_DESCRIPTOR_SET,
)
.register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
.build()
.expect("Failed to build reflection service");

// Build the tonic router (gRPC services) and merge with the HTTP router
// so both protocols share the same port. `accept_http1(true)` lets plain
// HTTP/1.1 requests reach the merged axum routes.
let router = TonicServer::builder()
.accept_http1(true)
.add_service(health_service)
.add_service(ObservabilityServer::new(service))
.add_service(reflection_service)
.into_router()
.merge(http_router(router_serving));

let result = hyper::Server::from_tcp(std_listener)
.expect("Failed to build HTTP server from TCP listener")
.serve(router.into_make_service())
.with_graceful_shutdown(async {
rx.await.ok();
info!("GRPC API server shutting down.");
})
.await;

if let Err(e) = result {
error!(
message = "GRPC server encountered an error.",
error = %e,
error_source = ?e.source(),
bind_addr = %actual_addr,
);
tokio::spawn(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be helpful to have a spawn_in_current_task wrapper for this common operation? It's a relatively trivial amount of code duplication, but it would simplify searching for spawns that are missing spans and potentially even block calling tokio::spawn directly without proper annotations.

async move {
// Build reflection service for tools like grpcurl
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(
crate::proto::observability::FILE_DESCRIPTOR_SET,
)
.register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
.build()
.expect("Failed to build reflection service");

// Build the tonic router (gRPC services) and merge with the HTTP router
// so both protocols share the same port. `accept_http1(true)` lets plain
// HTTP/1.1 requests reach the merged axum routes.
let router = TonicServer::builder()
.accept_http1(true)
.add_service(health_service)
.add_service(ObservabilityServer::new(service))
.add_service(reflection_service)
.into_router()
.merge(http_router(router_serving));

let result = hyper::Server::from_tcp(std_listener)
.expect("Failed to build HTTP server from TCP listener")
.serve(router.into_make_service())
.with_graceful_shutdown(async {
rx.await.ok();
info!("GRPC API server shutting down.");
})
.await;

if let Err(e) = result {
error!(
message = "GRPC server encountered an error.",
error = %e,
error_source = ?e.source(),
bind_addr = %actual_addr,
);
}
}
});
.in_current_span(),
);

info!("GRPC API server started on {}.", actual_addr);

Expand Down
3 changes: 2 additions & 1 deletion src/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use hyper::header::AUTHORIZATION;
use smpl_jwt::Jwt;
use snafu::{ResultExt, Snafu};
use tokio::sync::watch;
use tracing::Instrument;
use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};

use crate::{
Expand Down Expand Up @@ -194,7 +195,7 @@ impl GcpAuthenticator {

pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> {
let (sender, receiver) = watch::channel(());
tokio::spawn(self.clone().token_regenerator(sender));
tokio::spawn(self.clone().token_regenerator(sender).in_current_span());
receiver
}

Expand Down
3 changes: 2 additions & 1 deletion src/secrets/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, process::Command, time};
use tokio_util::codec;
use tracing::Instrument;
use vector_lib::configurable::{component::GenerateConfig, configurable_component};
use vrl::value::Value;

Expand Down Expand Up @@ -179,7 +180,7 @@ async fn query_backend(
.ok_or("unable to acquire stdout")?;

let query = serde_json::to_vec(&query)?;
tokio::spawn(async move { stdin.write_all(&query).await });
tokio::spawn(async move { stdin.write_all(&query).await }.in_current_span());

let timeout = time::sleep(time::Duration::from_secs(timeout));
tokio::pin!(timeout);
Expand Down
47 changes: 26 additions & 21 deletions src/sinks/blackhole/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use vector_lib::{
},
};

use tracing::Instrument;

use crate::{
event::{EventArray, EventContainer, EventStatus, Finalizable},
sinks::{blackhole::config::BlackholeConfig, util::StreamSink},
Expand Down Expand Up @@ -57,29 +59,32 @@ impl StreamSink<EventArray> for BlackholeSink {

if self.config.print_interval_secs.as_secs() > 0 {
let interval_dur = self.config.print_interval_secs;
tokio::spawn(async move {
let mut print_interval = interval(interval_dur);
loop {
select! {
_ = print_interval.tick() => {
info!(
events = total_events.load(Ordering::Relaxed),
raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed),
internal_log_rate_limit = false,
"Collected events."
);
},
_ = tripwire.changed() => break,
tokio::spawn(
async move {
let mut print_interval = interval(interval_dur);
loop {
select! {
_ = print_interval.tick() => {
info!(
events = total_events.load(Ordering::Relaxed),
raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed),
internal_log_rate_limit = false,
"Collected events."
);
},
_ = tripwire.changed() => break,
}
}
}

info!(
events = total_events.load(Ordering::Relaxed),
raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed),
internal_log_rate_limit = false,
"Collected events."
);
});
info!(
events = total_events.load(Ordering::Relaxed),
raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed),
internal_log_rate_limit = false,
"Collected events."
);
}
.in_current_span(),
);
}

while let Some(mut events) = input.next().await {
Expand Down
Loading
Loading