mirror of
https://github.com/Yonokid/PyTaiko.git
synced 2026-02-04 03:30:13 +01:00
the fire alarm went off so I need to commit
This commit is contained in:
196
libs/audio.py
196
libs/audio.py
@@ -1,10 +1,11 @@
|
||||
import io
|
||||
import os
|
||||
import queue
|
||||
import time
|
||||
import wave
|
||||
from pathlib import Path
|
||||
from threading import Lock, Thread
|
||||
from typing import Optional
|
||||
|
||||
import soundfile as sf
|
||||
from numpy import abs as np_abs
|
||||
from numpy import (
|
||||
arange,
|
||||
@@ -15,6 +16,8 @@ from numpy import (
|
||||
int32,
|
||||
interp,
|
||||
mean,
|
||||
ndarray,
|
||||
sqrt,
|
||||
uint8,
|
||||
zeros,
|
||||
)
|
||||
@@ -22,7 +25,6 @@ from numpy import max as np_max
|
||||
|
||||
os.environ["SD_ENABLE_ASIO"] = "1"
|
||||
import sounddevice as sd
|
||||
from pydub import AudioSegment
|
||||
|
||||
from libs.utils import get_config, rounded
|
||||
|
||||
@@ -83,8 +85,13 @@ def get_np_array(sample_width, raw_data):
|
||||
else:
|
||||
raise ValueError(f"Unsupported sample width: {sample_width}")
|
||||
|
||||
def get_average_volume_rms(data):
|
||||
"""Calculate average volume using RMS method"""
|
||||
rms = sqrt(mean(data ** 2))
|
||||
return rms
|
||||
|
||||
class Sound:
|
||||
def __init__(self, file_path, data=None, target_sample_rate=44100):
|
||||
def __init__(self, file_path: Path, data=None, target_sample_rate=44100):
|
||||
self.file_path = file_path
|
||||
self.data = data
|
||||
self.channels = 0
|
||||
@@ -94,42 +101,33 @@ class Sound:
|
||||
self.is_paused = False
|
||||
self.volume = 1.0
|
||||
self.pan = 0.5 # 0.0 = left, 0.5 = center, 1.0 = right
|
||||
self.normalize: Optional[float] = None
|
||||
|
||||
if file_path:
|
||||
self.load()
|
||||
|
||||
def load(self):
|
||||
"""Load and prepare the sound file data"""
|
||||
if self.file_path.endswith('.ogg'):
|
||||
audio = AudioSegment.from_ogg(self.file_path)
|
||||
wav_io = io.BytesIO()
|
||||
audio.export(wav_io, format="wav")
|
||||
wav_io.seek(0)
|
||||
file_path = wav_io
|
||||
data, original_sample_rate = sf.read(str(self.file_path))
|
||||
|
||||
if data.ndim == 1:
|
||||
self.channels = 1
|
||||
data = data.reshape(-1, 1)
|
||||
else:
|
||||
file_path = self.file_path
|
||||
with wave.open(file_path, 'rb') as wf:
|
||||
# Get file properties
|
||||
self.channels = wf.getnchannels()
|
||||
sample_width = wf.getsampwidth()
|
||||
original_sample_rate = wf.getframerate()
|
||||
frames = wf.getnframes()
|
||||
self.channels = data.shape[1]
|
||||
|
||||
# Read all frames from the file
|
||||
raw_data = wf.readframes(frames)
|
||||
if original_sample_rate != self.sample_rate:
|
||||
print(f"Resampling {self.file_path} from {original_sample_rate}Hz to {self.sample_rate}Hz")
|
||||
data = resample(data, original_sample_rate, self.sample_rate)
|
||||
|
||||
data = get_np_array(sample_width, raw_data)
|
||||
if self.normalize is not None:
|
||||
current_rms = get_average_volume_rms(data)
|
||||
if current_rms > 0: # Avoid division by zero
|
||||
target_rms = self.normalize
|
||||
rms_scale_factor = target_rms / current_rms
|
||||
data *= rms_scale_factor
|
||||
|
||||
# Reshape for multi-channel audio
|
||||
if self.channels > 1:
|
||||
data = data.reshape(-1, self.channels)
|
||||
|
||||
# Resample if needed
|
||||
if original_sample_rate != self.sample_rate:
|
||||
print(f"Resampling {self.file_path} from {original_sample_rate}Hz to {self.sample_rate}Hz")
|
||||
data = resample(data, original_sample_rate, self.sample_rate)
|
||||
|
||||
self.data = data
|
||||
self.data = data
|
||||
|
||||
def play(self):
|
||||
self.position = 0
|
||||
@@ -151,6 +149,12 @@ class Sound:
|
||||
self.is_playing = True
|
||||
self.is_paused = False
|
||||
|
||||
def normalize_vol(self, rms: float):
|
||||
self.normalize = rms
|
||||
if self.data is not None:
|
||||
self.data = None
|
||||
self.load()
|
||||
|
||||
def get_frames(self, num_frames):
|
||||
"""Get the next num_frames of audio data, applying volume, pitch, and pan"""
|
||||
if self.data is None:
|
||||
@@ -180,7 +184,7 @@ class Sound:
|
||||
|
||||
if self.channels == 1:
|
||||
output = zeros(num_frames, dtype=float32)
|
||||
output[:frames_to_get] = self.data[self.position:self.position+frames_to_get]
|
||||
output[:frames_to_get] = self.data[self.position:self.position+frames_to_get].flatten()
|
||||
else:
|
||||
output = zeros((num_frames, self.channels), dtype=float32)
|
||||
output[:frames_to_get] = self.data[self.position:self.position+frames_to_get]
|
||||
@@ -199,7 +203,7 @@ class Sound:
|
||||
return output
|
||||
|
||||
class Music:
|
||||
def __init__(self, file_path, data=None, file_type=None, target_sample_rate=44100):
|
||||
def __init__(self, file_path: Path, data=None, file_type=None, target_sample_rate=44100):
|
||||
self.file_path = file_path
|
||||
self.file_type = file_type
|
||||
self.data = data
|
||||
@@ -209,12 +213,12 @@ class Music:
|
||||
self.position = 0 # In frames
|
||||
self.is_playing = False
|
||||
self.is_paused = False
|
||||
self.volume = 1.0
|
||||
self.volume = 0.75
|
||||
self.pan = 0.5 # Center
|
||||
self.total_frames = 0
|
||||
self.valid = False
|
||||
self.normalize = None
|
||||
|
||||
self.wave_file = None
|
||||
self.file_buffer_size = int(target_sample_rate * 5) # 5 seconds buffer
|
||||
self.buffer = None
|
||||
self.buffer_position = 0
|
||||
@@ -226,39 +230,31 @@ class Music:
|
||||
|
||||
def load_from_file(self):
|
||||
"""Load music from file"""
|
||||
if self.file_path.endswith('.ogg'):
|
||||
audio = AudioSegment.from_ogg(self.file_path)
|
||||
wav_io = io.BytesIO()
|
||||
audio.export(wav_io, format="wav")
|
||||
wav_io.seek(0)
|
||||
file_path = wav_io
|
||||
else:
|
||||
file_path = self.file_path
|
||||
try:
|
||||
# Keep the file open for streaming
|
||||
self.wave_file = wave.open(file_path, 'rb')
|
||||
# soundfile handles OGG, WAV, FLAC, etc. natively
|
||||
self.sound_file = sf.SoundFile(str(self.file_path))
|
||||
|
||||
# Get file properties
|
||||
self.channels = self.wave_file.getnchannels()
|
||||
self.sample_width = self.wave_file.getsampwidth()
|
||||
self.sample_rate = self.wave_file.getframerate()
|
||||
self.total_frames = self.wave_file.getnframes()
|
||||
self.channels = self.sound_file.channels
|
||||
self.sample_width = 2 if self.sound_file.subtype in ['PCM_16', 'VORBIS'] else 4 # Most common
|
||||
self.sample_rate = self.sound_file.samplerate
|
||||
self.total_frames = len(self.sound_file)
|
||||
|
||||
# Initialize buffer with some initial data
|
||||
self._fill_buffer()
|
||||
|
||||
self.valid = True
|
||||
print(f"Music loaded: {self.channels} channels, {self.sample_rate}Hz, {self.total_frames} frames")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error loading music file: {e}")
|
||||
if self.wave_file:
|
||||
self.wave_file.close()
|
||||
self.wave_file = None
|
||||
if hasattr(self, 'sound_file') and self.sound_file:
|
||||
self.sound_file.close()
|
||||
self.sound_file = None
|
||||
self.valid = False
|
||||
|
||||
def _fill_buffer(self):
|
||||
"""Fill the streaming buffer from file"""
|
||||
if not self.wave_file:
|
||||
if not self.sound_file:
|
||||
return False
|
||||
|
||||
# Read a chunk of frames from file
|
||||
@@ -267,18 +263,31 @@ class Music:
|
||||
if frames_to_read <= 0:
|
||||
return False
|
||||
|
||||
raw_data = self.wave_file.readframes(frames_to_read)
|
||||
# Read data directly as numpy array (float64 by default)
|
||||
data = self.sound_file.read(frames_to_read)
|
||||
|
||||
data = get_np_array(self.sample_width, raw_data)
|
||||
# Convert to float32 if needed (soundfile returns float64 by default)
|
||||
if data.dtype != float32:
|
||||
data = data.astype(float32)
|
||||
|
||||
# Reshape for multi-channel audio
|
||||
if self.channels > 1:
|
||||
data = data.reshape(-1, self.channels)
|
||||
# Ensure proper shape for mono audio
|
||||
if self.channels == 1 and data.ndim == 1:
|
||||
data = data.reshape(-1, 1)
|
||||
elif self.channels == 1 and data.ndim == 2:
|
||||
data = data[:, 0].reshape(-1, 1) # Take first channel if stereo file but expecting mono
|
||||
|
||||
# Resample if needed
|
||||
if self.sample_rate != self.target_sample_rate:
|
||||
print(f"Resampling {self.file_path} from {self.sample_rate}Hz to {self.target_sample_rate}Hz")
|
||||
data = resample(data, self.sample_rate, self.target_sample_rate)
|
||||
|
||||
if self.normalize is not None:
|
||||
current_rms = get_average_volume_rms(data)
|
||||
if current_rms > 0: # Avoid division by zero
|
||||
target_rms = self.normalize
|
||||
rms_scale_factor = target_rms / current_rms
|
||||
data *= rms_scale_factor
|
||||
|
||||
self.buffer = data
|
||||
self.buffer_position = 0
|
||||
return True
|
||||
@@ -295,7 +304,7 @@ class Music:
|
||||
# Check if we need to refill the buffer
|
||||
if self.buffer is None:
|
||||
raise Exception("buffer is None")
|
||||
if self.wave_file and self.buffer_position >= len(self.buffer):
|
||||
if self.sound_file and self.buffer_position >= len(self.buffer):
|
||||
if not self._fill_buffer():
|
||||
self.is_playing = False
|
||||
|
||||
@@ -303,8 +312,8 @@ class Music:
|
||||
"""Start playing the music stream"""
|
||||
with self.lock:
|
||||
# Reset position if at the end
|
||||
if self.wave_file and self.position >= self.total_frames:
|
||||
self.wave_file.rewind()
|
||||
if self.sound_file and self.position >= self.total_frames:
|
||||
self.sound_file.seek(0) # Reset to beginning
|
||||
self.position = 0
|
||||
self.buffer_position = 0
|
||||
self._fill_buffer()
|
||||
@@ -319,8 +328,8 @@ class Music:
|
||||
self.is_paused = False
|
||||
self.position = 0
|
||||
self.buffer_position = 0
|
||||
if self.wave_file:
|
||||
self.wave_file.rewind()
|
||||
if self.sound_file:
|
||||
self.sound_file.seek(0) # Reset to beginning
|
||||
self._fill_buffer()
|
||||
|
||||
def pause(self):
|
||||
@@ -347,8 +356,8 @@ class Music:
|
||||
frame_position = max(0, min(frame_position, self.total_frames - 1))
|
||||
|
||||
# Update file position if streaming from file
|
||||
if self.wave_file:
|
||||
self.wave_file.setpos(frame_position)
|
||||
if self.sound_file:
|
||||
self.sound_file.seek(frame_position)
|
||||
self._fill_buffer()
|
||||
|
||||
self.position = frame_position
|
||||
@@ -377,7 +386,7 @@ class Music:
|
||||
# Check if we need more data
|
||||
if self.buffer_position >= len(self.buffer):
|
||||
# If no more data available and streaming from file
|
||||
if self.wave_file and not self._fill_buffer():
|
||||
if self.sound_file and not self._fill_buffer():
|
||||
self.is_playing = False
|
||||
if self.channels == 1:
|
||||
return zeros(num_frames, dtype=float32)
|
||||
@@ -393,7 +402,7 @@ class Music:
|
||||
|
||||
if self.channels == 1:
|
||||
output = zeros(num_frames, dtype=float32)
|
||||
output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get]
|
||||
output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get].flatten()
|
||||
else:
|
||||
output = zeros((num_frames, self.channels), dtype=float32)
|
||||
output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get]
|
||||
@@ -418,9 +427,9 @@ class Music:
|
||||
|
||||
def __del__(self):
|
||||
"""Cleanup when the music object is deleted"""
|
||||
if self.wave_file:
|
||||
if hasattr(self, 'sound_file') and self.sound_file:
|
||||
try:
|
||||
self.wave_file.close()
|
||||
self.sound_file.close()
|
||||
except Exception:
|
||||
raise Exception("unable to close music stream")
|
||||
|
||||
@@ -428,12 +437,12 @@ class AudioEngine:
|
||||
def __init__(self, type: str):
|
||||
self.target_sample_rate = 44100
|
||||
self.buffer_size = 10
|
||||
self.sounds = {}
|
||||
self.sounds: dict[str, Sound] = {}
|
||||
self.music_streams = {}
|
||||
self.stream = None
|
||||
self.device_id = None
|
||||
self.running = False
|
||||
self.sound_queue = queue.Queue()
|
||||
self.sound_queue: queue.Queue[str] = queue.Queue()
|
||||
self.music_queue = queue.Queue()
|
||||
self.master_volume = 1.0
|
||||
self.output_channels = 2 # Default to stereo
|
||||
@@ -532,20 +541,21 @@ class AudioEngine:
|
||||
sound_data = sound.get_frames(frames)
|
||||
|
||||
# If mono sound but stereo output, duplicate to both channels
|
||||
if sound.channels == 1 and self.output_channels > 1:
|
||||
sound_data = column_stack([sound_data] * self.output_channels)
|
||||
if isinstance(sound_data, ndarray):
|
||||
if sound.channels == 1 and self.output_channels > 1:
|
||||
sound_data = column_stack([sound_data] * self.output_channels)
|
||||
|
||||
# Ensure sound_data matches the output format
|
||||
if sound.channels > self.output_channels:
|
||||
# Down-mix if needed
|
||||
if self.output_channels == 1:
|
||||
sound_data = mean(sound_data, axis=1)
|
||||
else:
|
||||
# Keep only the first output_channels
|
||||
sound_data = sound_data[:, :self.output_channels]
|
||||
# Ensure sound_data matches the output format
|
||||
if sound.channels > self.output_channels:
|
||||
# Down-mix if needed
|
||||
if self.output_channels == 1:
|
||||
sound_data = mean(sound_data, axis=1)
|
||||
else:
|
||||
# Keep only the first output_channels
|
||||
sound_data = sound_data[:, :self.output_channels]
|
||||
|
||||
# Add to the mix (simple additive mixing)
|
||||
output += sound_data
|
||||
# Add to the mix (simple additive mixing)
|
||||
output += sound_data
|
||||
|
||||
# Mix music streams
|
||||
for music_name, music in self.music_streams.items():
|
||||
@@ -655,7 +665,7 @@ class AudioEngine:
|
||||
def get_master_volume(self) -> float:
|
||||
return self.master_volume
|
||||
|
||||
def load_sound(self, fileName: str) -> str:
|
||||
def load_sound(self, fileName: Path) -> str:
|
||||
sound = Sound(fileName, target_sample_rate=self.target_sample_rate)
|
||||
sound_id = f"sound_{len(self.sounds)}"
|
||||
self.sounds[sound_id] = sound
|
||||
@@ -678,6 +688,17 @@ class AudioEngine:
|
||||
if sound in self.sounds:
|
||||
self.sounds[sound].resume()
|
||||
|
||||
def unload_sound(self, sound: str):
|
||||
if sound in self.sounds:
|
||||
del self.sounds[sound]
|
||||
|
||||
def normalize_sound(self, sound: str, rms: float):
|
||||
if sound in self.sounds:
|
||||
self.sounds[sound].normalize_vol(rms)
|
||||
|
||||
def is_sound_valid(self, sound: str) -> bool:
|
||||
return sound in self.music_streams
|
||||
|
||||
def is_sound_playing(self, sound: str) -> bool:
|
||||
if sound in self.sounds:
|
||||
return self.sounds[sound].is_playing
|
||||
@@ -691,7 +712,7 @@ class AudioEngine:
|
||||
if sound in self.sounds:
|
||||
self.sounds[sound].pan = max(0.0, min(1.0, pan))
|
||||
|
||||
def load_music_stream(self, fileName: str) -> str:
|
||||
def load_music_stream(self, fileName: Path) -> str:
|
||||
music = Music(file_path=fileName, target_sample_rate=self.target_sample_rate)
|
||||
music_id = f"music_{len(self.music_streams)}"
|
||||
self.music_streams[music_id] = music
|
||||
@@ -744,6 +765,10 @@ class AudioEngine:
|
||||
if music in self.music_streams:
|
||||
self.music_streams[music].pan = max(0.0, min(1.0, pan))
|
||||
|
||||
def normalize_music_stream(self, music: str, rms: float):
|
||||
if music in self.music_streams:
|
||||
self.music_streams[music].normalize = rms
|
||||
|
||||
def get_music_time_length(self, music: str) -> float:
|
||||
if music in self.music_streams:
|
||||
return self.music_streams[music].get_time_length()
|
||||
@@ -755,4 +780,3 @@ class AudioEngine:
|
||||
raise ValueError(f"Music stream {music} not initialized")
|
||||
|
||||
audio = AudioEngine(get_config()["audio"]["device_type"])
|
||||
audio.set_master_volume(0.75)
|
||||
|
||||
Reference in New Issue
Block a user