add song select, results, config, and asio support

This commit is contained in:
Yonokid
2025-04-21 02:14:21 -04:00
parent 8985be1145
commit 0012868d4e
13 changed files with 1612 additions and 676 deletions

133
libs/animation.py Normal file
View File

@@ -0,0 +1,133 @@
class Animation:
def __init__(self, current_ms, duration, type):
self.type = type
self.start_ms = current_ms
self.attribute = 0
self.duration = duration
self.params = dict()
self.is_finished = False
def update(self, current_ms):
if self.type == 'fade':
self.fade(current_ms,
self.duration,
initial_opacity=self.params.get('initial_opacity', 1.0),
final_opacity=self.params.get('final_opacity', 0.0),
delay=self.params.get('delay', 0.0),
ease_in=self.params.get('ease_in', None),
ease_out=self.params.get('ease_out', None))
if self.params.get('reverse', None) is not None and current_ms - self.start_ms >= self.duration + self.params.get('delay', 0.0):
self.fade(current_ms,
self.duration,
final_opacity=self.params.get('initial_opacity', 1.0),
initial_opacity=self.params.get('final_opacity', 0.0),
delay=self.params.get('delay', 0.0) + self.duration + self.params.get('reverse'),
ease_in=self.params.get('ease_in', None),
ease_out=self.params.get('ease_out', None))
elif self.type == 'move':
self.move(current_ms,
self.duration,
self.params['total_distance'],
self.params['start_position'],
delay=self.params.get('delay', 0.0))
elif self.type == 'texture_change':
self.texture_change(current_ms,
self.duration,
self.params['textures'])
elif self.type == 'text_stretch':
self.text_stretch(current_ms,
self.duration)
elif self.type == 'texture_resize':
self.texture_resize(current_ms,
self.duration,
initial_size=self.params.get('initial_size', 1.0),
final_size=self.params.get('final_size', 1.0),
delay=self.params.get('delay', 0.0))
if self.params.get('reverse', None) is not None and current_ms - self.start_ms >= self.duration + self.params.get('delay', 0.0):
self.texture_resize(current_ms,
self.duration,
final_size=self.params.get('initial_size', 1.0),
initial_size=self.params.get('final_size', 1.0),
delay=self.params.get('delay', 0.0) + self.duration)
def fade(self, current_ms, duration, initial_opacity, final_opacity, delay, ease_in, ease_out):
def ease_out_progress(progress, ease):
if ease == 'quadratic':
return progress * (2 - progress)
elif ease == 'cubic':
return 1 - pow(1 - progress, 3)
elif ease == 'exponential':
return 1 - pow(2, -10 * progress)
else:
return progress
def ease_in_progress(progress, ease):
if ease == 'quadratic':
return progress * progress
elif ease == 'cubic':
return progress * progress * progress
elif ease == 'exponential':
return pow(2, 10 * (progress - 1))
else:
return progress
elapsed_time = current_ms - self.start_ms
if elapsed_time < delay:
self.attribute = initial_opacity
elapsed_time -= delay
if elapsed_time >= duration:
self.attribute = final_opacity
self.is_finished = True
if ease_in is not None:
progress = ease_in_progress(elapsed_time / duration, ease_in)
elif ease_out is not None:
progress = ease_out_progress(elapsed_time / duration, ease_out)
else:
progress = elapsed_time / duration
current_opacity = initial_opacity + (final_opacity - initial_opacity) * progress
self.attribute = current_opacity
def move(self, current_ms, duration, total_distance, start_position, delay):
elapsed_time = current_ms - self.start_ms
if elapsed_time < delay:
self.attribute = start_position
elapsed_time -= delay
if elapsed_time <= duration:
progress = elapsed_time / duration
self.attribute = start_position + (total_distance * progress)
else:
self.attribute = start_position + total_distance
self.is_finished = True
def texture_change(self, current_ms, duration, textures):
elapsed_time = current_ms - self.start_ms
if elapsed_time <= duration:
for start, end, index in textures:
if start < elapsed_time <= end:
self.attribute = index
else:
self.is_finished = True
def text_stretch(self, current_ms, duration):
elapsed_time = current_ms - self.start_ms
if elapsed_time <= duration:
self.attribute = 2 + 5 * (elapsed_time // 25)
elif elapsed_time <= duration + 116:
frame_time = (elapsed_time - duration) // 16.57
self.attribute = 2 + 10 - (2 * (frame_time + 1))
else:
self.attribute = 0
self.is_finished = True
def texture_resize(self, current_ms, duration, initial_size, final_size, delay):
elapsed_time = current_ms - self.start_ms
if elapsed_time < delay:
self.attribute = initial_size
elapsed_time -= delay
if elapsed_time >= duration:
self.attribute = final_size
self.is_finished = True
elif elapsed_time < duration:
progress = elapsed_time / duration
self.attribute = initial_size + ((final_size - initial_size) * progress)
else:
self.attribute = final_size
self.is_finished = True

743
libs/audio.py Normal file
View File

@@ -0,0 +1,743 @@
import io
import os
import queue
import time
import wave
from threading import Lock, Thread
import numpy as np
import pyray as ray
os.environ["SD_ENABLE_ASIO"] = "1"
import sounddevice as sd
from pydub import AudioSegment
from scipy import signal
from libs.utils import get_config
def resample(data, orig_sr, target_sr):
ratio = target_sr / orig_sr
if ratio == 1.0:
return data
if len(data.shape) == 1:
resampled_data = signal.resample_poly(data, target_sr, orig_sr)
else:
num_channels = data.shape[1]
resampled_channels = []
for ch in range(num_channels):
channel_data = data[:, ch]
resampled_channel = signal.resample_poly(channel_data, target_sr, orig_sr)
resampled_channels.append(resampled_channel)
resampled_data = np.column_stack(resampled_channels)
return resampled_data
def get_np_array(sample_width, raw_data):
if sample_width == 1:
# 8-bit samples are unsigned
data = np.frombuffer(raw_data, dtype=np.uint8)
return (data.astype(np.float32) - 128) / 128.0
elif sample_width == 2:
# 16-bit samples are signed
data = np.frombuffer(raw_data, dtype=np.int16)
return data.astype(np.float32) / 32768.0
elif sample_width == 3:
# 24-bit samples handling
data = np.zeros(len(raw_data) // 3, dtype=np.int32)
for i in range(len(data)):
data[i] = int.from_bytes(raw_data[i*3:i*3+3], byteorder='little', signed=True)
return data.astype(np.float32) / (2**23)
elif sample_width == 4:
# 32-bit samples are signed
data = np.frombuffer(raw_data, dtype=np.int32)
return data.astype(np.float32) / (2**31)
else:
raise ValueError(f"Unsupported sample width: {sample_width}")
class Sound:
def __init__(self, file_path, data=None, target_sample_rate=48000):
self.file_path = file_path
self.data = data
self.channels = 0
self.sample_rate = target_sample_rate
self.position = 0
self.is_playing = False
self.is_paused = False
self.volume = 1.0
self.pan = 0.5 # 0.0 = left, 0.5 = center, 1.0 = right
if file_path:
self.load()
def load(self):
"""Load and prepare the sound file data"""
if self.file_path.endswith('.ogg'):
audio = AudioSegment.from_ogg(self.file_path)
wav_io = io.BytesIO()
audio.export(wav_io, format="wav")
wav_io.seek(0)
file_path = wav_io
else:
file_path = self.file_path
with wave.open(file_path, 'rb') as wf:
# Get file properties
self.channels = wf.getnchannels()
sample_width = wf.getsampwidth()
original_sample_rate = wf.getframerate()
frames = wf.getnframes()
# Read all frames from the file
raw_data = wf.readframes(frames)
data = get_np_array(sample_width, raw_data)
# Reshape for multi-channel audio
if self.channels > 1:
data = data.reshape(-1, self.channels)
# Resample if needed
if original_sample_rate != self.sample_rate:
print(f"Resampling {self.file_path} from {original_sample_rate}Hz to {self.sample_rate}Hz")
data = resample(data, original_sample_rate, self.sample_rate)
self.data = data
def play(self):
self.position = 0
self.is_playing = True
self.is_paused = False
def stop(self):
self.is_playing = False
self.is_paused = False
self.position = 0
def pause(self):
if self.is_playing:
self.is_paused = True
self.is_playing = False
def resume(self):
if self.is_paused:
self.is_playing = True
self.is_paused = False
def get_frames(self, num_frames):
"""Get the next num_frames of audio data, applying volume, pitch, and pan"""
if self.data is None:
return
if not self.is_playing:
# Return silence if not playing
if self.channels == 1:
return np.zeros(num_frames, dtype=np.float32)
else:
return np.zeros((num_frames, self.channels), dtype=np.float32)
# Calculate how many frames we have left
frames_left = len(self.data) - self.position
if self.channels > 1:
frames_left = self.data.shape[0] - self.position
if frames_left <= 0:
# We've reached the end of the sound
self.is_playing = False
if self.channels == 1:
return np.zeros(num_frames, dtype=np.float32)
else:
return np.zeros((num_frames, self.channels), dtype=np.float32)
# Get the actual frames to return
frames_to_get = min(num_frames, frames_left)
if self.channels == 1:
output = np.zeros(num_frames, dtype=np.float32)
output[:frames_to_get] = self.data[self.position:self.position+frames_to_get]
else:
output = np.zeros((num_frames, self.channels), dtype=np.float32)
output[:frames_to_get] = self.data[self.position:self.position+frames_to_get]
self.position += frames_to_get
output *= self.volume
# Apply pan for stereo output
if self.channels == 2 and self.pan != 0.5:
# pan=0: full left, pan=0.5: center, pan=1: full right
left_vol = min(1.0, 2.0 * (1.0 - self.pan))
right_vol = min(1.0, 2.0 * self.pan)
output[:, 0] *= left_vol
output[:, 1] *= right_vol
return output
class Music:
def __init__(self, file_path, data=None, file_type=None, target_sample_rate=48000):
self.file_path = file_path
self.file_type = file_type
self.data = data
self.target_sample_rate = target_sample_rate
self.sample_rate = target_sample_rate
self.channels = 0
self.position = 0 # In frames
self.is_playing = False
self.is_paused = False
self.volume = 1.0
self.pan = 0.5 # Center
self.total_frames = 0
self.valid = False
self.wave_file = None
self.file_buffer_size = int(target_sample_rate * 5) # 5 seconds buffer
self.buffer = None
self.buffer_position = 0
# Thread-safe updates
self.lock = Lock()
self.load_from_file()
def load_from_file(self):
"""Load music from file"""
if self.file_path.endswith('.ogg'):
audio = AudioSegment.from_ogg(self.file_path)
wav_io = io.BytesIO()
audio.export(wav_io, format="wav")
wav_io.seek(0)
file_path = wav_io
else:
file_path = self.file_path
try:
# Keep the file open for streaming
self.wave_file = wave.open(file_path, 'rb')
# Get file properties
self.channels = self.wave_file.getnchannels()
self.sample_width = self.wave_file.getsampwidth()
self.sample_rate = self.wave_file.getframerate()
self.total_frames = self.wave_file.getnframes()
# Initialize buffer with some initial data
self._fill_buffer()
self.valid = True
print(f"Music loaded: {self.channels} channels, {self.sample_rate}Hz, {self.total_frames} frames")
except Exception as e:
print(f"Error loading music file: {e}")
if self.wave_file:
self.wave_file.close()
self.wave_file = None
self.valid = False
def _fill_buffer(self):
"""Fill the streaming buffer from file"""
if not self.wave_file:
return False
# Read a chunk of frames from file
try:
frames_to_read = min(self.file_buffer_size, self.total_frames - self.position)
if frames_to_read <= 0:
return False
raw_data = self.wave_file.readframes(frames_to_read)
data = get_np_array(self.sample_width, raw_data)
# Reshape for multi-channel audio
if self.channels > 1:
data = data.reshape(-1, self.channels)
if self.sample_rate != self.target_sample_rate:
print(f"Resampling {self.file_path} from {self.sample_rate}Hz to {self.target_sample_rate}Hz")
data = resample(data, self.sample_rate, self.target_sample_rate)
self.buffer = data
self.buffer_position = 0
return True
except Exception as e:
print(f"Error filling buffer: {e}")
return False
def update(self):
"""Update music stream buffers"""
if not self.is_playing or self.is_paused:
return
with self.lock:
# Check if we need to refill the buffer
if self.buffer is None:
raise Exception("buffer is None")
if self.wave_file and self.buffer_position >= len(self.buffer):
if not self._fill_buffer():
self.is_playing = False
def play(self):
"""Start playing the music stream"""
with self.lock:
# Reset position if at the end
if self.wave_file and self.position >= self.total_frames:
self.wave_file.rewind()
self.position = 0
self.buffer_position = 0
self._fill_buffer()
self.is_playing = True
self.is_paused = False
def stop(self):
"""Stop playing the music stream"""
with self.lock:
self.is_playing = False
self.is_paused = False
self.position = 0
self.buffer_position = 0
if self.wave_file:
self.wave_file.rewind()
self._fill_buffer()
def pause(self):
"""Pause the music playback"""
with self.lock:
if self.is_playing:
self.is_paused = True
self.is_playing = False
def resume(self):
"""Resume the music playback"""
with self.lock:
if self.is_paused:
self.is_playing = True
self.is_paused = False
def seek(self, position_seconds):
"""Seek to a specific position in seconds"""
with self.lock:
# Convert seconds to frames
frame_position = int(position_seconds * self.sample_rate)
# Clamp position to valid range
frame_position = max(0, min(frame_position, self.total_frames - 1))
# Update file position if streaming from file
if self.wave_file:
self.wave_file.setpos(frame_position)
self._fill_buffer()
self.position = frame_position
self.buffer_position = 0
def get_time_length(self):
"""Get the total length of the music in seconds"""
return self.total_frames / self.sample_rate
def get_time_played(self):
"""Get the current playback position in seconds"""
return (self.position + self.buffer_position) / self.sample_rate
def get_frames(self, num_frames):
"""Get the next num_frames of music data, applying volume, pitch, and pan"""
if not self.is_playing:
# Return silence if not playing
if self.channels == 1:
return np.zeros(num_frames, dtype=np.float32)
else:
return np.zeros((num_frames, self.channels), dtype=np.float32)
with self.lock:
if self.buffer is None:
raise Exception("buffer is None")
# Check if we need more data
if self.buffer_position >= len(self.buffer):
# If no more data available and streaming from file
if self.wave_file and not self._fill_buffer():
self.is_playing = False
if self.channels == 1:
return np.zeros(num_frames, dtype=np.float32)
else:
return np.zeros((num_frames, self.channels), dtype=np.float32)
# Calculate how many frames we have left in buffer
frames_left_in_buffer = len(self.buffer) - self.buffer_position
if self.channels > 1:
frames_left_in_buffer = self.buffer.shape[0] - self.buffer_position
frames_to_get = min(num_frames, frames_left_in_buffer)
if self.channels == 1:
output = np.zeros(num_frames, dtype=np.float32)
output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get]
else:
output = np.zeros((num_frames, self.channels), dtype=np.float32)
output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get]
# Update buffer position
self.buffer_position += frames_to_get
self.position += frames_to_get
# Apply volume
output *= self.volume
# Apply pan for stereo output
if self.channels == 2 and self.pan != 0.5:
# pan=0: full left, pan=0.5: center, pan=1: full right
left_vol = min(1.0, 2.0 * (1.0 - self.pan))
right_vol = min(1.0, 2.0 * self.pan)
output[:, 0] *= left_vol
output[:, 1] *= right_vol
return output
def __del__(self):
"""Cleanup when the music object is deleted"""
if self.wave_file:
try:
self.wave_file.close()
except Exception:
raise Exception("unable to close music stream")
class ASIOEngine:
def __init__(self):
self.target_sample_rate = 48000
self.buffer_size = get_config()["audio"]["asio_buffer"]
self.sounds = {}
self.music_streams = {}
self.stream = None
self.device_id = None
self.running = False
self.sound_queue = queue.Queue()
self.music_queue = queue.Queue()
self.master_volume = 1.0
self.output_channels = 2 # Default to stereo
self.audio_device_ready = False
# Threading for music stream updates
self.update_thread = None
self.update_thread_running = False
def _initialize_asio(self):
"""Set up ASIO device"""
# Find ASIO API and use its default device
hostapis = sd.query_hostapis()
asio_api_index = -1
for i, api in enumerate(hostapis):
if isinstance(api, dict) and 'name' in api and api['name'] == 'ASIO':
asio_api_index = i
break
if asio_api_index is not None:
asio_api = hostapis[asio_api_index]
if isinstance(asio_api, dict) and 'default_output_device' in asio_api:
default_asio_device = asio_api['default_output_device']
else:
raise Exception("Warning: 'default_output_device' key not found in ASIO API info.")
if default_asio_device >= 0:
self.device_id = default_asio_device
device_info = sd.query_devices(self.device_id)
if isinstance(device_info, sd.DeviceList):
raise Exception("Invalid ASIO Device")
print(f"Using default ASIO device: {device_info['name']}")
# Set output channels based on device capabilities
self.output_channels = device_info['max_output_channels']
if self.output_channels > 2:
# Limit to stereo for simplicity
self.output_channels = 2
return True
else:
print("No default ASIO device found, using system default.")
else:
print("ASIO API not found, using system default device.")
# If we get here, use default system device
self.device_id = None
device_info = sd.query_devices(sd.default.device[1])
if isinstance(device_info, sd.DeviceList):
raise Exception("Invalid ASIO Device")
self.output_channels = min(2, device_info['max_output_channels'])
return True
def _audio_callback(self, outdata, frames, time, status):
"""Callback function for the sounddevice stream"""
if status:
print(f"Status: {status}")
# Process any new sound play requests
while not self.sound_queue.empty():
try:
sound_name = self.sound_queue.get_nowait()
if sound_name in self.sounds:
self.sounds[sound_name].play()
except queue.Empty:
break
# Process any new music play requests
while not self.music_queue.empty():
try:
music_name, action, *args = self.music_queue.get_nowait()
if music_name in self.music_streams:
music = self.music_streams[music_name]
if action == 'play':
music.play()
elif action == 'stop':
music.stop()
elif action == 'pause':
music.pause()
elif action == 'resume':
music.resume()
elif action == 'seek' and args:
music.seek(args[0])
except queue.Empty:
break
# Mix all playing sounds and music
output = np.zeros((frames, self.output_channels), dtype=np.float32)
# Mix sounds
for sound_name, sound in self.sounds.items():
if sound.is_playing:
sound_data = sound.get_frames(frames)
# If mono sound but stereo output, duplicate to both channels
if sound.channels == 1 and self.output_channels > 1:
sound_data = np.column_stack([sound_data] * self.output_channels)
# Ensure sound_data matches the output format
if sound.channels > self.output_channels:
# Down-mix if needed
if self.output_channels == 1:
sound_data = np.mean(sound_data, axis=1)
else:
# Keep only the first output_channels
sound_data = sound_data[:, :self.output_channels]
# Add to the mix (simple additive mixing)
output += sound_data
# Mix music streams
for music_name, music in self.music_streams.items():
if music.is_playing:
music_data = music.get_frames(frames)
# If mono music but stereo output, duplicate to both channels
if music.channels == 1 and self.output_channels > 1:
music_data = np.column_stack([music_data] * self.output_channels)
# Ensure music_data matches the output format
if music.channels > self.output_channels:
# Down-mix if needed
if self.output_channels == 1:
music_data = np.mean(music_data, axis=1)
else:
# Keep only the first output_channels
music_data = music_data[:, :self.output_channels]
# Add to the mix
output += music_data
# Apply master volume
output *= self.master_volume
# Apply simple limiter to prevent clipping
max_val = np.max(np.abs(output))
if max_val > 1.0:
output = output / max_val
outdata[:] = output
def _start_update_thread(self):
"""Start a thread to update music streams"""
self.update_thread_running = True
self.update_thread = Thread(target=self._update_music_thread)
self.update_thread.daemon = True
self.update_thread.start()
def _update_music_thread(self):
"""Thread function to update all music streams"""
while self.update_thread_running:
# Update all active music streams
for music_name, music in self.music_streams.items():
if music.is_playing:
music.update()
# Sleep to not consume too much CPU
time.sleep(0.1)
def init_audio_device(self):
if self.audio_device_ready:
return True
try:
# Try to use ASIO if available
self._initialize_asio()
# Set up and start the stream
self.stream = sd.OutputStream(
samplerate=self.target_sample_rate,
channels=self.output_channels,
callback=self._audio_callback,
blocksize=self.buffer_size,
device=self.device_id
)
self.stream.start()
self.running = True
self.audio_device_ready = True
# Start update thread for music streams
self._start_update_thread()
print(f"Audio device initialized with {self.output_channels} channels at {self.target_sample_rate}Hz")
return True
except Exception as e:
print(f"Error initializing audio device: {e}")
self.audio_device_ready = False
return False
def close_audio_device(self):
self.update_thread_running = False
if self.update_thread:
self.update_thread.join(timeout=1.0)
if self.stream:
self.stream.stop()
self.stream.close()
self.stream = None
self.running = False
self.audio_device_ready = False
print("Audio device closed")
return
def is_audio_device_ready(self) -> bool:
return self.audio_device_ready
def set_master_volume(self, volume: float):
self.master_volume = max(0.0, min(1.0, volume))
def get_master_volume(self) -> float:
return self.master_volume
def load_sound(self, fileName: str) -> str | None:
try:
sound = Sound(fileName, self.target_sample_rate)
sound_id = f"sound_{len(self.sounds)}"
self.sounds[sound_id] = sound
print(f"Loaded sound from {fileName} as {sound_id}")
return sound_id
except Exception as e:
print(f"Error loading sound: {e}")
return None
def play_sound(self, sound):
if sound in self.sounds:
self.sound_queue.put(sound)
def stop_sound(self, sound):
if sound in self.sounds:
self.sounds[sound].stop()
def pause_sound(self, sound: str):
if sound in self.sounds:
self.sounds[sound].pause()
def resume_sound(self, sound: str):
if sound in self.sounds:
self.sounds[sound].resume()
def is_sound_playing(self, sound: str) -> bool:
if sound in self.sounds:
return self.sounds[sound].is_playing
return False
def set_sound_volume(self, sound: str, volume: float):
if sound in self.sounds:
self.sounds[sound].volume = max(0.0, min(1.0, volume))
def set_sound_pan(self, sound: str, pan: float):
if sound in self.sounds:
self.sounds[sound].pan = max(0.0, min(1.0, pan))
def load_music_stream(self, fileName: str) -> str | None:
try:
music = Music(file_path=fileName, target_sample_rate=self.target_sample_rate)
music_id = f"music_{len(self.music_streams)}"
self.music_streams[music_id] = music
print(f"Loaded music stream from {fileName} as {music_id}")
return music_id
except Exception as e:
print(f"Error loading music stream: {e}")
return None
def is_music_valid(self, music: str) -> bool:
if music in self.music_streams:
return self.music_streams[music].valid
return False
def unload_music_stream(self, music: str):
if music in self.music_streams:
del self.music_streams[music]
def play_music_stream(self, music: str):
if music in self.music_streams:
self.music_queue.put((music, 'play'))
def is_music_stream_playing(self, music: str) -> bool:
if music in self.music_streams:
return self.music_streams[music].is_playing
return False
def update_music_stream(self, music: str):
if music in self.music_streams:
self.music_streams[music].update()
def stop_music_stream(self, music: str):
if music in self.music_streams:
self.music_queue.put((music, 'stop'))
def pause_music_stream(self, music: str):
if music in self.music_streams:
self.music_queue.put((music, 'pause'))
def resume_music_stream(self, music: str):
if music in self.music_streams:
self.music_queue.put((music, 'resume'))
def seek_music_stream(self, music: str, position: float):
if music in self.music_streams:
self.music_queue.put((music, 'seek', position))
def set_music_volume(self, music: str, volume: float):
if music in self.music_streams:
self.music_streams[music].volume = max(0.0, min(1.0, volume))
def set_music_pan(self, music: str, pan: float):
if music in self.music_streams:
self.music_streams[music].pan = max(0.0, min(1.0, pan))
def get_music_time_length(self, music: str) -> float:
if music in self.music_streams:
return self.music_streams[music].get_time_length()
raise ValueError(f"Music stream {music} not initialized")
def get_music_time_played(self, music: str) -> float:
if music in self.music_streams:
return self.music_streams[music].get_time_played()
raise ValueError(f"Music stream {music} not initialized")
class AudioEngineWrapper:
def __init__(self, host_api):
self.host_api = host_api
if host_api == 'WASAPI':
self._module = ray
elif host_api == 'ASIO':
self._module = ASIOEngine()
else:
raise Exception("Invalid host API passed to wrapper")
def __getattr__(self, name):
try:
return getattr(self._module, name)
except AttributeError:
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}' and '{type(self._module).__name__}' has no attribute '{name}'")
audio = AudioEngineWrapper(get_config()["audio"]["device_type"])

