import os import queue import time from pathlib import Path from threading import Lock, Thread from typing import Optional import soundfile as sf from numpy import abs as np_abs from numpy import ( arange, column_stack, float32, frombuffer, int16, int32, interp, mean, ndarray, sqrt, uint8, zeros, ) from numpy import max as np_max os.environ["SD_ENABLE_ASIO"] = "1" import sounddevice as sd from libs.utils import get_config, rounded def resample(data, orig_sr, target_sr): # Return original data if no resampling needed ratio = target_sr / orig_sr if ratio == 1.0: return data # Handle both mono and multi-channel audio if len(data.shape) == 1: # Mono audio return _resample_channel(data, orig_sr, target_sr) else: # Multi-channel audio num_channels = data.shape[1] resampled_channels = [] for ch in range(num_channels): channel_data = data[:, ch] resampled_channel = _resample_channel(channel_data, orig_sr, target_sr) resampled_channels.append(resampled_channel) return column_stack(resampled_channels) def _resample_channel(channel_data, orig_sr, target_sr): # Calculate number of samples in resampled audio orig_length = len(channel_data) new_length = int(orig_length * target_sr / orig_sr) # Create time points for original and new sample rates orig_time = arange(orig_length) / orig_sr new_time = arange(new_length) / target_sr # Perform linear interpolation resampled_data = interp(new_time, orig_time, channel_data) return resampled_data def get_np_array(sample_width, raw_data): if sample_width == 1: # 8-bit samples are unsigned data = frombuffer(raw_data, dtype=uint8) return (data.astype(float32) - 128) / 128.0 elif sample_width == 2: # 16-bit samples are signed data = frombuffer(raw_data, dtype=int16) return data.astype(float32) / 32768.0 elif sample_width == 3: # 24-bit samples handling data = zeros(len(raw_data) // 3, dtype=int32) for i in range(len(data)): data[i] = int.from_bytes(raw_data[i*3:i*3+3], byteorder='little', signed=True) return data.astype(float32) / (2**23) elif sample_width == 4: # 32-bit samples are signed data = frombuffer(raw_data, dtype=int32) return data.astype(float32) / (2**31) else: raise ValueError(f"Unsupported sample width: {sample_width}") def get_average_volume_rms(data): """Calculate average volume using RMS method""" rms = sqrt(mean(data ** 2)) return rms class Sound: def __init__(self, file_path: Path, data=None, target_sample_rate=44100): self.file_path = file_path self.data = data self.channels = 0 self.sample_rate = target_sample_rate self.position = 0 self.is_playing = False self.is_paused = False self.volume = 1.0 self.pan = 0.5 # 0.0 = left, 0.5 = center, 1.0 = right self.normalize: Optional[float] = None if file_path: self.load() def load(self): """Load and prepare the sound file data""" data, original_sample_rate = sf.read(str(self.file_path)) if data.ndim == 1: self.channels = 1 data = data.reshape(-1, 1) else: self.channels = data.shape[1] if original_sample_rate != self.sample_rate: print(f"Resampling {self.file_path} from {original_sample_rate}Hz to {self.sample_rate}Hz") data = resample(data, original_sample_rate, self.sample_rate) if self.normalize is not None: current_rms = get_average_volume_rms(data) if current_rms > 0: # Avoid division by zero target_rms = self.normalize rms_scale_factor = target_rms / current_rms data *= rms_scale_factor self.data = data def play(self): self.position = 0 self.is_playing = True self.is_paused = False def stop(self): self.is_playing = False self.is_paused = False self.position = 0 def pause(self): if self.is_playing: self.is_paused = True self.is_playing = False def resume(self): if self.is_paused: self.is_playing = True self.is_paused = False def normalize_vol(self, rms: float): self.normalize = rms if self.data is not None: self.data = None self.load() def get_frames(self, num_frames): """Get the next num_frames of audio data, applying volume, pitch, and pan""" if self.data is None: return if not self.is_playing: # Return silence if not playing if self.channels == 1: return zeros(num_frames, dtype=float32) else: return zeros((num_frames, self.channels), dtype=float32) # Calculate how many frames we have left frames_left = len(self.data) - self.position if self.channels > 1: frames_left = self.data.shape[0] - self.position if frames_left <= 0: # We've reached the end of the sound self.is_playing = False if self.channels == 1: return zeros(num_frames, dtype=float32) else: return zeros((num_frames, self.channels), dtype=float32) # Get the actual frames to return frames_to_get = min(num_frames, frames_left) if self.channels == 1: output = zeros(num_frames, dtype=float32) output[:frames_to_get] = self.data[self.position:self.position+frames_to_get].flatten() else: output = zeros((num_frames, self.channels), dtype=float32) output[:frames_to_get] = self.data[self.position:self.position+frames_to_get] self.position += frames_to_get output *= self.volume # Apply pan for stereo output if self.channels == 2 and self.pan != 0.5: # pan=0: full left, pan=0.5: center, pan=1: full right left_vol = min(1.0, 2.0 * (1.0 - self.pan)) right_vol = min(1.0, 2.0 * self.pan) output[:, 0] *= left_vol output[:, 1] *= right_vol return output class Music: def __init__(self, file_path: Path, data=None, file_type=None, target_sample_rate=44100): self.file_path = file_path self.file_type = file_type self.data = data self.target_sample_rate = target_sample_rate self.sample_rate = target_sample_rate self.channels = 0 self.position = 0 # In frames self.is_playing = False self.is_paused = False self.volume = 0.75 self.pan = 0.5 # Center self.total_frames = 0 self.valid = False self.normalize = None self.file_buffer_size = int(target_sample_rate * 5) # 5 seconds buffer self.buffer = None self.buffer_position = 0 # Thread-safe updates self.lock = Lock() self.load_from_file() def load_from_file(self): """Load music from file""" try: # soundfile handles OGG, WAV, FLAC, etc. natively self.sound_file = sf.SoundFile(str(self.file_path)) # Get file properties self.channels = self.sound_file.channels self.sample_width = 2 if self.sound_file.subtype in ['PCM_16', 'VORBIS'] else 4 # Most common self.sample_rate = self.sound_file.samplerate self.total_frames = len(self.sound_file) # Initialize buffer with some initial data self._fill_buffer() self.valid = True print(f"Music loaded: {self.channels} channels, {self.sample_rate}Hz, {self.total_frames} frames") except Exception as e: print(f"Error loading music file: {e}") if hasattr(self, 'sound_file') and self.sound_file: self.sound_file.close() self.sound_file = None self.valid = False def _fill_buffer(self): """Fill the streaming buffer from file""" if not self.sound_file: return False # Read a chunk of frames from file try: frames_to_read = min(self.file_buffer_size, self.total_frames - self.position) if frames_to_read <= 0: return False # Read data directly as numpy array (float64 by default) data = self.sound_file.read(frames_to_read) # Convert to float32 if needed (soundfile returns float64 by default) if data.dtype != float32: data = data.astype(float32) # Ensure proper shape for mono audio if self.channels == 1 and data.ndim == 1: data = data.reshape(-1, 1) elif self.channels == 1 and data.ndim == 2: data = data[:, 0].reshape(-1, 1) # Take first channel if stereo file but expecting mono # Resample if needed if self.sample_rate != self.target_sample_rate: print(f"Resampling {self.file_path} from {self.sample_rate}Hz to {self.target_sample_rate}Hz") data = resample(data, self.sample_rate, self.target_sample_rate) if self.normalize is not None: current_rms = get_average_volume_rms(data) if current_rms > 0: # Avoid division by zero target_rms = self.normalize rms_scale_factor = target_rms / current_rms data *= rms_scale_factor self.buffer = data self.buffer_position = 0 return True except Exception as e: print(f"Error filling buffer: {e}") return False def update(self): """Update music stream buffers""" if not self.is_playing or self.is_paused: return with self.lock: # Check if we need to refill the buffer if self.buffer is None: raise Exception("buffer is None") if self.sound_file and self.buffer_position >= len(self.buffer): if not self._fill_buffer(): self.is_playing = False def play(self): """Start playing the music stream""" with self.lock: # Reset position if at the end if self.sound_file and self.position >= self.total_frames: self.sound_file.seek(0) # Reset to beginning self.position = 0 self.buffer_position = 0 self._fill_buffer() self.is_playing = True self.is_paused = False def stop(self): """Stop playing the music stream""" with self.lock: self.is_playing = False self.is_paused = False self.position = 0 self.buffer_position = 0 if self.sound_file: self.sound_file.seek(0) # Reset to beginning self._fill_buffer() def pause(self): """Pause the music playback""" with self.lock: if self.is_playing: self.is_paused = True self.is_playing = False def resume(self): """Resume the music playback""" with self.lock: if self.is_paused: self.is_playing = True self.is_paused = False def seek(self, position_seconds): """Seek to a specific position in seconds""" with self.lock: # Convert seconds to frames frame_position = int(position_seconds * self.sample_rate) # Clamp position to valid range frame_position = max(0, min(frame_position, self.total_frames - 1)) # Update file position if streaming from file if self.sound_file: self.sound_file.seek(frame_position) self._fill_buffer() self.position = frame_position self.buffer_position = 0 def get_time_length(self): """Get the total length of the music in seconds""" return self.total_frames / self.sample_rate def get_time_played(self): """Get the current playback position in seconds""" return (self.position + self.buffer_position) / self.sample_rate def get_frames(self, num_frames): """Get the next num_frames of music data, applying volume, pitch, and pan""" if not self.is_playing: # Return silence if not playing if self.channels == 1: return zeros(num_frames, dtype=float32) else: return zeros((num_frames, self.channels), dtype=float32) with self.lock: if self.buffer is None: raise Exception("buffer is None") # Check if we need more data if self.buffer_position >= len(self.buffer): # If no more data available and streaming from file if self.sound_file and not self._fill_buffer(): self.is_playing = False if self.channels == 1: return zeros(num_frames, dtype=float32) else: return zeros((num_frames, self.channels), dtype=float32) # Calculate how many frames we have left in buffer frames_left_in_buffer = len(self.buffer) - self.buffer_position if self.channels > 1: frames_left_in_buffer = self.buffer.shape[0] - self.buffer_position frames_to_get = min(num_frames, frames_left_in_buffer) if self.channels == 1: output = zeros(num_frames, dtype=float32) output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get].flatten() else: output = zeros((num_frames, self.channels), dtype=float32) output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get] # Update buffer position self.buffer_position += frames_to_get self.position += frames_to_get # Apply volume output *= self.volume # Apply pan for stereo output if self.channels == 2 and self.pan != 0.5: # pan=0: full left, pan=0.5: center, pan=1: full right left_vol = min(1.0, 2.0 * (1.0 - self.pan)) right_vol = min(1.0, 2.0 * self.pan) output[:, 0] *= left_vol output[:, 1] *= right_vol return output def __del__(self): """Cleanup when the music object is deleted""" if hasattr(self, 'sound_file') and self.sound_file: try: self.sound_file.close() except Exception: raise Exception("unable to close music stream") class AudioEngine: def __init__(self, type: str): self.target_sample_rate = 44100 self.buffer_size = 10 self.sounds: dict[str, Sound] = {} self.music_streams = {} self.stream = None self.device_id = None self.running = False self.sound_queue: queue.Queue[str] = queue.Queue() self.music_queue = queue.Queue() self.master_volume = 1.0 self.output_channels = 2 # Default to stereo self.audio_device_ready = False # Threading for music stream updates self.update_thread = None self.update_thread_running = False self.type = type def _initialize_asio(self): """Set up ASIO device""" # Find ASIO API and use its default device hostapis = sd.query_hostapis() asio_api_index = -1 for i, api in enumerate(hostapis): if isinstance(api, dict) and 'name' in api and api['name'] == self.type: asio_api_index = i break if isinstance(hostapis, tuple): asio_api = hostapis[asio_api_index] if isinstance(asio_api, dict) and 'default_output_device' in asio_api: default_asio_device = asio_api['default_output_device'] else: raise Exception("Warning: 'default_output_device' key not found in ASIO API info.") if default_asio_device >= 0: self.device_id = default_asio_device device_info = sd.query_devices(self.device_id) if isinstance(device_info, sd.DeviceList): raise Exception("Invalid ASIO Device") print(f"Using default ASIO device: {device_info['name']}") self.buffer_size = rounded(device_info['default_low_output_latency']*1000) if 'buffer_size' in get_config()['audio']: self.buffer_size = get_config()['audio']['buffer_size'] self.target_sample_rate = device_info['default_samplerate'] if 'sample_rate' in get_config()['audio']: self.target_sample_rate = get_config()['audio']['sample_rate'] if self.target_sample_rate == -1: self.target_sample_rate = device_info['default_samplerate'] # Set output channels based on device capabilities self.output_channels = device_info['max_output_channels'] if self.output_channels > 2: # Limit to stereo for simplicity self.output_channels = 2 return True else: print("ASIO API not found, using system default device.") # If we get here, use default system device self.device_id = None device_info = sd.query_devices(sd.default.device[1]) if isinstance(device_info, sd.DeviceList): raise Exception("Invalid ASIO Device") self.output_channels = min(2, device_info['max_output_channels']) return True def _audio_callback(self, outdata, frames, time, status): """Callback function for the sounddevice stream""" if status: print(f"Status: {status}") # Process any new sound play requests while not self.sound_queue.empty(): try: sound_name = self.sound_queue.get_nowait() if sound_name in self.sounds: self.sounds[sound_name].play() except queue.Empty: break # Process any new music play requests while not self.music_queue.empty(): try: music_name, action, *args = self.music_queue.get_nowait() if music_name in self.music_streams: music = self.music_streams[music_name] if action == 'play': music.play() elif action == 'stop': music.stop() elif action == 'pause': music.pause() elif action == 'resume': music.resume() elif action == 'seek' and args: music.seek(args[0]) except queue.Empty: break # Mix all playing sounds and music output = zeros((frames, self.output_channels), dtype=float32) # Mix sounds for sound_name, sound in self.sounds.items(): if sound.is_playing: sound_data = sound.get_frames(frames) # If mono sound but stereo output, duplicate to both channels if isinstance(sound_data, ndarray): if sound.channels == 1 and self.output_channels > 1: sound_data = column_stack([sound_data] * self.output_channels) # Ensure sound_data matches the output format if sound.channels > self.output_channels: # Down-mix if needed if self.output_channels == 1: sound_data = mean(sound_data, axis=1) else: # Keep only the first output_channels sound_data = sound_data[:, :self.output_channels] # Add to the mix (simple additive mixing) output += sound_data # Mix music streams for music_name, music in self.music_streams.items(): if music.is_playing: music_data = music.get_frames(frames) # If mono music but stereo output, duplicate to both channels if music.channels == 1 and self.output_channels > 1: music_data = column_stack([music_data] * self.output_channels) # Ensure music_data matches the output format if music.channels > self.output_channels: # Down-mix if needed if self.output_channels == 1: music_data = mean(music_data, axis=1) else: # Keep only the first output_channels music_data = music_data[:, :self.output_channels] # Add to the mix output += music_data # Apply master volume output *= self.master_volume # Apply simple limiter to prevent clipping max_val = np_max(np_abs(output)) if max_val > 1.0: output = output / max_val outdata[:] = output def _start_update_thread(self): """Start a thread to update music streams""" self.update_thread_running = True self.update_thread = Thread(target=self._update_music_thread) self.update_thread.daemon = True self.update_thread.start() def _update_music_thread(self): """Thread function to update all music streams""" while self.update_thread_running: # Update all active music streams for music_name, music in self.music_streams.items(): if music.is_playing: music.update() # Sleep to not consume too much CPU time.sleep(0.1) def init_audio_device(self): if self.audio_device_ready: return True try: # Try to use ASIO if available self._initialize_asio() # Set up and start the stream extra_settings = None buffer_size = self.buffer_size self.stream = sd.OutputStream( samplerate=self.target_sample_rate, channels=self.output_channels, callback=self._audio_callback, blocksize=buffer_size, device=self.device_id, latency='low', extra_settings=extra_settings ) self.stream.start() self.running = True self.audio_device_ready = True print(self.stream.samplerate, self.stream.blocksize, self.stream.latency*1000) # Start update thread for music streams self._start_update_thread() print(f"Audio device initialized with {self.output_channels} channels at {self.target_sample_rate}Hz") return True except Exception as e: print(f"Error initializing audio device: {e}") self.audio_device_ready = False return False def close_audio_device(self): self.update_thread_running = False if self.update_thread: self.update_thread.join(timeout=1.0) if self.stream: self.stream.stop() self.stream.close() self.stream = None self.running = False self.audio_device_ready = False print("Audio device closed") return def is_audio_device_ready(self) -> bool: return self.audio_device_ready def set_master_volume(self, volume: float): self.master_volume = max(0.0, min(1.0, volume)) def get_master_volume(self) -> float: return self.master_volume def load_sound(self, fileName: Path) -> str: sound = Sound(fileName, target_sample_rate=self.target_sample_rate) sound_id = f"sound_{len(self.sounds)}" self.sounds[sound_id] = sound print(f"Loaded sound from {fileName} as {sound_id}") return sound_id def play_sound(self, sound): if sound in self.sounds: self.sound_queue.put(sound) def stop_sound(self, sound): if sound in self.sounds: self.sounds[sound].stop() def pause_sound(self, sound: str): if sound in self.sounds: self.sounds[sound].pause() def resume_sound(self, sound: str): if sound in self.sounds: self.sounds[sound].resume() def unload_sound(self, sound: str): if sound in self.sounds: del self.sounds[sound] def normalize_sound(self, sound: str, rms: float): if sound in self.sounds: self.sounds[sound].normalize_vol(rms) def is_sound_valid(self, sound: str) -> bool: return sound in self.music_streams def is_sound_playing(self, sound: str) -> bool: if sound in self.sounds: return self.sounds[sound].is_playing return False def set_sound_volume(self, sound: str, volume: float): if sound in self.sounds: self.sounds[sound].volume = max(0.0, min(1.0, volume)) def set_sound_pan(self, sound: str, pan: float): if sound in self.sounds: self.sounds[sound].pan = max(0.0, min(1.0, pan)) def load_music_stream(self, fileName: Path) -> str: music = Music(file_path=fileName, target_sample_rate=self.target_sample_rate) music_id = f"music_{len(self.music_streams)}" self.music_streams[music_id] = music print(f"Loaded music stream from {fileName} as {music_id}") return music_id def is_music_valid(self, music: str) -> bool: if music in self.music_streams: return self.music_streams[music].valid return False def unload_music_stream(self, music: str): if music in self.music_streams: del self.music_streams[music] def play_music_stream(self, music: str): if music in self.music_streams: self.music_queue.put((music, 'play')) def is_music_stream_playing(self, music: str) -> bool: if music in self.music_streams: return self.music_streams[music].is_playing return False def update_music_stream(self, music: str): if music in self.music_streams: self.music_streams[music].update() def stop_music_stream(self, music: str): if music in self.music_streams: self.music_queue.put((music, 'stop')) def pause_music_stream(self, music: str): if music in self.music_streams: self.music_queue.put((music, 'pause')) def resume_music_stream(self, music: str): if music in self.music_streams: self.music_queue.put((music, 'resume')) def seek_music_stream(self, music: str, position: float): if music in self.music_streams: self.music_queue.put((music, 'seek', position)) def set_music_volume(self, music: str, volume: float): if music in self.music_streams: self.music_streams[music].volume = max(0.0, min(1.0, volume)) def set_music_pan(self, music: str, pan: float): if music in self.music_streams: self.music_streams[music].pan = max(0.0, min(1.0, pan)) def normalize_music_stream(self, music: str, rms: float): if music in self.music_streams: self.music_streams[music].normalize = rms def get_music_time_length(self, music: str) -> float: if music in self.music_streams: return self.music_streams[music].get_time_length() raise ValueError(f"Music stream {music} not initialized") def get_music_time_played(self, music: str) -> float: if music in self.music_streams: return self.music_streams[music].get_time_played() raise ValueError(f"Music stream {music} not initialized") audio = AudioEngine(get_config()["audio"]["device_type"])