fix double free bug, add logging, update to python 3.14

This commit is contained in:
Anthony Samms
2025-10-27 18:41:28 -04:00
parent 104ec726b0
commit 3b0a6bed97
29 changed files with 739 additions and 640 deletions

View File

@@ -1,6 +1,6 @@
import cffi
import ctypes
import platform
import logging
from pathlib import Path
from libs.utils import get_config
@@ -43,6 +43,7 @@ ffi.cdef("""
// Device management
void list_host_apis(void);
const char* get_host_api_name(PaHostApiIndex hostApi);
void init_audio_device(PaHostApiIndex host_api, double sample_rate, unsigned long buffer_size);
void close_audio_device(void);
bool is_audio_device_ready(void);
@@ -102,6 +103,8 @@ ffi.cdef("""
void free(void *ptr);
""")
logger = logging.getLogger(__name__)
try:
if platform.system() == "Windows":
lib = ffi.dlopen("libaudio.dll")
@@ -110,23 +113,9 @@ try:
else: # Assume Linux/Unix
lib = ffi.dlopen("./libaudio.so")
except OSError as e:
print(f"Failed to load shared library: {e}")
logger.error(f"Failed to load shared library: {e}")
raise
def get_short_path_name(long_path: str) -> str:
"""Convert long path to Windows short path (8.3 format)"""
if platform.system() != 'Windows':
return long_path
# Get short path name
buffer = ctypes.create_unicode_buffer(512)
get_short_path = ctypes.windll.kernel32.GetShortPathNameW
ret = get_short_path(long_path, buffer, 512)
if ret:
return buffer.value
return long_path
class AudioEngine:
"""Initialize an audio engine for playing sounds and music."""
def __init__(self, device_type: int, sample_rate: float, buffer_size: int, volume_presets: dict[str, float]):
@@ -150,6 +139,11 @@ class AudioEngine:
"""Prints a list of available host APIs to the console"""
lib.list_host_apis() # type: ignore
def get_host_api_name(self, api_id: int) -> str:
"""Returns the name of the host API with the given ID"""
result = lib.get_host_api_name(api_id) # type: ignore
return ffi.string(result).decode('utf-8')
def init_audio_device(self) -> bool:
"""Initialize the audio device"""
try:
@@ -160,10 +154,10 @@ class AudioEngine:
file_path_str = str(self.sounds_path / 'ka.wav').encode('utf-8')
self.kat = lib.load_sound(file_path_str) # type: ignore
if self.audio_device_ready:
print("Audio device initialized successfully")
logger.info("Audio device initialized successfully")
return self.audio_device_ready
except Exception as e:
print(f"Failed to initialize audio device: {e}")
logger.error(f"Failed to initialize audio device: {e}")
return False
def close_audio_device(self) -> None:
@@ -179,9 +173,9 @@ class AudioEngine:
lib.unload_sound(self.kat) # type: ignore
lib.close_audio_device() # type: ignore
self.audio_device_ready = False
print("Audio device closed")
logger.info("Audio device closed")
except Exception as e:
print(f"Error closing audio device: {e}")
logger.error(f"Error closing audio device: {e}")
def is_audio_device_ready(self) -> bool:
"""Check if audio device is ready"""
@@ -210,10 +204,10 @@ class AudioEngine:
self.sounds[name] = sound
return name
else:
print(f"Failed to load sound: {file_path}")
logger.error(f"Failed to load sound: {file_path}")
return ""
except Exception as e:
print(f"Error loading sound {file_path}: {e}")
logger.error(f"Error loading sound {file_path}: {e}")
return ""
def unload_sound(self, name: str) -> None:
@@ -222,11 +216,14 @@ class AudioEngine:
lib.unload_sound(self.sounds[name]) # type: ignore
del self.sounds[name]
else:
print(f"Sound {name} not found")
logger.warning(f"Sound {name} not found")
def load_screen_sounds(self, screen_name: str) -> None:
"""Load sounds for a given screen"""
path = self.sounds_path / screen_name
if not path.exists():
logger.warning(f"Sounds for {screen_name} not found")
return
for sound in path.iterdir():
if sound.is_dir():
for file in sound.iterdir():
@@ -263,7 +260,7 @@ class AudioEngine:
lib.set_sound_volume(sound, self.volume_presets[volume_preset]) # type: ignore
lib.play_sound(sound) # type: ignore
else:
print(f"Sound {name} not found")
logger.warning(f"Sound {name} not found")
def stop_sound(self, name: str) -> None:
"""Stop a sound"""
@@ -275,7 +272,7 @@ class AudioEngine:
sound = self.sounds[name]
lib.stop_sound(sound) # type: ignore
else:
print(f"Sound {name} not found")
logger.warning(f"Sound {name} not found")
def is_sound_playing(self, name: str) -> bool:
"""Check if a sound is playing"""
@@ -287,7 +284,7 @@ class AudioEngine:
sound = self.sounds[name]
return lib.is_sound_playing(sound) # type: ignore
else:
print(f"Sound {name} not found")
logger.warning(f"Sound {name} not found")
return False
def set_sound_volume(self, name: str, volume: float) -> None:
@@ -300,7 +297,7 @@ class AudioEngine:
sound = self.sounds[name]
lib.set_sound_volume(sound, volume) # type: ignore
else:
print(f"Sound {name} not found")
logger.warning(f"Sound {name} not found")
def set_sound_pan(self, name: str, pan: float) -> None:
"""Set the pan of a specific sound"""
@@ -312,7 +309,7 @@ class AudioEngine:
sound = self.sounds[name]
lib.set_sound_pan(sound, pan) # type: ignore
else:
print(f"Sound {name} not found")
logger.warning(f"Sound {name} not found")
# Music management
def load_music_stream(self, file_path: Path, name: str) -> str:
@@ -326,10 +323,10 @@ class AudioEngine:
if lib.is_music_valid(music): # type: ignore
self.music_streams[name] = music
print(f"Loaded music stream from {file_path} as {name}")
logger.info(f"Loaded music stream from {file_path} as {name}")
return name
else:
print(f"Failed to load music: {file_path}")
logger.error(f"Failed to load music: {file_path}")
return ""
def play_music_stream(self, name: str, volume_preset: str) -> None:
@@ -341,7 +338,7 @@ class AudioEngine:
lib.set_music_volume(music, self.volume_presets[volume_preset]) # type: ignore
lib.play_music_stream(music) # type: ignore
else:
print(f"Music stream {name} not found")
logger.warning(f"Music stream {name} not found")
def update_music_stream(self, name: str) -> None:
"""Update a music stream"""
@@ -349,7 +346,7 @@ class AudioEngine:
music = self.music_streams[name]
lib.update_music_stream(music) # type: ignore
else:
print(f"Music stream {name} not found")
logger.warning(f"Music stream {name} not found")
def get_music_time_length(self, name: str) -> float:
"""Get the time length of a music stream"""
@@ -357,7 +354,7 @@ class AudioEngine:
music = self.music_streams[name]
return lib.get_music_time_length(music) # type: ignore
else:
print(f"Music stream {name} not found")
logger.warning(f"Music stream {name} not found")
return 0.0
def get_music_time_played(self, name: str) -> float:
@@ -366,7 +363,7 @@ class AudioEngine:
music = self.music_streams[name]
return lib.get_music_time_played(music) # type: ignore
else:
print(f"Music stream {name} not found")
logger.warning(f"Music stream {name} not found")
return 0.0
def set_music_volume(self, name: str, volume: float) -> None:
@@ -375,7 +372,7 @@ class AudioEngine:
music = self.music_streams[name]
lib.set_music_volume(music, volume) # type: ignore
else:
print(f"Music stream {name} not found")
logger.warning(f"Music stream {name} not found")
def is_music_stream_playing(self, name: str) -> bool:
"""Check if a music stream is playing"""
@@ -383,7 +380,7 @@ class AudioEngine:
music = self.music_streams[name]
return lib.is_music_stream_playing(music) # type: ignore
else:
print(f"Music stream {name} not found")
logger.warning(f"Music stream {name} not found")
return False
def stop_music_stream(self, name: str) -> None:
@@ -392,7 +389,7 @@ class AudioEngine:
music = self.music_streams[name]
lib.stop_music_stream(music) # type: ignore
else:
print(f"Music stream {name} not found")
logger.warning(f"Music stream {name} not found")
def unload_music_stream(self, name: str) -> None:
"""Unload a music stream"""
@@ -401,7 +398,7 @@ class AudioEngine:
lib.unload_music_stream(music) # type: ignore
del self.music_streams[name]
else:
print(f"Music stream {name} not found")
logger.warning(f"Music stream {name} not found")
def unload_all_music(self) -> None:
"""Unload all music streams"""
@@ -414,7 +411,7 @@ class AudioEngine:
music = self.music_streams[name]
lib.seek_music_stream(music, position) # type: ignore
else:
print(f"Music stream {name} not found")
logger.warning(f"Music stream {name} not found")
# Create the global audio instance
audio = AudioEngine(get_config()["audio"]["device_type"], get_config()["audio"]["sample_rate"], get_config()["audio"]["buffer_size"], get_config()["volume"])

View File

@@ -116,6 +116,7 @@ typedef struct AudioData {
} AudioData;
void list_host_apis(void);
const char* get_host_api_name(PaHostApiIndex hostApi);
void init_audio_device(PaHostApiIndex host_api, double sample_rate, unsigned long buffer_size);
void close_audio_device(void);
bool is_audio_device_ready(void);
@@ -289,6 +290,16 @@ void list_host_apis(void)
}
}
const char* get_host_api_name(PaHostApiIndex hostApi)
{
const PaHostApiInfo *hostApiInfo = Pa_GetHostApiInfo(hostApi);
if (!hostApiInfo) {
return NULL;
}
return hostApiInfo->name;
}
PaDeviceIndex get_best_output_device_for_host_api(PaHostApiIndex hostApi)
{
const PaHostApiInfo *hostApiInfo = Pa_GetHostApiInfo(hostApi);

View File

@@ -77,6 +77,10 @@ void set_log_level(int level);
* Print available host APIs to the console
*/
void list_host_apis(void);
/**
* Get the name of a host API by index
*/
const char* get_host_api_name(PaHostApiIndex hostApi);
/**
* Initialize the audio device and system
* Must be called before using any other audio functions

View File

@@ -1,3 +1,4 @@
import logging
import random
import libs.bg_collabs
@@ -11,6 +12,8 @@ from libs.bg_objects.footer import Footer
from libs.bg_objects.renda import RendaController
from libs.texture import TextureWrapper
logger = logging.getLogger(__name__)
class Background:
"""The background class for the game."""
COLLABS = {
@@ -24,6 +27,7 @@ class Background:
"IMAS_SIDEM": (libs.bg_collabs.imas_sidem.Background, 'background/collab/imas_sidem', 3),
"DAN": (libs.bg_collabs.dan.Background, 'background/collab/dan', 1)
}
def __init__(self, player_num: int, bpm: float, scene_preset: str = ''):
"""
Initialize the background class.
@@ -87,7 +91,8 @@ class Background:
self.chibi = collab_bg.chibi
self.is_clear = False
self.is_rainbow = False
self.last_milestone = 0
self.last_milestone = 1
logger.info(f"Background initialized for player {player_num}, bpm={bpm}, scene_preset={scene_preset}")
def add_chibi(self, bad: bool, player_num: int):
"""
@@ -122,14 +127,17 @@ class Background:
current_milestone = min(self.max_dancers - 1, int(gauge_1p.gauge_length / (clear_threshold / self.max_dancers)))
else:
current_milestone = self.max_dancers
if current_milestone > self.last_milestone and current_milestone < self.max_dancers:
if current_milestone > self.last_milestone and current_milestone <= self.max_dancers:
self.dancer.add_dancer()
self.last_milestone = current_milestone
logger.info(f"Dancer milestone reached: {current_milestone}/{self.max_dancers}")
if self.bg_fever is not None:
if not self.is_clear and gauge_1p.is_clear:
self.bg_fever.start()
logger.info("Fever started")
if not self.is_rainbow and gauge_1p.is_rainbow and self.fever is not None:
self.fever.start()
logger.info("Rainbow fever started")
self.is_clear = gauge_1p.is_clear
self.is_rainbow = gauge_1p.is_rainbow
self.don_bg.update(current_time_ms, self.is_clear)
@@ -182,3 +190,4 @@ class Background:
Unload the background.
"""
self.tex_wrapper.unload_textures()
logger.info("Background textures unloaded")

View File

@@ -1,6 +1,9 @@
import logging
from libs.animation import Animation
from libs.utils import global_tex
logger = logging.getLogger(__name__)
class Chara2D:
def __init__(self, index: int, bpm: float, path: str = 'chara'):
"""
@@ -33,6 +36,7 @@ class Chara2D:
textures = [[duration*i, duration*(i+1), index] for i, index in enumerate(keyframes)]
self.anims[name] = Animation.create_texture_change(total_duration, textures=textures)
self.anims[name].start()
logger.info(f"Chara2D initialized: index={index}, bpm={bpm}, path={path}")
def set_animation(self, name: str):
"""
@@ -62,6 +66,7 @@ class Chara2D:
self.past_anim = 'gogo'
self.current_anim = name
self.anims[name].start()
logger.debug(f"Animation set: {name}")
def update(self, current_time_ms: float, bpm: float, is_clear: bool, is_rainbow: bool):
"""
Update the character's animation state and appearance.
@@ -75,10 +80,12 @@ class Chara2D:
if is_rainbow and not self.is_rainbow:
self.is_rainbow = True
self.set_animation('soul_in')
logger.info("Rainbow state entered, soul_in animation triggered")
if is_clear and not self.is_clear:
self.is_clear = True
self.set_animation('clear_in')
self.past_anim = 'clear'
logger.info("Clear state entered, clear_in animation triggered")
if bpm != self.bpm:
self.bpm = bpm
for name in self.tex.textures[self.name]:
@@ -90,6 +97,7 @@ class Chara2D:
textures = [[duration*i, duration*(i+1), index] for i, index in enumerate(keyframes)]
self.anims[name] = Animation.create_texture_change(total_duration, textures=textures)
self.anims[name].start()
logger.info(f"BPM changed, animations updated: bpm={bpm}")
self.anims[self.current_anim] = self.anims[self.current_anim]
self.anims[self.current_anim].update(current_time_ms)
if self.anims[self.current_anim].is_finished:

View File

@@ -1,3 +1,4 @@
import logging
from pathlib import Path
import random
from typing import Optional, Union
@@ -12,6 +13,8 @@ import pyray as ray
BOX_CENTER = 444
logger = logging.getLogger(__name__)
class SongBox:
"""A box for the song select screen."""
OUTLINE_MAP = {
@@ -132,6 +135,8 @@ class SongBox:
if self.move.is_finished:
self.position = self.target_position
self.move = None
if not (-56 <= self.position <= 1280):
self.reset()
def update(self, is_diff_select):
self.is_diff_select = is_diff_select
@@ -723,12 +728,14 @@ class FileNavigator:
self.box_open = False
self.genre_bg = None
self.song_count = 0
logger.info("FileNavigator initialized")
def initialize(self, root_dirs: list[Path]):
self.root_dirs = [Path(p) if not isinstance(p, Path) else p for p in root_dirs]
self._generate_all_objects()
self._create_virtual_root()
self.load_current_directory()
logger.info(f"FileNavigator initialized with root_dirs: {self.root_dirs}")
def _create_virtual_root(self):
"""Create a virtual root directory containing all root directories"""
@@ -762,12 +769,12 @@ class FileNavigator:
def _generate_all_objects(self):
"""Generate all Directory and SongFile objects in advance"""
print("Generating all Directory and SongFile objects...")
logging.info("Generating all Directory and SongFile objects...")
# Generate objects for each root directory
for root_path in self.root_dirs:
if not root_path.exists():
print(f"Root directory does not exist: {root_path}")
logging.warning(f"Root directory does not exist: {root_path}")
continue
self._generate_objects_recursive(root_path)
@@ -778,7 +785,7 @@ class FileNavigator:
if str(song_obj) in self.all_song_files:
self.all_song_files[str(song_obj)].box.is_favorite = True
print(f"Object generation complete. "
logging.info(f"Object generation complete. "
f"Directories: {len(self.all_directories)}, "
f"Songs: {len(self.all_song_files)}")
@@ -905,7 +912,7 @@ class FileNavigator:
global_data.song_progress = self.song_count / global_data.total_songs
self.all_song_files[song_key] = song_obj
except Exception as e:
print(f"Error creating SongFile for {tja_path}: {e}")
logger.error(f"Error creating SongFile for {tja_path}: {e}")
continue
def is_at_root(self) -> bool:
@@ -1191,7 +1198,7 @@ class FileNavigator:
elif line.startswith("#COLLECTION"):
collection = line.split(":", 1)[1].strip()
except Exception as e:
print(f"Error parsing box.def in {path}: {e}")
logger.error(f"Error parsing box.def in {path}: {e}")
return name, texture_index, collection
@@ -1237,7 +1244,7 @@ class FileNavigator:
if file_updated:
with open(path / 'song_list.txt', 'w', encoding='utf-8-sig') as song_list:
for line in updated_lines:
print("updated", line)
logger.info(f"updated: {line}")
song_list.write(line + '\n')
return tja_files
@@ -1323,7 +1330,7 @@ class FileNavigator:
with open(recents_path, 'w', encoding='utf-8-sig') as song_list:
song_list.writelines(recent_entries)
print("Added recent: ", song.hash, song.tja.metadata.title['en'], song.tja.metadata.subtitle['en'])
logger.info(f"Added Recent: {song.hash} {song.tja.metadata.title['en']} {song.tja.metadata.subtitle['en']}")
def add_favorite(self) -> bool:
"""Add the current song to the favorites list"""
@@ -1351,11 +1358,11 @@ class FileNavigator:
with open(favorites_path, 'w', encoding='utf-8-sig') as song_list:
for line in lines:
song_list.write(line + '\n')
print("Removed favorite:", song.hash, song.tja.metadata.title['en'], song.tja.metadata.subtitle['en'])
logger.info(f"Removed Favorite: {song.hash} {song.tja.metadata.title['en']} {song.tja.metadata.subtitle['en']}")
else:
with open(favorites_path, 'a', encoding='utf-8-sig') as song_list:
song_list.write(f'{song.hash}|{song.tja.metadata.title["en"]}|{song.tja.metadata.subtitle["en"]}\n')
print("Added favorite: ", song.hash, song.tja.metadata.title['en'], song.tja.metadata.subtitle['en'])
logger.info(f"Added Favorite: {song.hash} {song.tja.metadata.title['en']} {song.tja.metadata.subtitle['en']}")
return True
navigator = FileNavigator()

41
libs/screen.py Normal file
View File

@@ -0,0 +1,41 @@
import logging
from typing import Any
from libs.audio import audio
from libs.texture import tex
logger = logging.getLogger(__name__)
class Screen:
def __init__(self, name: str):
self.screen_init = False
self.screen_name = name
def _do_screen_start(self):
if not self.screen_init:
self.screen_init = True
self.on_screen_start()
logger.info(f"{self.__class__.__name__} initialized")
def on_screen_start(self) -> Any:
tex.load_screen_textures(self.screen_name)
logger.info(f"Loaded textures for screen: {self.screen_name}")
audio.load_screen_sounds(self.screen_name)
logger.info(f"Loaded sounds for screen: {self.screen_name}")
def on_screen_end(self, next_screen: str):
self.screen_init = False
logger.info(f"{self.__class__.__name__} ended, transitioning to {next_screen} screen")
audio.unload_all_sounds()
audio.unload_all_music()
logger.info(f"Unloaded sounds for screen: {next_screen}")
tex.unload_textures()
logger.info(f"Unloaded textures for screen: {next_screen}")
return next_screen
def update(self) -> Any:
ret_val = self._do_screen_start()
if ret_val:
return ret_val
def draw(self):
pass

View File

@@ -1,5 +1,6 @@
import configparser
import csv
import logging
import json
import sqlite3
import sys
@@ -9,6 +10,7 @@ from pathlib import Path
from libs.tja import NoteList, TJAParser, test_encodings
from libs.utils import get_config, global_data
logger = logging.getLogger(__name__)
def diff_hashes_object_hook(obj):
if "diff_hashes" in obj:
@@ -137,7 +139,7 @@ def build_song_hashes(output_dir=Path("cache")):
all_notes.bars.extend(branch.bars)
all_notes.bars.extend(diff_notes.bars)
except Exception as e:
print(f"Failed to parse TJA {tja_path}: {e}")
logger.error(f"Failed to parse TJA {tja_path}: {e}")
continue
if all_notes == NoteList():
@@ -190,7 +192,7 @@ def build_song_hashes(output_dir=Path("cache")):
""", (diff_hashes[i], en_name, jp_name, i, imported_scores[i], clear, bads))
if cursor.rowcount > 0:
action = "Added" if not existing_record else "Updated"
print(f"{action} entry for {en_name} ({i}) - Score: {imported_scores[i]}")
logger.info(f"{action} entry for {en_name} ({i}) - Score: {imported_scores[i]}")
conn.commit()
conn.close()
@@ -213,18 +215,18 @@ def build_song_hashes(output_dir=Path("cache")):
WHERE (en_name = ? AND jp_name = ?) AND diff = ?
""", (diff_hash, en_name, jp_name, diff))
if cursor.rowcount > 0:
print(f"Updated {cursor.rowcount} entries for {en_name} ({diff})")
logger.info(f"Updated {cursor.rowcount} entries for {en_name} ({diff})")
conn.commit()
conn.close()
print(f"Database update completed. Processed {len(db_updates)} difficulty hash updates.")
logger.info(f"Database update completed. Processed {len(db_updates)} difficulty hash updates.")
except sqlite3.Error as e:
print(f"Database error: {e}")
logger.error(f"Database error: {e}")
except Exception as e:
print(f"Error updating database: {e}")
logger.error(f"Error updating database: {e}")
elif db_updates:
print(f"Warning: scores.db not found, skipping {len(db_updates)} database updates")
logger.warning(f"Warning: scores.db not found, skipping {len(db_updates)} database updates")
# Save both files
with open(output_path, "w", encoding="utf-8") as f:
@@ -345,11 +347,11 @@ def get_japanese_songs_for_version(csv_file_path, version_column):
if len(matches) == 1:
path = matches[0][1]
elif len(matches) > 1:
print(
logger.info(
f"Multiple matches found for '{title.split('')[0]} ({title.split('')[1] if len(title.split('')) > 1 else ''})':"
)
for i, (key, path_val) in enumerate(matches, 1):
print(f"{i}. {key}: {path_val}")
logger.info(f"{i}. {key}: {path_val}")
choice = int(input("Choose number: ")) - 1
path = matches[choice][1]
else:
@@ -362,7 +364,7 @@ def get_japanese_songs_for_version(csv_file_path, version_column):
text_files[genre].append(
f"{hash}|{tja_parse.metadata.title['en'].strip()}|{tja_parse.metadata.subtitle['en'].strip()}"
)
print(f"Added {title}: {path}")
logger.info(f"Added {title}: {path}")
for genre in text_files:
if not Path(version_column).exists():
Path(version_column).mkdir()

View File

@@ -1,6 +1,7 @@
import copy
import json
import os
import logging
import tempfile
import zipfile
from pathlib import Path
@@ -13,6 +14,8 @@ from libs.animation import BaseAnimation, parse_animations
SCREEN_WIDTH = 1280
SCREEN_HEIGHT = 720
logger = logging.getLogger(__name__)
class Texture:
"""Texture class for managing textures and animations."""
def __init__(self, name: str, texture: Union[ray.Texture, list[ray.Texture]], init_vals: dict[str, int]):
@@ -42,14 +45,28 @@ class TextureWrapper:
def unload_textures(self):
"""Unload all textures and animations."""
ids = {} # Map ID to texture name
for zip in self.textures:
for file in self.textures[zip]:
tex_object = self.textures[zip][file]
if isinstance(tex_object.texture, list):
for texture in tex_object.texture:
ray.unload_texture(texture)
for i, texture in enumerate(tex_object.texture):
if texture.id in ids:
logger.warning(f"Duplicate texture ID {texture.id}: {ids[texture.id]} and {zip}/{file}[{i}]")
else:
ids[texture.id] = f"{zip}/{file}[{i}]"
ray.unload_texture(texture)
else:
ray.unload_texture(tex_object.texture)
if tex_object.texture.id in ids:
logger.warning(f"Duplicate texture ID {tex_object.texture.id}: {ids[tex_object.texture.id]} and {zip}/{file}")
else:
ids[tex_object.texture.id] = f"{zip}/{file}"
ray.unload_texture(tex_object.texture)
self.textures.clear()
self.animations.clear()
logger.info("All textures unloaded")
def get_animation(self, index: int, is_copy: bool = False):
"""Get an animation by ID and returns a reference.
@@ -93,52 +110,57 @@ class TextureWrapper:
if (screen_path / 'animation.json').exists():
with open(screen_path / 'animation.json') as json_file:
self.animations = parse_animations(json.loads(json_file.read()))
logger.info(f"Animations loaded for screen: {screen_name}")
def load_zip(self, screen_name: str, subset: str):
"""Load textures from a zip file."""
zip = (self.graphics_path / screen_name / subset).with_suffix('.zip')
if screen_name in self.textures and subset in self.textures[screen_name]:
return
with zipfile.ZipFile(zip, 'r') as zip_ref:
if 'texture.json' not in zip_ref.namelist():
raise Exception(f"texture.json file missing from {zip}")
try:
with zipfile.ZipFile(zip, 'r') as zip_ref:
if 'texture.json' not in zip_ref.namelist():
raise Exception(f"texture.json file missing from {zip}")
with zip_ref.open('texture.json') as json_file:
tex_mapping_data = json.loads(json_file.read().decode('utf-8'))
self.textures[zip.stem] = dict()
with zip_ref.open('texture.json') as json_file:
tex_mapping_data = json.loads(json_file.read().decode('utf-8'))
self.textures[zip.stem] = dict()
for tex_name in tex_mapping_data:
if f"{tex_name}/" in zip_ref.namelist():
tex_mapping = tex_mapping_data[tex_name]
for tex_name in tex_mapping_data:
if f"{tex_name}/" in zip_ref.namelist():
tex_mapping = tex_mapping_data[tex_name]
with tempfile.TemporaryDirectory() as temp_dir:
zip_ref.extractall(temp_dir, members=[name for name in zip_ref.namelist()
if name.startswith(tex_name)])
with tempfile.TemporaryDirectory() as temp_dir:
zip_ref.extractall(temp_dir, members=[name for name in zip_ref.namelist()
if name.startswith(tex_name)])
extracted_path = Path(temp_dir) / tex_name
if extracted_path.is_dir():
frames = [ray.load_texture(str(frame)) for frame in sorted(extracted_path.iterdir(),
key=lambda x: int(x.stem)) if frame.is_file()]
else:
frames = [ray.load_texture(str(extracted_path))]
self.textures[zip.stem][tex_name] = Texture(tex_name, frames, tex_mapping)
self._read_tex_obj_data(tex_mapping, self.textures[zip.stem][tex_name])
elif f"{tex_name}.png" in zip_ref.namelist():
tex_mapping = tex_mapping_data[tex_name]
png_filename = f"{tex_name}.png"
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file:
temp_file.write(zip_ref.read(png_filename))
temp_path = temp_file.name
try:
tex = ray.load_texture(temp_path)
self.textures[zip.stem][tex_name] = Texture(tex_name, tex, tex_mapping)
extracted_path = Path(temp_dir) / tex_name
if extracted_path.is_dir():
frames = [ray.load_texture(str(frame)) for frame in sorted(extracted_path.iterdir(),
key=lambda x: int(x.stem)) if frame.is_file()]
else:
frames = [ray.load_texture(str(extracted_path))]
self.textures[zip.stem][tex_name] = Texture(tex_name, frames, tex_mapping)
self._read_tex_obj_data(tex_mapping, self.textures[zip.stem][tex_name])
finally:
os.unlink(temp_path)
else:
raise Exception(f"Texture {tex_name} was not found in {zip}")
elif f"{tex_name}.png" in zip_ref.namelist():
tex_mapping = tex_mapping_data[tex_name]
png_filename = f"{tex_name}.png"
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file:
temp_file.write(zip_ref.read(png_filename))
temp_path = temp_file.name
try:
tex = ray.load_texture(temp_path)
self.textures[zip.stem][tex_name] = Texture(tex_name, tex, tex_mapping)
self._read_tex_obj_data(tex_mapping, self.textures[zip.stem][tex_name])
finally:
os.unlink(temp_path)
else:
raise Exception(f"Texture {tex_name} was not found in {zip}")
logger.info(f"Textures loaded from zip: {zip}")
except Exception as e:
logger.error(f"Failed to load textures from zip {zip}: {e}")
def load_screen_textures(self, screen_name: str) -> None:
"""Load textures for a screen."""
@@ -146,10 +168,12 @@ class TextureWrapper:
if (screen_path / 'animation.json').exists():
with open(screen_path / 'animation.json') as json_file:
self.animations = parse_animations(json.loads(json_file.read()))
logger.info(f"Animations loaded for screen: {screen_name}")
for zip in screen_path.iterdir():
if zip.is_dir() or zip.suffix != ".zip":
continue
self.load_zip(screen_name, zip.name)
logger.info(f"Screen textures loaded for: {screen_name}")
def control(self, tex_object: Texture, index: int = 0):
'''debug function'''
@@ -158,16 +182,16 @@ class TextureWrapper:
distance = 10
if ray.is_key_pressed(ray.KeyboardKey.KEY_LEFT):
tex_object.x[index] -= distance
print(f"{tex_object.name}: {tex_object.x[index]}, {tex_object.y[index]}")
logger.info(f"{tex_object.name}: {tex_object.x[index]}, {tex_object.y[index]}")
if ray.is_key_pressed(ray.KeyboardKey.KEY_RIGHT):
tex_object.x[index] += distance
print(f"{tex_object.name}: {tex_object.x[index]}, {tex_object.y[index]}")
logger.info(f"{tex_object.name}: {tex_object.x[index]}, {tex_object.y[index]}")
if ray.is_key_pressed(ray.KeyboardKey.KEY_UP):
tex_object.y[index] -= distance
print(f"{tex_object.name}: {tex_object.x[index]}, {tex_object.y[index]}")
logger.info(f"{tex_object.name}: {tex_object.x[index]}, {tex_object.y[index]}")
if ray.is_key_pressed(ray.KeyboardKey.KEY_DOWN):
tex_object.y[index] += distance
print(f"{tex_object.name}: {tex_object.x[index]}, {tex_object.y[index]}")
logger.info(f"{tex_object.name}: {tex_object.x[index]}, {tex_object.y[index]}")
def draw_texture(self, subset: str, texture: str, color: ray.Color=ray.WHITE, frame: int = 0, scale: float = 1.0, center: bool = False,
mirror: str = '', x: float = 0, y: float = 0, x2: float = 0, y2: float = 0,

View File

@@ -1,6 +1,7 @@
import bisect
import hashlib
import math
import logging
import random
from collections import deque
from dataclasses import dataclass, field, fields
@@ -285,6 +286,7 @@ def test_encodings(file_path):
continue
return final_encoding
logger = logging.getLogger(__name__)
class TJAParser:
"""Parse a TJA file and extract metadata and data.
@@ -318,6 +320,7 @@ class TJAParser:
self.metadata = TJAMetadata()
self.ex_data = TJAEXData()
logger.debug(f"Parsing TJA file: {self.file_path}")
self.get_metadata()
self.distance = distance

View File

@@ -2,6 +2,7 @@ import ctypes
import hashlib
import math
import sys
import logging
import time
import json
from libs.global_data import global_data
@@ -19,6 +20,7 @@ from raylib import (
from libs.texture import TextureWrapper
logger = logging.getLogger(__name__)
def force_dedicated_gpu():
"""Force Windows to use dedicated GPU for this application"""
@@ -29,13 +31,13 @@ def force_dedicated_gpu():
if nvapi:
ctypes.windll.kernel32.SetEnvironmentVariableW("SHIM_MCCOMPAT", "0x800000001")
except Exception as e:
print(e)
logger.error(e)
try:
# AMD PowerXpress
ctypes.windll.kernel32.SetEnvironmentVariableW("AMD_VULKAN_ICD", "DISABLE")
except Exception as e:
print(e)
logger.error(e)
def rounded(num: float) -> int:
"""Round a number to the nearest integer"""

View File

@@ -1,4 +1,5 @@
from pathlib import Path
import logging
import pyray as ray
from moviepy import VideoFileClip
@@ -6,6 +7,7 @@ from moviepy import VideoFileClip
from libs.audio import audio
from libs.utils import get_current_ms
logger = logging.getLogger(__name__)
class VideoPlayer:
def __init__(self, path: Path):
@@ -61,7 +63,7 @@ class VideoPlayer:
return texture
except Exception as e:
print(f"Error loading frame at index {index}: {e}")
logger.error(f"Error loading frame at index {index}: {e}")
return None
def _manage_buffer(self):