277
libs/tja.py Normal file
View File

@@ -0,0 +1,277 @@
import math
from collections import deque
from libs.utils import get_pixels_per_frame, strip_comments
def calculate_base_score(play_note_list: list[dict]) -> int:
total_notes = 0
balloon_num = 0
balloon_count = 0
drumroll_sec = 0
for i in range(len(play_note_list)):
note = play_note_list[i]
if i < len(play_note_list)-1:
next_note = play_note_list[i+1]
else:
next_note = play_note_list[len(play_note_list)-1]
if note.get('note') in {'1','2','3','4'}:
total_notes += 1
elif note.get('note') in {'5', '6'}:
drumroll_sec += (next_note['ms'] - note['ms']) / 1000
elif note.get('note') in {'7', '9'}:
balloon_num += 1
balloon_count += next_note['balloon']
total_score = (1000000 - (balloon_count * 100) - (drumroll_sec * 1692.0079999994086)) / total_notes
return math.ceil(total_score / 10) * 10
class TJAParser:
def __init__(self, path: str):
#Defined on startup
self.folder_path = path
self.folder_name = self.folder_path.split('\\')[-1]
self.file_path = f'{self.folder_path}\\{self.folder_name}.tja'
#Defined on file_to_data()
self.data = []
#Defined on get_metadata()
self.title = ''
self.title_ja = ''
self.subtitle = ''
self.subtitle_ja = ''
self.wave = f'{self.folder_path}\\'
self.offset = 0
self.demo_start = 0
self.course_data = dict()
#Defined in metadata but can change throughout the chart
self.bpm = 120
self.time_signature = 4/4
self.distance = 0
self.scroll_modifier = 1
self.current_ms = 0
self.barline_display = True
self.gogo_time = False
def file_to_data(self):
with open(self.file_path, 'rt', encoding='utf-8-sig') as tja_file:
for line in tja_file:
line = strip_comments(line).strip()
if line != '':
self.data.append(str(line))
return self.data
def get_metadata(self):
self.file_to_data()
diff_index = 1
highest_diff = -1
for item in self.data:
if item[0] == '#':
continue
elif 'SUBTITLEJA' in item:
self.subtitle_ja = str(item.split('SUBTITLEJA:')[1])
elif 'TITLEJA' in item:
self.title_ja = str(item.split('TITLEJA:')[1])
elif 'SUBTITLE' in item:
self.subtitle = str(item.split('SUBTITLE:')[1][2:])
elif 'TITLE' in item:
self.title = str(item.split('TITLE:')[1])
elif 'BPM' in item:
self.bpm = float(item.split(':')[1])
elif 'WAVE' in item:
self.wave += str(item.split(':')[1])
elif 'OFFSET' in item:
self.offset = float(item.split(':')[1])
elif 'DEMOSTART' in item:
self.demo_start = float(item.split(':')[1])
elif 'COURSE' in item:
course = str(item.split(':')[1]).lower()
if course == 'dan' or course == '6':
self.course_data[6] = []
if course == 'tower' or course == '5':
self.course_data[5] = []
elif course == 'edit' or course == '4':
self.course_data[4] = []
elif course == 'oni' or course == '3':
self.course_data[3] = []
elif course == 'hard' or course == '2':
self.course_data[2] = []
elif course == 'normal' or course == '1':
self.course_data[1] = []
elif course == 'easy' or course == '0':
self.course_data[0] = []
highest_diff = max(self.course_data)
diff_index -= 1
elif 'LEVEL' in item:
item = int(item.split(':')[1])
self.course_data[diff_index+highest_diff].append(item)
elif 'BALLOON' in item:
item = item.split(':')[1]
if item == '':
continue
self.course_data[diff_index+highest_diff].append([int(x) for x in item.split(',')])
elif 'SCOREINIT' in item:
if item.split(':')[1] == '':
continue
item = item.split(':')[1]
self.course_data[diff_index+highest_diff].append([int(x) for x in item.split(',')])
elif 'SCOREDIFF' in item:
if item.split(':')[1] == '':
continue
item = int(item.split(':')[1])
self.course_data[diff_index+highest_diff].append(item)
return [self.title, self.title_ja, self.subtitle, self.subtitle_ja,
self.bpm, self.wave, self.offset, self.demo_start, self.course_data]
def data_to_notes(self, diff):
self.file_to_data()
#Get notes start and end
note_start = -1
note_end = -1
diff_count = 0
for i in range(len(self.data)):
if self.data[i] == '#START':
note_start = i+1
elif self.data[i] == '#END':
note_end = i
diff_count += 1
if diff_count == len(self.course_data) - diff:
break
notes = []
bar = []
#Check for measures and separate when comma exists
for i in range(note_start, note_end):
item = self.data[i].strip(',')
bar.append(item)
if item != self.data[i]:
notes.append(bar)
bar = []
return notes, self.course_data[diff][1]
def get_se_note(self, play_note_list, ms_per_measure, note, note_ms):
#Someone please refactor this
se_notes = {'1': [0, 1, 2],
'2': [3, 4],
'3': 5,
'4': 6,
'5': 7,
'6': 14,
'7': 9,
'8': 10,
'9': 11}
if len(play_note_list) > 1:
prev_note = play_note_list[-2]
if prev_note['note'] in {'1', '2'}:
if note_ms - prev_note['ms'] <= (ms_per_measure/8) - 1:
prev_note['se_note'] = se_notes[prev_note['note']][1]
else:
prev_note['se_note'] = se_notes[prev_note['note']][0]
else:
prev_note['se_note'] = se_notes[prev_note['note']]
if len(play_note_list) > 3:
if play_note_list[-4]['note'] == play_note_list[-3]['note'] == play_note_list[-2]['note'] == '1':
if (play_note_list[-3]['ms'] - play_note_list[-4]['ms'] < (ms_per_measure/8)) and (play_note_list[-2]['ms'] - play_note_list[-3]['ms'] < (ms_per_measure/8)):
if len(play_note_list) > 5:
if (play_note_list[-4]['ms'] - play_note_list[-5]['ms'] >= (ms_per_measure/8)) and (play_note_list[-1]['ms'] - play_note_list[-2]['ms'] >= (ms_per_measure/8)):
play_note_list[-3]['se_note'] = se_notes[play_note_list[-3]['note']][2]
else:
play_note_list[-3]['se_note'] = se_notes[play_note_list[-3]['note']][2]
else:
play_note_list[-1]['se_note'] = se_notes[note]
if play_note_list[-1]['note'] in {'1', '2'}:
play_note_list[-1]['se_note'] = se_notes[note][0]
else:
play_note_list[-1]['se_note'] = se_notes[note]
def notes_to_position(self, diff):
play_note_list = deque()
bar_list = deque()
draw_note_list = deque()
notes, balloon = self.data_to_notes(diff)
index = 0
balloon_index = 0
for bar in notes:
#Length of the bar is determined by number of notes excluding commands
bar_length = sum(len(part) for part in bar if '#' not in part)
for part in bar:
if '#JPOSSCROLL' in part:
continue
elif '#NMSCROLL' in part:
continue
elif '#MEASURE' in part:
divisor = part.find('/')
self.time_signature = float(part[9:divisor]) / float(part[divisor+1:])
continue
elif '#SCROLL' in part:
self.scroll_modifier = float(part[7:])
continue
elif '#BPMCHANGE' in part:
self.bpm = float(part[11:])
continue
elif '#BARLINEOFF' in part:
self.barline_display = False
continue
elif '#BARLINEON' in part:
self.barline_display = True
continue
elif '#GOGOSTART' in part:
self.gogo_time = True
continue
elif '#GOGOEND' in part:
self.gogo_time = False
continue
elif '#LYRIC' in part:
continue
#Unrecognized commands will be skipped for now
elif '#' in part:
continue
#https://gist.github.com/KatieFrogs/e000f406bbc70a12f3c34a07303eec8b#measure
ms_per_measure = 60000 * (self.time_signature*4) / self.bpm
#Determines how quickly the notes need to move across the screen to reach the judgment circle in time
pixels_per_frame = get_pixels_per_frame(self.bpm * self.time_signature * self.scroll_modifier, self.time_signature*4, self.distance)
pixels_per_ms = pixels_per_frame / (1000 / 60)
bar_ms = self.current_ms
load_ms = bar_ms - (self.distance / pixels_per_ms)
if self.barline_display:
bar_list.append({'note': 'barline', 'ms': bar_ms, 'load_ms': load_ms, 'ppf': pixels_per_frame})
#Empty bar is still a bar, otherwise start increment
if len(part) == 0:
self.current_ms += ms_per_measure
increment = 0
else:
increment = ms_per_measure / bar_length
for note in part:
note_ms = self.current_ms
load_ms = note_ms - (self.distance / pixels_per_ms)
#Do not add blank notes otherwise lag
if note != '0':
play_note_list.append({'note': note, 'ms': note_ms, 'load_ms': load_ms, 'ppf': pixels_per_frame, 'index': index})
self.get_se_note(play_note_list, ms_per_measure, note, note_ms)
index += 1
if note in {'5', '6', '8'}:
play_note_list[-1]['color'] = 255
if note == '8' and play_note_list[-2]['note'] in ('7', '9'):
if balloon_index >= len(balloon):
play_note_list[-1]['balloon'] = 0
else:
play_note_list[-1]['balloon'] = int(balloon[balloon_index])
balloon_index += 1
self.current_ms += increment
# https://stackoverflow.com/questions/72899/how-to-sort-a-list-of-dictionaries-by-a-value-of-the-dictionary-in-python
# Sorting by load_ms is necessary for drawing, as some notes appear on the
# screen slower regardless of when they reach the judge circle
# Bars can be sorted like this because they don't need hit detection
draw_note_list = deque(sorted(play_note_list, key=lambda d: d['load_ms']))
bar_list = deque(sorted(bar_list, key=lambda d: d['load_ms']))
return play_note_list, draw_note_list, bar_list

