mirror of
https://github.com/Yonokid/PyTaiko.git
synced 2026-02-04 19:50:12 +01:00
thank you claude for the music class refactor
This commit is contained in:
391
libs/audio.py
391
libs/audio.py
@@ -202,199 +202,200 @@ class Sound:
|
|||||||
output[:, 1] *= right_vol
|
output[:, 1] *= right_vol
|
||||||
return output
|
return output
|
||||||
|
|
||||||
|
|
||||||
class Music:
|
class Music:
|
||||||
def __init__(self, file_path: Path, data: Optional[ndarray]=None, target_sample_rate: int=44100, sample_rate: int =44100, preview: Optional[float]=None, normalize: Optional[float]=None):
|
BUFFER_DURATION = 5.0 # seconds
|
||||||
|
def __init__(self, file_path: Path, data: Optional[ndarray] = None,
|
||||||
|
target_sample_rate: int = 44100, sample_rate: int = 44100,
|
||||||
|
preview: Optional[float] = None, normalize: Optional[float] = None):
|
||||||
|
# Core properties
|
||||||
self.file_path = file_path
|
self.file_path = file_path
|
||||||
self.data = data
|
self.data = data
|
||||||
self.target_sample_rate = target_sample_rate
|
self.target_sample_rate = target_sample_rate
|
||||||
self.sample_rate = sample_rate
|
self.sample_rate = sample_rate
|
||||||
self.channels = 0
|
self.normalize = normalize
|
||||||
self.position = 0 # In frames (original sample rate)
|
self.preview = preview
|
||||||
|
|
||||||
|
# Playback state
|
||||||
|
self.position = 0
|
||||||
self.is_playing = False
|
self.is_playing = False
|
||||||
self.is_paused = False
|
self.is_paused = False
|
||||||
self.volume = 0.75
|
self.volume = 0.75
|
||||||
self.pan = 0.5 # Center
|
self.pan = 0.5
|
||||||
|
|
||||||
|
# Audio properties (set during loading)
|
||||||
|
self.channels = 0
|
||||||
self.total_frames = 0
|
self.total_frames = 0
|
||||||
self.valid = False
|
self.valid = False
|
||||||
self.normalize = normalize
|
|
||||||
self.preview = preview # Preview start time in seconds
|
|
||||||
self.is_preview_mode = preview is not None
|
|
||||||
|
|
||||||
self.preview_start_frame = 0
|
# Buffering
|
||||||
self.preview_end_frame = 0
|
self.buffer_size = int(target_sample_rate * self.BUFFER_DURATION)
|
||||||
self.uses_file_streaming = False
|
|
||||||
|
|
||||||
self.file_buffer_size = int(target_sample_rate * 5) # 5 seconds buffer
|
|
||||||
self.buffer = None
|
self.buffer = None
|
||||||
self.buffer_position = 0
|
self.buffer_position = 0
|
||||||
|
|
||||||
# Thread-safe updates
|
# File streaming
|
||||||
self.lock = Lock()
|
|
||||||
self.sound_file = None
|
self.sound_file = None
|
||||||
|
|
||||||
if self.file_path.exists():
|
|
||||||
self.load_from_file()
|
|
||||||
else:
|
|
||||||
self.load_from_memory()
|
|
||||||
|
|
||||||
def load_from_memory(self) -> None:
|
|
||||||
"""Load music from in-memory numpy array"""
|
|
||||||
try:
|
|
||||||
if self.data is None:
|
|
||||||
raise Exception("No data provided for memory loading")
|
|
||||||
|
|
||||||
if self.data.dtype != float32:
|
|
||||||
self.data = self.data.astype(float32)
|
|
||||||
|
|
||||||
if self.sample_rate != self.target_sample_rate:
|
|
||||||
print(f"Resampling {self.file_path} from {self.sample_rate}Hz to {self.target_sample_rate}Hz")
|
|
||||||
self.data = resample(self.data, self.sample_rate, self.target_sample_rate)
|
|
||||||
|
|
||||||
if self.normalize is not None:
|
|
||||||
current_rms = get_average_volume_rms(self.data)
|
|
||||||
if current_rms > 0: # Avoid division by zero
|
|
||||||
target_rms = self.normalize
|
|
||||||
rms_scale_factor = target_rms / current_rms
|
|
||||||
self.data *= rms_scale_factor
|
|
||||||
|
|
||||||
if self.data.ndim == 1:
|
|
||||||
self.channels = 1
|
|
||||||
self.total_frames = len(self.data)
|
|
||||||
self.data = self.data.reshape(-1, 1)
|
|
||||||
else:
|
|
||||||
self.channels = self.data.shape[1]
|
|
||||||
self.total_frames = self.data.shape[0]
|
|
||||||
|
|
||||||
self.sample_width = 4 # float32
|
|
||||||
self.uses_file_streaming = False
|
self.uses_file_streaming = False
|
||||||
self._fill_buffer()
|
self.preview_start_frame = 0
|
||||||
self.valid = True
|
self.preview_end_frame = 0
|
||||||
print(f"Music loaded from memory: {self.channels} channels, {self.sample_rate}Hz, {self.total_frames} frames")
|
|
||||||
|
|
||||||
|
# Thread safety
|
||||||
|
self.lock = Lock()
|
||||||
|
|
||||||
|
# Load audio
|
||||||
|
self._load_audio()
|
||||||
|
|
||||||
|
def _load_audio(self):
|
||||||
|
"""Load audio from file or memory"""
|
||||||
|
try:
|
||||||
|
if self.file_path.exists():
|
||||||
|
self._load_from_file()
|
||||||
|
else:
|
||||||
|
self._load_from_memory()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error loading music from memory: {e}")
|
print(f"Error loading audio: {e}")
|
||||||
self.valid = False
|
self.valid = False
|
||||||
|
|
||||||
def load_from_file(self) -> None:
|
def _load_from_file(self):
|
||||||
"""Load music from file"""
|
"""Load and setup file streaming"""
|
||||||
try:
|
|
||||||
self.sound_file = sf.SoundFile(str(self.file_path))
|
self.sound_file = sf.SoundFile(str(self.file_path))
|
||||||
|
|
||||||
# Get file properties
|
|
||||||
self.channels = self.sound_file.channels
|
self.channels = self.sound_file.channels
|
||||||
self.sample_width = 2 if self.sound_file.subtype in ['PCM_16', 'VORBIS'] else 4
|
|
||||||
self.sample_rate = self.sound_file.samplerate
|
self.sample_rate = self.sound_file.samplerate
|
||||||
original_total_frames = self.sound_file.frames
|
original_frames = self.sound_file.frames
|
||||||
|
|
||||||
if self.is_preview_mode:
|
self.preview_start_frame, self.preview_end_frame = self._calculate_preview_bounds(original_frames)
|
||||||
self.preview_start_frame = int(self.preview * self.sample_rate)
|
|
||||||
available_frames = original_total_frames - self.preview_start_frame
|
|
||||||
self.preview_end_frame = min(self.preview_start_frame + available_frames, original_total_frames)
|
|
||||||
if self.preview_start_frame >= original_total_frames:
|
|
||||||
self.preview_start_frame = max(0, original_total_frames - available_frames)
|
|
||||||
self.preview_end_frame = original_total_frames
|
|
||||||
self.total_frames = self.preview_end_frame - self.preview_start_frame
|
self.total_frames = self.preview_end_frame - self.preview_start_frame
|
||||||
self.sound_file.seek(self.preview_start_frame)
|
|
||||||
|
if self.preview is not None:
|
||||||
self.uses_file_streaming = True
|
self.uses_file_streaming = True
|
||||||
self.data = None # Don't store full data in memory
|
self.sound_file.seek(self.preview_start_frame)
|
||||||
|
self.data = None
|
||||||
print(f"Preview mode: Streaming {self.total_frames} frames ({self.total_frames/self.sample_rate:.2f}s) starting at {self.preview:.2f}s")
|
print(f"Preview mode: Streaming {self.total_frames} frames starting at {self.preview:.2f}s")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self.data = self.sound_file.read()
|
self.data = self.sound_file.read()
|
||||||
self.total_frames = original_total_frames
|
self.data = self._process_audio_data(self.data)
|
||||||
self.uses_file_streaming = False
|
self.uses_file_streaming = False
|
||||||
|
|
||||||
self.load_from_memory()
|
|
||||||
return # Early return to avoid duplicate processing
|
|
||||||
|
|
||||||
self._fill_buffer()
|
self._fill_buffer()
|
||||||
self.valid = True
|
self.valid = True
|
||||||
|
print(f"Music loaded: {self.channels} channels, {self.sample_rate}Hz, {self.time_length:.2f}s")
|
||||||
|
|
||||||
if self.is_preview_mode:
|
def _load_from_memory(self):
|
||||||
print(f"Music preview streaming: {self.channels} channels, {self.sample_rate}Hz, {self.total_frames} frames ({self.get_time_length():.2f}s)")
|
"""Load from in-memory numpy array"""
|
||||||
else:
|
if self.data is None:
|
||||||
print(f"Music loaded: {self.channels} channels, {self.sample_rate}Hz, {self.total_frames} frames")
|
raise ValueError("No data provided for memory loading")
|
||||||
|
|
||||||
except Exception as e:
|
self.data = self._process_audio_data(self.data)
|
||||||
print(f"Error loading music file: {e}")
|
self.channels = 1 if self.data.ndim == 1 else self.data.shape[1]
|
||||||
if hasattr(self, 'sound_file') and self.sound_file:
|
self.total_frames = len(self.data) if self.data.ndim == 1 else self.data.shape[0]
|
||||||
self.sound_file.close()
|
self.uses_file_streaming = False
|
||||||
self.sound_file = None
|
|
||||||
self.valid = False
|
|
||||||
|
|
||||||
def _fill_buffer(self) -> bool:
|
self._fill_buffer()
|
||||||
"""Fill buffer from either memory or file stream"""
|
self.valid = True
|
||||||
try:
|
print(f"Music loaded from memory: {self.channels} channels, {self.total_frames} frames")
|
||||||
|
|
||||||
|
def _process_audio_data(self, data):
|
||||||
|
"""Apply resampling, normalization, and ensure correct format"""
|
||||||
|
if data.dtype != float32:
|
||||||
|
data = data.astype(float32)
|
||||||
|
|
||||||
|
if self.sample_rate != self.target_sample_rate:
|
||||||
|
print(f"Resampling from {self.sample_rate}Hz to {self.target_sample_rate}Hz")
|
||||||
|
data = resample(data, self.sample_rate, self.target_sample_rate)
|
||||||
|
|
||||||
|
if self.normalize is not None:
|
||||||
|
current_rms = get_average_volume_rms(data)
|
||||||
|
if current_rms > 0:
|
||||||
|
data *= self.normalize / current_rms
|
||||||
|
|
||||||
|
if data.ndim == 1:
|
||||||
|
data = data.reshape(-1, 1)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
def _calculate_preview_bounds(self, total_frames):
|
||||||
|
"""Calculate start and end frames for preview mode"""
|
||||||
|
if self.preview is None:
|
||||||
|
return 0, total_frames
|
||||||
|
|
||||||
|
start_frame = int(self.preview * self.sample_rate)
|
||||||
|
start_frame = max(0, min(start_frame, total_frames - 1))
|
||||||
|
end_frame = total_frames
|
||||||
|
|
||||||
|
return start_frame, end_frame
|
||||||
|
|
||||||
|
def _fill_buffer(self):
|
||||||
|
"""Fill the audio buffer from appropriate source"""
|
||||||
if self.uses_file_streaming:
|
if self.uses_file_streaming:
|
||||||
return self._fill_buffer_from_file()
|
return self._fill_buffer_from_file()
|
||||||
else:
|
else:
|
||||||
return self._fill_buffer_from_memory()
|
return self._fill_buffer_from_memory()
|
||||||
except Exception as e:
|
|
||||||
print(f"Error filling buffer: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _fill_buffer_from_memory(self) -> bool:
|
def _fill_buffer_from_file(self):
|
||||||
"""Fill buffer from in-memory data"""
|
"""Stream buffer from file"""
|
||||||
if self.data is None:
|
|
||||||
return False
|
|
||||||
|
|
||||||
start_frame = self.position + self.buffer_position
|
|
||||||
end_frame = min(start_frame + self.file_buffer_size, self.total_frames)
|
|
||||||
|
|
||||||
if start_frame >= self.total_frames:
|
|
||||||
return False
|
|
||||||
|
|
||||||
data_chunk = self.data[start_frame:end_frame]
|
|
||||||
|
|
||||||
self.buffer = data_chunk
|
|
||||||
self.position += self.buffer_position
|
|
||||||
self.buffer_position = 0
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _fill_buffer_from_file(self) -> bool:
|
|
||||||
"""Fill buffer by streaming from file"""
|
|
||||||
if not self.sound_file:
|
if not self.sound_file:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
current_absolute_position = self.preview_start_frame + self.position
|
current_pos = self.preview_start_frame + self.position
|
||||||
if current_absolute_position >= self.preview_end_frame:
|
if current_pos >= self.preview_end_frame:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
frames_to_read = min(self.file_buffer_size, self.preview_end_frame - current_absolute_position)
|
frames_to_read = min(self.buffer_size, self.preview_end_frame - current_pos)
|
||||||
|
|
||||||
if frames_to_read <= 0:
|
if frames_to_read <= 0:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
self.sound_file.seek(current_absolute_position)
|
self.sound_file.seek(current_pos)
|
||||||
|
|
||||||
data_chunk = self.sound_file.read(frames_to_read)
|
data_chunk = self.sound_file.read(frames_to_read)
|
||||||
|
|
||||||
if len(data_chunk) == 0:
|
if len(data_chunk) == 0:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
original_frames = len(data_chunk) if data_chunk.ndim == 1 else data_chunk.shape[0]
|
original_frames = len(data_chunk) if data_chunk.ndim == 1 else data_chunk.shape[0]
|
||||||
if self.sample_rate != self.target_sample_rate:
|
|
||||||
data_chunk = resample(data_chunk, self.sample_rate, self.target_sample_rate)
|
|
||||||
|
|
||||||
if self.normalize is not None:
|
data_chunk = self._process_audio_data(data_chunk)
|
||||||
current_rms = get_average_volume_rms(data_chunk)
|
|
||||||
if current_rms > 0:
|
|
||||||
target_rms = self.normalize
|
|
||||||
rms_scale_factor = target_rms / current_rms
|
|
||||||
data_chunk *= rms_scale_factor
|
|
||||||
|
|
||||||
if data_chunk.ndim == 1 and self.channels > 1:
|
|
||||||
data_chunk = data_chunk.reshape(-1, self.channels)
|
|
||||||
elif data_chunk.ndim == 2 and self.channels == 1:
|
|
||||||
data_chunk = data_chunk.flatten().reshape(-1, 1)
|
|
||||||
|
|
||||||
self.buffer = data_chunk
|
self.buffer = data_chunk
|
||||||
self.position += original_frames
|
|
||||||
self.buffer_position = 0
|
self.buffer_position = 0
|
||||||
|
self.position += original_frames
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def update(self) -> None:
|
def _fill_buffer_from_memory(self):
|
||||||
|
"""Fill buffer from in-memory data"""
|
||||||
|
if self.data is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
start_frame = self.position + self.buffer_position
|
||||||
|
end_frame = min(start_frame + self.buffer_size, self.total_frames)
|
||||||
|
|
||||||
|
if start_frame >= self.total_frames:
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.buffer = self.data[start_frame:end_frame]
|
||||||
|
self.position += self.buffer_position
|
||||||
|
self.buffer_position = 0
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
@property
|
||||||
|
def time_length(self):
|
||||||
|
"""Total length in seconds"""
|
||||||
|
return self.total_frames / self.target_sample_rate
|
||||||
|
|
||||||
|
@property
|
||||||
|
def time_played(self):
|
||||||
|
"""Current position in seconds (relative to preview start)"""
|
||||||
|
return (self.position + self.buffer_position) / self.target_sample_rate
|
||||||
|
|
||||||
|
@property
|
||||||
|
def actual_time_played(self):
|
||||||
|
"""Actual position in original file (including preview offset)"""
|
||||||
|
base_time = self.time_played
|
||||||
|
if self.preview is not None:
|
||||||
|
return base_time + self.preview
|
||||||
|
return base_time
|
||||||
|
|
||||||
|
def update(self):
|
||||||
"""Update music stream buffers"""
|
"""Update music stream buffers"""
|
||||||
if not self.is_playing or self.is_paused:
|
if not self.is_playing or self.is_paused:
|
||||||
return
|
return
|
||||||
@@ -405,125 +406,121 @@ class Music:
|
|||||||
if self.buffer_position >= len(self.buffer):
|
if self.buffer_position >= len(self.buffer):
|
||||||
self.is_playing = self._fill_buffer()
|
self.is_playing = self._fill_buffer()
|
||||||
|
|
||||||
def play(self) -> None:
|
def play(self):
|
||||||
"""Start playing the music stream"""
|
"""Start playing"""
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if self.position >= self.total_frames:
|
if self.position >= self.total_frames:
|
||||||
self.position = 0
|
self._reset_position()
|
||||||
self.buffer_position = 0
|
|
||||||
if self.sound_file and self.uses_file_streaming:
|
|
||||||
self.sound_file.seek(self.preview_start_frame)
|
|
||||||
self._fill_buffer()
|
|
||||||
|
|
||||||
self.is_playing = True
|
self.is_playing = True
|
||||||
self.is_paused = False
|
self.is_paused = False
|
||||||
|
|
||||||
def stop(self) -> None:
|
def stop(self):
|
||||||
"""Stop playing the music stream"""
|
"""Stop playing"""
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.is_playing = False
|
self.is_playing = False
|
||||||
self.is_paused = False
|
self.is_paused = False
|
||||||
self.position = 0
|
self._reset_position()
|
||||||
self.buffer_position = 0
|
|
||||||
if self.sound_file and self.uses_file_streaming:
|
|
||||||
self.sound_file.seek(self.preview_start_frame)
|
|
||||||
self._fill_buffer()
|
|
||||||
|
|
||||||
def pause(self) -> None:
|
def pause(self):
|
||||||
"""Pause the music playback"""
|
"""Pause playback"""
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if self.is_playing:
|
if self.is_playing:
|
||||||
self.is_paused = True
|
self.is_paused = True
|
||||||
self.is_playing = False
|
self.is_playing = False
|
||||||
|
|
||||||
def resume(self) -> None:
|
def resume(self):
|
||||||
"""Resume the music playback"""
|
"""Resume playback"""
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if self.is_paused:
|
if self.is_paused:
|
||||||
self.is_playing = True
|
self.is_playing = True
|
||||||
self.is_paused = False
|
self.is_paused = False
|
||||||
|
|
||||||
def seek(self, position_seconds) -> None:
|
def seek(self, position_seconds):
|
||||||
"""Seek to a specific position in seconds (relative to preview start if in preview mode)"""
|
"""Seek to position in seconds"""
|
||||||
with self.lock:
|
with self.lock:
|
||||||
frame_position = int(position_seconds * self.target_sample_rate)
|
frame_position = int(position_seconds * self.target_sample_rate)
|
||||||
frame_position = max(0, min(frame_position, self.total_frames - 1))
|
frame_position = max(0, min(frame_position, self.total_frames - 1))
|
||||||
|
|
||||||
|
if self.sound_file:
|
||||||
|
actual_pos = frame_position
|
||||||
|
if self.preview is not None:
|
||||||
|
actual_pos += int(self.preview * self.sample_rate)
|
||||||
|
self.sound_file.seek(actual_pos)
|
||||||
|
|
||||||
self.position = frame_position
|
self.position = frame_position
|
||||||
self.buffer_position = 0
|
self.buffer_position = 0
|
||||||
|
|
||||||
self._fill_buffer()
|
self._fill_buffer()
|
||||||
|
|
||||||
def get_time_length(self) -> float:
|
def _reset_position(self):
|
||||||
"""Get the total length of the music in seconds (preview length if in preview mode)"""
|
"""Reset to beginning"""
|
||||||
return self.total_frames / self.target_sample_rate
|
self.position = 0
|
||||||
|
self.buffer_position = 0
|
||||||
|
if self.sound_file:
|
||||||
|
seek_pos = self.preview_start_frame if self.preview is not None else 0
|
||||||
|
self.sound_file.seek(seek_pos)
|
||||||
|
self._fill_buffer()
|
||||||
|
|
||||||
def get_time_played(self) -> float:
|
def get_frames(self, num_frames):
|
||||||
"""Get the current playback position in seconds (relative to preview start if in preview mode)"""
|
"""Get next audio frames with volume and panning applied"""
|
||||||
return (self.position + self.buffer_position) / self.target_sample_rate
|
|
||||||
|
|
||||||
def get_actual_time_played(self) -> float:
|
|
||||||
"""Get the actual playback position in the original file (including preview offset)"""
|
|
||||||
base_time = (self.position + self.buffer_position) / self.target_sample_rate
|
|
||||||
if self.is_preview_mode and self.preview is not None:
|
|
||||||
return base_time + self.preview
|
|
||||||
return base_time
|
|
||||||
|
|
||||||
def get_frames(self, num_frames) -> ndarray:
|
|
||||||
"""Get the next num_frames of music data, applying volume, pitch, and pan"""
|
|
||||||
if not self.is_playing:
|
if not self.is_playing:
|
||||||
# Return silence if not playing
|
return self._get_silence(num_frames)
|
||||||
if self.channels == 1:
|
|
||||||
return zeros(num_frames, dtype=float32)
|
|
||||||
else:
|
|
||||||
return zeros((num_frames, self.channels), dtype=float32)
|
|
||||||
|
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if self.buffer is None:
|
if self.buffer is None:
|
||||||
return zeros(num_frames, dtype=float32)
|
return self._get_silence(num_frames)
|
||||||
|
|
||||||
if self.buffer_position >= len(self.buffer):
|
if self.buffer_position >= len(self.buffer):
|
||||||
if not self._fill_buffer():
|
if not self._fill_buffer():
|
||||||
self.is_playing = False
|
self.is_playing = False
|
||||||
|
return self._get_silence(num_frames)
|
||||||
|
|
||||||
|
frames_available = len(self.buffer) - self.buffer_position
|
||||||
|
if self.channels > 1:
|
||||||
|
frames_available = self.buffer.shape[0] - self.buffer_position
|
||||||
|
|
||||||
|
frames_to_get = min(num_frames, frames_available)
|
||||||
|
|
||||||
|
output = self._get_silence(num_frames)
|
||||||
|
|
||||||
|
if self.channels == 1:
|
||||||
|
output[:frames_to_get] = self.buffer[
|
||||||
|
self.buffer_position:self.buffer_position + frames_to_get
|
||||||
|
].flatten()
|
||||||
|
else:
|
||||||
|
output[:frames_to_get] = self.buffer[
|
||||||
|
self.buffer_position:self.buffer_position + frames_to_get
|
||||||
|
]
|
||||||
|
|
||||||
|
self.buffer_position += frames_to_get
|
||||||
|
return self._apply_effects(output)
|
||||||
|
|
||||||
|
def _get_silence(self, num_frames):
|
||||||
|
"""Return silence buffer"""
|
||||||
if self.channels == 1:
|
if self.channels == 1:
|
||||||
return zeros(num_frames, dtype=float32)
|
return zeros(num_frames, dtype=float32)
|
||||||
else:
|
else:
|
||||||
return zeros((num_frames, self.channels), dtype=float32)
|
return zeros((num_frames, self.channels), dtype=float32)
|
||||||
|
|
||||||
frames_left_in_buffer = len(self.buffer) - self.buffer_position
|
def _apply_effects(self, output):
|
||||||
if self.channels > 1:
|
"""Apply volume and panning effects"""
|
||||||
frames_left_in_buffer = self.buffer.shape[0] - self.buffer_position
|
|
||||||
|
|
||||||
frames_to_get = min(num_frames, frames_left_in_buffer)
|
|
||||||
|
|
||||||
if self.channels == 1:
|
|
||||||
output = zeros(num_frames, dtype=float32)
|
|
||||||
output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get].flatten()
|
|
||||||
else:
|
|
||||||
output = zeros((num_frames, self.channels), dtype=float32)
|
|
||||||
output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get]
|
|
||||||
|
|
||||||
self.buffer_position += frames_to_get
|
|
||||||
|
|
||||||
output *= self.volume
|
output *= self.volume
|
||||||
|
|
||||||
if self.channels == 2 and self.pan != 0.5:
|
if self.channels == 2 and self.pan != 0.5:
|
||||||
# pan=0: full left, pan=0.5: center, pan=1: full right
|
|
||||||
left_vol = min(1.0, 2.0 * (1.0 - self.pan))
|
left_vol = min(1.0, 2.0 * (1.0 - self.pan))
|
||||||
right_vol = min(1.0, 2.0 * self.pan)
|
right_vol = min(1.0, 2.0 * self.pan)
|
||||||
|
|
||||||
output[:, 0] *= left_vol
|
output[:, 0] *= left_vol
|
||||||
output[:, 1] *= right_vol
|
output[:, 1] *= right_vol
|
||||||
|
|
||||||
return output
|
return output
|
||||||
|
|
||||||
def __del__(self) -> None:
|
def __del__(self):
|
||||||
"""Cleanup when the music object is deleted"""
|
"""Cleanup"""
|
||||||
if hasattr(self, 'sound_file') and self.sound_file:
|
if hasattr(self, 'sound_file') and self.sound_file:
|
||||||
try:
|
try:
|
||||||
self.sound_file.close()
|
self.sound_file.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
raise Exception("unable to close music stream")
|
pass
|
||||||
|
|
||||||
class AudioEngine:
|
class AudioEngine:
|
||||||
def __init__(self, type: str) -> None:
|
def __init__(self, type: str) -> None:
|
||||||
@@ -570,7 +567,7 @@ class AudioEngine:
|
|||||||
device_info = sd.query_devices(self.device_id)
|
device_info = sd.query_devices(self.device_id)
|
||||||
if isinstance(device_info, sd.DeviceList):
|
if isinstance(device_info, sd.DeviceList):
|
||||||
raise Exception("Invalid ASIO Device")
|
raise Exception("Invalid ASIO Device")
|
||||||
print(f"Using default ASIO device: {device_info['name']}")
|
print(f"Using default audio device: {device_info['name']}")
|
||||||
self.buffer_size = rounded(device_info['default_low_output_latency']*1000)
|
self.buffer_size = rounded(device_info['default_low_output_latency']*1000)
|
||||||
if 'buffer_size' in get_config()['audio']:
|
if 'buffer_size' in get_config()['audio']:
|
||||||
self.buffer_size = get_config()['audio']['buffer_size']
|
self.buffer_size = get_config()['audio']['buffer_size']
|
||||||
|
|||||||
@@ -40,8 +40,6 @@ class GameScreen:
|
|||||||
self.height = height
|
self.height = height
|
||||||
self.current_ms = 0
|
self.current_ms = 0
|
||||||
self.screen_init = False
|
self.screen_init = False
|
||||||
self.movie = None
|
|
||||||
self.song_music = None
|
|
||||||
self.end_ms = 0
|
self.end_ms = 0
|
||||||
self.start_delay = 1000
|
self.start_delay = 1000
|
||||||
self.song_started = False
|
self.song_started = False
|
||||||
@@ -75,8 +73,8 @@ class GameScreen:
|
|||||||
self.movie = None
|
self.movie = None
|
||||||
session_data.song_title = self.tja.metadata.title.get(global_data.config['general']['language'].lower(), self.tja.metadata.title['en'])
|
session_data.song_title = self.tja.metadata.title.get(global_data.config['general']['language'].lower(), self.tja.metadata.title['en'])
|
||||||
if self.tja.metadata.wave.exists() and self.tja.metadata.wave.is_file():
|
if self.tja.metadata.wave.exists() and self.tja.metadata.wave.is_file():
|
||||||
self.song_music = audio.load_music_stream(self.tja.metadata.wave)
|
self.song_music = audio.load_sound(self.tja.metadata.wave)
|
||||||
audio.normalize_music_stream(self.song_music, 0.1935)
|
audio.normalize_sound(self.song_music, 0.1935)
|
||||||
|
|
||||||
self.player_1 = Player(self, global_data.player_num, difficulty)
|
self.player_1 = Player(self, global_data.player_num, difficulty)
|
||||||
if self.tja is not None:
|
if self.tja is not None:
|
||||||
@@ -85,6 +83,8 @@ class GameScreen:
|
|||||||
def on_screen_start(self):
|
def on_screen_start(self):
|
||||||
if not self.screen_init:
|
if not self.screen_init:
|
||||||
self.screen_init = True
|
self.screen_init = True
|
||||||
|
self.movie = None
|
||||||
|
self.song_music = None
|
||||||
tex.load_screen_textures('game')
|
tex.load_screen_textures('game')
|
||||||
self.background = Background(self.width, self.height, global_data.player_num)
|
self.background = Background(self.width, self.height, global_data.player_num)
|
||||||
self.load_sounds()
|
self.load_sounds()
|
||||||
@@ -102,8 +102,7 @@ class GameScreen:
|
|||||||
self.screen_init = False
|
self.screen_init = False
|
||||||
tex.unload_textures()
|
tex.unload_textures()
|
||||||
if self.song_music is not None:
|
if self.song_music is not None:
|
||||||
audio.unload_music_stream(self.song_music)
|
audio.unload_sound(self.song_music)
|
||||||
del self.song_music
|
|
||||||
self.song_started = False
|
self.song_started = False
|
||||||
self.end_ms = 0
|
self.end_ms = 0
|
||||||
self.movie = None
|
self.movie = None
|
||||||
@@ -146,8 +145,8 @@ class GameScreen:
|
|||||||
if self.tja is not None:
|
if self.tja is not None:
|
||||||
if (self.current_ms >= self.tja.metadata.offset*1000 + self.start_delay - global_data.config["general"]["judge_offset"]) and not self.song_started:
|
if (self.current_ms >= self.tja.metadata.offset*1000 + self.start_delay - global_data.config["general"]["judge_offset"]) and not self.song_started:
|
||||||
if self.song_music is not None:
|
if self.song_music is not None:
|
||||||
if not audio.is_music_stream_playing(self.song_music):
|
if not audio.is_sound_playing(self.song_music):
|
||||||
audio.play_music_stream(self.song_music)
|
audio.play_sound(self.song_music)
|
||||||
print(f"Song started at {self.current_ms}")
|
print(f"Song started at {self.current_ms}")
|
||||||
if self.movie is not None:
|
if self.movie is not None:
|
||||||
self.movie.start(get_current_ms())
|
self.movie.start(get_current_ms())
|
||||||
@@ -159,10 +158,6 @@ class GameScreen:
|
|||||||
|
|
||||||
self.player_1.update(self)
|
self.player_1.update(self)
|
||||||
self.song_info.update(get_current_ms())
|
self.song_info.update(get_current_ms())
|
||||||
|
|
||||||
if self.song_music is not None:
|
|
||||||
audio.update_music_stream(self.song_music)
|
|
||||||
|
|
||||||
self.result_transition.update(get_current_ms())
|
self.result_transition.update(get_current_ms())
|
||||||
if self.result_transition.is_finished:
|
if self.result_transition.is_finished:
|
||||||
return self.on_screen_end('RESULT')
|
return self.on_screen_end('RESULT')
|
||||||
@@ -180,14 +175,14 @@ class GameScreen:
|
|||||||
|
|
||||||
if ray.is_key_pressed(ray.KeyboardKey.KEY_F1):
|
if ray.is_key_pressed(ray.KeyboardKey.KEY_F1):
|
||||||
if self.song_music is not None:
|
if self.song_music is not None:
|
||||||
audio.stop_music_stream(self.song_music)
|
audio.stop_sound(self.song_music)
|
||||||
self.init_tja(global_data.selected_song, session_data.selected_difficulty)
|
self.init_tja(global_data.selected_song, session_data.selected_difficulty)
|
||||||
audio.play_sound(self.sound_restart)
|
audio.play_sound(self.sound_restart)
|
||||||
self.song_started = False
|
self.song_started = False
|
||||||
|
|
||||||
if ray.is_key_pressed(ray.KeyboardKey.KEY_ESCAPE):
|
if ray.is_key_pressed(ray.KeyboardKey.KEY_ESCAPE):
|
||||||
if self.song_music is not None:
|
if self.song_music is not None:
|
||||||
audio.stop_music_stream(self.song_music)
|
audio.stop_sound(self.song_music)
|
||||||
return self.on_screen_end('SONG_SELECT')
|
return self.on_screen_end('SONG_SELECT')
|
||||||
|
|
||||||
def draw(self):
|
def draw(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user