Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
- Breaking: `Done` now calls a callback instead of decrementing an `Arc<AtomicUsize>`.
- Added `Skippable::skipped` function to check if the inner source was skipped.
- Fixed `Player::skip_one` not decreasing the player's length immediately.

## Version [0.22.2] (2026-02-22)

Expand Down
5 changes: 4 additions & 1 deletion UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ The list below only contains required code changes. For a complete list of
changes and new features, see [CHANGELOG.md](CHANGELOG.md).

# rodio 0.22 to current github version
Nothing yet!
- `Done` now calls a callback instead of decrementing an `Arc<AtomicUsize>`.
- To retain old behavior replace the `Arc<AtomicUsize>` argument in `Done::new` with
`move |_| { number.fetch_sub(1, std::sync::atomic::Ordering::Relaxed) }`.
- `Done` has now two generics instead of one: `<I: Source, F: FnMut(&mut I)>`.

# rodio 0.21.1 to 0.22
- _Sink_ terms are replaced with _Player_ and _Stream_ terms replaced
Expand Down
104 changes: 65 additions & 39 deletions src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,47 +117,55 @@ impl Player {
let controls = self.controls.clone();

let start_played = AtomicBool::new(false);

let source = source
.speed(1.0)
// Must be placed before pausable but after speed & delay
.track_position()
.pausable(false)
.amplify(1.0)
.skippable()
.stoppable()
// If you change the duration update the docs for try_seek!
.periodic_access(Duration::from_millis(5), move |src| {
if controls.stopped.load(Ordering::SeqCst) {
src.stop();
*controls.position.lock().unwrap() = Duration::ZERO;
let sound_count_clone = self.sound_count.clone();

let source = Done::new(
source
.speed(1.0)
// Must be placed before pausable but after speed & delay
.track_position()
.pausable(false)
.amplify(1.0)
.skippable()
.stoppable(),
move |src| {
if !src.inner().skipped() {
sound_count_clone.fetch_sub(1, Ordering::Relaxed);
}
{
let mut to_clear = controls.to_clear.lock().unwrap();
if *to_clear > 0 {
src.inner_mut().skip();
*to_clear -= 1;
*controls.position.lock().unwrap() = Duration::ZERO;
} else {
*controls.position.lock().unwrap() =
src.inner().inner().inner().inner().get_pos();
}
}
let amp = src.inner_mut().inner_mut();
amp.set_factor(*controls.volume.lock().unwrap());
amp.inner_mut()
.set_paused(controls.pause.load(Ordering::SeqCst));
amp.inner_mut()
.inner_mut()
.inner_mut()
.set_factor(*controls.speed.lock().unwrap());
if let Some(seek) = controls.seek.lock().unwrap().take() {
seek.attempt(amp)
},
)
// If you change the duration update the docs for try_seek!
.periodic_access(Duration::from_millis(5), move |src| {
if controls.stopped.load(Ordering::SeqCst) {
src.inner_mut().stop();
*controls.position.lock().unwrap() = Duration::ZERO;
}
{
let mut to_clear = controls.to_clear.lock().unwrap();
if *to_clear > 0 {
src.inner_mut().inner_mut().skip();
*to_clear -= 1;
*controls.position.lock().unwrap() = Duration::ZERO;
} else {
*controls.position.lock().unwrap() =
src.inner().inner().inner().inner().inner().get_pos();
}
start_played.store(true, Ordering::SeqCst);
});
}
let amp = src.inner_mut().inner_mut().inner_mut();
amp.set_factor(*controls.volume.lock().unwrap());
amp.inner_mut()
.set_paused(controls.pause.load(Ordering::SeqCst));
amp.inner_mut()
.inner_mut()
.inner_mut()
.set_factor(*controls.speed.lock().unwrap());
if let Some(seek) = controls.seek.lock().unwrap().take() {
seek.attempt(amp)
}
start_played.store(true, Ordering::SeqCst);
});

self.sound_count.fetch_add(1, Ordering::Relaxed);
let source = Done::new(source, self.sound_count.clone());
*self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
}

Expand Down Expand Up @@ -279,7 +287,7 @@ impl Player {
pub fn clear(&self) {
let len = self.sound_count.load(Ordering::SeqCst) as u32;
*self.controls.to_clear.lock().unwrap() = len;
self.sleep_until_end();
self.sound_count.store(0, Ordering::Relaxed);
self.pause();
}

Expand All @@ -294,6 +302,7 @@ impl Player {
if len > *to_clear {
*to_clear += 1;
}
self.sound_count.fetch_sub(1, Ordering::SeqCst);
}

/// Stops the sink by emptying the queue.
Expand Down Expand Up @@ -361,6 +370,23 @@ mod tests {
use crate::math::nz;
use crate::{Player, Source};

#[test]
fn test_immediate_length_changes() {
let (player, mut source) = Player::new();

player.append(SamplesBuffer::new(nz!(1), nz!(1), vec![2.0, 3.0]));
player.append(SamplesBuffer::new(nz!(1), nz!(1), vec![1.0, 0.5]));
assert_eq!(player.len(), 2);
assert_eq!(source.next(), Some(2.0));

player.skip_one();
assert_eq!(player.len(), 1);
assert_eq!(source.next(), Some(1.0));

player.clear();
assert_eq!(player.len(), 0);
}

#[test]
fn test_pause_and_stop() {
let (player, mut source) = Player::new();
Expand Down
34 changes: 21 additions & 13 deletions src/source/done.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use super::SeekError;
use crate::common::{ChannelCount, SampleRate};
use crate::Source;

/// When the inner source is empty this decrements a `AtomicUsize`.
/// When the inner source is exhausted, this source calls a callback once
/// with the mutable reference to the inner source.
#[derive(Debug, Clone)]
pub struct Done<I> {
pub struct Done<I, F>
where
F: FnMut(&mut I),
{
input: I,
signal: Arc<AtomicUsize>,
callback: F,
signal_sent: bool,
}

impl<I> Done<I> {
/// When the inner source is empty the AtomicUsize passed in is decremented.
/// If it was zero it will overflow negatively.
impl<I, F> Done<I, F>
where
F: FnMut(&mut I),
{
/// When the inner source is exhausted, this source calls a callback once
/// with the mutable reference to the inner source.
#[inline]
pub fn new(input: I, signal: Arc<AtomicUsize>) -> Done<I> {
pub fn new(input: I, callback: F) -> Done<I, F> {
Done {
input,
signal,
callback,
signal_sent: false,
}
}
Expand All @@ -45,30 +50,33 @@ impl<I> Done<I> {
}
}

impl<I: Source> Iterator for Done<I>
impl<I, F> Iterator for Done<I, F>
where
I: Source,
F: FnMut(&mut I),
{
type Item = I::Item;

#[inline]
fn next(&mut self) -> Option<I::Item> {
let next = self.input.next();
if !self.signal_sent && next.is_none() {
self.signal.fetch_sub(1, Ordering::Relaxed);
self.signal_sent = true;
(self.callback)(&mut self.input);
}
next
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}

impl<I> Source for Done<I>
impl<I, F> Source for Done<I, F>
where
I: Source,
F: FnMut(&mut I),
{
#[inline]
fn current_span_len(&self) -> Option<usize> {
Expand Down
6 changes: 6 additions & 0 deletions src/source/skippable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ impl<I> Skippable<I> {
self.do_skip = true;
}

/// Returns true if the inner source was skipped.
#[inline]
pub fn skipped(&self) -> bool {
self.do_skip
}

/// Returns a reference to the inner source.
#[inline]
pub fn inner(&self) -> &I {
Expand Down