75
libs/utils.py Normal file
View File

@@ -0,0 +1,75 @@
import os
import tempfile
import time
import zipfile
from dataclasses import dataclass
from typing import Any
import pyray as ray
import tomllib
#TJA Format creator is unknown. I did not create the format, but I did write the parser though.
def load_image_from_zip(zip_path: str, filename: str) -> ray.Image:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
with zip_ref.open(filename) as image_file:
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file:
temp_file.write(image_file.read())
temp_file_path = temp_file.name
image = ray.load_image(temp_file_path)
os.remove(temp_file_path)
return image
def load_texture_from_zip(zip_path: str, filename: str) -> ray.Texture:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
with zip_ref.open(filename) as image_file:
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file:
temp_file.write(image_file.read())
temp_file_path = temp_file.name
texture = ray.load_texture(temp_file_path)
os.remove(temp_file_path)
return texture
def rounded(num: float) -> int:
sign = 1 if (num >= 0) else -1
num = abs(num)
result = int(num)
if (num - result >= 0.5):
result += 1
return sign * result
def get_current_ms() -> int:
return rounded(time.time() * 1000)
def strip_comments(code: str):
result = ''
index = 0
for line in code.splitlines():
comment_index = line.find('//')
if comment_index == -1:
result += line
elif comment_index != 0 and not line[:comment_index].isspace():
result += line[:comment_index]
index += 1
return result
def get_pixels_per_frame(bpm: float, time_signature: float, distance: float):
beat_duration = 60 / bpm
total_time = time_signature * beat_duration
total_frames = 60 * total_time
return (distance / total_frames)
def get_config() -> dict[str, Any]:
with open('config.toml', "rb") as f:
config_file = tomllib.load(f)
return config_file
@dataclass
class GlobalData:
start_song: bool = False
selected_song: str = ''
selected_difficulty: int = -1
result_good: int = -1
result_ok: int = -1
result_bad: int = -1
result_score: int = -1

