From de7976f38053957f6703d8b5aab0b961606d3325 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Thu, 9 Apr 2026 17:56:24 -0500 Subject: [PATCH 1/3] fix(topology): prevent source pump deadlock during sink config reload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During config reload, when a sink in wait_for_sinks is being changed, remove_inputs previously sent Pause to the upstream fanout. This caused the source pump to block in wait_for_replacements (waiting for a Replace message that only arrives after connect_diff, which runs after shutdown_diff completes). Since shutdown_diff waits for the old sink to finish, this created a circular dependency that stalled all sources. The fix uses Remove instead of Pause for sinks in wait_for_sinks during reload, and correspondingly uses Add instead of Replace when reconnecting them in connect_diff. This allows the source pump to continue sending events to other sinks while the old sink drains, breaking the circular dependency. This is buffer-type agnostic — it fixes the deadlock for memory and disk buffered sinks alike, and only affects the reload path. Refs: vectordotdev/vector#24125 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../24125_fix_topology_reload_deadlock.fix.md | 6 + src/topology/running.rs | 98 +++++++++++----- src/topology/test/reload.rs | 105 ++++++++++++++++++ 3 files changed, 184 insertions(+), 25 deletions(-) create mode 100644 changelog.d/24125_fix_topology_reload_deadlock.fix.md diff --git a/changelog.d/24125_fix_topology_reload_deadlock.fix.md b/changelog.d/24125_fix_topology_reload_deadlock.fix.md new file mode 100644 index 0000000000000..6f2d148efdb8f --- /dev/null +++ b/changelog.d/24125_fix_topology_reload_deadlock.fix.md @@ -0,0 +1,6 @@ +Fixed a deadlock during config reload that caused all sources to stop consuming +new messages when a sink in wait_for_sinks was being changed. The source pump +would block in wait_for_replacements due to a Pause control message, creating a +circular dependency with shutdown_diff. + +authors: joshcoughlan diff --git a/src/topology/running.rs b/src/topology/running.rs index 6839379cd0f1a..2d49397083381 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -309,7 +309,7 @@ impl RunningTopology { } else { ConfigDiff::new(&self.config, &new_config, HashSet::new()) }; - let buffers = self.shutdown_diff(&diff, &new_config).await; + let (buffers, force_removed_sinks) = self.shutdown_diff(&diff, &new_config).await; // Gives windows some time to make available any port // released by shutdown components. @@ -336,7 +336,7 @@ impl RunningTopology { .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks) .await { - self.connect_diff(&diff, &mut new_pieces).await; + self.connect_diff(&diff, &mut new_pieces, &force_removed_sinks).await; self.spawn_diff(&diff, new_pieces); self.config = new_config; @@ -362,7 +362,7 @@ impl RunningTopology { .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks) .await { - self.connect_diff(&diff, &mut new_pieces).await; + self.connect_diff(&diff, &mut new_pieces, &force_removed_sinks).await; self.spawn_diff(&diff, new_pieces); info!("Old configuration restored successfully."); @@ -425,7 +425,7 @@ impl RunningTopology { &mut self, diff: &ConfigDiff, new_config: &Config, - ) -> HashMap { + ) -> (HashMap, HashSet) { // First, we shutdown any changed/removed sources. This ensures that we can allow downstream // components to terminate naturally by virtue of the flow of events stopping. if diff.sources.any_changed_or_removed() { @@ -479,7 +479,7 @@ impl RunningTopology { let previous = self.tasks.remove(key).unwrap(); drop(previous); // detach and forget - self.remove_inputs(key, diff, new_config).await; + self.remove_inputs(key, diff, new_config, false).await; self.remove_outputs(key); if let Some(registry) = self.utilization_registry.as_ref() { @@ -490,7 +490,7 @@ impl RunningTopology { for key in &diff.transforms.to_change { debug!(component_id = %key, "Changing transform."); - self.remove_inputs(key, diff, new_config).await; + self.remove_inputs(key, diff, new_config, false).await; self.remove_outputs(key); } @@ -610,15 +610,22 @@ impl RunningTopology { .collect::>(); for key in &removed_sinks { debug!(component_id = %key, "Removing sink."); - self.remove_inputs(key, diff, new_config).await; + self.remove_inputs(key, diff, new_config, false).await; if let Some(registry) = self.utilization_registry.as_ref() { registry.remove_component(key); } } - // After that, for any changed sinks, we temporarily detach their inputs (not remove) so - // they can naturally shutdown and allow us to recover their buffers if possible. + // After that, for any changed sinks, we disconnect their inputs so they can naturally + // shutdown and allow us to recover their buffers if possible. + // + // For sinks in wait_for_sinks, we use a full Remove (not Pause) from the upstream fanout. + // This prevents a circular dependency where the source pump blocks in + // wait_for_replacements (waiting for a Replace that only arrives after connect_diff, + // which runs after shutdown_diff, which is waiting for the old sink to finish). + // Using Remove allows the source pump to continue sending to other sinks while + // the old sink drains. let mut buffer_tx = HashMap::new(); let sinks_to_change = diff @@ -633,8 +640,18 @@ impl RunningTopology { })) .collect::>(); + // Track which sinks were force-removed from their upstream fanouts so that + // connect_diff knows to use Add (not Replace) when reconnecting them. + let mut force_removed_sinks = HashSet::new(); + for key in &sinks_to_change { debug!(component_id = %key, "Changing sink."); + + let force_remove = wait_for_sinks.contains(key); + if force_remove { + force_removed_sinks.insert((*key).clone()); + } + if reuse_buffers.contains(key) { self.detach_triggers .remove(key) @@ -654,7 +671,7 @@ impl RunningTopology { // at other stages. buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone()); } - self.remove_inputs(key, diff, new_config).await; + self.remove_inputs(key, diff, new_config, force_remove).await; } // Now that we've disconnected or temporarily detached the inputs to all changed/removed @@ -699,14 +716,19 @@ impl RunningTopology { } } - buffers + (buffers, force_removed_sinks) } /// Connects all changed/added components in the given configuration diff. + /// + /// `force_removed_sinks` contains the keys of sinks that were fully removed (not paused) + /// from their upstream fanouts during shutdown_diff. These sinks need to be re-added + /// via `ControlMessage::Add` rather than `ControlMessage::Replace`. pub(crate) async fn connect_diff( &mut self, diff: &ConfigDiff, new_pieces: &mut TopologyPieces, + force_removed_sinks: &HashSet, ) { debug!("Connecting changed/added component(s)."); @@ -826,13 +848,13 @@ impl RunningTopology { // with transforms. for key in diff.transforms.changed_and_added() { debug!(component_id = %key, "Connecting inputs for transform."); - self.setup_inputs(key, diff, new_pieces).await; + self.setup_inputs(key, diff, new_pieces, false).await; } // Now that all sources and transforms are fully configured, we can wire up sinks. for key in diff.sinks.changed_and_added() { debug!(component_id = %key, "Connecting inputs for sink."); - self.setup_inputs(key, diff, new_pieces).await; + self.setup_inputs(key, diff, new_pieces, force_removed_sinks.contains(key)).await; } let added_changed_tables: Vec<&ComponentKey> = diff .enrichment_tables @@ -841,7 +863,7 @@ impl RunningTopology { .collect(); for key in added_changed_tables.iter() { debug!(component_id = %key, "Connecting inputs for enrichment table sink."); - self.setup_inputs(key, diff, new_pieces).await; + self.setup_inputs(key, diff, new_pieces, force_removed_sinks.contains(key)).await; } // We do a final pass here to reconnect unchanged components. @@ -937,6 +959,7 @@ impl RunningTopology { key: &ComponentKey, diff: &ConfigDiff, new_pieces: &mut builder::TopologyPieces, + force_removed: bool, ) { let (tx, inputs) = new_pieces.inputs.remove(key).unwrap(); @@ -954,10 +977,21 @@ impl RunningTopology { for input in inputs { let output = self.outputs.get_mut(&input).expect("unknown output"); - if diff.contains(&input.component) || inputs_to_add.contains(&input) { - // If the input we're connecting to is changing, that means its outputs will have been - // recreated, so instead of replacing a paused sink, we have to add it to this new - // output for the first time, since there's nothing to actually replace at this point. + if force_removed + || diff.contains(&input.component) + || inputs_to_add.contains(&input) + { + // Cases where we need to add (not replace) the component input: + // + // Case 1: The component was force-removed from the fanout during shutdown_diff + // (rather than paused), so there is no paused entry to replace. + // + // Case 2: If the input we're connecting to is changing, that means its outputs + // will have been recreated, so instead of replacing a paused sink, we have to add + // it to this new output for the first time, since there's nothing to actually + // replace at this point. + // + // Case 3: This is a newly added connection. debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout."); _ = output.send(ControlMessage::Add(key.clone(), tx.clone())); @@ -983,7 +1017,13 @@ impl RunningTopology { self.outputs.retain(|id, _output| &id.component != key); } - async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) { + async fn remove_inputs( + &mut self, + key: &ComponentKey, + diff: &ConfigDiff, + new_config: &Config, + force_remove: bool, + ) { self.inputs.remove(key); self.detach_triggers.remove(key); @@ -996,20 +1036,26 @@ impl RunningTopology { for input in old_inputs { if let Some(output) = self.outputs.get_mut(input) { - if diff.contains(&input.component) + if force_remove + || diff.contains(&input.component) || diff.is_removed(key) || !new_inputs.contains(input) { - // 3 cases to remove the input: + // Cases to remove the input: + // + // Case 1: The caller has requested a full removal (force_remove). This is used + // for sinks whose old task must finish before the reload can proceed + // (wait_for_sinks). Using Pause here would block the source pump in + // wait_for_replacements, creating a circular dependency with shutdown_diff. // - // Case 1: If the input we're removing ourselves from is changing, that means its + // Case 2: If the input we're removing ourselves from is changing, that means its // outputs will be recreated, so instead of pausing the sink, we just delete it // outright to ensure things are clean. // - // Case 2: If this component itself is being removed, then pausing makes no sense + // Case 3: If this component itself is being removed, then pausing makes no sense // because it isn't coming back. // - // Case 3: This component is no longer connected to the input from new config. + // Case 4: This component is no longer connected to the input from new config. debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout."); _ = output.send(ControlMessage::Remove(key.clone())); @@ -1363,7 +1409,9 @@ impl RunningTopology { { return None; } - running_topology.connect_diff(&diff, &mut pieces).await; + running_topology + .connect_diff(&diff, &mut pieces, &HashSet::new()) + .await; running_topology.spawn_diff(&diff, pieces); let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) = diff --git a/src/topology/test/reload.rs b/src/topology/test/reload.rs index 907be19f6ec0f..1ec6afb1be9e6 100644 --- a/src/topology/test/reload.rs +++ b/src/topology/test/reload.rs @@ -320,6 +320,111 @@ async fn topology_reload_component() { } } +/// Regression test for https://github.com/vectordotdev/vector/issues/24125 +/// +/// When a sink with a conflicting resource (e.g., a bound port) is reloaded, +/// the old sink must be waited on before the new one can start. Previously, +/// `remove_inputs` sent `Pause` to the upstream fanout, which blocked the +/// source pump in `wait_for_replacements` — creating a circular dependency +/// with `shutdown_diff`. This test verifies that the reload completes +/// within a reasonable time instead of stalling. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn topology_reload_conflicting_sink_does_not_stall() { + test_util::trace_init(); + + let (_guard, address) = next_addr(); + + let mut old_config = Config::builder(); + old_config.add_source("in", internal_metrics_source()); + old_config.add_sink("out", &["in"], prom_exporter_sink(address, 1)); + + // Change only the flush period so the sink config differs but the + // resource (bound address) stays the same, creating a conflict. + let mut new_config = Config::builder(); + new_config.add_source("in", internal_metrics_source()); + new_config.add_sink("out", &["in"], prom_exporter_sink(address, 2)); + + let (mut topology, crash) = start_topology(old_config.build().unwrap(), false).await; + let mut crash_stream = UnboundedReceiverStream::new(crash); + + tokio::select! { + _ = wait_for_tcp(address) => {}, + _ = crash_stream.next() => panic!("topology crashed before reload"), + } + + // Let some events flow so the source pump is active. + sleep(Duration::from_secs(2)).await; + + let reload_result = tokio::time::timeout( + Duration::from_secs(10), + topology.reload_config_and_respawn(new_config.build().unwrap(), Default::default()), + ) + .await; + + assert!( + reload_result.is_ok(), + "Reload stalled: reloading a sink with conflicting resources should not block the source pump" + ); + reload_result.unwrap().unwrap(); + + // Verify the new sink is running. + tokio::select! { + _ = wait_for_tcp(address) => {}, + _ = crash_stream.next() => panic!("topology crashed after reload"), + } +} + +/// Similar regression test for the SIGHUP reload path where sinks end up in +/// `reuse_buffers` (buffer config unchanged, no `components_to_reload`). +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn topology_reload_reuse_buffer_does_not_stall() { + test_util::trace_init(); + + let (_guard_0, address_0) = next_addr(); + let (_guard_1, address_1) = next_addr(); + + let mut old_config = Config::builder(); + old_config.add_source("in", internal_metrics_source()); + old_config.add_sink("out", &["in"], prom_exporter_sink(address_0, 1)); + + // Change the address so the sink is in to_change, but don't change + // the buffer config so it lands in reuse_buffers. Also don't use + // extend_reload_set so the sink is NOT in components_to_reload + // (simulating a SIGHUP-style reload). + let mut new_config = Config::builder(); + new_config.add_source("in", internal_metrics_source()); + new_config.add_sink("out", &["in"], prom_exporter_sink(address_1, 1)); + + let (mut topology, crash) = start_topology(old_config.build().unwrap(), false).await; + let mut crash_stream = UnboundedReceiverStream::new(crash); + + tokio::select! { + _ = wait_for_tcp(address_0) => {}, + _ = crash_stream.next() => panic!("topology crashed before reload"), + } + + // Let some events flow so the source pump is active. + sleep(Duration::from_secs(2)).await; + + let reload_result = tokio::time::timeout( + Duration::from_secs(10), + topology.reload_config_and_respawn(new_config.build().unwrap(), Default::default()), + ) + .await; + + assert!( + reload_result.is_ok(), + "Reload stalled: reloading a sink with reused buffer should not block the source pump" + ); + reload_result.unwrap().unwrap(); + + // Verify the new sink is running on the new address. + tokio::select! { + _ = wait_for_tcp(address_1) => {}, + _ = crash_stream.next() => panic!("topology crashed after reload"), + } +} + async fn reload_sink_test( old_config: Config, new_config: Config, From fdb7a0dbf2a472db798357f4c3f78a740388f835 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Fri, 10 Apr 2026 13:45:16 -0500 Subject: [PATCH 2/3] fix: rustfmt formatting for long connect_diff calls Co-Authored-By: Claude Opus 4.6 (1M context) --- src/topology/running.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index 2d49397083381..5bd423ff7527d 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -336,7 +336,8 @@ impl RunningTopology { .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks) .await { - self.connect_diff(&diff, &mut new_pieces, &force_removed_sinks).await; + self.connect_diff(&diff, &mut new_pieces, &force_removed_sinks) + .await; self.spawn_diff(&diff, new_pieces); self.config = new_config; @@ -362,7 +363,8 @@ impl RunningTopology { .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks) .await { - self.connect_diff(&diff, &mut new_pieces, &force_removed_sinks).await; + self.connect_diff(&diff, &mut new_pieces, &force_removed_sinks) + .await; self.spawn_diff(&diff, new_pieces); info!("Old configuration restored successfully."); @@ -671,7 +673,8 @@ impl RunningTopology { // at other stages. buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone()); } - self.remove_inputs(key, diff, new_config, force_remove).await; + self.remove_inputs(key, diff, new_config, force_remove) + .await; } // Now that we've disconnected or temporarily detached the inputs to all changed/removed @@ -854,7 +857,8 @@ impl RunningTopology { // Now that all sources and transforms are fully configured, we can wire up sinks. for key in diff.sinks.changed_and_added() { debug!(component_id = %key, "Connecting inputs for sink."); - self.setup_inputs(key, diff, new_pieces, force_removed_sinks.contains(key)).await; + self.setup_inputs(key, diff, new_pieces, force_removed_sinks.contains(key)) + .await; } let added_changed_tables: Vec<&ComponentKey> = diff .enrichment_tables @@ -863,7 +867,8 @@ impl RunningTopology { .collect(); for key in added_changed_tables.iter() { debug!(component_id = %key, "Connecting inputs for enrichment table sink."); - self.setup_inputs(key, diff, new_pieces, force_removed_sinks.contains(key)).await; + self.setup_inputs(key, diff, new_pieces, force_removed_sinks.contains(key)) + .await; } // We do a final pass here to reconnect unchanged components. @@ -977,10 +982,7 @@ impl RunningTopology { for input in inputs { let output = self.outputs.get_mut(&input).expect("unknown output"); - if force_removed - || diff.contains(&input.component) - || inputs_to_add.contains(&input) - { + if force_removed || diff.contains(&input.component) || inputs_to_add.contains(&input) { // Cases where we need to add (not replace) the component input: // // Case 1: The component was force-removed from the fanout during shutdown_diff From 2603193500d8096d6d2ed31739aa185f1fd281a3 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Fri, 10 Apr 2026 13:56:27 -0500 Subject: [PATCH 3/3] ci: retrigger checks