Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog.d/24125_fix_topology_reload_deadlock.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Fixed a deadlock during config reload that caused all sources to stop consuming
new messages when a sink in wait_for_sinks was being changed. The source pump
would block in wait_for_replacements due to a Pause control message, creating a
circular dependency with shutdown_diff.

authors: joshcoughlan
100 changes: 75 additions & 25 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl RunningTopology {
} else {
ConfigDiff::new(&self.config, &new_config, HashSet::new())
};
let buffers = self.shutdown_diff(&diff, &new_config).await;
let (buffers, force_removed_sinks) = self.shutdown_diff(&diff, &new_config).await;

// Gives windows some time to make available any port
// released by shutdown components.
Expand All @@ -336,7 +336,8 @@ impl RunningTopology {
.run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
.await
{
self.connect_diff(&diff, &mut new_pieces).await;
self.connect_diff(&diff, &mut new_pieces, &force_removed_sinks)
.await;
self.spawn_diff(&diff, new_pieces);
self.config = new_config;

Expand All @@ -362,7 +363,8 @@ impl RunningTopology {
.run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
.await
{
self.connect_diff(&diff, &mut new_pieces).await;
self.connect_diff(&diff, &mut new_pieces, &force_removed_sinks)
.await;
self.spawn_diff(&diff, new_pieces);

info!("Old configuration restored successfully.");
Expand Down Expand Up @@ -425,7 +427,7 @@ impl RunningTopology {
&mut self,
diff: &ConfigDiff,
new_config: &Config,
) -> HashMap<ComponentKey, BuiltBuffer> {
) -> (HashMap<ComponentKey, BuiltBuffer>, HashSet<ComponentKey>) {
// First, we shutdown any changed/removed sources. This ensures that we can allow downstream
// components to terminate naturally by virtue of the flow of events stopping.
if diff.sources.any_changed_or_removed() {
Expand Down Expand Up @@ -479,7 +481,7 @@ impl RunningTopology {
let previous = self.tasks.remove(key).unwrap();
drop(previous); // detach and forget

self.remove_inputs(key, diff, new_config).await;
self.remove_inputs(key, diff, new_config, false).await;
self.remove_outputs(key);

if let Some(registry) = self.utilization_registry.as_ref() {
Expand All @@ -490,7 +492,7 @@ impl RunningTopology {
for key in &diff.transforms.to_change {
debug!(component_id = %key, "Changing transform.");

self.remove_inputs(key, diff, new_config).await;
self.remove_inputs(key, diff, new_config, false).await;
self.remove_outputs(key);
}

Expand Down Expand Up @@ -610,15 +612,22 @@ impl RunningTopology {
.collect::<Vec<_>>();
for key in &removed_sinks {
debug!(component_id = %key, "Removing sink.");
self.remove_inputs(key, diff, new_config).await;
self.remove_inputs(key, diff, new_config, false).await;

if let Some(registry) = self.utilization_registry.as_ref() {
registry.remove_component(key);
}
}

// After that, for any changed sinks, we temporarily detach their inputs (not remove) so
// they can naturally shutdown and allow us to recover their buffers if possible.
// After that, for any changed sinks, we disconnect their inputs so they can naturally
// shutdown and allow us to recover their buffers if possible.
//
// For sinks in wait_for_sinks, we use a full Remove (not Pause) from the upstream fanout.
// This prevents a circular dependency where the source pump blocks in
// wait_for_replacements (waiting for a Replace that only arrives after connect_diff,
// which runs after shutdown_diff, which is waiting for the old sink to finish).
// Using Remove allows the source pump to continue sending to other sinks while
// the old sink drains.
let mut buffer_tx = HashMap::new();

let sinks_to_change = diff
Expand All @@ -633,8 +642,18 @@ impl RunningTopology {
}))
.collect::<Vec<_>>();

// Track which sinks were force-removed from their upstream fanouts so that
// connect_diff knows to use Add (not Replace) when reconnecting them.
let mut force_removed_sinks = HashSet::new();

for key in &sinks_to_change {
debug!(component_id = %key, "Changing sink.");

let force_remove = wait_for_sinks.contains(key);
if force_remove {
force_removed_sinks.insert((*key).clone());
}

if reuse_buffers.contains(key) {
self.detach_triggers
.remove(key)
Expand All @@ -654,7 +673,8 @@ impl RunningTopology {
// at other stages.
buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
}
self.remove_inputs(key, diff, new_config).await;
self.remove_inputs(key, diff, new_config, force_remove)
.await;
}

// Now that we've disconnected or temporarily detached the inputs to all changed/removed
Expand Down Expand Up @@ -699,14 +719,19 @@ impl RunningTopology {
}
}

buffers
(buffers, force_removed_sinks)
}

/// Connects all changed/added components in the given configuration diff.
///
/// `force_removed_sinks` contains the keys of sinks that were fully removed (not paused)
/// from their upstream fanouts during shutdown_diff. These sinks need to be re-added
/// via `ControlMessage::Add` rather than `ControlMessage::Replace`.
pub(crate) async fn connect_diff(
&mut self,
diff: &ConfigDiff,
new_pieces: &mut TopologyPieces,
force_removed_sinks: &HashSet<ComponentKey>,
) {
debug!("Connecting changed/added component(s).");

Expand Down Expand Up @@ -826,13 +851,14 @@ impl RunningTopology {
// with transforms.
for key in diff.transforms.changed_and_added() {
debug!(component_id = %key, "Connecting inputs for transform.");
self.setup_inputs(key, diff, new_pieces).await;
self.setup_inputs(key, diff, new_pieces, false).await;
}

// Now that all sources and transforms are fully configured, we can wire up sinks.
for key in diff.sinks.changed_and_added() {
debug!(component_id = %key, "Connecting inputs for sink.");
self.setup_inputs(key, diff, new_pieces).await;
self.setup_inputs(key, diff, new_pieces, force_removed_sinks.contains(key))
.await;
}
let added_changed_tables: Vec<&ComponentKey> = diff
.enrichment_tables
Expand All @@ -841,7 +867,8 @@ impl RunningTopology {
.collect();
for key in added_changed_tables.iter() {
debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
self.setup_inputs(key, diff, new_pieces).await;
self.setup_inputs(key, diff, new_pieces, force_removed_sinks.contains(key))
.await;
}

// We do a final pass here to reconnect unchanged components.
Expand Down Expand Up @@ -937,6 +964,7 @@ impl RunningTopology {
key: &ComponentKey,
diff: &ConfigDiff,
new_pieces: &mut builder::TopologyPieces,
force_removed: bool,
) {
let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();

Expand All @@ -954,10 +982,18 @@ impl RunningTopology {
for input in inputs {
let output = self.outputs.get_mut(&input).expect("unknown output");

if diff.contains(&input.component) || inputs_to_add.contains(&input) {
// If the input we're connecting to is changing, that means its outputs will have been
// recreated, so instead of replacing a paused sink, we have to add it to this new
// output for the first time, since there's nothing to actually replace at this point.
if force_removed || diff.contains(&input.component) || inputs_to_add.contains(&input) {
// Cases where we need to add (not replace) the component input:
//
// Case 1: The component was force-removed from the fanout during shutdown_diff
// (rather than paused), so there is no paused entry to replace.
//
// Case 2: If the input we're connecting to is changing, that means its outputs
// will have been recreated, so instead of replacing a paused sink, we have to add
// it to this new output for the first time, since there's nothing to actually
// replace at this point.
//
// Case 3: This is a newly added connection.
debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");

_ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
Expand All @@ -983,7 +1019,13 @@ impl RunningTopology {
self.outputs.retain(|id, _output| &id.component != key);
}

async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
async fn remove_inputs(
&mut self,
key: &ComponentKey,
diff: &ConfigDiff,
new_config: &Config,
force_remove: bool,
) {
self.inputs.remove(key);
self.detach_triggers.remove(key);

Expand All @@ -996,20 +1038,26 @@ impl RunningTopology {

for input in old_inputs {
if let Some(output) = self.outputs.get_mut(input) {
if diff.contains(&input.component)
if force_remove
|| diff.contains(&input.component)
|| diff.is_removed(key)
|| !new_inputs.contains(input)
{
// 3 cases to remove the input:
// Cases to remove the input:
//
// Case 1: The caller has requested a full removal (force_remove). This is used
// for sinks whose old task must finish before the reload can proceed
// (wait_for_sinks). Using Pause here would block the source pump in
// wait_for_replacements, creating a circular dependency with shutdown_diff.
//
// Case 1: If the input we're removing ourselves from is changing, that means its
// Case 2: If the input we're removing ourselves from is changing, that means its
// outputs will be recreated, so instead of pausing the sink, we just delete it
// outright to ensure things are clean.
//
// Case 2: If this component itself is being removed, then pausing makes no sense
// Case 3: If this component itself is being removed, then pausing makes no sense
// because it isn't coming back.
//
// Case 3: This component is no longer connected to the input from new config.
// Case 4: This component is no longer connected to the input from new config.
debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");

_ = output.send(ControlMessage::Remove(key.clone()));
Expand Down Expand Up @@ -1363,7 +1411,9 @@ impl RunningTopology {
{
return None;
}
running_topology.connect_diff(&diff, &mut pieces).await;
running_topology
.connect_diff(&diff, &mut pieces, &HashSet::new())
.await;
running_topology.spawn_diff(&diff, pieces);

let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
Expand Down
105 changes: 105 additions & 0 deletions src/topology/test/reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,111 @@ async fn topology_reload_component() {
}
}

/// Regression test for https://github.com/vectordotdev/vector/issues/24125
///
/// When a sink with a conflicting resource (e.g., a bound port) is reloaded,
/// the old sink must be waited on before the new one can start. Previously,
/// `remove_inputs` sent `Pause` to the upstream fanout, which blocked the
/// source pump in `wait_for_replacements` — creating a circular dependency
/// with `shutdown_diff`. This test verifies that the reload completes
/// within a reasonable time instead of stalling.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn topology_reload_conflicting_sink_does_not_stall() {
test_util::trace_init();

let (_guard, address) = next_addr();

let mut old_config = Config::builder();
old_config.add_source("in", internal_metrics_source());
old_config.add_sink("out", &["in"], prom_exporter_sink(address, 1));

// Change only the flush period so the sink config differs but the
// resource (bound address) stays the same, creating a conflict.
let mut new_config = Config::builder();
new_config.add_source("in", internal_metrics_source());
new_config.add_sink("out", &["in"], prom_exporter_sink(address, 2));

let (mut topology, crash) = start_topology(old_config.build().unwrap(), false).await;
let mut crash_stream = UnboundedReceiverStream::new(crash);

tokio::select! {
_ = wait_for_tcp(address) => {},
_ = crash_stream.next() => panic!("topology crashed before reload"),
}

// Let some events flow so the source pump is active.
sleep(Duration::from_secs(2)).await;

let reload_result = tokio::time::timeout(
Duration::from_secs(10),
topology.reload_config_and_respawn(new_config.build().unwrap(), Default::default()),
)
.await;

assert!(
reload_result.is_ok(),
"Reload stalled: reloading a sink with conflicting resources should not block the source pump"
);
reload_result.unwrap().unwrap();

// Verify the new sink is running.
tokio::select! {
_ = wait_for_tcp(address) => {},
_ = crash_stream.next() => panic!("topology crashed after reload"),
}
}

/// Similar regression test for the SIGHUP reload path where sinks end up in
/// `reuse_buffers` (buffer config unchanged, no `components_to_reload`).
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn topology_reload_reuse_buffer_does_not_stall() {
test_util::trace_init();

let (_guard_0, address_0) = next_addr();
let (_guard_1, address_1) = next_addr();

let mut old_config = Config::builder();
old_config.add_source("in", internal_metrics_source());
old_config.add_sink("out", &["in"], prom_exporter_sink(address_0, 1));

// Change the address so the sink is in to_change, but don't change
// the buffer config so it lands in reuse_buffers. Also don't use
// extend_reload_set so the sink is NOT in components_to_reload
// (simulating a SIGHUP-style reload).
let mut new_config = Config::builder();
new_config.add_source("in", internal_metrics_source());
new_config.add_sink("out", &["in"], prom_exporter_sink(address_1, 1));

let (mut topology, crash) = start_topology(old_config.build().unwrap(), false).await;
let mut crash_stream = UnboundedReceiverStream::new(crash);

tokio::select! {
_ = wait_for_tcp(address_0) => {},
_ = crash_stream.next() => panic!("topology crashed before reload"),
}

// Let some events flow so the source pump is active.
sleep(Duration::from_secs(2)).await;

let reload_result = tokio::time::timeout(
Duration::from_secs(10),
topology.reload_config_and_respawn(new_config.build().unwrap(), Default::default()),
)
.await;

assert!(
reload_result.is_ok(),
"Reload stalled: reloading a sink with reused buffer should not block the source pump"
);
reload_result.unwrap().unwrap();

// Verify the new sink is running on the new address.
tokio::select! {
_ = wait_for_tcp(address_1) => {},
_ = crash_stream.next() => panic!("topology crashed after reload"),
}
}

async fn reload_sink_test(
old_config: Config,
new_config: Config,
Expand Down
Loading