From 3b2475d9000fbb2039a16ffa37292f88b628eee8 Mon Sep 17 00:00:00 2001 From: Raphael Poss Date: Mon, 4 May 2026 00:50:13 +0200 Subject: [PATCH 1/3] fix(pulseaudio): release server-side stream on Stream::drop The PulseAudio backend spawns a driver thread that calls `pulseaudio::PlaybackStream::play_all`, which awaits source EOF before draining and exiting. EOF for a `PlaybackSource` is signalled by `poll_read` returning `0`, but the data-callback wrapper used by `new_playback` always reports the full buffer length to satisfy cpal's no-short-write contract. The driver thread therefore parks in `source_eof` for the lifetime of the process. Because that thread holds a `PlaybackStream` clone (an `Arc`), the inner Arc count never drops to zero, `InnerPlaybackStream::drop` never fires, and the `DeletePlaybackStream` command it would otherwise queue is never sent. Each `Stream` created and dropped by the user thus leaks one server-side stream and one OS thread; long-running clients accumulate `pactl list short sink-inputs` entries until PulseAudio runs out of channels. Queue a `DeletePlaybackStream` from `Stream::drop` so the reactor removes the stream state, drops the source's `eof_tx` channel, and wakes the driver thread with a cancellation error. `now_or_never` mirrors `InnerPlaybackStream::drop` and avoids blocking in `Drop`. Fixes #1188. --- CHANGELOG.md | 5 +++++ src/host/pulseaudio/stream.rs | 19 +++++++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 241aff2cc..f0ee7741b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -144,6 +144,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **JACK**: Fix input capture timestamp using callback execution time instead of cycle start. - **JACK**: Poisoned error callback mutex no longer silently drops subsequent error notifications. - **PulseAudio**: Poisoned locks now exit the thread gracefully instead of panicking. +- **PulseAudio**: Fix server-side stream leak when a `Stream` is dropped: the + internal `play_all` driver thread parks in `source_eof` and never returns + for callback-based sources, holding the `PlaybackStream` `Arc` indefinitely. + `Stream::drop` now queues a `DeletePlaybackStream` so the reactor can wake + the driver thread and release the stream (#1188). - **JACK**: Port registration failure now fails stream creation instead of silently failing. - **JACK**: `activate_async()` failure now returns an error instead of panicking. - **JACK**: Sample rate is now validated against the live JACK server at stream creation time. diff --git a/src/host/pulseaudio/stream.rs b/src/host/pulseaudio/stream.rs index 16181ce11..311cbd8d9 100644 --- a/src/host/pulseaudio/stream.rs +++ b/src/host/pulseaudio/stream.rs @@ -7,6 +7,7 @@ use std::{ }; use futures::executor::block_on; +use futures::FutureExt as _; use pulseaudio::{protocol, AsPlaybackSource}; use crate::{ @@ -56,8 +57,22 @@ pub struct Stream(StreamInner); impl Drop for Stream { fn drop(&mut self) { match &mut self.0 { - StreamInner::Playback(_, _, handle) | StreamInner::Record(_, _, handle) => { - handle.cancel() + StreamInner::Playback(stream, _, handle) => { + handle.cancel(); + // Tear down the play_all driver thread spawned in + // new_playback. It is parked in source_eof(), which only + // resolves when the source returns 0 from poll_read — but + // the data-callback wrapper installed there always reports + // a full buffer, so without an external nudge the thread + // never exits and keeps the PlaybackStream Arc alive, + // leaking the server-side stream. Queueing a delete causes + // the reactor to drop the source's eof_tx, waking play_all + // with cancellation. now_or_never matches the existing + // pattern in pulseaudio::PlaybackStream's own Drop. + let _ = stream.clone().delete().now_or_never(); + } + StreamInner::Record(_, _, handle) => { + handle.cancel(); } } } From 9a2c03ccba36c720920684cfba80bf93806098b8 Mon Sep 17 00:00:00 2001 From: Raphael Poss Date: Wed, 6 May 2026 22:53:34 +0200 Subject: [PATCH 2/3] fix(pulseaudio): join worker threads on Stream::drop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous fix queued `DeletePlaybackStream` from `Stream::drop` but returned immediately. Because the delete is processed asynchronously on the reactor and the play_all driver thread only exits *after* the reactor wakes it, a process that exits shortly after dropping the Stream can race the worker threads — the delete may never reach the server, leaving exactly the leak this PR set out to fix. Store the spawned threads' JoinHandles on `Stream` and join them in `Drop` after signalling cancellation. Skip handles whose thread id matches the current thread, since the user's error_callback runs on those workers and may itself drop the Stream; joining ourselves would deadlock. Mirrors the pattern roderickvd is introducing for PipeWire in #1187. --- CHANGELOG.md | 4 +++- src/host/pulseaudio/stream.rs | 41 +++++++++++++++++++++++++---------- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0ee7741b..8177ffdfc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -148,7 +148,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 internal `play_all` driver thread parks in `source_eof` and never returns for callback-based sources, holding the `PlaybackStream` `Arc` indefinitely. `Stream::drop` now queues a `DeletePlaybackStream` so the reactor can wake - the driver thread and release the stream (#1188). + the driver thread and release the stream, then joins the worker threads + so the delete actually reaches the server before the process exits + (#1188). - **JACK**: Port registration failure now fails stream creation instead of silently failing. - **JACK**: `activate_async()` failure now returns an error instead of panicking. - **JACK**: Sample rate is now validated against the live JACK server at stream creation time. diff --git a/src/host/pulseaudio/stream.rs b/src/host/pulseaudio/stream.rs index 311cbd8d9..6b1845db9 100644 --- a/src/host/pulseaudio/stream.rs +++ b/src/host/pulseaudio/stream.rs @@ -52,11 +52,14 @@ enum StreamInner { Record(pulseaudio::RecordStream, Instant, LatencyHandle), } -pub struct Stream(StreamInner); +pub struct Stream { + inner: StreamInner, + workers: Vec>, +} impl Drop for Stream { fn drop(&mut self) { - match &mut self.0 { + match &mut self.inner { StreamInner::Playback(stream, _, handle) => { handle.cancel(); // Tear down the play_all driver thread spawned in @@ -75,12 +78,20 @@ impl Drop for Stream { handle.cancel(); } } + for handle in self.workers.drain(..) { + // Prevent self-join: a worker thread may surface an error + // through the user's error_callback, and that callback may + // drop the Stream — in which case we'd be joining ourselves. + if handle.thread().id() != std::thread::current().id() { + let _ = handle.join(); + } + } } } impl StreamTrait for Stream { fn play(&self) -> Result<(), Error> { - match &self.0 { + match &self.inner { StreamInner::Playback(stream, _, handle) => { block_on(stream.uncork()).map_err(Error::from)?; handle.notify(); @@ -95,12 +106,12 @@ impl StreamTrait for Stream { } fn pause(&self) -> Result<(), Error> { - let res = match &self.0 { + let res = match &self.inner { StreamInner::Playback(stream, _, _) => block_on(stream.cork()), StreamInner::Record(stream, _, _) => block_on(stream.cork()), }; res.map_err(Error::from)?; - match &self.0 { + match &self.inner { StreamInner::Playback(_, _, handle) | StreamInner::Record(_, _, handle) => { handle.notify() } @@ -109,7 +120,7 @@ impl StreamTrait for Stream { } fn now(&self) -> StreamInstant { - let start = match &self.0 { + let start = match &self.inner { StreamInner::Playback(_, start, _) | StreamInner::Record(_, start, _) => *start, }; let elapsed = start.elapsed(); @@ -117,7 +128,7 @@ impl StreamTrait for Stream { } fn buffer_size(&self) -> Result { - let (spec, bytes) = match &self.0 { + let (spec, bytes) = match &self.inner { StreamInner::Playback(s, _, _) => ( s.sample_spec(), s.buffer_attr().minimum_request_length as usize, @@ -234,7 +245,7 @@ impl Stream { // when the stream is stopped by the user. let stream_clone = stream.clone(); let error_callback_clone = error_callback.clone(); - std::thread::spawn(move || { + let driver_handle = std::thread::spawn(move || { if let Err(e) = block_on(stream_clone.play_all()) { emit_error(&error_callback_clone, Error::from(e)); } @@ -246,7 +257,7 @@ impl Stream { let stream_clone = stream.clone(); let latency_clone = current_latency_micros.clone(); let poll_clone = last_poll_micros.clone(); - std::thread::spawn(move || loop { + let latency_handle = std::thread::spawn(move || loop { if cancel_thread.load(atomic::Ordering::Relaxed) { break; } @@ -280,7 +291,10 @@ impl Stream { *guard = false; }); - Ok(Self(StreamInner::Playback(stream, start, handle))) + Ok(Self { + inner: StreamInner::Playback(stream, start, handle), + workers: vec![driver_handle, latency_handle], + }) } pub fn new_record( @@ -367,7 +381,7 @@ impl Stream { let stream_clone = stream.clone(); let latency_clone = current_latency_micros.clone(); let poll_clone = last_poll_micros.clone(); - std::thread::spawn(move || loop { + let latency_handle = std::thread::spawn(move || loop { if cancel_thread.load(atomic::Ordering::Relaxed) { break; } @@ -401,7 +415,10 @@ impl Stream { *guard = false; }); - Ok(Self(StreamInner::Record(stream, start, handle))) + Ok(Self { + inner: StreamInner::Record(stream, start, handle), + workers: vec![latency_handle], + }) } } From 1149632fab2779e8f987ebab3c73a5eacb91f1ad Mon Sep 17 00:00:00 2001 From: Raphael Poss Date: Thu, 7 May 2026 16:27:53 +0200 Subject: [PATCH 3/3] fix(pulseaudio): suppress disconnect error during Stream::drop teardown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The play_all driver thread surfaces ClientError::Disconnected when the reactor drops the source's eof_tx — which happens not only on real server disconnect but also as a *direct consequence* of Stream::drop queueing DeletePlaybackStream and the server processing it. The error callback then fires "PulseAudio client disconnected" on every clean teardown, despite the connection being healthy. Check the cancel flag in the play_all spawn before emit_error, mirroring the latency thread's existing pre-poll cancel check. After a clean Stream::drop the cancel flag is already set when play_all unblocks, so the spurious notification is silenced; real disconnects (reactor exits, socket closed) leave the flag clear and still surface through the user's error_callback. --- CHANGELOG.md | 6 ++++++ src/host/pulseaudio/stream.rs | 11 ++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8177ffdfc..bf3b79727 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -151,6 +151,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 the driver thread and release the stream, then joins the worker threads so the delete actually reaches the server before the process exits (#1188). +- **PulseAudio**: Suppress spurious `StreamInvalidated` ("PulseAudio client + disconnected") error callbacks during cleanup. When `Stream::drop` queues + the delete, the reactor drops the source's `eof_tx` and the `play_all` + driver thread surfaces `ClientError::Disconnected` — even though the + server is fine. The driver-thread spawn now checks the cancel flag before + emitting the error, mirroring the latency thread's existing behaviour. - **JACK**: Port registration failure now fails stream creation instead of silently failing. - **JACK**: `activate_async()` failure now returns an error instead of panicking. - **JACK**: Sample rate is now validated against the live JACK server at stream creation time. diff --git a/src/host/pulseaudio/stream.rs b/src/host/pulseaudio/stream.rs index 6b1845db9..db5e7297e 100644 --- a/src/host/pulseaudio/stream.rs +++ b/src/host/pulseaudio/stream.rs @@ -245,9 +245,18 @@ impl Stream { // when the stream is stopped by the user. let stream_clone = stream.clone(); let error_callback_clone = error_callback.clone(); + let cancel_driver = handle.cancel.clone(); let driver_handle = std::thread::spawn(move || { if let Err(e) = block_on(stream_clone.play_all()) { - emit_error(&error_callback_clone, Error::from(e)); + // Stream::drop sets the cancel flag and then queues + // DeletePlaybackStream, which makes the reactor drop the + // source's eof_tx and surfaces here as ClientError::Disconnected. + // That is teardown noise, not a real error — suppress + // emit_error in the cancel case. The latency thread already + // does the equivalent check before each timing_info poll. + if !cancel_driver.load(atomic::Ordering::Relaxed) { + emit_error(&error_callback_clone, Error::from(e)); + } } });