diff --git a/CHANGELOG.md b/CHANGELOG.md index 241aff2cc..bf3b79727 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -144,6 +144,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **JACK**: Fix input capture timestamp using callback execution time instead of cycle start. - **JACK**: Poisoned error callback mutex no longer silently drops subsequent error notifications. - **PulseAudio**: Poisoned locks now exit the thread gracefully instead of panicking. +- **PulseAudio**: Fix server-side stream leak when a `Stream` is dropped: the + internal `play_all` driver thread parks in `source_eof` and never returns + for callback-based sources, holding the `PlaybackStream` `Arc` indefinitely. + `Stream::drop` now queues a `DeletePlaybackStream` so the reactor can wake + the driver thread and release the stream, then joins the worker threads + so the delete actually reaches the server before the process exits + (#1188). +- **PulseAudio**: Suppress spurious `StreamInvalidated` ("PulseAudio client + disconnected") error callbacks during cleanup. When `Stream::drop` queues + the delete, the reactor drops the source's `eof_tx` and the `play_all` + driver thread surfaces `ClientError::Disconnected` — even though the + server is fine. The driver-thread spawn now checks the cancel flag before + emitting the error, mirroring the latency thread's existing behaviour. - **JACK**: Port registration failure now fails stream creation instead of silently failing. - **JACK**: `activate_async()` failure now returns an error instead of panicking. - **JACK**: Sample rate is now validated against the live JACK server at stream creation time. diff --git a/src/host/pulseaudio/stream.rs b/src/host/pulseaudio/stream.rs index 16181ce11..db5e7297e 100644 --- a/src/host/pulseaudio/stream.rs +++ b/src/host/pulseaudio/stream.rs @@ -7,6 +7,7 @@ use std::{ }; use futures::executor::block_on; +use futures::FutureExt as _; use pulseaudio::{protocol, AsPlaybackSource}; use crate::{ @@ -51,13 +52,38 @@ enum StreamInner { Record(pulseaudio::RecordStream, Instant, LatencyHandle), } -pub struct Stream(StreamInner); +pub struct Stream { + inner: StreamInner, + workers: Vec>, +} impl Drop for Stream { fn drop(&mut self) { - match &mut self.0 { - StreamInner::Playback(_, _, handle) | StreamInner::Record(_, _, handle) => { - handle.cancel() + match &mut self.inner { + StreamInner::Playback(stream, _, handle) => { + handle.cancel(); + // Tear down the play_all driver thread spawned in + // new_playback. It is parked in source_eof(), which only + // resolves when the source returns 0 from poll_read — but + // the data-callback wrapper installed there always reports + // a full buffer, so without an external nudge the thread + // never exits and keeps the PlaybackStream Arc alive, + // leaking the server-side stream. Queueing a delete causes + // the reactor to drop the source's eof_tx, waking play_all + // with cancellation. now_or_never matches the existing + // pattern in pulseaudio::PlaybackStream's own Drop. + let _ = stream.clone().delete().now_or_never(); + } + StreamInner::Record(_, _, handle) => { + handle.cancel(); + } + } + for handle in self.workers.drain(..) { + // Prevent self-join: a worker thread may surface an error + // through the user's error_callback, and that callback may + // drop the Stream — in which case we'd be joining ourselves. + if handle.thread().id() != std::thread::current().id() { + let _ = handle.join(); } } } @@ -65,7 +91,7 @@ impl Drop for Stream { impl StreamTrait for Stream { fn play(&self) -> Result<(), Error> { - match &self.0 { + match &self.inner { StreamInner::Playback(stream, _, handle) => { block_on(stream.uncork()).map_err(Error::from)?; handle.notify(); @@ -80,12 +106,12 @@ impl StreamTrait for Stream { } fn pause(&self) -> Result<(), Error> { - let res = match &self.0 { + let res = match &self.inner { StreamInner::Playback(stream, _, _) => block_on(stream.cork()), StreamInner::Record(stream, _, _) => block_on(stream.cork()), }; res.map_err(Error::from)?; - match &self.0 { + match &self.inner { StreamInner::Playback(_, _, handle) | StreamInner::Record(_, _, handle) => { handle.notify() } @@ -94,7 +120,7 @@ impl StreamTrait for Stream { } fn now(&self) -> StreamInstant { - let start = match &self.0 { + let start = match &self.inner { StreamInner::Playback(_, start, _) | StreamInner::Record(_, start, _) => *start, }; let elapsed = start.elapsed(); @@ -102,7 +128,7 @@ impl StreamTrait for Stream { } fn buffer_size(&self) -> Result { - let (spec, bytes) = match &self.0 { + let (spec, bytes) = match &self.inner { StreamInner::Playback(s, _, _) => ( s.sample_spec(), s.buffer_attr().minimum_request_length as usize, @@ -219,9 +245,18 @@ impl Stream { // when the stream is stopped by the user. let stream_clone = stream.clone(); let error_callback_clone = error_callback.clone(); - std::thread::spawn(move || { + let cancel_driver = handle.cancel.clone(); + let driver_handle = std::thread::spawn(move || { if let Err(e) = block_on(stream_clone.play_all()) { - emit_error(&error_callback_clone, Error::from(e)); + // Stream::drop sets the cancel flag and then queues + // DeletePlaybackStream, which makes the reactor drop the + // source's eof_tx and surfaces here as ClientError::Disconnected. + // That is teardown noise, not a real error — suppress + // emit_error in the cancel case. The latency thread already + // does the equivalent check before each timing_info poll. + if !cancel_driver.load(atomic::Ordering::Relaxed) { + emit_error(&error_callback_clone, Error::from(e)); + } } }); @@ -231,7 +266,7 @@ impl Stream { let stream_clone = stream.clone(); let latency_clone = current_latency_micros.clone(); let poll_clone = last_poll_micros.clone(); - std::thread::spawn(move || loop { + let latency_handle = std::thread::spawn(move || loop { if cancel_thread.load(atomic::Ordering::Relaxed) { break; } @@ -265,7 +300,10 @@ impl Stream { *guard = false; }); - Ok(Self(StreamInner::Playback(stream, start, handle))) + Ok(Self { + inner: StreamInner::Playback(stream, start, handle), + workers: vec![driver_handle, latency_handle], + }) } pub fn new_record( @@ -352,7 +390,7 @@ impl Stream { let stream_clone = stream.clone(); let latency_clone = current_latency_micros.clone(); let poll_clone = last_poll_micros.clone(); - std::thread::spawn(move || loop { + let latency_handle = std::thread::spawn(move || loop { if cancel_thread.load(atomic::Ordering::Relaxed) { break; } @@ -386,7 +424,10 @@ impl Stream { *guard = false; }); - Ok(Self(StreamInner::Record(stream, start, handle))) + Ok(Self { + inner: StreamInner::Record(stream, start, handle), + workers: vec![latency_handle], + }) } }