107
libs/video.py Normal file
View File

@@ -0,0 +1,107 @@
import cv2
import pyray as ray
from libs.audio import audio
from libs.utils import get_current_ms
class VideoPlayer:
def __init__(self, path: str):
self.video_path = path
self.start_ms = None
self.current_frame = None
self.last_frame = self.current_frame
self.frame_index = 0
self.frames = []
self.cap = cv2.VideoCapture(self.video_path)
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
self.is_finished = [False, False]
audio_path = path[:-4] + '.ogg'
self.audio = audio.load_music_stream(audio_path)
def convert_frames_background(self, index: int):
if not self.cap.isOpened():
raise ValueError("Error: Could not open video file.")
total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
if len(self.frames) == total_frames:
return 0
self.cap.set(cv2.CAP_PROP_POS_FRAMES, index)
success, frame = self.cap.read()
timestamp = (index / self.fps * 1000)
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
new_frame = ray.Image(frame_rgb.tobytes(), frame_rgb.shape[1], frame_rgb.shape[0], 1, ray.PixelFormat.PIXELFORMAT_UNCOMPRESSED_R8G8B8)
self.frames.append((timestamp, new_frame))
def convert_frames(self):
if not self.cap.isOpened():
raise ValueError("Error: Could not open video file.")
frame_count = 0
success, frame = self.cap.read()
while success:
timestamp = (frame_count / self.fps * 1000)
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
new_frame = ray.Image(frame_rgb.tobytes(), frame_rgb.shape[1], frame_rgb.shape[0], 1, ray.PixelFormat.PIXELFORMAT_UNCOMPRESSED_R8G8B8)
self.frames.append((timestamp, new_frame))
success, frame = self.cap.read()
frame_count += 1
self.cap.release()
print(f"Extracted {len(self.frames)} frames.")
self.start_ms = get_current_ms()
def check_for_start(self):
if self.frames == []:
self.convert_frames()
if not audio.is_music_stream_playing(self.audio):
audio.play_music_stream(self.audio)
def audio_manager(self):
audio.update_music_stream(self.audio)
time_played = audio.get_music_time_played(self.audio) / audio.get_music_time_length(self.audio)
ending_lenience = 0.95
if time_played > ending_lenience:
self.is_finished[1] = True
def update(self):
self.check_for_start()
self.audio_manager()
if self.frame_index == len(self.frames)-1:
self.is_finished[0] = True
return
if self.start_ms is None:
return
timestamp, frame = self.frames[self.frame_index][0], self.frames[self.frame_index][1]
elapsed_time = get_current_ms() - self.start_ms
if elapsed_time >= timestamp:
self.current_frame = ray.load_texture_from_image(frame)
if self.last_frame != self.current_frame and self.last_frame is not None:
ray.unload_texture(self.last_frame)
self.frame_index += 1
self.last_frame = self.current_frame
def draw(self):
if self.current_frame is not None:
ray.draw_texture(self.current_frame, 0, 0, ray.WHITE)
def __del__(self):
if hasattr(self, 'current_frame') and self.current_frame:
ray.unload_texture(self.current_frame)
if hasattr(self, 'last_frame') and self.last_frame:
ray.unload_texture(self.last_frame)
if audio.is_music_stream_playing(self.audio):
audio.stop_music_stream(self.audio)