diff --git a/src/arduino/app_bricks/sound_generator/README.md b/src/arduino/app_bricks/sound_generator/README.md new file mode 100644 index 00000000..084bee93 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/README.md @@ -0,0 +1,50 @@ +# Sound Generator Brick + +Sound Generator is a lightweight and expressive audio generation brick that lets you create, manipulate, and play sounds programmatically. +You can write musical notes, generate tones, and compose melodies — all while shaping the sound through custom waveforms and effects. + +Features: +* *Generate tones and melodies from notes or frequencies. +* Choose your waveform — sine, square, triangle, sawtooth. +* Add sound effects such as chorus, overdrive, delay, vibrato, or distortion. +* Compose procedural music directly from code. +* Real-time playback over speaker + +## Code example and usage + +```python +from arduino.app_bricks.sound_generator import SoundGenerator, SoundEffect +from arduino.app_utils import App + +player = SoundGenerator(sound_effects=[SoundEffect.adsr()]) + +fur_elise = [ + ("E5", 1/4), ("D#5", 1/4), ("E5", 1/4), ("D#5", 1/4), ("E5", 1/4), + ("B4", 1/4), ("D5", 1/4), ("C5", 1/4), ("A4", 1/2), + + ("C4", 1/4), ("E4", 1/4), ("A4", 1/4), ("B4", 1/2), + ("E4", 1/4), ("G#4", 1/4), ("B4", 1/4), ("C5", 1/2), + + ("E4", 1/4), ("E5", 1/4), ("D#5", 1/4), ("E5", 1/4), ("D#5", 1/4), ("E5", 1/4), + ("B4", 1/4), ("D5", 1/4), ("C5", 1/4), ("A4", 1/2), + + ("C4", 1/4), ("E4", 1/4), ("A4", 1/4), ("B4", 1/2), + ("E4", 1/4), ("C5", 1/4), ("B4", 1/4), ("A4", 1.0), +] +for note, duration in fur_elise: + player.play(note, duration) + +App.run() +``` + +waveform can be customized to change effect. For example, for a retro-gaming sound, you can configure "square" wave form. + +```python +player = SoundGenerator(wave_form="square") +``` + +instead, to have a more "rock" like sound, you can add effects like: + +```python +player = SoundGenerator(sound_effects=[SoundEffect.adsr(), SoundEffect.overdrive(drive=180.0), SoundEffect.chorus(depth_ms=15, rate_hz=0.2, mix=0.4)]) +``` diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py new file mode 100644 index 00000000..6167b8cc --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -0,0 +1,639 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +from arduino.app_utils import brick +from arduino.app_peripherals.speaker import Speaker +import threading +from typing import Iterable +import numpy as np +import time +from pathlib import Path +from collections import OrderedDict + +from .generator import WaveSamplesBuilder +from .effects import * +from .loaders import ABCNotationLoader + + +class LRUDict(OrderedDict): + """A dictionary-like object with a fixed size that evicts the least recently used items.""" + + def __init__(self, maxsize=128, *args, **kwargs): + self.maxsize = maxsize + super().__init__(*args, **kwargs) + + def __getitem__(self, key): + value = super().__getitem__(key) + self.move_to_end(key) + return value + + def __setitem__(self, key, value): + if key in self: + self.move_to_end(key) + + super().__setitem__(key, value) + + if len(self) > self.maxsize: + # Evict the least recently used item (the first item) + self.popitem(last=False) + + +@brick +class SoundGeneratorStreamer: + SAMPLE_RATE = 16000 + A4_FREQUENCY = 440.0 + + # Semitone mapping for the 12 notes (0 = C, 11 = B). + # This is used to determine the relative position within an octave. + SEMITONE_MAP = { + "C": 0, + "C#": 1, + "DB": 1, + "D": 2, + "D#": 3, + "EB": 3, + "E": 4, + "F": 5, + "F#": 6, + "GB": 6, + "G": 7, + "G#": 8, + "AB": 8, + "A": 9, + "A#": 10, + "BB": 10, + "B": 11, + } + + NOTE_DURATTION = { + "W": 1.0, # Whole + "H": 0.5, # Half + "Q": 0.25, # Quarter + "E": 0.125, # Eighth + "S": 0.0625, # Sixteenth + "T": 0.03125, # Thirty-second + "X": 0.015625, # Sixty-fourth + } + + # The reference point in the overall semitone count from C0. A4 is (4 * 12) + 9 semitones from C0. + A4_SEMITONE_INDEX = (4 * 12) + 9 + + def __init__( + self, + bpm: int = 120, + time_signature: tuple = (4, 4), + octaves: int = 8, + wave_form: str = "sine", + master_volume: float = 1.0, + sound_effects: list = None, + ): + """Initialize the SoundGeneratorStreamer. Generates sound blocks for streaming, without internal playback. + Args: + bpm (int): The tempo in beats per minute for note duration calculations. + time_signature (tuple): The time signature as (numerator, denominator). + octaves (int): Number of octaves to generate notes for (starting from octave + 0 up to octaves-1). + wave_form (str): The type of wave form to generate. Supported values + are "sine" (default), "square", "triangle" and "sawtooth". + master_volume (float): The master volume level (0.0 to 1.0). + sound_effects (list, optional): List of sound effect instances to apply to the audio + signal (e.g., [SoundEffect.adsr()]). See SoundEffect class for available effects. + """ + + self._cfg_lock = threading.Lock() + self._init_wave_generator(wave_form) + + self._bpm = bpm + self.time_signature = time_signature + self._master_volume = master_volume + self._sound_effects = sound_effects + + self._notes = {} + for octave in range(octaves): + notes = self._fill_node_frequencies(octave) + self._notes.update(notes) + + self._wav_cache = LRUDict(maxsize=10) + + def start(self): + pass + + def stop(self): + pass + + def _init_wave_generator(self, wave_form: str): + with self._cfg_lock: + self._wave_gen = WaveSamplesBuilder(sample_rate=self.SAMPLE_RATE, wave_form=wave_form) + + def set_wave_form(self, wave_form: str): + """ + Set the wave form type for sound generation. + Args: + wave_form (str): The type of wave form to generate. Supported values + are "sine", "square", "triangle" and "sawtooth". + """ + self._init_wave_generator(wave_form) + + def set_master_volume(self, volume: float): + """ + Set the master volume level. + Args: + volume (float): Volume level (0.0 to 1.0). + """ + self._master_volume = max(0.0, min(1.0, volume)) + + def set_effects(self, effects: list): + """ + Set the list of sound effects to apply to the audio signal. + Args: + effects (list): List of sound effect instances (e.g., [SoundEffect.adsr()]). + """ + with self._cfg_lock: + self._sound_effects = effects + + def _fill_node_frequencies(self, octave: int) -> dict: + """ + Given a sequence of notes with their names and octaves, fill in their frequencies. + + """ + notes = {} + + notes[f"REST"] = 0.0 # Rest note + + # Generate frequencies for all notes in the given octave + for note_name in self.SEMITONE_MAP: + frequency = self._note_to_frequency(note_name, octave) + notes[f"{note_name}{octave}"] = frequency + + return notes + + def _note_to_frequency(self, note_name: str, octave: int) -> float: + """ + Calculates the frequency (in Hz) of a musical note based on its name and octave. + + It uses the standard 12-tone equal temperament formula: f = f0 * 2^(n/12), + where f0 is the reference frequency (A4=440Hz) and n is the number of + semitones from the reference note. + + Args: + note_name: The name of the note (e.g., 'A', 'C#', 'Bb', case-insensitive). + octave: The octave number (e.g., 4 for A4, 5 for C5). + + Returns: + The frequency in Hertz (float). + """ + # 1. Normalize the note name for lookup + normalized_note = note_name.strip().upper() + if len(normalized_note) > 1 and normalized_note[1] == "#": + # Ensure sharps are treated correctly (e.g., 'C#' is fine) + pass + elif len(normalized_note) > 1 and normalized_note[1].lower() == "b": + # Replace 'B' (flat) with 'B' for consistent dictionary key + normalized_note = normalized_note[0] + "B" + + # 2. Look up the semitone count within the octave + if normalized_note not in self.SEMITONE_MAP: + raise ValueError(f"Invalid note name: {note_name}. Please use notes like 'A', 'C#', 'Eb', etc.") + + semitones_in_octave = self.SEMITONE_MAP[normalized_note] + + # 3. Calculate the absolute semitone index (from C0) + # Total semitones = (octave number * 12) + semitones_from_C_in_octave + target_semitone_index = (octave * 12) + semitones_in_octave + + # 4. Calculate 'n', the number of semitones from the reference pitch (A4) + # A4 is the reference, so n is the distance from A4. + semitones_from_a4 = target_semitone_index - self.A4_SEMITONE_INDEX + + # 5. Calculate the frequency + # f = 440 * 2^(n/12) + frequency_hz = self.A4_FREQUENCY * (2.0 ** (semitones_from_a4 / 12.0)) + + return frequency_hz + + def _note_duration(self, symbol: str | float | int) -> float: + """ + Decode a note duration symbol into its corresponding fractional value. + Args: + symbol (str | float | int): Note duration symbol (e.g., 'W', 'H', 'Q', etc.) or a float/int value. + Returns: + float: Corresponding fractional duration value or the float itself if provided. + """ + + if isinstance(symbol, float) or isinstance(symbol, int): + return self._compute_time_duration(symbol) + + duration = self.NOTE_DURATTION.get(symbol.upper(), None) + if duration is not None: + return self._compute_time_duration(duration) + + return self._compute_time_duration(1 / 4) # Default to quarter note + + def _compute_time_duration(self, note_fraction: float) -> float: + """ + Compute the time duration in seconds for a given note fraction and time signature. + Args: + note_fraction (float): The fraction of the note (e.g., 1.0 for whole, 0.5 for half). + time_signature (tuple): The time signature as (numerator, denominator). + Returns: + float: Duration in seconds. + """ + + numerator, denominator = self.time_signature + + # For compound time signatures (6/8, 9/8, 12/8), the beat is the dotted quarter note (3/8) + if denominator == 8 and numerator % 3 == 0: + beat_value = 3 / 8 + else: + beat_value = 1 / denominator # es. 1/4 in 4/4 + + # Calculate the duration of a single beat in seconds + beat_duration = 60.0 / self._bpm + + # Compute the total duration + return beat_duration * (note_fraction / beat_value) + + def _apply_sound_effects(self, signal: np.ndarray, frequency: float) -> np.ndarray: + """ + Apply the configured sound effects to the audio signal. + Args: + signal (np.ndarray): Input audio signal. + frequency (float): Frequency of the note being played. + Returns: + np.ndarray: Processed audio signal with sound effects applied. + """ + with self._cfg_lock: + if self._sound_effects is None: + return signal + + processed_signal = signal + for effect in self._sound_effects: + if hasattr(effect, "apply_with_tone"): + processed_signal = effect.apply_with_tone(processed_signal, frequency) + else: + processed_signal = effect.apply(processed_signal) + + return processed_signal + + def _get_note(self, note: str) -> float | None: + if note is None: + return None + return self._notes.get(note.strip().upper()) + + def _to_bytes(self, signal: np.ndarray) -> bytes: + # Format: "FLOAT_LE" -> (ALSA: "PCM_FORMAT_FLOAT_LE", np.float32), + return signal.astype(np.float32).tobytes() + + def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None) -> tuple[bytes, float]: + """ + Play multiple sequences of musical notes simultaneously (poliphony). + It is possible to play multi track music by providing a list of sequences, + where each sequence is a list of tuples (note, duration). + Duration is in notes fractions (e.g., 1/4 for quarter note). + Args: + notes (list[list[tuple[str, float]]]): List of sequences, each sequence is a list of tuples (note, duration). + as_tone (bool): If True, play as tones, considering duration in seconds + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + Returns: + tuple[bytes, float]: The audio block of the mixed sequences (float32) and its duration in seconds. + """ + if volume is None: + volume = self._master_volume + + # Multi track mixing + sequences_data = [] + base_frequency = None + max_duration = 0.0 + for sequence in notes: + sequence_waves = [] + sequence_duration = 0.0 + for note, duration in sequence: + sequence_duration += duration + frequency = self._get_note(note) + if frequency >= 0.0: + if base_frequency is None: + base_frequency = frequency + if not as_tone: + duration = self._note_duration(duration) + data = self._wave_gen.generate_block(float(frequency), duration, volume) + sequence_waves.append(data) + else: + continue + + if len(sequence_waves) > 0: + single_track_data = np.concatenate(sequence_waves) + sequences_data.append(single_track_data) + if sequence_duration > max_duration: + max_duration = sequence_duration + + if len(sequences_data) == 0: + return + + # Mix sequences - align lengths + max_length = max(len(seq) for seq in sequences_data) + # Pad shorter sequences with zeros + for i in range(len(sequences_data)): + seq = sequences_data[i] + if len(seq) < max_length: + padding = np.zeros(max_length - len(seq), dtype=np.float32) + sequences_data[i] = np.concatenate((seq, padding)) + + # Sum all sequences + mixed = np.sum(sequences_data, axis=0, dtype=np.float32) + mixed /= np.max(np.abs(mixed)) # Normalize to prevent clipping + blk = mixed.astype(np.float32) + blk = self._apply_sound_effects(blk, base_frequency) + return (self._to_bytes(blk), max_duration) + + def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None) -> bytes: + """ + Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume. + Args: + notes (list[str]): List of musical notes to play (e.g., ['A4', 'C#5', 'E5']). + note_duration (float | str): Duration of the chord as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + Returns: + bytes: The audio block of the mixed sequences (float32). + """ + duration = self._note_duration(note_duration) + if len(notes) == 1: + self.play(notes[0], duration, volume) + return + + waves = [] + base_frequency = None + for note in notes: + frequency = self._get_note(note) + if frequency: + if base_frequency is None: + base_frequency = frequency + if volume is None: + volume = self._master_volume + data = self._wave_gen.generate_block(float(frequency), duration, volume) + waves.append(data) + else: + continue + if len(waves) == 0: + return + chord = np.sum(waves, axis=0, dtype=np.float32) + chord /= np.max(np.abs(chord)) # Normalize to prevent clipping + blk = chord.astype(np.float32) + blk = self._apply_sound_effects(blk, base_frequency) + return self._to_bytes(blk) + + def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None) -> bytes: + """ + Play a musical note for a specified duration and volume. + Args: + note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). + note_duration (float | str): Duration of the note as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + Returns: + bytes: The audio block of the played note (float32). + """ + duration = self._note_duration(note_duration) + frequency = self._get_note(note) + if frequency is not None and frequency >= 0.0: + if volume is None: + volume = self._master_volume + data = self._wave_gen.generate_block(float(frequency), duration, volume) + data = self._apply_sound_effects(data, frequency) + return self._to_bytes(data) + + def play_tone(self, note: str, duration: float = 0.25, volume: float = None) -> bytes: + """ + Play a musical note for a specified duration and volume. + Args: + note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). + duration (float): Duration of the note as a float in seconds. + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + Returns: + bytes: The audio block of the played note (float32). + """ + frequency = self._get_note(note) + if frequency is not None and frequency >= 0.0 and duration > 0.0: + if volume is None: + volume = self._master_volume + data = self._wave_gen.generate_block(float(frequency), duration, volume) + data = self._apply_sound_effects(data, frequency) + return self._to_bytes(data) + + def play_abc(self, abc_string: str, volume: float = None) -> Iterable[tuple[bytes, float]]: + """ + Play a sequence of musical notes defined in ABC notation. + Args: + abc_string (str): ABC notation string defining the sequence of notes. + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + Returns: + Iterable[tuple[bytes, float]]: An iterable yielding the audio blocks of the played notes (float32) and its duration. + """ + if not abc_string or abc_string.strip() == "": + return + if volume is None: + volume = self._master_volume + metadata, notes = ABCNotationLoader.parse_abc_notation(abc_string) + for note, duration in notes: + frequency = self._get_note(note) + if frequency is not None and frequency >= 0.0: + data = self._wave_gen.generate_block(float(frequency), duration, volume) + data = self._apply_sound_effects(data, frequency) + yield (self._to_bytes(data), duration) + + def play_wav(self, wav_file: str) -> tuple[bytes, float]: + """ + Play a WAV audio data block. + Args: + wav_file (str): The WAV audio file path. + Returns: + tuple[bytes, float]: The audio block of the WAV file (float32) and its duration in seconds. + """ + import wave + + file_path = Path(wav_file) + if not file_path.exists() or not file_path.is_file(): + raise FileNotFoundError(f"WAV file not found: {wav_file}") + + if wav_file in self._wav_cache: + return self._wav_cache[wav_file] + + with wave.open(wav_file, "rb") as wav: + # Read all frames (raw PCM data) + duration = wav.getnframes() / wav.getframerate() + wav_data = wav.readframes(wav.getnframes()) + if len(self._wav_cache) < 250 * 1024: # 250 KB cache limit + self._wav_cache[wav_file] = (wav_data, duration) + return (wav_data, duration) + + return (None, None) + + +@brick +class SoundGenerator(SoundGeneratorStreamer): + def __init__( + self, + output_device: Speaker = None, + bpm: int = 120, + time_signature: tuple = (4, 4), + octaves: int = 8, + wave_form: str = "sine", + master_volume: float = 1.0, + sound_effects: list = None, + ): + """Initialize the SoundGenerator. + Args: + output_device (Speaker, optional): The output device to play sound through. + wave_form (str): The type of wave form to generate. Supported values + are "sine" (default), "square", "triangle" and "sawtooth". + bpm (int): The tempo in beats per minute for note duration calculations. + master_volume (float): The master volume level (0.0 to 1.0). + octaves (int): Number of octaves to generate notes for (starting from octave + 0 up to octaves-1). + sound_effects (list, optional): List of sound effect instances to apply to the audio + signal (e.g., [SoundEffect.adsr()]). See SoundEffect class for available effects. + time_signature (tuple): The time signature as (numerator, denominator). + """ + + super().__init__( + bpm=bpm, + time_signature=time_signature, + octaves=octaves, + wave_form=wave_form, + master_volume=master_volume, + sound_effects=sound_effects, + ) + + self._started = threading.Event() + if output_device is None: + self.external_speaker = False + self._output_device = Speaker(sample_rate=self.SAMPLE_RATE, format="FLOAT_LE") + else: + self.external_speaker = True + self._output_device = output_device + + def start(self): + if self._started.is_set(): + return + if not self.external_speaker: + self._output_device.start(notify_if_started=False) + self._started.set() + + def stop(self): + if not self.external_speaker: + self._output_device.stop() + self._started.clear() + + def set_master_volume(self, volume: float): + """ + Set the master volume level. + Args: + volume (float): Volume level (0.0 to 1.0). + """ + super().set_master_volume(volume) + + def set_effects(self, effects: list): + """ + Set the list of sound effects to apply to the audio signal. + Args: + effects (list): List of sound effect instances (e.g., [SoundEffect.adsr()]). + """ + super().set_effects(effects) + + def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None, block: bool = False): + """ + Play multiple sequences of musical notes simultaneously (poliphony). + It is possible to play multi track music by providing a list of sequences, + where each sequence is a list of tuples (note, duration). + Duration is in notes fractions (e.g., 1/4 for quarter note). + Args: + notes (list[list[tuple[str, float]]]): List of sequences, each sequence is a list of tuples (note, duration). + as_tone (bool): If True, play as tones, considering duration in seconds + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + block (bool): If True, block until the entire sequence has been played. + """ + blk, duration = super().play_polyphonic(notes, as_tone, volume) + self._output_device.play(blk, block_on_queue=False) + if block and duration > 0.0: + time.sleep(duration) + + def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None, block: bool = False): + """ + Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume. + Args: + notes (list[str]): List of musical notes to play (e.g., ['A4', 'C#5', 'E5']). + note_duration (float | str): Duration of the chord as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + block (bool): If True, block until the entire chord has been played. + """ + blk = super().play_chord(notes, note_duration, volume) + self._output_device.play(blk, block_on_queue=False) + if block: + duration = self._note_duration(note_duration) + if duration > 0.0: + time.sleep(duration) + + def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None, block: bool = False): + """ + Play a musical note for a specified duration and volume. + Args: + note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). + note_duration (float | str): Duration of the note as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + block (bool): If True, block until the entire note has been played. + """ + data = super().play(note, note_duration, volume) + self._output_device.play(data, block_on_queue=False) + if block: + duration = self._note_duration(note_duration) + if duration > 0.0: + time.sleep(duration) + + def play_tone(self, note: str, duration: float = 0.25, volume: float = None, block: bool = False): + """ + Play a musical note for a specified duration and volume. + Args: + note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). + duration (float): Duration of the note as a float in seconds. + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + block (bool): If True, block until the entire note has been played. + """ + data = super().play_tone(note, duration, volume) + self._output_device.play(data, block_on_queue=False) + if block and duration > 0.0: + time.sleep(duration) + + def play_abc(self, abc_string: str, volume: float = None, block: bool = False): + """ + Play a sequence of musical notes defined in ABC notation. + Args: + abc_string (str): ABC notation string defining the sequence of notes. + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + block (bool): If True, block until the entire sequence has been played. + """ + if not abc_string or abc_string.strip() == "": + return + player = super().play_abc(abc_string, volume) + overall_duration = 0.0 + for data, duration in player: + self._output_device.play(data, block_on_queue=True) + overall_duration += duration + if block: + time.sleep(overall_duration) + + def play_wav(self, wav_file: str, block: bool = False): + """ + Play a WAV audio data block. + Args: + wav_file (str): The WAV audio file path. + block (bool): If True, block until the entire WAV file has been played. + """ + to_play, duration = super().play_wav(wav_file) + self._output_device.play(to_play, block_on_queue=False) + if block and duration > 0.0: + time.sleep(duration) + + def clear_playback_queue(self): + """ + Clear the playback queue of the output device. + """ + self._output_device.clear_playback_queue() diff --git a/src/arduino/app_bricks/sound_generator/brick_config.yaml b/src/arduino/app_bricks/sound_generator/brick_config.yaml new file mode 100644 index 00000000..ccaf9380 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/brick_config.yaml @@ -0,0 +1,7 @@ +id: arduino:sound_generator +name: Sound Generator +description: Generate sounds like notes, tones, or melodies using waveforms. +category: audio +required_devices: + - microphone + \ No newline at end of file diff --git a/src/arduino/app_bricks/sound_generator/effects.py b/src/arduino/app_bricks/sound_generator/effects.py new file mode 100644 index 00000000..622b095a --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/effects.py @@ -0,0 +1,241 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +import numpy as np + + +class SoundEffect: + @staticmethod + def overdrive(drive: float = 100.0) -> np.ndarray: + """ + Apply overdrive effect to the audio signal. + Args: + signal (np.ndarray): Input audio signal. + drive (float): Overdrive intensity factor. + Returns: + np.ndarray: Processed audio signal with overdrive effect. + """ + + class SoundEffectOverdrive: + def __init__(self, drive: float = 1.0): + pass + + def apply(self, signal: np.ndarray) -> np.ndarray: + signal = signal * drive + # soft clipping + return (2 / 3) * np.tanh(signal) + + return SoundEffectOverdrive(drive) + + @staticmethod + def chorus(depth_ms=10, rate_hz: float = 0.25, mix: float = 0.5) -> np.ndarray: + """ + Apply chorus effect to the audio signal. + Args: + signal (np.ndarray): Input audio signal. + depth_ms (float): Depth of the chorus effect in milliseconds. + rate_hz (float): Rate of the LFO in Hz. + mix (float): Mix ratio between dry and wet signals (0.0 to 1.0). + Returns: + np.ndarray: Processed audio signal with chorus effect. + """ + + class SoundEffectChorus: + def __init__(self, depth_ms: int = 10, rate_hz: float = 0.25, mix: float = 0.5): + self.fs = 16000 # sample rate + self.depth_ms = depth_ms + self.rate_hz = rate_hz + self.mix = mix + pass + + def apply(self, signal: np.ndarray) -> np.ndarray: + n = len(signal) + depth = (self.depth_ms / 1000.0) * self.fs # in samples + t = np.arange(n) + + lfo = (np.sin(2 * np.pi * self.rate_hz * t / self.fs) + 1) / 2 # [0..1] + delay = (lfo * depth).astype(int) + + out = np.zeros_like(signal) + for i in range(n): + d = delay[i] + if i - d >= 0: + out[i] = signal[i - d] + + # mix dry/wet + return ((1 - self.mix) * signal + self.mix * out).astype(np.float32) + + return SoundEffectChorus(depth_ms, rate_hz, mix) + + @staticmethod + def adsr(attack: float = 0.015, decay: float = 0.2, sustain: float = 0.5, release: float = 0.35): + """ + Apply ADSR (attack/decay/sustain/release) envelope to the audio signal. + Args: + attack (float): Attack time in seconds. + decay (float): Decay time in seconds. + sustain (float): Sustain level (0.0 to 1.0). + release (float): Release time in seconds. + """ + + class SoundEffectADSR: + def __init__(self, attack: float = 0.015, decay: float = 0.2, sustain: float = 0.5, release: float = 0.35): + """ + Initialize ADSR effect. + Args: + attack (float): Attack time in seconds. + decay (float): Decay time in seconds. + sustain (float): Sustain level (0.0 to 1.0). + release (float): Release time in seconds. + """ + self.attack = attack + self.decay = decay + self.sustain = sustain + self.release = release + + def apply(self, signal: np.ndarray) -> np.ndarray: + """ + Apply ADSR filter on signal. + Args: + signal: np.ndarray float32 (audio) + """ + n = len(signal) + env = np.zeros(n) + + a = int(n * self.attack) + d = int(n * self.decay) + r = int(n * self.release) + + s = max(0, n - (a + d + r)) + + env[:a] = np.linspace(0, 1, a, endpoint=False) # Attack + env[a : a + d] = np.linspace(1, self.sustain, d, endpoint=False) # Decay + env[a + d : a + d + s] = self.sustain # Sustain + env[a + d + s :] = np.linspace(self.sustain, 0, n - (a + d + s), endpoint=False) # Release + + return (signal * env).astype(np.float32) + + return SoundEffectADSR(attack, decay, sustain, release) + + @staticmethod + def tremolo(depth: float = 0.5, rate: float = 5.0): + class SoundEffectTremolo: + def __init__(self, depth: float = 0.5, rate: float = 5.0): + """ + Tremolo effect block-local. + Args: + depth (float): modulation depth (0=no effect, 1=full) + rate (float): rate in cycles per block + """ + self.depth = np.clip(depth, 0.0, 1.0) + self.rate = rate # cicli di tremolo per blocco + + def apply(self, signal: np.ndarray) -> np.ndarray: + """ + Apply tremolo to a block of audio. + Args: + signal (np.ndarray): input block + """ + n = len(signal) + t = np.linspace(0, 1, n, endpoint=False) # normalizzato al blocco + lfo = (1 - self.depth) + self.depth * np.sin(2 * np.pi * self.rate * t) + return (signal * lfo).astype(np.float32) + + return SoundEffectTremolo(depth, rate) + + @staticmethod + def vibrato(depth: float = 0.02, rate: float = 0.5): + class SoundEffectVibrato: + def __init__(self, depth: float = 0.02, rate: float = 2.0): + """ + Vibrato effect + Args: + depth (float): max deviation (0=no effect, 0.5=max) + rate (float): number of cycles per block + """ + self.depth = np.clip(depth, 0.0, 0.5) + self.rate = rate + + def apply(self, signal: np.ndarray) -> np.ndarray: + n = len(signal) + t = np.linspace(0, 1, n, endpoint=False) + lfo = self.depth * n * np.sin(2 * np.pi * self.rate * t) + indices = np.arange(n) + lfo + indices = np.clip(indices, 0, n - 1.001) + i0 = np.floor(indices).astype(int) + i1 = np.ceil(indices).astype(int) + frac = indices - i0 + output = (1 - frac) * signal[i0] + frac * signal[i1] + return output.astype(np.float32) + + return SoundEffectVibrato(depth=depth, rate=rate) + + @staticmethod + def bitcrusher(bits: int = 4, reduction: int = 6): + class SoundEffectBitcrusher: + def __init__(self, bits: int = 4, reduction: int = 4): + """ + Bitcrusher effect. + Args: + bits (int): Bit depth for quantization (1-16). + reduction (int): Redeuction factor for downsampling (>=1). + """ + self.bit_depth = np.clip(bits, 1, 16) + self.reduction = max(1, reduction) + + def apply(self, signal: np.ndarray) -> np.ndarray: + # Downsampling + reduced = signal[:: self.reduction] + expanded = np.repeat(reduced, self.reduction) + expanded = expanded[: len(signal)] # taglia se serve + + # Quantization + levels = 2**self.bit_depth + crushed = np.round(expanded * (levels / 2)) / (levels / 2) + crushed = np.clip(crushed, -1.0, 1.0) + return crushed.astype(np.float32) + + return SoundEffectBitcrusher(bits, reduction) + + @staticmethod + def octaver(oct_up: bool = True, oct_down: bool = False): + class SoundEffectOctaver: + def __init__(self, oct_up: bool = True, oct_down: bool = True): + """ + Octaver effect. + Args: + oct_up (bool): Add one octave above the original signal. + oct_down (bool): Add one octave below the original signal. + """ + self.oct_up = oct_up + self.oct_down = oct_down + + def apply(self, signal: np.ndarray) -> np.ndarray: + """ + Apply the octaver effect to a mono audio signal. + signal: numpy array with float values in range [-1, 1] + """ + output = signal.astype(np.float32) + n = len(signal) + + # Upper octave + if self.oct_up: + up = np.zeros(n, dtype=np.float32) + up[: n // 2] = signal[::2] + output += up + + # Lower octave + if self.oct_down: + down = np.zeros(n, dtype=np.float32) + down[::2] = signal[: n // 2] + output += down + + # Normalize to prevent clipping + max_val = np.max(np.abs(output)) + if max_val > 1.0: + output /= max_val + + return output + + return SoundEffectOctaver(oct_up, oct_down) diff --git a/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py b/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py new file mode 100644 index 00000000..58971085 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py @@ -0,0 +1,55 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME: Play a sequence of notes (Fur Elise) +from arduino.app_bricks.sound_generator import SoundGenerator, SoundEffect +from arduino.app_utils import App + +player = SoundGenerator(sound_effects=[SoundEffect.adsr()]) + +fur_elise = [ + ("E5", 1 / 8), + ("D#5", 1 / 8), + ("E5", 1 / 8), + ("D#5", 1 / 8), + ("E5", 1 / 8), + ("B4", 1 / 8), + ("D5", 1 / 8), + ("C5", 1 / 8), + ("A4", 1 / 4), + ("C4", 1 / 8), + ("E4", 1 / 8), + ("A4", 1 / 8), + ("B4", 1 / 8), + ("E4", 1 / 8), + ("G#4", 1 / 8), + ("B4", 1 / 8), + ("C5", 1 / 8), + ("E4", 1 / 8), + ("E5", 1 / 8), + ("D#5", 1 / 8), + ("E5", 1 / 8), + ("D#5", 1 / 8), + ("E5", 1 / 8), + ("B4", 1 / 8), + ("D5", 1 / 8), + ("C5", 1 / 8), + ("A4", 1 / 4), + ("C4", 1 / 8), + ("E4", 1 / 8), + ("A4", 1 / 8), + ("B4", 1 / 4), + ("E4", 1 / 8), + ("C5", 1 / 8), + ("B4", 1 / 8), + ("A4", 1), +] + + +def user_lp(): + for note, duration in fur_elise: + player.play(note, duration) + + +App.run(user_loop=user_lp) diff --git a/src/arduino/app_bricks/sound_generator/examples/2_stream_sequence.py b/src/arduino/app_bricks/sound_generator/examples/2_stream_sequence.py new file mode 100644 index 00000000..74718a95 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/examples/2_stream_sequence.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME: Stream a sequence of notes over websocket via WebUI +import time +from arduino.app_utils import * +from arduino.app_bricks.web_ui import WebUI +from arduino.app_bricks.sound_generator import SoundGeneratorStreamer, SoundEffect + +ui = WebUI() + +player = SoundGeneratorStreamer(master_volume=1.0, wave_form="square", bpm=120, sound_effects=[SoundEffect.adsr()]) + +tune_sequence = [ + ("E5", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("C5", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("G5", 0.25), + ("REST", 0.25), + ("G4", 0.25), + ("REST", 0.25), + ("C5", 0.25), + ("REST", 0.125), + ("G4", 0.25), + ("REST", 0.125), + ("E4", 0.25), + ("REST", 0.125), + ("A4", 0.25), + ("B4", 0.25), + ("Bb4", 0.125), + ("A4", 0.25), + ("G4", 0.125), + ("E5", 0.125), + ("G5", 0.125), + ("A5", 0.25), + ("F5", 0.125), + ("G5", 0.125), + ("REST", 0.125), + ("E5", 0.25), + ("C5", 0.125), + ("D5", 0.125), + ("B4", 0.25), +] + + +def user_lp(): + while True: + overall_time = 0 + for note, duration in tune_sequence: + frame = player.play_tone(note, duration) + entry = { + "raw_data": frame, + } + ui.send_message("audio_frame", entry) + overall_time += duration + + time.sleep(overall_time) # wait for the whole sequence to finish before restarting + + +App.run(user_loop=user_lp) diff --git a/src/arduino/app_bricks/sound_generator/examples/3_play_abc_notation.py b/src/arduino/app_bricks/sound_generator/examples/3_play_abc_notation.py new file mode 100644 index 00000000..b455130d --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/examples/3_play_abc_notation.py @@ -0,0 +1,31 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME: Play music in ABC notation +from arduino.app_bricks.sound_generator import SoundGenerator, SoundEffect +from arduino.app_utils import App + +player = SoundGenerator(sound_effects=[SoundEffect.adsr()]) + + +def play_melody(): + abc_music = """ + X:1 + T:Twinkle, Twinkle Little Star - #11 + T:Alphabet Song + C:Traditional Kid's Song + M:4/4 + L:1/4 + K:D + |"D"D D A A|"G"B B "D"A2 + |"G"G G "D"F F|"A"E/2E/2E/2E/2 "D"D2 + |A A "G"G G|"D"F F "A"E2 + |"D"A A "G"G G|"D"F F "A"E2 + |"D"D D A A|"G"B B "D"A2 + |"G"G G "D"F F|"A"E E "D"D2| + """ + player.play_abc(abc_music, wait_completion=True) + + +App.run(user_loop=play_melody) diff --git a/src/arduino/app_bricks/sound_generator/examples/4_effects.py b/src/arduino/app_bricks/sound_generator/examples/4_effects.py new file mode 100644 index 00000000..16962fc0 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/examples/4_effects.py @@ -0,0 +1,67 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME: Play a sequence using effects +from arduino.app_bricks.sound_generator import SoundGenerator, SoundEffect +from arduino.app_utils import App + +player = SoundGenerator() + +tune_sequence = [ + ("A4", 0.25), + ("C5", 0.25), + ("E5", 0.25), + ("C5", 0.25), + ("A4", 0.25), + ("C5", 0.25), + ("E5", 0.25), + ("REST", 0.25), + ("G4", 0.25), + ("B4", 0.25), + ("D5", 0.25), + ("B4", 0.25), + ("G4", 0.25), + ("B4", 0.25), + ("D5", 0.25), + ("REST", 0.25), + ("A4", 0.25), + ("A4", 0.25), + ("C5", 0.25), + ("E5", 0.25), + ("F5", 0.5), + ("E5", 0.25), + ("REST", 0.25), + ("D5", 0.25), + ("C5", 0.25), + ("B4", 0.25), + ("A4", 0.25), + ("G4", 0.5), + ("B4", 0.5), + ("REST", 1), +] + +# Play as a retro-game sound +player.set_wave_form("square") +player.set_effects([SoundEffect.adsr()]) # For a more synththetic sound, add SoundEffect.bitcrusher() effect +for note, duration in tune_sequence: + player.play_tone(note, duration) + +# Play with distortion +player.set_wave_form("sine") +player.set_effects([SoundEffect.adsr(), SoundEffect.chorus(), SoundEffect.overdrive(drive=200.0)]) +for note, duration in tune_sequence: + player.play_tone(note, duration) + +# Vibrato effect +player.set_effects([SoundEffect.adsr(), SoundEffect.vibrato()]) +for note, duration in tune_sequence: + player.play_tone(note, duration) + +# Tremolo effect +player.set_wave_form("triangle") +player.set_effects([SoundEffect.adsr(), SoundEffect.tremolo(), SoundEffect.chorus()]) +for note, duration in tune_sequence: + player.play_tone(note, duration) + +App.run() diff --git a/src/arduino/app_bricks/sound_generator/generator.py b/src/arduino/app_bricks/sound_generator/generator.py new file mode 100644 index 00000000..13596294 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/generator.py @@ -0,0 +1,69 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +import numpy as np + + +class WaveSamplesBuilder: + """Generate wave audio blocks. + + This class produces wave blocks as NumPy buffers. + + Attributes: + sample_rate (int): Audio sample rate in Hz. + """ + + def __init__(self, wave_form: str = "sine", sample_rate: int = 16000): + """Create a new WaveGenerator. + + Args: + wave_form (str): The type of wave form to generate. Supported values + are "sine", "square", "triangle", "white_noise" and "sawtooth". + sample_rate (int): The playback sample rate (Hz) used to compute + phase increments and buffer sizes. + """ + self.wave_form = wave_form.lower() + self.sample_rate = int(sample_rate) + + def generate_block(self, freq: float, block_dur: float, master_volume: float = 1.0): + """Generate a block of float32 audio samples. + + Returned buffer is a NumPy view (float32) into an internal preallocated array and is valid + until the next call to this method. + + Args: + freq (float): Target frequency in Hz for this block. + block_dur (float): Duration of the requested block in seconds. + master_volume (float, optional): Global gain multiplier. Defaults + to 1.0. + + Returns: + numpy.ndarray: A 1-D float32 NumPy array containing the generated + audio samples for the requested block. + """ + N = max(1, int(self.sample_rate * block_dur)) + + # compute wave form based on selected type + t = np.arange(N, dtype=np.float32) / self.sample_rate + + match self.wave_form: + case "square": + samples = 0.5 * (1 + np.sign(np.sin(2.0 * np.pi * freq * t))) + case "triangle": + samples = 2.0 * np.abs(2.0 * (freq * t % 1) - 1.0) - 1.0 + case "sawtooth": + samples = 2.0 * (freq * t % 1.0) - 1.0 + case "white_noise": + samples = np.random.uniform(-1.0, 1.0, size=N).astype(np.float32) + case _: # "sine" e default + samples = np.sin(2.0 * np.pi * freq * t) + + samples = samples.astype(np.float32) + + # apply gain + mg = float(master_volume) + if mg != 1.0: + np.multiply(samples, mg, out=samples) + + return samples diff --git a/src/arduino/app_bricks/sound_generator/loaders.py b/src/arduino/app_bricks/sound_generator/loaders.py new file mode 100644 index 00000000..6b454dac --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/loaders.py @@ -0,0 +1,194 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +from arduino.app_utils.logger import Logger +from typing import List, Tuple +import re + +logger = Logger(__name__) + + +class ABCNotationLoader: + NOT_HANDLED_RESERVED_LINES = r"^[A-Za-z]:" + + @staticmethod + def _parse_abc_duration(duration_str: str, default_duration_in_seconds: float) -> float: + """ + Parse ABC duration notation (e.g., '2', '/2', '3/2'). + The returned duration is in absolute seconds. + - default_duration_in_seconds: The absolute duration (in seconds) of the + note specified by the L: field, calculated using the Q: field (BPM). + Args: + duration_str (str): Duration string from ABC notation. + default_duration_in_seconds (float): Default duration in seconds for a single unit. + Returns: + float: Calculated duration in seconds. + """ + if not duration_str: + return default_duration_in_seconds + + # Handle fractions (e.g., C/2, C/4) + if "/" in duration_str: + parts = duration_str.split("/") + # Handles C/ (division by 2) and C4/2 (multiplication by 4, division by 2) + numerator = int(parts[0]) if parts[0] else 1 + denominator = int(parts[1]) if len(parts) > 1 and parts[1] else 2 + + return default_duration_in_seconds * numerator / denominator + + try: + multiplier = int(duration_str) + return default_duration_in_seconds * multiplier + except ValueError: + return default_duration_in_seconds + + @staticmethod + def parse_abc_notation(abc_string: str, default_octave: int = 4) -> Tuple[dict, List[Tuple[str, float]]]: + """ + Parse ABC notation and convert to an array of (note, duration_in_seconds) tuples. + Args: + abc_string (str): ABC notation string. + default_octave (int): Default octave for uppercase notes (C4). + + Returns: + Tuple[dict, List[Tuple[str, float]]]: Metadata dictionary and list of (note, duration) tuples. + """ + + metadata = {} + + lines = abc_string.split("\n") + music_lines = [] + + # --- Parse Header Fields --- + for line in lines: + line = line.strip() + if re.match(ABCNotationLoader.NOT_HANDLED_RESERVED_LINES, line): + if line.startswith("X:"): + metadata["reference"] = line[2:].strip() + elif line.startswith("T:"): + metadata["title"] = line[2:].strip() + elif line.startswith("K:"): + metadata["key"] = line[2:].strip() + elif line.startswith("L:"): + metadata["default_length"] = line[2:].strip() + elif line.startswith("Q:"): + metadata["tempo"] = line[2:].strip() + elif line.startswith("M:"): + metadata["meter"] = line[2:].strip() + elif line.startswith("C:"): + metadata["composer"] = line[2:].strip() + elif line.startswith("R:"): + metadata["rhythm"] = line[2:].strip() + elif line.startswith("%%transpose"): + # Handle transpose directive if needed + matched = re.match(r"%%transpose\s+(-?\d+)", line) + if matched: + # only octave transposition is supported + octaves = int(matched.group(1)) / 12 + if octaves + default_octave < 0: + octaves = 0 + metadata["transpose"] = int(octaves) + elif not line.startswith("%") and line: + music_lines.append(line) + + # Standard ABC default for L: is 1/8 if not specified. + default_unit_fraction = 1 / 8 + + if "default_length" in metadata and metadata["default_length"]: + match_L = re.match(r"(\d+)/(\d+)", metadata["default_length"]) + if match_L: + num, denom = int(match_L.group(1)), int(match_L.group(2)) + default_unit_fraction = num / denom + + bpm = 120 # Default BPM if Q: is not specified + beat_unit_fraction = 0.25 # Default beat unit (1/4 or quarter note) + + if "tempo" in metadata and metadata["tempo"]: + # Q: field is typically 'note_fraction=BPM', e.g. '1/4=120' + match_Q = re.match(r"(\d+/\d+)=(\d+)", metadata["tempo"].replace(" ", "")) + + if match_Q: + note_str, bpm_str = match_Q.groups() + bpm = int(bpm_str) + + q_num, q_denom = map(int, note_str.split("/")) + beat_unit_fraction = q_num / q_denom + else: + try: + bpm = int(metadata["tempo"].replace(" ", "")) + except ValueError: + pass # Keep default BPM + + # Duration in seconds of the note specified as the beat unit (Q: note) + duration_of_beat_unit = 60.0 / bpm + + # Calculate the ratio between the default L: unit and the Q: beat unit. + # This handles cases where L: and Q: define different note values (e.g., L:1/16, Q:1/4=120) + ratio_to_beat_unit = default_unit_fraction / beat_unit_fraction + + # The absolute duration in seconds of the note defined by L: + default_duration_in_seconds = ratio_to_beat_unit * duration_of_beat_unit + + # Informational output + if "title" in metadata: + logger.info(f"Playing: {metadata['title']}") + logger.info(f"BPM: {bpm}, Beat Unit Fraction: {beat_unit_fraction:.3f}, Default L: {default_unit_fraction:.3f}") + logger.info(f"Duration of 1 beat: {duration_of_beat_unit:.3f}s. Default L: Duration: {default_duration_in_seconds:.3f}s") + if "transpose" in metadata: + logger.info(f"Transposing by {metadata['transpose']} octaves. Target default octave: {default_octave + metadata['transpose']}") + + # --- 5. Parse Music Lines --- + music_string = " ".join(music_lines) + result = [] + + # Tokenize notes, rests, and bar lines + music_string = re.sub(r'"[^"]*"', "", music_string) # Remove chord annotations + tokens = re.findall(r"[A-Ga-g][',]*[#b]?[0-9]*/?[0-9]*|z[0-9]*/?[0-9]*|\|", music_string) + + for token in tokens: + if token == "|": + continue + + # Parse Rest + if token.startswith("z"): + # Use the duration in seconds as the base unit + duration = ABCNotationLoader._parse_abc_duration(token[1:], default_duration_in_seconds) + result.append(("REST", duration)) + continue + + # Parse Note + note_char = token[0] + rest = token[1:] + + octave = default_octave + if "transpose" in metadata: + octave += metadata["transpose"] + if note_char.islower(): + octave = octave + 1 + note_char = note_char.upper() + + # Handle octave markers (',) - adjust octave accordingly - increase/decrease octave + octave_markers = re.findall(r"[',]", rest) + for marker in octave_markers: + if marker == "'": + octave += 1 + elif marker == ",": + octave -= 1 + + rest = re.sub(r"[',]", "", rest) + + # Handle accidentals (# sharp, b flat) + accidental = "" + if rest and rest[0] in ["#", "b"]: + accidental = rest[0].upper() + rest = rest[1:] + + duration = ABCNotationLoader._parse_abc_duration(rest, default_duration_in_seconds) + + # Build note name (e.g., C#4) + note_name = f"{note_char}{accidental}{octave}" + result.append((note_name, duration)) + + metadata["actual_bpm"] = bpm + return metadata, result diff --git a/src/arduino/app_peripherals/speaker/__init__.py b/src/arduino/app_peripherals/speaker/__init__.py index 881145e1..7ddc5e75 100644 --- a/src/arduino/app_peripherals/speaker/__init__.py +++ b/src/arduino/app_peripherals/speaker/__init__.py @@ -298,10 +298,15 @@ def _clear_queue(self): break logger.debug("Playback queue cleared.") - def start(self): + def start(self, notify_if_started: bool = True): """Start the spaker stream by opening the PCM device.""" if self._is_reproducing.is_set(): - raise RuntimeError("Spaker is already reproducing audio, cannot start again.") + if notify_if_started: + raise RuntimeError("Spaker is already reproducing audio, cannot start again.") + else: + logger.debug("Spaker is already reproducing audio, start() call ignored.") + return + self._clear_queue() self._open_pcm() self._is_reproducing.set() @@ -313,7 +318,7 @@ def start(self): def stop(self): """Close the PCM device if open.""" if not self._is_reproducing.is_set(): - logger.warning("Spaker is not recording, nothing to stop.") + logger.debug("Spaker is not recording, nothing to stop.") return # Stop the playback thread @@ -411,3 +416,15 @@ def play(self, data: bytes | np.ndarray, block_on_queue: bool = False): except queue.Full: # logger.warning("Playback queue is full, dropping oldest data.") self._playing_queue.get_nowait() + + def is_reproducing(self) -> bool: + """Check if the speaker is currently reproducing audio. + + Returns: + bool: True if reproducing, False otherwise. + """ + return self._is_reproducing.is_set() + + def clear_playback_queue(self): + """Clear the playback queue.""" + self._clear_queue() diff --git a/tests/arduino/app_bricks/sound_generator/test_abc.py b/tests/arduino/app_bricks/sound_generator/test_abc.py new file mode 100644 index 00000000..07f4830f --- /dev/null +++ b/tests/arduino/app_bricks/sound_generator/test_abc.py @@ -0,0 +1,99 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +from arduino.app_bricks.sound_generator import ABCNotationLoader + + +def test_abc_loader(): + full_abc = """ + X:1 + T:Main Theme + M:4/4 + L:1/8 + Q:1/4=130 + K:Cm + "Cm"E2 E2 E2 "Ab"C>G | "Cm"E2 "Ab"C>G "Cm"E4 | + "Cm"B2 B2 B2 "Ab"c>G | "Fm"^D#2 "Ab"C>G "Cm"E4 | + """ + + reference_notes = [ + ("E4", 60 / 130), + ("E4", 60 / 130), + ("E4", 60 / 130), + ("C4", (60 / 130) / 2), + ("G4", (60 / 130) / 2), + ("E4", 60 / 130), + ("C4", (60 / 130) / 2), + ("G4", (60 / 130) / 2), + ("E4", (60 / 130) * 2), + ("B4", 60 / 130), + ("B4", 60 / 130), + ("B4", 60 / 130), + ("C5", (60 / 130) / 2), + ("G4", (60 / 130) / 2), + ("D#4", 60 / 130), + ("C4", (60 / 130) / 2), + ("G4", (60 / 130) / 2), + ("E4", (60 / 130) * 2), + ] + + metadata, loaded = ABCNotationLoader.parse_abc_notation(full_abc) + assert metadata["title"] == "Main Theme" + assert "transpose" not in metadata + assert metadata["tempo"] == "1/4=130" + + i_ref = 0 + for note, duration in loaded: + print(f"Note: {note}, Duration: {duration}") + assert note == reference_notes[i_ref][0] + assert abs(duration - reference_notes[i_ref][1]) < 0.01 + i_ref += 1 + + +def test_abc_loader_with_transpose(): + full_abc = """ + X:1 + T:Main Theme + M:4/4 + L:1/8 + Q:1/4=130 + K:Cm + %%transpose -12 + "Cm"E2 E2 E2 "Ab"C>G | "Cm"E2 "Ab"C>G "Cm"E4 | + "Cm"B2 B2 B2 "Ab"c>G | "Fm"^D#2 "Ab"C>G "Cm"E4 | + """ + + reference_notes = [ + ("E3", 60 / 130), + ("E3", 60 / 130), + ("E3", 60 / 130), + ("C3", (60 / 130) / 2), + ("G3", (60 / 130) / 2), + ("E3", 60 / 130), + ("C3", (60 / 130) / 2), + ("G3", (60 / 130) / 2), + ("E3", (60 / 130) * 2), + ("B3", 60 / 130), + ("B3", 60 / 130), + ("B3", 60 / 130), + ("C4", (60 / 130) / 2), + ("G3", (60 / 130) / 2), + ("D#3", 60 / 130), + ("C3", (60 / 130) / 2), + ("G3", (60 / 130) / 2), + ("E3", (60 / 130) * 2), + ] + + metadata, loaded = ABCNotationLoader.parse_abc_notation(full_abc) + assert metadata["title"] == "Main Theme" + assert "transpose" in metadata + assert metadata["transpose"] == -1 + assert metadata["tempo"] == "1/4=130" + + i_ref = 0 + for note, duration in loaded: + print(f"Note: {note}, Duration: {duration}") + assert note == reference_notes[i_ref][0] + assert abs(duration - reference_notes[i_ref][1]) < 0.01 + i_ref += 1 diff --git a/tests/arduino/app_bricks/sound_generator/test_effects.py b/tests/arduino/app_bricks/sound_generator/test_effects.py new file mode 100644 index 00000000..03ad7070 --- /dev/null +++ b/tests/arduino/app_bricks/sound_generator/test_effects.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +from arduino.app_bricks.sound_generator.effects import SoundEffect +from arduino.app_bricks.sound_generator import SoundGeneratorStreamer, WaveSamplesBuilder + + +def test_adsr_effect(): + generator = WaveSamplesBuilder(sample_rate=16000, wave_form="square") + adsr = SoundEffect.adsr() + blk = generator.generate_block(440.0, 1 / 8, 1.0) # Generate a block to initialize + assert adsr is not None + + # Apply ADSR effect + processed = adsr.apply(blk) + + assert processed is not None + assert len(processed) == len(blk) + + +def test_available_notes(): + note_sequence = [ + ("E5", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("C5", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("G5", 0.25), + ("REST", 0.25), + ("G4", 0.25), + ("REST", 0.25), + ("C5", 0.25), + ("REST", 0.125), + ("G4", 0.25), + ("REST", 0.125), + ("E4", 0.25), + ("REST", 0.125), + ("A4", 0.25), + ("B4", 0.25), + ("Bb4", 0.125), + ("A4", 0.25), + ("G4", 0.125), + ("E5", 0.125), + ("G5", 0.125), + ("A5", 0.25), + ("F5", 0.125), + ("G5", 0.125), + ("REST", 0.125), + ("E5", 0.25), + ("C5", 0.125), + ("D5", 0.125), + ("B4", 0.25), + ] + + generator = SoundGeneratorStreamer() + for note, duration in note_sequence: + print(f"Testing note: {note}") + frequency = generator._get_note(note) + if "REST" != note: + assert frequency is not None and frequency > 0.0 + else: + assert frequency is not None and frequency == 0.0