From 3756839f430e4bcdd934b3488b74fba6c0dc2689 Mon Sep 17 00:00:00 2001 From: Yonokid <37304577+Yonokid@users.noreply.github.com> Date: Wed, 27 Aug 2025 14:20:14 -0400 Subject: [PATCH] thank you claude for the music class refactor --- libs/audio.py | 423 ++++++++++++++++++++++++------------------------- scenes/game.py | 23 ++- 2 files changed, 219 insertions(+), 227 deletions(-) diff --git a/libs/audio.py b/libs/audio.py index 9a12237..1b7e7d4 100644 --- a/libs/audio.py +++ b/libs/audio.py @@ -202,199 +202,200 @@ class Sound: output[:, 1] *= right_vol return output + class Music: - def __init__(self, file_path: Path, data: Optional[ndarray]=None, target_sample_rate: int=44100, sample_rate: int =44100, preview: Optional[float]=None, normalize: Optional[float]=None): + BUFFER_DURATION = 5.0 # seconds + def __init__(self, file_path: Path, data: Optional[ndarray] = None, + target_sample_rate: int = 44100, sample_rate: int = 44100, + preview: Optional[float] = None, normalize: Optional[float] = None): + # Core properties self.file_path = file_path self.data = data self.target_sample_rate = target_sample_rate self.sample_rate = sample_rate - self.channels = 0 - self.position = 0 # In frames (original sample rate) + self.normalize = normalize + self.preview = preview + + # Playback state + self.position = 0 self.is_playing = False self.is_paused = False self.volume = 0.75 - self.pan = 0.5 # Center + self.pan = 0.5 + + # Audio properties (set during loading) + self.channels = 0 self.total_frames = 0 self.valid = False - self.normalize = normalize - self.preview = preview # Preview start time in seconds - self.is_preview_mode = preview is not None - self.preview_start_frame = 0 - self.preview_end_frame = 0 - self.uses_file_streaming = False - - self.file_buffer_size = int(target_sample_rate * 5) # 5 seconds buffer + # Buffering + self.buffer_size = int(target_sample_rate * self.BUFFER_DURATION) self.buffer = None self.buffer_position = 0 - # Thread-safe updates - self.lock = Lock() + # File streaming self.sound_file = None + self.uses_file_streaming = False + self.preview_start_frame = 0 + self.preview_end_frame = 0 - if self.file_path.exists(): - self.load_from_file() + # Thread safety + self.lock = Lock() + + # Load audio + self._load_audio() + + def _load_audio(self): + """Load audio from file or memory""" + try: + if self.file_path.exists(): + self._load_from_file() + else: + self._load_from_memory() + except Exception as e: + print(f"Error loading audio: {e}") + self.valid = False + + def _load_from_file(self): + """Load and setup file streaming""" + self.sound_file = sf.SoundFile(str(self.file_path)) + self.channels = self.sound_file.channels + self.sample_rate = self.sound_file.samplerate + original_frames = self.sound_file.frames + + self.preview_start_frame, self.preview_end_frame = self._calculate_preview_bounds(original_frames) + self.total_frames = self.preview_end_frame - self.preview_start_frame + + if self.preview is not None: + self.uses_file_streaming = True + self.sound_file.seek(self.preview_start_frame) + self.data = None + print(f"Preview mode: Streaming {self.total_frames} frames starting at {self.preview:.2f}s") else: - self.load_from_memory() - - def load_from_memory(self) -> None: - """Load music from in-memory numpy array""" - try: - if self.data is None: - raise Exception("No data provided for memory loading") - - if self.data.dtype != float32: - self.data = self.data.astype(float32) - - if self.sample_rate != self.target_sample_rate: - print(f"Resampling {self.file_path} from {self.sample_rate}Hz to {self.target_sample_rate}Hz") - self.data = resample(self.data, self.sample_rate, self.target_sample_rate) - - if self.normalize is not None: - current_rms = get_average_volume_rms(self.data) - if current_rms > 0: # Avoid division by zero - target_rms = self.normalize - rms_scale_factor = target_rms / current_rms - self.data *= rms_scale_factor - - if self.data.ndim == 1: - self.channels = 1 - self.total_frames = len(self.data) - self.data = self.data.reshape(-1, 1) - else: - self.channels = self.data.shape[1] - self.total_frames = self.data.shape[0] - - self.sample_width = 4 # float32 + self.data = self.sound_file.read() + self.data = self._process_audio_data(self.data) self.uses_file_streaming = False - self._fill_buffer() - self.valid = True - print(f"Music loaded from memory: {self.channels} channels, {self.sample_rate}Hz, {self.total_frames} frames") - except Exception as e: - print(f"Error loading music from memory: {e}") - self.valid = False + self._fill_buffer() + self.valid = True + print(f"Music loaded: {self.channels} channels, {self.sample_rate}Hz, {self.time_length:.2f}s") - def load_from_file(self) -> None: - """Load music from file""" - try: - self.sound_file = sf.SoundFile(str(self.file_path)) - - # Get file properties - self.channels = self.sound_file.channels - self.sample_width = 2 if self.sound_file.subtype in ['PCM_16', 'VORBIS'] else 4 - self.sample_rate = self.sound_file.samplerate - original_total_frames = self.sound_file.frames - - if self.is_preview_mode: - self.preview_start_frame = int(self.preview * self.sample_rate) - available_frames = original_total_frames - self.preview_start_frame - self.preview_end_frame = min(self.preview_start_frame + available_frames, original_total_frames) - if self.preview_start_frame >= original_total_frames: - self.preview_start_frame = max(0, original_total_frames - available_frames) - self.preview_end_frame = original_total_frames - self.total_frames = self.preview_end_frame - self.preview_start_frame - self.sound_file.seek(self.preview_start_frame) - self.uses_file_streaming = True - self.data = None # Don't store full data in memory - - print(f"Preview mode: Streaming {self.total_frames} frames ({self.total_frames/self.sample_rate:.2f}s) starting at {self.preview:.2f}s") - - else: - self.data = self.sound_file.read() - self.total_frames = original_total_frames - self.uses_file_streaming = False - - self.load_from_memory() - return # Early return to avoid duplicate processing - - self._fill_buffer() - self.valid = True - - if self.is_preview_mode: - print(f"Music preview streaming: {self.channels} channels, {self.sample_rate}Hz, {self.total_frames} frames ({self.get_time_length():.2f}s)") - else: - print(f"Music loaded: {self.channels} channels, {self.sample_rate}Hz, {self.total_frames} frames") - - except Exception as e: - print(f"Error loading music file: {e}") - if hasattr(self, 'sound_file') and self.sound_file: - self.sound_file.close() - self.sound_file = None - self.valid = False - - def _fill_buffer(self) -> bool: - """Fill buffer from either memory or file stream""" - try: - if self.uses_file_streaming: - return self._fill_buffer_from_file() - else: - return self._fill_buffer_from_memory() - except Exception as e: - print(f"Error filling buffer: {e}") - return False - - def _fill_buffer_from_memory(self) -> bool: - """Fill buffer from in-memory data""" + def _load_from_memory(self): + """Load from in-memory numpy array""" if self.data is None: - return False + raise ValueError("No data provided for memory loading") - start_frame = self.position + self.buffer_position - end_frame = min(start_frame + self.file_buffer_size, self.total_frames) + self.data = self._process_audio_data(self.data) + self.channels = 1 if self.data.ndim == 1 else self.data.shape[1] + self.total_frames = len(self.data) if self.data.ndim == 1 else self.data.shape[0] + self.uses_file_streaming = False - if start_frame >= self.total_frames: - return False + self._fill_buffer() + self.valid = True + print(f"Music loaded from memory: {self.channels} channels, {self.total_frames} frames") - data_chunk = self.data[start_frame:end_frame] + def _process_audio_data(self, data): + """Apply resampling, normalization, and ensure correct format""" + if data.dtype != float32: + data = data.astype(float32) - self.buffer = data_chunk - self.position += self.buffer_position - self.buffer_position = 0 - return True + if self.sample_rate != self.target_sample_rate: + print(f"Resampling from {self.sample_rate}Hz to {self.target_sample_rate}Hz") + data = resample(data, self.sample_rate, self.target_sample_rate) - def _fill_buffer_from_file(self) -> bool: - """Fill buffer by streaming from file""" + if self.normalize is not None: + current_rms = get_average_volume_rms(data) + if current_rms > 0: + data *= self.normalize / current_rms + + if data.ndim == 1: + data = data.reshape(-1, 1) + + return data + + def _calculate_preview_bounds(self, total_frames): + """Calculate start and end frames for preview mode""" + if self.preview is None: + return 0, total_frames + + start_frame = int(self.preview * self.sample_rate) + start_frame = max(0, min(start_frame, total_frames - 1)) + end_frame = total_frames + + return start_frame, end_frame + + def _fill_buffer(self): + """Fill the audio buffer from appropriate source""" + if self.uses_file_streaming: + return self._fill_buffer_from_file() + else: + return self._fill_buffer_from_memory() + + def _fill_buffer_from_file(self): + """Stream buffer from file""" if not self.sound_file: return False - current_absolute_position = self.preview_start_frame + self.position - if current_absolute_position >= self.preview_end_frame: + current_pos = self.preview_start_frame + self.position + if current_pos >= self.preview_end_frame: return False - frames_to_read = min(self.file_buffer_size, self.preview_end_frame - current_absolute_position) - + frames_to_read = min(self.buffer_size, self.preview_end_frame - current_pos) if frames_to_read <= 0: return False - self.sound_file.seek(current_absolute_position) - + self.sound_file.seek(current_pos) data_chunk = self.sound_file.read(frames_to_read) if len(data_chunk) == 0: return False original_frames = len(data_chunk) if data_chunk.ndim == 1 else data_chunk.shape[0] - if self.sample_rate != self.target_sample_rate: - data_chunk = resample(data_chunk, self.sample_rate, self.target_sample_rate) - if self.normalize is not None: - current_rms = get_average_volume_rms(data_chunk) - if current_rms > 0: - target_rms = self.normalize - rms_scale_factor = target_rms / current_rms - data_chunk *= rms_scale_factor - - if data_chunk.ndim == 1 and self.channels > 1: - data_chunk = data_chunk.reshape(-1, self.channels) - elif data_chunk.ndim == 2 and self.channels == 1: - data_chunk = data_chunk.flatten().reshape(-1, 1) + data_chunk = self._process_audio_data(data_chunk) self.buffer = data_chunk - self.position += original_frames self.buffer_position = 0 + self.position += original_frames + return True - def update(self) -> None: + def _fill_buffer_from_memory(self): + """Fill buffer from in-memory data""" + if self.data is None: + return False + + start_frame = self.position + self.buffer_position + end_frame = min(start_frame + self.buffer_size, self.total_frames) + + if start_frame >= self.total_frames: + return False + + self.buffer = self.data[start_frame:end_frame] + self.position += self.buffer_position + self.buffer_position = 0 + + return True + + @property + def time_length(self): + """Total length in seconds""" + return self.total_frames / self.target_sample_rate + + @property + def time_played(self): + """Current position in seconds (relative to preview start)""" + return (self.position + self.buffer_position) / self.target_sample_rate + + @property + def actual_time_played(self): + """Actual position in original file (including preview offset)""" + base_time = self.time_played + if self.preview is not None: + return base_time + self.preview + return base_time + + def update(self): """Update music stream buffers""" if not self.is_playing or self.is_paused: return @@ -405,125 +406,121 @@ class Music: if self.buffer_position >= len(self.buffer): self.is_playing = self._fill_buffer() - def play(self) -> None: - """Start playing the music stream""" + def play(self): + """Start playing""" with self.lock: if self.position >= self.total_frames: - self.position = 0 - self.buffer_position = 0 - if self.sound_file and self.uses_file_streaming: - self.sound_file.seek(self.preview_start_frame) - self._fill_buffer() + self._reset_position() self.is_playing = True self.is_paused = False - def stop(self) -> None: - """Stop playing the music stream""" + def stop(self): + """Stop playing""" with self.lock: self.is_playing = False self.is_paused = False - self.position = 0 - self.buffer_position = 0 - if self.sound_file and self.uses_file_streaming: - self.sound_file.seek(self.preview_start_frame) - self._fill_buffer() + self._reset_position() - def pause(self) -> None: - """Pause the music playback""" + def pause(self): + """Pause playback""" with self.lock: if self.is_playing: self.is_paused = True self.is_playing = False - def resume(self) -> None: - """Resume the music playback""" + def resume(self): + """Resume playback""" with self.lock: if self.is_paused: self.is_playing = True self.is_paused = False - def seek(self, position_seconds) -> None: - """Seek to a specific position in seconds (relative to preview start if in preview mode)""" + def seek(self, position_seconds): + """Seek to position in seconds""" with self.lock: frame_position = int(position_seconds * self.target_sample_rate) frame_position = max(0, min(frame_position, self.total_frames - 1)) + if self.sound_file: + actual_pos = frame_position + if self.preview is not None: + actual_pos += int(self.preview * self.sample_rate) + self.sound_file.seek(actual_pos) + self.position = frame_position self.buffer_position = 0 - self._fill_buffer() - def get_time_length(self) -> float: - """Get the total length of the music in seconds (preview length if in preview mode)""" - return self.total_frames / self.target_sample_rate + def _reset_position(self): + """Reset to beginning""" + self.position = 0 + self.buffer_position = 0 + if self.sound_file: + seek_pos = self.preview_start_frame if self.preview is not None else 0 + self.sound_file.seek(seek_pos) + self._fill_buffer() - def get_time_played(self) -> float: - """Get the current playback position in seconds (relative to preview start if in preview mode)""" - return (self.position + self.buffer_position) / self.target_sample_rate - - def get_actual_time_played(self) -> float: - """Get the actual playback position in the original file (including preview offset)""" - base_time = (self.position + self.buffer_position) / self.target_sample_rate - if self.is_preview_mode and self.preview is not None: - return base_time + self.preview - return base_time - - def get_frames(self, num_frames) -> ndarray: - """Get the next num_frames of music data, applying volume, pitch, and pan""" + def get_frames(self, num_frames): + """Get next audio frames with volume and panning applied""" if not self.is_playing: - # Return silence if not playing - if self.channels == 1: - return zeros(num_frames, dtype=float32) - else: - return zeros((num_frames, self.channels), dtype=float32) + return self._get_silence(num_frames) with self.lock: if self.buffer is None: - return zeros(num_frames, dtype=float32) + return self._get_silence(num_frames) if self.buffer_position >= len(self.buffer): if not self._fill_buffer(): self.is_playing = False - if self.channels == 1: - return zeros(num_frames, dtype=float32) - else: - return zeros((num_frames, self.channels), dtype=float32) + return self._get_silence(num_frames) - frames_left_in_buffer = len(self.buffer) - self.buffer_position + frames_available = len(self.buffer) - self.buffer_position if self.channels > 1: - frames_left_in_buffer = self.buffer.shape[0] - self.buffer_position + frames_available = self.buffer.shape[0] - self.buffer_position - frames_to_get = min(num_frames, frames_left_in_buffer) + frames_to_get = min(num_frames, frames_available) + + output = self._get_silence(num_frames) if self.channels == 1: - output = zeros(num_frames, dtype=float32) - output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get].flatten() + output[:frames_to_get] = self.buffer[ + self.buffer_position:self.buffer_position + frames_to_get + ].flatten() else: - output = zeros((num_frames, self.channels), dtype=float32) - output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get] + output[:frames_to_get] = self.buffer[ + self.buffer_position:self.buffer_position + frames_to_get + ] self.buffer_position += frames_to_get + return self._apply_effects(output) - output *= self.volume + def _get_silence(self, num_frames): + """Return silence buffer""" + if self.channels == 1: + return zeros(num_frames, dtype=float32) + else: + return zeros((num_frames, self.channels), dtype=float32) - if self.channels == 2 and self.pan != 0.5: - # pan=0: full left, pan=0.5: center, pan=1: full right - left_vol = min(1.0, 2.0 * (1.0 - self.pan)) - right_vol = min(1.0, 2.0 * self.pan) + def _apply_effects(self, output): + """Apply volume and panning effects""" + output *= self.volume - output[:, 0] *= left_vol - output[:, 1] *= right_vol + if self.channels == 2 and self.pan != 0.5: + left_vol = min(1.0, 2.0 * (1.0 - self.pan)) + right_vol = min(1.0, 2.0 * self.pan) + output[:, 0] *= left_vol + output[:, 1] *= right_vol - return output + return output - def __del__(self) -> None: - """Cleanup when the music object is deleted""" + def __del__(self): + """Cleanup""" if hasattr(self, 'sound_file') and self.sound_file: try: self.sound_file.close() except Exception: - raise Exception("unable to close music stream") + pass class AudioEngine: def __init__(self, type: str) -> None: @@ -570,7 +567,7 @@ class AudioEngine: device_info = sd.query_devices(self.device_id) if isinstance(device_info, sd.DeviceList): raise Exception("Invalid ASIO Device") - print(f"Using default ASIO device: {device_info['name']}") + print(f"Using default audio device: {device_info['name']}") self.buffer_size = rounded(device_info['default_low_output_latency']*1000) if 'buffer_size' in get_config()['audio']: self.buffer_size = get_config()['audio']['buffer_size'] diff --git a/scenes/game.py b/scenes/game.py index 6785fa9..1f36dcd 100644 --- a/scenes/game.py +++ b/scenes/game.py @@ -40,8 +40,6 @@ class GameScreen: self.height = height self.current_ms = 0 self.screen_init = False - self.movie = None - self.song_music = None self.end_ms = 0 self.start_delay = 1000 self.song_started = False @@ -75,8 +73,8 @@ class GameScreen: self.movie = None session_data.song_title = self.tja.metadata.title.get(global_data.config['general']['language'].lower(), self.tja.metadata.title['en']) if self.tja.metadata.wave.exists() and self.tja.metadata.wave.is_file(): - self.song_music = audio.load_music_stream(self.tja.metadata.wave) - audio.normalize_music_stream(self.song_music, 0.1935) + self.song_music = audio.load_sound(self.tja.metadata.wave) + audio.normalize_sound(self.song_music, 0.1935) self.player_1 = Player(self, global_data.player_num, difficulty) if self.tja is not None: @@ -85,6 +83,8 @@ class GameScreen: def on_screen_start(self): if not self.screen_init: self.screen_init = True + self.movie = None + self.song_music = None tex.load_screen_textures('game') self.background = Background(self.width, self.height, global_data.player_num) self.load_sounds() @@ -102,8 +102,7 @@ class GameScreen: self.screen_init = False tex.unload_textures() if self.song_music is not None: - audio.unload_music_stream(self.song_music) - del self.song_music + audio.unload_sound(self.song_music) self.song_started = False self.end_ms = 0 self.movie = None @@ -146,8 +145,8 @@ class GameScreen: if self.tja is not None: if (self.current_ms >= self.tja.metadata.offset*1000 + self.start_delay - global_data.config["general"]["judge_offset"]) and not self.song_started: if self.song_music is not None: - if not audio.is_music_stream_playing(self.song_music): - audio.play_music_stream(self.song_music) + if not audio.is_sound_playing(self.song_music): + audio.play_sound(self.song_music) print(f"Song started at {self.current_ms}") if self.movie is not None: self.movie.start(get_current_ms()) @@ -159,10 +158,6 @@ class GameScreen: self.player_1.update(self) self.song_info.update(get_current_ms()) - - if self.song_music is not None: - audio.update_music_stream(self.song_music) - self.result_transition.update(get_current_ms()) if self.result_transition.is_finished: return self.on_screen_end('RESULT') @@ -180,14 +175,14 @@ class GameScreen: if ray.is_key_pressed(ray.KeyboardKey.KEY_F1): if self.song_music is not None: - audio.stop_music_stream(self.song_music) + audio.stop_sound(self.song_music) self.init_tja(global_data.selected_song, session_data.selected_difficulty) audio.play_sound(self.sound_restart) self.song_started = False if ray.is_key_pressed(ray.KeyboardKey.KEY_ESCAPE): if self.song_music is not None: - audio.stop_music_stream(self.song_music) + audio.stop_sound(self.song_music) return self.on_screen_end('SONG_SELECT') def draw(self):