From 0012868d4e0655e9e94a36d77f6a3d9e95192b4b Mon Sep 17 00:00:00 2001 From: Yonokid <37304577+Yonokid@users.noreply.github.com> Date: Mon, 21 Apr 2025 02:14:21 -0400 Subject: [PATCH] add song select, results, config, and asio support --- config.toml | 22 ++ global_funcs.py | 568 --------------------------- libs/animation.py | 133 +++++++ libs/audio.py | 743 ++++++++++++++++++++++++++++++++++++ libs/tja.py | 277 ++++++++++++++ libs/utils.py | 75 ++++ libs/video.py | 107 ++++++ main.py | 41 +- entry.py => scenes/entry.py | 7 +- game.py => scenes/game.py | 155 ++++---- scenes/result.py | 23 ++ scenes/song_select.py | 83 ++++ title.py => scenes/title.py | 54 +-- 13 files changed, 1612 insertions(+), 676 deletions(-) create mode 100644 config.toml delete mode 100644 global_funcs.py create mode 100644 libs/animation.py create mode 100644 libs/audio.py create mode 100644 libs/tja.py create mode 100644 libs/utils.py create mode 100644 libs/video.py rename entry.py => scenes/entry.py (82%) rename game.py => scenes/game.py (91%) create mode 100644 scenes/result.py create mode 100644 scenes/song_select.py rename title.py => scenes/title.py (89%) diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..53d0fa3 --- /dev/null +++ b/config.toml @@ -0,0 +1,22 @@ +[general] +fps_counter = true +judge_offset = 0 + +[paths] +tja_path = 'Songs' +video_path = 'Videos' + +[keybinds] +left_kat = 'E' +left_don = 'F' +right_don = 'J' +right_kat = 'I' + +[audio] +device_type = 'ASIO' +asio_buffer = 6 + +[video] +fullscreen = true +borderless = true +vsync = true diff --git a/global_funcs.py b/global_funcs.py deleted file mode 100644 index aae7546..0000000 --- a/global_funcs.py +++ /dev/null @@ -1,568 +0,0 @@ -import time -import os -import pyray as ray -import cv2 -import math -import zipfile -import tempfile - -from collections import deque - -#TJA Format creator is unknown. I did not create the format, but I did write the parser though. - -def load_image_from_zip(zip_path, filename): - with zipfile.ZipFile(zip_path, 'r') as zip_ref: - with zip_ref.open(filename) as image_file: - with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file: - temp_file.write(image_file.read()) - temp_file_path = temp_file.name - image = ray.load_image(temp_file_path) - os.remove(temp_file_path) - return image - -def load_texture_from_zip(zip_path, filename): - with zipfile.ZipFile(zip_path, 'r') as zip_ref: - with zip_ref.open(filename) as image_file: - with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file: - temp_file.write(image_file.read()) - temp_file_path = temp_file.name - texture = ray.load_texture(temp_file_path) - os.remove(temp_file_path) - return texture - -def rounded(num): - sign = 1 if (num >= 0) else -1 - num = abs(num) - result = int(num) - if (num - result >= 0.5): - result += 1 - return sign * result - -def get_current_ms(): - return rounded(time.time() * 1000) - -def stripComments(code): - result = '' - index = 0 - is_line = True - for line in code.splitlines(): - comment_index = line.find('//') - if comment_index == -1: - result += line - is_line = True - elif comment_index != 0 and not line[:comment_index].isspace(): - result += line[:comment_index] - is_line = True - else: - is_line = False - index += 1 - return result - -def get_pixels_per_frame(bpm, time_signature, distance): - beat_duration = 60 / bpm - total_time = time_signature * beat_duration - total_frames = 60 * total_time - return (distance / total_frames) - -def calculate_base_score(play_note_list): - total_notes = 0 - balloon_num = 0 - balloon_sec = 0 - balloon_count = 0 - drumroll_sec = 0 - for i in range(len(play_note_list)): - note = play_note_list[i] - if i < len(play_note_list)-1: - next_note = play_note_list[i+1] - else: - next_note = play_note_list[len(play_note_list)-1] - if note.get('note') in {'1','2','3','4'}: - total_notes += 1 - elif note.get('note') in {'5', '6'}: - drumroll_sec += (next_note.get('ms') - note.get('ms')) / 1000 - elif note.get('note') in {'7', '9'}: - balloon_num += 1 - balloon_count += next_note.get('balloon') - total_score = (1000000 - (balloon_count * 100) - (drumroll_sec * 1692.0079999994086)) / total_notes - return math.ceil(total_score / 10) * 10 - -class TJAParser: - def __init__(self, path): - #Defined on startup - self.folder_path = path - self.folder_name = self.folder_path.split('\\')[-1] - self.file_path = f'{self.folder_path}\\{self.folder_name}.tja' - - #Defined on file_to_data() - self.data = [] - - #Defined on get_metadata() - self.title = '' - self.title_ja = '' - self.subtitle = '' - self.subtitle_ja = '' - self.wave = f'{self.folder_path}\\' - self.offset = 0 - self.demo_start = 0 - self.course_data = dict() - - #Defined in metadata but can change throughout the chart - self.bpm = 120 - self.time_signature = 4/4 - - self.distance = 0 - self.scroll_modifier = 1 - self.current_ms = 0 - self.barline_display = True - self.gogo_time = False - - def file_to_data(self): - with open(self.file_path, 'rt', encoding='utf-8-sig') as tja_file: - for line in tja_file: - line = stripComments(line).strip() - if line != '': - self.data.append(str(line)) - return self.data - - def get_metadata(self): - self.file_to_data() - diff_index = 1 - highest_diff = -1 - for item in self.data: - if item[0] == '#': - continue - elif 'SUBTITLEJA' in item: self.subtitle_ja = str(item.split('SUBTITLEJA:')[1]) - elif 'TITLEJA' in item: self.title_ja = str(item.split('TITLEJA:')[1]) - elif 'SUBTITLE' in item: self.subtitle = str(item.split('SUBTITLE:')[1][2:]) - elif 'TITLE' in item: self.title = str(item.split('TITLE:')[1]) - elif 'BPM' in item: self.bpm = float(item.split(':')[1]) - elif 'WAVE' in item: self.wave += str(item.split(':')[1]) - elif 'OFFSET' in item: self.offset = float(item.split(':')[1]) - elif 'DEMOSTART' in item: self.demo_start = float(item.split(':')[1]) - elif 'COURSE' in item: - course = str(item.split(':')[1]).lower() - if course == 'dan' or course == '6': - self.course_data[6] = [] - if course == 'tower' or course == '5': - self.course_data[5] = [] - elif course == 'edit' or course == '4': - self.course_data[4] = [] - elif course == 'oni' or course == '3': - self.course_data[3] = [] - elif course == 'hard' or course == '2': - self.course_data[2] = [] - elif course == 'normal' or course == '1': - self.course_data[1] = [] - elif course == 'easy' or course == '0': - self.course_data[0] = [] - highest_diff = max(self.course_data) - diff_index -= 1 - elif 'LEVEL' in item: - item = int(item.split(':')[1]) - self.course_data[diff_index+highest_diff].append(item) - elif 'BALLOON' in item: - item = item.split(':')[1] - if item == '': - continue - self.course_data[diff_index+highest_diff].append([int(x) for x in item.split(',')]) - elif 'SCOREINIT' in item: - if item.split(':')[1] == '': - continue - item = item.split(':')[1] - self.course_data[diff_index+highest_diff].append([int(x) for x in item.split(',')]) - elif 'SCOREDIFF' in item: - if item.split(':')[1] == '': - continue - item = int(item.split(':')[1]) - self.course_data[diff_index+highest_diff].append(item) - return [self.title, self.title_ja, self.subtitle, self.subtitle_ja, - self.bpm, self.wave, self.offset, self.demo_start, self.course_data] - - def data_to_notes(self, diff): - self.file_to_data() - #Get notes start and end - note_start = -1 - note_end = -1 - diff_count = 0 - for i in range(len(self.data)): - if self.data[i] == '#START': - note_start = i+1 - elif self.data[i] == '#END': - note_end = i - diff_count += 1 - if diff_count == len(self.course_data) - diff: - break - - notes = [] - bar = [] - #Check for measures and separate when comma exists - for i in range(note_start, note_end): - item = self.data[i].strip(',') - bar.append(item) - if item != self.data[i]: - notes.append(bar) - bar = [] - return notes, self.course_data[diff][1] - - def get_se_note(self, play_note_list, ms_per_measure, note, note_ms): - #Someone please refactor this - se_notes = {'1': [0, 1, 2], - '2': [3, 4], - '3': 5, - '4': 6, - '5': 7, - '6': 14, - '7': 9, - '8': 10, - '9': 11} - if len(play_note_list) > 1: - prev_note = play_note_list[-2] - if prev_note['note'] in {'1', '2'}: - if note_ms - prev_note['ms'] <= (ms_per_measure/8) - 1: - prev_note['se_note'] = se_notes[prev_note['note']][1] - else: - prev_note['se_note'] = se_notes[prev_note['note']][0] - else: - prev_note['se_note'] = se_notes[prev_note['note']] - if len(play_note_list) > 3: - if play_note_list[-4]['note'] == play_note_list[-3]['note'] == play_note_list[-2]['note'] == '1': - if (play_note_list[-3]['ms'] - play_note_list[-4]['ms'] < (ms_per_measure/8)) and (play_note_list[-2]['ms'] - play_note_list[-3]['ms'] < (ms_per_measure/8)): - if len(play_note_list) > 5: - if (play_note_list[-4]['ms'] - play_note_list[-5]['ms'] >= (ms_per_measure/8)) and (play_note_list[-1]['ms'] - play_note_list[-2]['ms'] >= (ms_per_measure/8)): - play_note_list[-3]['se_note'] = se_notes[play_note_list[-3]['note']][2] - else: - play_note_list[-3]['se_note'] = se_notes[play_note_list[-3]['note']][2] - else: - play_note_list[-1]['se_note'] = se_notes[note] - if play_note_list[-1]['note'] in {'1', '2'}: - play_note_list[-1]['se_note'] = se_notes[note][0] - else: - play_note_list[-1]['se_note'] = se_notes[note] - - def notes_to_position(self, diff): - play_note_list = deque() - bar_list = deque() - draw_note_list = deque() - notes, balloon = self.data_to_notes(diff) - index = 0 - balloon_index = 0 - drumroll_head = dict() - drumroll_tail = dict() - for bar in notes: - #Length of the bar is determined by number of notes excluding commands - bar_length = sum(len(part) for part in bar if '#' not in part) - - for part in bar: - if '#JPOSSCROLL' in part: - continue - elif '#NMSCROLL' in part: - continue - elif '#MEASURE' in part: - divisor = part.find('/') - self.time_signature = float(part[9:divisor]) / float(part[divisor+1:]) - continue - elif '#SCROLL' in part: - self.scroll_modifier = float(part[7:]) - continue - elif '#BPMCHANGE' in part: - self.bpm = float(part[11:]) - continue - elif '#BARLINEOFF' in part: - self.barline_display = False - continue - elif '#BARLINEON' in part: - self.barline_display = True - continue - elif '#GOGOSTART' in part: - self.gogo_time = True - continue - elif '#GOGOEND' in part: - self.gogo_time = False - continue - elif '#LYRIC' in part: - continue - #Unrecognized commands will be skipped for now - elif '#' in part: - continue - - #https://gist.github.com/KatieFrogs/e000f406bbc70a12f3c34a07303eec8b#measure - ms_per_measure = 60000 * (self.time_signature*4) / self.bpm - - #Determines how quickly the notes need to move across the screen to reach the judgment circle in time - pixels_per_frame = get_pixels_per_frame(self.bpm * self.time_signature * self.scroll_modifier, self.time_signature*4, self.distance) - pixels_per_ms = pixels_per_frame / (1000 / 60) - - bar_ms = self.current_ms - load_ms = bar_ms - (self.distance / pixels_per_ms) - - if self.barline_display: - bar_list.append({'note': 'barline', 'ms': bar_ms, 'load_ms': load_ms, 'ppf': pixels_per_frame}) - - #Empty bar is still a bar, otherwise start increment - if len(part) == 0: - self.current_ms += ms_per_measure - increment = 0 - else: - increment = ms_per_measure / bar_length - - for note in part: - note_ms = self.current_ms - load_ms = note_ms - (self.distance / pixels_per_ms) - #Do not add blank notes otherwise lag - if note != '0': - play_note_list.append({'note': note, 'ms': note_ms, 'load_ms': load_ms, 'ppf': pixels_per_frame, 'index': index}) - self.get_se_note(play_note_list, ms_per_measure, note, note_ms) - index += 1 - if note in {'5', '6', '8'}: - play_note_list[-1]['color'] = 255 - if note == '8' and play_note_list[-2]['note'] in ('7', '9'): - if balloon_index >= len(balloon): - play_note_list[-1]['balloon'] = 0 - else: - play_note_list[-1]['balloon'] = int(balloon[balloon_index]) - balloon_index += 1 - self.current_ms += increment - - # https://stackoverflow.com/questions/72899/how-to-sort-a-list-of-dictionaries-by-a-value-of-the-dictionary-in-python - # Sorting by load_ms is necessary for drawing, as some notes appear on the - # screen slower regardless of when they reach the judge circle - # Bars can be sorted like this because they don't need hit detection - draw_note_list = deque(sorted(play_note_list, key=lambda d: d['load_ms'])) - bar_list = deque(sorted(bar_list, key=lambda d: d['load_ms'])) - return play_note_list, draw_note_list, bar_list - -class Animation: - def __init__(self, current_ms, duration, type): - self.type = type - self.start_ms = current_ms - self.attribute = 0 - self.duration = duration - self.params = dict() - self.is_finished = False - - def update(self, current_ms): - if self.type == 'fade': - self.fade(current_ms, - self.duration, - initial_opacity=self.params.get('initial_opacity', 1.0), - final_opacity=self.params.get('final_opacity', 0.0), - delay=self.params.get('delay', 0.0), - ease_in=self.params.get('ease_in', None), - ease_out=self.params.get('ease_out', None)) - if self.params.get('reverse', None) is not None and current_ms - self.start_ms >= self.duration + self.params.get('delay', 0.0): - self.fade(current_ms, - self.duration, - final_opacity=self.params.get('initial_opacity', 1.0), - initial_opacity=self.params.get('final_opacity', 0.0), - delay=self.params.get('delay', 0.0) + self.duration + self.params.get('reverse'), - ease_in=self.params.get('ease_in', None), - ease_out=self.params.get('ease_out', None)) - elif self.type == 'move': - self.move(current_ms, - self.duration, - self.params['total_distance'], - self.params['start_position'], - delay=self.params.get('delay', 0.0)) - elif self.type == 'texture_change': - self.texture_change(current_ms, - self.duration, - self.params['textures']) - elif self.type == 'text_stretch': - self.text_stretch(current_ms, - self.duration) - elif self.type == 'texture_resize': - self.texture_resize(current_ms, - self.duration, - initial_size=self.params.get('initial_size', 1.0), - final_size=self.params.get('final_size', 1.0), - delay=self.params.get('delay', 0.0)) - if self.params.get('reverse', None) is not None and current_ms - self.start_ms >= self.duration + self.params.get('delay', 0.0): - self.texture_resize(current_ms, - self.duration, - final_size=self.params.get('initial_size', 1.0), - initial_size=self.params.get('final_size', 1.0), - delay=self.params.get('delay', 0.0) + self.duration) - - def fade(self, current_ms, duration, initial_opacity, final_opacity, delay, ease_in, ease_out): - def ease_out_progress(progress, ease): - if ease == 'quadratic': - return progress * (2 - progress) - elif ease == 'cubic': - return 1 - pow(1 - progress, 3) - elif ease == 'exponential': - return 1 - pow(2, -10 * progress) - else: - return progress - def ease_in_progress(progress, ease): - if ease == 'quadratic': - return progress * progress - elif ease == 'cubic': - return progress * progress * progress - elif ease == 'exponential': - return pow(2, 10 * (progress - 1)) - else: - return progress - elapsed_time = current_ms - self.start_ms - if elapsed_time < delay: - self.attribute = initial_opacity - - elapsed_time -= delay - if elapsed_time >= duration: - self.attribute = final_opacity - self.is_finished = True - - if ease_in is not None: - progress = ease_in_progress(elapsed_time / duration, ease_in) - elif ease_out is not None: - progress = ease_out_progress(elapsed_time / duration, ease_out) - else: - progress = elapsed_time / duration - - current_opacity = initial_opacity + (final_opacity - initial_opacity) * progress - self.attribute = current_opacity - def move(self, current_ms, duration, total_distance, start_position, delay): - elapsed_time = current_ms - self.start_ms - if elapsed_time < delay: - self.attribute = start_position - - elapsed_time -= delay - if elapsed_time <= duration: - progress = elapsed_time / duration - self.attribute = start_position + (total_distance * progress) - else: - self.attribute = start_position + total_distance - self.is_finished = True - def texture_change(self, current_ms, duration, textures): - elapsed_time = current_ms - self.start_ms - if elapsed_time <= duration: - for start, end, index in textures: - if start < elapsed_time <= end: - self.attribute = index - else: - self.is_finished = True - def text_stretch(self, current_ms, duration): - elapsed_time = current_ms - self.start_ms - if elapsed_time <= duration: - self.attribute = 2 + 5 * (elapsed_time // 25) - elif elapsed_time <= duration + 116: - frame_time = (elapsed_time - duration) // 16.57 - self.attribute = 2 + 10 - (2 * (frame_time + 1)) - else: - self.attribute = 0 - self.is_finished = True - def texture_resize(self, current_ms, duration, initial_size, final_size, delay): - elapsed_time = current_ms - self.start_ms - if elapsed_time < delay: - self.attribute = initial_size - elapsed_time -= delay - if elapsed_time >= duration: - self.attribute = final_size - self.is_finished = True - elif elapsed_time < duration: - progress = elapsed_time / duration - self.attribute = initial_size + ((final_size - initial_size) * progress) - else: - self.attribute = final_size - self.is_finished = True - -class VideoPlayer: - def __init__(self, path, loop_start=None): - self.video_path = path - self.start_ms = None - self.loop_start = loop_start - - self.current_frame = None - self.last_frame = self.current_frame - self.frame_index = 0 - self.frames = [] - self.cap = cv2.VideoCapture(self.video_path) - self.fps = self.cap.get(cv2.CAP_PROP_FPS) - - self.is_finished = [False, False] - audio_path = path[:-4] + '.ogg' - self.audio = ray.load_music_stream(audio_path) - - def convert_frames_background(self, index): - if not self.cap.isOpened(): - raise ValueError("Error: Could not open video file.") - - total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) - if len(self.frames) == total_frames: - return 0 - self.cap.set(cv2.CAP_PROP_POS_FRAMES, index) - - success, frame = self.cap.read() - - timestamp = (index / self.fps * 1000) - frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - - new_frame = ray.Image(frame_rgb.tobytes(), frame_rgb.shape[1], frame_rgb.shape[0], 1, ray.PixelFormat.PIXELFORMAT_UNCOMPRESSED_R8G8B8) - - self.frames.append((timestamp, new_frame)) - print(len(self.frames), total_frames) - - def convert_frames(self): - if not self.cap.isOpened(): - raise ValueError("Error: Could not open video file.") - - frame_count = 0 - success, frame = self.cap.read() - - while success: - timestamp = (frame_count / self.fps * 1000) - frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - - new_frame = ray.Image(frame_rgb.tobytes(), frame_rgb.shape[1], frame_rgb.shape[0], 1, ray.PixelFormat.PIXELFORMAT_UNCOMPRESSED_R8G8B8) - - self.frames.append((timestamp, new_frame)) - - success, frame = self.cap.read() - frame_count += 1 - - self.cap.release() - print(f"Extracted {len(self.frames)} frames.") - self.start_ms = get_current_ms() - - def check_for_start(self): - if self.start_ms is None: - self.start_ms = get_current_ms() - ray.play_music_stream(self.audio) - if self.frames == []: - self.convert_frames() - - def audio_manager(self): - ray.update_music_stream(self.audio) - time_played = ray.get_music_time_played(self.audio) / ray.get_music_time_length(self.audio) - ending_lenience = 0.95 - if time_played > ending_lenience: - self.is_finished[1] = True - - def update(self): - self.check_for_start() - self.audio_manager() - - if self.frame_index == len(self.frames)-1: - self.is_finished[0] = True - return - - if self.start_ms is None: - return - - timestamp, frame = self.frames[self.frame_index][0], self.frames[self.frame_index][1] - elapsed_time = get_current_ms() - self.start_ms - if elapsed_time >= timestamp: - self.current_frame = ray.load_texture_from_image(frame) - if self.last_frame != self.current_frame and self.last_frame is not None: - ray.unload_texture(self.last_frame) - self.frame_index += 1 - self.last_frame = self.current_frame - - def draw(self): - if self.current_frame is not None: - ray.draw_texture(self.current_frame, 0, 0, ray.WHITE) - - def __del__(self): - if hasattr(self, 'current_frame') and self.current_frame: - ray.unload_texture(self.current_frame) - if hasattr(self, 'last_frame') and self.last_frame: - ray.unload_texture(self.last_frame) diff --git a/libs/animation.py b/libs/animation.py new file mode 100644 index 0000000..e203450 --- /dev/null +++ b/libs/animation.py @@ -0,0 +1,133 @@ +class Animation: + def __init__(self, current_ms, duration, type): + self.type = type + self.start_ms = current_ms + self.attribute = 0 + self.duration = duration + self.params = dict() + self.is_finished = False + + def update(self, current_ms): + if self.type == 'fade': + self.fade(current_ms, + self.duration, + initial_opacity=self.params.get('initial_opacity', 1.0), + final_opacity=self.params.get('final_opacity', 0.0), + delay=self.params.get('delay', 0.0), + ease_in=self.params.get('ease_in', None), + ease_out=self.params.get('ease_out', None)) + if self.params.get('reverse', None) is not None and current_ms - self.start_ms >= self.duration + self.params.get('delay', 0.0): + self.fade(current_ms, + self.duration, + final_opacity=self.params.get('initial_opacity', 1.0), + initial_opacity=self.params.get('final_opacity', 0.0), + delay=self.params.get('delay', 0.0) + self.duration + self.params.get('reverse'), + ease_in=self.params.get('ease_in', None), + ease_out=self.params.get('ease_out', None)) + elif self.type == 'move': + self.move(current_ms, + self.duration, + self.params['total_distance'], + self.params['start_position'], + delay=self.params.get('delay', 0.0)) + elif self.type == 'texture_change': + self.texture_change(current_ms, + self.duration, + self.params['textures']) + elif self.type == 'text_stretch': + self.text_stretch(current_ms, + self.duration) + elif self.type == 'texture_resize': + self.texture_resize(current_ms, + self.duration, + initial_size=self.params.get('initial_size', 1.0), + final_size=self.params.get('final_size', 1.0), + delay=self.params.get('delay', 0.0)) + if self.params.get('reverse', None) is not None and current_ms - self.start_ms >= self.duration + self.params.get('delay', 0.0): + self.texture_resize(current_ms, + self.duration, + final_size=self.params.get('initial_size', 1.0), + initial_size=self.params.get('final_size', 1.0), + delay=self.params.get('delay', 0.0) + self.duration) + + def fade(self, current_ms, duration, initial_opacity, final_opacity, delay, ease_in, ease_out): + def ease_out_progress(progress, ease): + if ease == 'quadratic': + return progress * (2 - progress) + elif ease == 'cubic': + return 1 - pow(1 - progress, 3) + elif ease == 'exponential': + return 1 - pow(2, -10 * progress) + else: + return progress + def ease_in_progress(progress, ease): + if ease == 'quadratic': + return progress * progress + elif ease == 'cubic': + return progress * progress * progress + elif ease == 'exponential': + return pow(2, 10 * (progress - 1)) + else: + return progress + elapsed_time = current_ms - self.start_ms + if elapsed_time < delay: + self.attribute = initial_opacity + + elapsed_time -= delay + if elapsed_time >= duration: + self.attribute = final_opacity + self.is_finished = True + + if ease_in is not None: + progress = ease_in_progress(elapsed_time / duration, ease_in) + elif ease_out is not None: + progress = ease_out_progress(elapsed_time / duration, ease_out) + else: + progress = elapsed_time / duration + + current_opacity = initial_opacity + (final_opacity - initial_opacity) * progress + self.attribute = current_opacity + def move(self, current_ms, duration, total_distance, start_position, delay): + elapsed_time = current_ms - self.start_ms + if elapsed_time < delay: + self.attribute = start_position + + elapsed_time -= delay + if elapsed_time <= duration: + progress = elapsed_time / duration + self.attribute = start_position + (total_distance * progress) + else: + self.attribute = start_position + total_distance + self.is_finished = True + def texture_change(self, current_ms, duration, textures): + elapsed_time = current_ms - self.start_ms + if elapsed_time <= duration: + for start, end, index in textures: + if start < elapsed_time <= end: + self.attribute = index + else: + self.is_finished = True + def text_stretch(self, current_ms, duration): + elapsed_time = current_ms - self.start_ms + if elapsed_time <= duration: + self.attribute = 2 + 5 * (elapsed_time // 25) + elif elapsed_time <= duration + 116: + frame_time = (elapsed_time - duration) // 16.57 + self.attribute = 2 + 10 - (2 * (frame_time + 1)) + else: + self.attribute = 0 + self.is_finished = True + def texture_resize(self, current_ms, duration, initial_size, final_size, delay): + elapsed_time = current_ms - self.start_ms + if elapsed_time < delay: + self.attribute = initial_size + elapsed_time -= delay + if elapsed_time >= duration: + self.attribute = final_size + self.is_finished = True + elif elapsed_time < duration: + progress = elapsed_time / duration + self.attribute = initial_size + ((final_size - initial_size) * progress) + else: + self.attribute = final_size + self.is_finished = True diff --git a/libs/audio.py b/libs/audio.py new file mode 100644 index 0000000..ddce10c --- /dev/null +++ b/libs/audio.py @@ -0,0 +1,743 @@ +import io +import os +import queue +import time +import wave +from threading import Lock, Thread + +import numpy as np +import pyray as ray + +os.environ["SD_ENABLE_ASIO"] = "1" +import sounddevice as sd +from pydub import AudioSegment +from scipy import signal + +from libs.utils import get_config + + +def resample(data, orig_sr, target_sr): + ratio = target_sr / orig_sr + + if ratio == 1.0: + return data + + if len(data.shape) == 1: + resampled_data = signal.resample_poly(data, target_sr, orig_sr) + else: + num_channels = data.shape[1] + resampled_channels = [] + + for ch in range(num_channels): + channel_data = data[:, ch] + resampled_channel = signal.resample_poly(channel_data, target_sr, orig_sr) + resampled_channels.append(resampled_channel) + resampled_data = np.column_stack(resampled_channels) + return resampled_data + +def get_np_array(sample_width, raw_data): + if sample_width == 1: + # 8-bit samples are unsigned + data = np.frombuffer(raw_data, dtype=np.uint8) + return (data.astype(np.float32) - 128) / 128.0 + elif sample_width == 2: + # 16-bit samples are signed + data = np.frombuffer(raw_data, dtype=np.int16) + return data.astype(np.float32) / 32768.0 + elif sample_width == 3: + # 24-bit samples handling + data = np.zeros(len(raw_data) // 3, dtype=np.int32) + for i in range(len(data)): + data[i] = int.from_bytes(raw_data[i*3:i*3+3], byteorder='little', signed=True) + return data.astype(np.float32) / (2**23) + elif sample_width == 4: + # 32-bit samples are signed + data = np.frombuffer(raw_data, dtype=np.int32) + return data.astype(np.float32) / (2**31) + else: + raise ValueError(f"Unsupported sample width: {sample_width}") + +class Sound: + def __init__(self, file_path, data=None, target_sample_rate=48000): + self.file_path = file_path + self.data = data + self.channels = 0 + self.sample_rate = target_sample_rate + self.position = 0 + self.is_playing = False + self.is_paused = False + self.volume = 1.0 + self.pan = 0.5 # 0.0 = left, 0.5 = center, 1.0 = right + + if file_path: + self.load() + + def load(self): + """Load and prepare the sound file data""" + if self.file_path.endswith('.ogg'): + audio = AudioSegment.from_ogg(self.file_path) + wav_io = io.BytesIO() + audio.export(wav_io, format="wav") + wav_io.seek(0) + file_path = wav_io + else: + file_path = self.file_path + with wave.open(file_path, 'rb') as wf: + # Get file properties + self.channels = wf.getnchannels() + sample_width = wf.getsampwidth() + original_sample_rate = wf.getframerate() + frames = wf.getnframes() + + # Read all frames from the file + raw_data = wf.readframes(frames) + + data = get_np_array(sample_width, raw_data) + + # Reshape for multi-channel audio + if self.channels > 1: + data = data.reshape(-1, self.channels) + + # Resample if needed + if original_sample_rate != self.sample_rate: + print(f"Resampling {self.file_path} from {original_sample_rate}Hz to {self.sample_rate}Hz") + data = resample(data, original_sample_rate, self.sample_rate) + + self.data = data + + def play(self): + self.position = 0 + self.is_playing = True + self.is_paused = False + + def stop(self): + self.is_playing = False + self.is_paused = False + self.position = 0 + + def pause(self): + if self.is_playing: + self.is_paused = True + self.is_playing = False + + def resume(self): + if self.is_paused: + self.is_playing = True + self.is_paused = False + + def get_frames(self, num_frames): + """Get the next num_frames of audio data, applying volume, pitch, and pan""" + if self.data is None: + return + if not self.is_playing: + # Return silence if not playing + if self.channels == 1: + return np.zeros(num_frames, dtype=np.float32) + else: + return np.zeros((num_frames, self.channels), dtype=np.float32) + + # Calculate how many frames we have left + frames_left = len(self.data) - self.position + if self.channels > 1: + frames_left = self.data.shape[0] - self.position + + if frames_left <= 0: + # We've reached the end of the sound + self.is_playing = False + if self.channels == 1: + return np.zeros(num_frames, dtype=np.float32) + else: + return np.zeros((num_frames, self.channels), dtype=np.float32) + + # Get the actual frames to return + frames_to_get = min(num_frames, frames_left) + + if self.channels == 1: + output = np.zeros(num_frames, dtype=np.float32) + output[:frames_to_get] = self.data[self.position:self.position+frames_to_get] + else: + output = np.zeros((num_frames, self.channels), dtype=np.float32) + output[:frames_to_get] = self.data[self.position:self.position+frames_to_get] + + self.position += frames_to_get + + output *= self.volume + + # Apply pan for stereo output + if self.channels == 2 and self.pan != 0.5: + # pan=0: full left, pan=0.5: center, pan=1: full right + left_vol = min(1.0, 2.0 * (1.0 - self.pan)) + right_vol = min(1.0, 2.0 * self.pan) + output[:, 0] *= left_vol + output[:, 1] *= right_vol + return output + +class Music: + def __init__(self, file_path, data=None, file_type=None, target_sample_rate=48000): + self.file_path = file_path + self.file_type = file_type + self.data = data + self.target_sample_rate = target_sample_rate + self.sample_rate = target_sample_rate + self.channels = 0 + self.position = 0 # In frames + self.is_playing = False + self.is_paused = False + self.volume = 1.0 + self.pan = 0.5 # Center + self.total_frames = 0 + self.valid = False + + self.wave_file = None + self.file_buffer_size = int(target_sample_rate * 5) # 5 seconds buffer + self.buffer = None + self.buffer_position = 0 + + # Thread-safe updates + self.lock = Lock() + + self.load_from_file() + + def load_from_file(self): + """Load music from file""" + if self.file_path.endswith('.ogg'): + audio = AudioSegment.from_ogg(self.file_path) + wav_io = io.BytesIO() + audio.export(wav_io, format="wav") + wav_io.seek(0) + file_path = wav_io + else: + file_path = self.file_path + try: + # Keep the file open for streaming + self.wave_file = wave.open(file_path, 'rb') + + # Get file properties + self.channels = self.wave_file.getnchannels() + self.sample_width = self.wave_file.getsampwidth() + self.sample_rate = self.wave_file.getframerate() + self.total_frames = self.wave_file.getnframes() + + # Initialize buffer with some initial data + self._fill_buffer() + + self.valid = True + print(f"Music loaded: {self.channels} channels, {self.sample_rate}Hz, {self.total_frames} frames") + except Exception as e: + print(f"Error loading music file: {e}") + if self.wave_file: + self.wave_file.close() + self.wave_file = None + self.valid = False + + def _fill_buffer(self): + """Fill the streaming buffer from file""" + if not self.wave_file: + return False + + # Read a chunk of frames from file + try: + frames_to_read = min(self.file_buffer_size, self.total_frames - self.position) + if frames_to_read <= 0: + return False + + raw_data = self.wave_file.readframes(frames_to_read) + + data = get_np_array(self.sample_width, raw_data) + + # Reshape for multi-channel audio + if self.channels > 1: + data = data.reshape(-1, self.channels) + + if self.sample_rate != self.target_sample_rate: + print(f"Resampling {self.file_path} from {self.sample_rate}Hz to {self.target_sample_rate}Hz") + data = resample(data, self.sample_rate, self.target_sample_rate) + + self.buffer = data + self.buffer_position = 0 + return True + except Exception as e: + print(f"Error filling buffer: {e}") + return False + + def update(self): + """Update music stream buffers""" + if not self.is_playing or self.is_paused: + return + + with self.lock: + # Check if we need to refill the buffer + if self.buffer is None: + raise Exception("buffer is None") + if self.wave_file and self.buffer_position >= len(self.buffer): + if not self._fill_buffer(): + self.is_playing = False + + def play(self): + """Start playing the music stream""" + with self.lock: + # Reset position if at the end + if self.wave_file and self.position >= self.total_frames: + self.wave_file.rewind() + self.position = 0 + self.buffer_position = 0 + self._fill_buffer() + + self.is_playing = True + self.is_paused = False + + def stop(self): + """Stop playing the music stream""" + with self.lock: + self.is_playing = False + self.is_paused = False + self.position = 0 + self.buffer_position = 0 + if self.wave_file: + self.wave_file.rewind() + self._fill_buffer() + + def pause(self): + """Pause the music playback""" + with self.lock: + if self.is_playing: + self.is_paused = True + self.is_playing = False + + def resume(self): + """Resume the music playback""" + with self.lock: + if self.is_paused: + self.is_playing = True + self.is_paused = False + + def seek(self, position_seconds): + """Seek to a specific position in seconds""" + with self.lock: + # Convert seconds to frames + frame_position = int(position_seconds * self.sample_rate) + + # Clamp position to valid range + frame_position = max(0, min(frame_position, self.total_frames - 1)) + + # Update file position if streaming from file + if self.wave_file: + self.wave_file.setpos(frame_position) + self._fill_buffer() + + self.position = frame_position + self.buffer_position = 0 + + def get_time_length(self): + """Get the total length of the music in seconds""" + return self.total_frames / self.sample_rate + + def get_time_played(self): + """Get the current playback position in seconds""" + return (self.position + self.buffer_position) / self.sample_rate + + def get_frames(self, num_frames): + """Get the next num_frames of music data, applying volume, pitch, and pan""" + if not self.is_playing: + # Return silence if not playing + if self.channels == 1: + return np.zeros(num_frames, dtype=np.float32) + else: + return np.zeros((num_frames, self.channels), dtype=np.float32) + + with self.lock: + if self.buffer is None: + raise Exception("buffer is None") + # Check if we need more data + if self.buffer_position >= len(self.buffer): + # If no more data available and streaming from file + if self.wave_file and not self._fill_buffer(): + self.is_playing = False + if self.channels == 1: + return np.zeros(num_frames, dtype=np.float32) + else: + return np.zeros((num_frames, self.channels), dtype=np.float32) + + # Calculate how many frames we have left in buffer + frames_left_in_buffer = len(self.buffer) - self.buffer_position + if self.channels > 1: + frames_left_in_buffer = self.buffer.shape[0] - self.buffer_position + + frames_to_get = min(num_frames, frames_left_in_buffer) + + if self.channels == 1: + output = np.zeros(num_frames, dtype=np.float32) + output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get] + else: + output = np.zeros((num_frames, self.channels), dtype=np.float32) + output[:frames_to_get] = self.buffer[self.buffer_position:self.buffer_position+frames_to_get] + + # Update buffer position + self.buffer_position += frames_to_get + self.position += frames_to_get + + # Apply volume + output *= self.volume + + # Apply pan for stereo output + if self.channels == 2 and self.pan != 0.5: + # pan=0: full left, pan=0.5: center, pan=1: full right + left_vol = min(1.0, 2.0 * (1.0 - self.pan)) + right_vol = min(1.0, 2.0 * self.pan) + + output[:, 0] *= left_vol + output[:, 1] *= right_vol + + return output + + def __del__(self): + """Cleanup when the music object is deleted""" + if self.wave_file: + try: + self.wave_file.close() + except Exception: + raise Exception("unable to close music stream") + +class ASIOEngine: + def __init__(self): + self.target_sample_rate = 48000 + self.buffer_size = get_config()["audio"]["asio_buffer"] + self.sounds = {} + self.music_streams = {} + self.stream = None + self.device_id = None + self.running = False + self.sound_queue = queue.Queue() + self.music_queue = queue.Queue() + self.master_volume = 1.0 + self.output_channels = 2 # Default to stereo + self.audio_device_ready = False + + # Threading for music stream updates + self.update_thread = None + self.update_thread_running = False + + def _initialize_asio(self): + """Set up ASIO device""" + # Find ASIO API and use its default device + hostapis = sd.query_hostapis() + asio_api_index = -1 + for i, api in enumerate(hostapis): + if isinstance(api, dict) and 'name' in api and api['name'] == 'ASIO': + asio_api_index = i + break + + if asio_api_index is not None: + asio_api = hostapis[asio_api_index] + if isinstance(asio_api, dict) and 'default_output_device' in asio_api: + default_asio_device = asio_api['default_output_device'] + else: + raise Exception("Warning: 'default_output_device' key not found in ASIO API info.") + if default_asio_device >= 0: + self.device_id = default_asio_device + device_info = sd.query_devices(self.device_id) + if isinstance(device_info, sd.DeviceList): + raise Exception("Invalid ASIO Device") + print(f"Using default ASIO device: {device_info['name']}") + # Set output channels based on device capabilities + self.output_channels = device_info['max_output_channels'] + if self.output_channels > 2: + # Limit to stereo for simplicity + self.output_channels = 2 + return True + else: + print("No default ASIO device found, using system default.") + else: + print("ASIO API not found, using system default device.") + + # If we get here, use default system device + self.device_id = None + device_info = sd.query_devices(sd.default.device[1]) + if isinstance(device_info, sd.DeviceList): + raise Exception("Invalid ASIO Device") + self.output_channels = min(2, device_info['max_output_channels']) + return True + + def _audio_callback(self, outdata, frames, time, status): + """Callback function for the sounddevice stream""" + if status: + print(f"Status: {status}") + + # Process any new sound play requests + while not self.sound_queue.empty(): + try: + sound_name = self.sound_queue.get_nowait() + if sound_name in self.sounds: + self.sounds[sound_name].play() + except queue.Empty: + break + + # Process any new music play requests + while not self.music_queue.empty(): + try: + music_name, action, *args = self.music_queue.get_nowait() + if music_name in self.music_streams: + music = self.music_streams[music_name] + if action == 'play': + music.play() + elif action == 'stop': + music.stop() + elif action == 'pause': + music.pause() + elif action == 'resume': + music.resume() + elif action == 'seek' and args: + music.seek(args[0]) + except queue.Empty: + break + + # Mix all playing sounds and music + output = np.zeros((frames, self.output_channels), dtype=np.float32) + + # Mix sounds + for sound_name, sound in self.sounds.items(): + if sound.is_playing: + sound_data = sound.get_frames(frames) + + # If mono sound but stereo output, duplicate to both channels + if sound.channels == 1 and self.output_channels > 1: + sound_data = np.column_stack([sound_data] * self.output_channels) + + # Ensure sound_data matches the output format + if sound.channels > self.output_channels: + # Down-mix if needed + if self.output_channels == 1: + sound_data = np.mean(sound_data, axis=1) + else: + # Keep only the first output_channels + sound_data = sound_data[:, :self.output_channels] + + # Add to the mix (simple additive mixing) + output += sound_data + + # Mix music streams + for music_name, music in self.music_streams.items(): + if music.is_playing: + music_data = music.get_frames(frames) + + # If mono music but stereo output, duplicate to both channels + if music.channels == 1 and self.output_channels > 1: + music_data = np.column_stack([music_data] * self.output_channels) + + # Ensure music_data matches the output format + if music.channels > self.output_channels: + # Down-mix if needed + if self.output_channels == 1: + music_data = np.mean(music_data, axis=1) + else: + # Keep only the first output_channels + music_data = music_data[:, :self.output_channels] + + # Add to the mix + output += music_data + + # Apply master volume + output *= self.master_volume + + # Apply simple limiter to prevent clipping + max_val = np.max(np.abs(output)) + if max_val > 1.0: + output = output / max_val + + outdata[:] = output + + def _start_update_thread(self): + """Start a thread to update music streams""" + self.update_thread_running = True + self.update_thread = Thread(target=self._update_music_thread) + self.update_thread.daemon = True + self.update_thread.start() + + def _update_music_thread(self): + """Thread function to update all music streams""" + while self.update_thread_running: + # Update all active music streams + for music_name, music in self.music_streams.items(): + if music.is_playing: + music.update() + + # Sleep to not consume too much CPU + time.sleep(0.1) + + def init_audio_device(self): + if self.audio_device_ready: + return True + + try: + # Try to use ASIO if available + self._initialize_asio() + + # Set up and start the stream + self.stream = sd.OutputStream( + samplerate=self.target_sample_rate, + channels=self.output_channels, + callback=self._audio_callback, + blocksize=self.buffer_size, + device=self.device_id + ) + self.stream.start() + self.running = True + self.audio_device_ready = True + + # Start update thread for music streams + self._start_update_thread() + + print(f"Audio device initialized with {self.output_channels} channels at {self.target_sample_rate}Hz") + return True + except Exception as e: + print(f"Error initializing audio device: {e}") + self.audio_device_ready = False + return False + + def close_audio_device(self): + self.update_thread_running = False + if self.update_thread: + self.update_thread.join(timeout=1.0) + + if self.stream: + self.stream.stop() + self.stream.close() + self.stream = None + + self.running = False + self.audio_device_ready = False + print("Audio device closed") + return + + def is_audio_device_ready(self) -> bool: + return self.audio_device_ready + + def set_master_volume(self, volume: float): + self.master_volume = max(0.0, min(1.0, volume)) + + def get_master_volume(self) -> float: + return self.master_volume + + def load_sound(self, fileName: str) -> str | None: + try: + sound = Sound(fileName, self.target_sample_rate) + sound_id = f"sound_{len(self.sounds)}" + self.sounds[sound_id] = sound + print(f"Loaded sound from {fileName} as {sound_id}") + return sound_id + except Exception as e: + print(f"Error loading sound: {e}") + return None + + def play_sound(self, sound): + if sound in self.sounds: + self.sound_queue.put(sound) + + def stop_sound(self, sound): + if sound in self.sounds: + self.sounds[sound].stop() + + def pause_sound(self, sound: str): + if sound in self.sounds: + self.sounds[sound].pause() + + def resume_sound(self, sound: str): + if sound in self.sounds: + self.sounds[sound].resume() + + def is_sound_playing(self, sound: str) -> bool: + if sound in self.sounds: + return self.sounds[sound].is_playing + return False + + def set_sound_volume(self, sound: str, volume: float): + if sound in self.sounds: + self.sounds[sound].volume = max(0.0, min(1.0, volume)) + + def set_sound_pan(self, sound: str, pan: float): + if sound in self.sounds: + self.sounds[sound].pan = max(0.0, min(1.0, pan)) + + def load_music_stream(self, fileName: str) -> str | None: + try: + music = Music(file_path=fileName, target_sample_rate=self.target_sample_rate) + music_id = f"music_{len(self.music_streams)}" + self.music_streams[music_id] = music + print(f"Loaded music stream from {fileName} as {music_id}") + return music_id + except Exception as e: + print(f"Error loading music stream: {e}") + return None + + def is_music_valid(self, music: str) -> bool: + if music in self.music_streams: + return self.music_streams[music].valid + return False + + def unload_music_stream(self, music: str): + if music in self.music_streams: + del self.music_streams[music] + + def play_music_stream(self, music: str): + if music in self.music_streams: + self.music_queue.put((music, 'play')) + + def is_music_stream_playing(self, music: str) -> bool: + if music in self.music_streams: + return self.music_streams[music].is_playing + return False + + def update_music_stream(self, music: str): + if music in self.music_streams: + self.music_streams[music].update() + + def stop_music_stream(self, music: str): + if music in self.music_streams: + self.music_queue.put((music, 'stop')) + + def pause_music_stream(self, music: str): + if music in self.music_streams: + self.music_queue.put((music, 'pause')) + + def resume_music_stream(self, music: str): + if music in self.music_streams: + self.music_queue.put((music, 'resume')) + + def seek_music_stream(self, music: str, position: float): + if music in self.music_streams: + self.music_queue.put((music, 'seek', position)) + + def set_music_volume(self, music: str, volume: float): + if music in self.music_streams: + self.music_streams[music].volume = max(0.0, min(1.0, volume)) + + def set_music_pan(self, music: str, pan: float): + if music in self.music_streams: + self.music_streams[music].pan = max(0.0, min(1.0, pan)) + + def get_music_time_length(self, music: str) -> float: + if music in self.music_streams: + return self.music_streams[music].get_time_length() + raise ValueError(f"Music stream {music} not initialized") + + def get_music_time_played(self, music: str) -> float: + if music in self.music_streams: + return self.music_streams[music].get_time_played() + raise ValueError(f"Music stream {music} not initialized") + +class AudioEngineWrapper: + def __init__(self, host_api): + self.host_api = host_api + if host_api == 'WASAPI': + self._module = ray + elif host_api == 'ASIO': + self._module = ASIOEngine() + else: + raise Exception("Invalid host API passed to wrapper") + def __getattr__(self, name): + try: + return getattr(self._module, name) + except AttributeError: + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}' and '{type(self._module).__name__}' has no attribute '{name}'") + +audio = AudioEngineWrapper(get_config()["audio"]["device_type"]) diff --git a/libs/tja.py b/libs/tja.py new file mode 100644 index 0000000..8b515e3 --- /dev/null +++ b/libs/tja.py @@ -0,0 +1,277 @@ +import math +from collections import deque + +from libs.utils import get_pixels_per_frame, strip_comments + + +def calculate_base_score(play_note_list: list[dict]) -> int: + total_notes = 0 + balloon_num = 0 + balloon_count = 0 + drumroll_sec = 0 + for i in range(len(play_note_list)): + note = play_note_list[i] + if i < len(play_note_list)-1: + next_note = play_note_list[i+1] + else: + next_note = play_note_list[len(play_note_list)-1] + if note.get('note') in {'1','2','3','4'}: + total_notes += 1 + elif note.get('note') in {'5', '6'}: + drumroll_sec += (next_note['ms'] - note['ms']) / 1000 + elif note.get('note') in {'7', '9'}: + balloon_num += 1 + balloon_count += next_note['balloon'] + total_score = (1000000 - (balloon_count * 100) - (drumroll_sec * 1692.0079999994086)) / total_notes + return math.ceil(total_score / 10) * 10 + +class TJAParser: + def __init__(self, path: str): + #Defined on startup + self.folder_path = path + self.folder_name = self.folder_path.split('\\')[-1] + self.file_path = f'{self.folder_path}\\{self.folder_name}.tja' + + #Defined on file_to_data() + self.data = [] + + #Defined on get_metadata() + self.title = '' + self.title_ja = '' + self.subtitle = '' + self.subtitle_ja = '' + self.wave = f'{self.folder_path}\\' + self.offset = 0 + self.demo_start = 0 + self.course_data = dict() + + #Defined in metadata but can change throughout the chart + self.bpm = 120 + self.time_signature = 4/4 + + self.distance = 0 + self.scroll_modifier = 1 + self.current_ms = 0 + self.barline_display = True + self.gogo_time = False + + def file_to_data(self): + with open(self.file_path, 'rt', encoding='utf-8-sig') as tja_file: + for line in tja_file: + line = strip_comments(line).strip() + if line != '': + self.data.append(str(line)) + return self.data + + def get_metadata(self): + self.file_to_data() + diff_index = 1 + highest_diff = -1 + for item in self.data: + if item[0] == '#': + continue + elif 'SUBTITLEJA' in item: + self.subtitle_ja = str(item.split('SUBTITLEJA:')[1]) + elif 'TITLEJA' in item: + self.title_ja = str(item.split('TITLEJA:')[1]) + elif 'SUBTITLE' in item: + self.subtitle = str(item.split('SUBTITLE:')[1][2:]) + elif 'TITLE' in item: + self.title = str(item.split('TITLE:')[1]) + elif 'BPM' in item: + self.bpm = float(item.split(':')[1]) + elif 'WAVE' in item: + self.wave += str(item.split(':')[1]) + elif 'OFFSET' in item: + self.offset = float(item.split(':')[1]) + elif 'DEMOSTART' in item: + self.demo_start = float(item.split(':')[1]) + elif 'COURSE' in item: + course = str(item.split(':')[1]).lower() + if course == 'dan' or course == '6': + self.course_data[6] = [] + if course == 'tower' or course == '5': + self.course_data[5] = [] + elif course == 'edit' or course == '4': + self.course_data[4] = [] + elif course == 'oni' or course == '3': + self.course_data[3] = [] + elif course == 'hard' or course == '2': + self.course_data[2] = [] + elif course == 'normal' or course == '1': + self.course_data[1] = [] + elif course == 'easy' or course == '0': + self.course_data[0] = [] + highest_diff = max(self.course_data) + diff_index -= 1 + elif 'LEVEL' in item: + item = int(item.split(':')[1]) + self.course_data[diff_index+highest_diff].append(item) + elif 'BALLOON' in item: + item = item.split(':')[1] + if item == '': + continue + self.course_data[diff_index+highest_diff].append([int(x) for x in item.split(',')]) + elif 'SCOREINIT' in item: + if item.split(':')[1] == '': + continue + item = item.split(':')[1] + self.course_data[diff_index+highest_diff].append([int(x) for x in item.split(',')]) + elif 'SCOREDIFF' in item: + if item.split(':')[1] == '': + continue + item = int(item.split(':')[1]) + self.course_data[diff_index+highest_diff].append(item) + return [self.title, self.title_ja, self.subtitle, self.subtitle_ja, + self.bpm, self.wave, self.offset, self.demo_start, self.course_data] + + def data_to_notes(self, diff): + self.file_to_data() + #Get notes start and end + note_start = -1 + note_end = -1 + diff_count = 0 + for i in range(len(self.data)): + if self.data[i] == '#START': + note_start = i+1 + elif self.data[i] == '#END': + note_end = i + diff_count += 1 + if diff_count == len(self.course_data) - diff: + break + + notes = [] + bar = [] + #Check for measures and separate when comma exists + for i in range(note_start, note_end): + item = self.data[i].strip(',') + bar.append(item) + if item != self.data[i]: + notes.append(bar) + bar = [] + return notes, self.course_data[diff][1] + + def get_se_note(self, play_note_list, ms_per_measure, note, note_ms): + #Someone please refactor this + se_notes = {'1': [0, 1, 2], + '2': [3, 4], + '3': 5, + '4': 6, + '5': 7, + '6': 14, + '7': 9, + '8': 10, + '9': 11} + if len(play_note_list) > 1: + prev_note = play_note_list[-2] + if prev_note['note'] in {'1', '2'}: + if note_ms - prev_note['ms'] <= (ms_per_measure/8) - 1: + prev_note['se_note'] = se_notes[prev_note['note']][1] + else: + prev_note['se_note'] = se_notes[prev_note['note']][0] + else: + prev_note['se_note'] = se_notes[prev_note['note']] + if len(play_note_list) > 3: + if play_note_list[-4]['note'] == play_note_list[-3]['note'] == play_note_list[-2]['note'] == '1': + if (play_note_list[-3]['ms'] - play_note_list[-4]['ms'] < (ms_per_measure/8)) and (play_note_list[-2]['ms'] - play_note_list[-3]['ms'] < (ms_per_measure/8)): + if len(play_note_list) > 5: + if (play_note_list[-4]['ms'] - play_note_list[-5]['ms'] >= (ms_per_measure/8)) and (play_note_list[-1]['ms'] - play_note_list[-2]['ms'] >= (ms_per_measure/8)): + play_note_list[-3]['se_note'] = se_notes[play_note_list[-3]['note']][2] + else: + play_note_list[-3]['se_note'] = se_notes[play_note_list[-3]['note']][2] + else: + play_note_list[-1]['se_note'] = se_notes[note] + if play_note_list[-1]['note'] in {'1', '2'}: + play_note_list[-1]['se_note'] = se_notes[note][0] + else: + play_note_list[-1]['se_note'] = se_notes[note] + + def notes_to_position(self, diff): + play_note_list = deque() + bar_list = deque() + draw_note_list = deque() + notes, balloon = self.data_to_notes(diff) + index = 0 + balloon_index = 0 + for bar in notes: + #Length of the bar is determined by number of notes excluding commands + bar_length = sum(len(part) for part in bar if '#' not in part) + + for part in bar: + if '#JPOSSCROLL' in part: + continue + elif '#NMSCROLL' in part: + continue + elif '#MEASURE' in part: + divisor = part.find('/') + self.time_signature = float(part[9:divisor]) / float(part[divisor+1:]) + continue + elif '#SCROLL' in part: + self.scroll_modifier = float(part[7:]) + continue + elif '#BPMCHANGE' in part: + self.bpm = float(part[11:]) + continue + elif '#BARLINEOFF' in part: + self.barline_display = False + continue + elif '#BARLINEON' in part: + self.barline_display = True + continue + elif '#GOGOSTART' in part: + self.gogo_time = True + continue + elif '#GOGOEND' in part: + self.gogo_time = False + continue + elif '#LYRIC' in part: + continue + #Unrecognized commands will be skipped for now + elif '#' in part: + continue + + #https://gist.github.com/KatieFrogs/e000f406bbc70a12f3c34a07303eec8b#measure + ms_per_measure = 60000 * (self.time_signature*4) / self.bpm + + #Determines how quickly the notes need to move across the screen to reach the judgment circle in time + pixels_per_frame = get_pixels_per_frame(self.bpm * self.time_signature * self.scroll_modifier, self.time_signature*4, self.distance) + pixels_per_ms = pixels_per_frame / (1000 / 60) + + bar_ms = self.current_ms + load_ms = bar_ms - (self.distance / pixels_per_ms) + + if self.barline_display: + bar_list.append({'note': 'barline', 'ms': bar_ms, 'load_ms': load_ms, 'ppf': pixels_per_frame}) + + #Empty bar is still a bar, otherwise start increment + if len(part) == 0: + self.current_ms += ms_per_measure + increment = 0 + else: + increment = ms_per_measure / bar_length + + for note in part: + note_ms = self.current_ms + load_ms = note_ms - (self.distance / pixels_per_ms) + #Do not add blank notes otherwise lag + if note != '0': + play_note_list.append({'note': note, 'ms': note_ms, 'load_ms': load_ms, 'ppf': pixels_per_frame, 'index': index}) + self.get_se_note(play_note_list, ms_per_measure, note, note_ms) + index += 1 + if note in {'5', '6', '8'}: + play_note_list[-1]['color'] = 255 + if note == '8' and play_note_list[-2]['note'] in ('7', '9'): + if balloon_index >= len(balloon): + play_note_list[-1]['balloon'] = 0 + else: + play_note_list[-1]['balloon'] = int(balloon[balloon_index]) + balloon_index += 1 + self.current_ms += increment + + # https://stackoverflow.com/questions/72899/how-to-sort-a-list-of-dictionaries-by-a-value-of-the-dictionary-in-python + # Sorting by load_ms is necessary for drawing, as some notes appear on the + # screen slower regardless of when they reach the judge circle + # Bars can be sorted like this because they don't need hit detection + draw_note_list = deque(sorted(play_note_list, key=lambda d: d['load_ms'])) + bar_list = deque(sorted(bar_list, key=lambda d: d['load_ms'])) + return play_note_list, draw_note_list, bar_list diff --git a/libs/utils.py b/libs/utils.py new file mode 100644 index 0000000..0890354 --- /dev/null +++ b/libs/utils.py @@ -0,0 +1,75 @@ +import os +import tempfile +import time +import zipfile +from dataclasses import dataclass +from typing import Any + +import pyray as ray +import tomllib + +#TJA Format creator is unknown. I did not create the format, but I did write the parser though. + +def load_image_from_zip(zip_path: str, filename: str) -> ray.Image: + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + with zip_ref.open(filename) as image_file: + with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file: + temp_file.write(image_file.read()) + temp_file_path = temp_file.name + image = ray.load_image(temp_file_path) + os.remove(temp_file_path) + return image + +def load_texture_from_zip(zip_path: str, filename: str) -> ray.Texture: + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + with zip_ref.open(filename) as image_file: + with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file: + temp_file.write(image_file.read()) + temp_file_path = temp_file.name + texture = ray.load_texture(temp_file_path) + os.remove(temp_file_path) + return texture + +def rounded(num: float) -> int: + sign = 1 if (num >= 0) else -1 + num = abs(num) + result = int(num) + if (num - result >= 0.5): + result += 1 + return sign * result + +def get_current_ms() -> int: + return rounded(time.time() * 1000) + +def strip_comments(code: str): + result = '' + index = 0 + for line in code.splitlines(): + comment_index = line.find('//') + if comment_index == -1: + result += line + elif comment_index != 0 and not line[:comment_index].isspace(): + result += line[:comment_index] + index += 1 + return result + +def get_pixels_per_frame(bpm: float, time_signature: float, distance: float): + beat_duration = 60 / bpm + total_time = time_signature * beat_duration + total_frames = 60 * total_time + return (distance / total_frames) + +def get_config() -> dict[str, Any]: + with open('config.toml', "rb") as f: + config_file = tomllib.load(f) + return config_file + +@dataclass +class GlobalData: + start_song: bool = False + selected_song: str = '' + selected_difficulty: int = -1 + result_good: int = -1 + result_ok: int = -1 + result_bad: int = -1 + result_score: int = -1 diff --git a/libs/video.py b/libs/video.py new file mode 100644 index 0000000..eb6b95f --- /dev/null +++ b/libs/video.py @@ -0,0 +1,107 @@ +import cv2 +import pyray as ray + +from libs.audio import audio +from libs.utils import get_current_ms + + +class VideoPlayer: + def __init__(self, path: str): + self.video_path = path + self.start_ms = None + + self.current_frame = None + self.last_frame = self.current_frame + self.frame_index = 0 + self.frames = [] + self.cap = cv2.VideoCapture(self.video_path) + self.fps = self.cap.get(cv2.CAP_PROP_FPS) + + self.is_finished = [False, False] + audio_path = path[:-4] + '.ogg' + self.audio = audio.load_music_stream(audio_path) + + def convert_frames_background(self, index: int): + if not self.cap.isOpened(): + raise ValueError("Error: Could not open video file.") + + total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) + if len(self.frames) == total_frames: + return 0 + self.cap.set(cv2.CAP_PROP_POS_FRAMES, index) + + success, frame = self.cap.read() + + timestamp = (index / self.fps * 1000) + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + new_frame = ray.Image(frame_rgb.tobytes(), frame_rgb.shape[1], frame_rgb.shape[0], 1, ray.PixelFormat.PIXELFORMAT_UNCOMPRESSED_R8G8B8) + + self.frames.append((timestamp, new_frame)) + + def convert_frames(self): + if not self.cap.isOpened(): + raise ValueError("Error: Could not open video file.") + + frame_count = 0 + success, frame = self.cap.read() + + while success: + timestamp = (frame_count / self.fps * 1000) + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + new_frame = ray.Image(frame_rgb.tobytes(), frame_rgb.shape[1], frame_rgb.shape[0], 1, ray.PixelFormat.PIXELFORMAT_UNCOMPRESSED_R8G8B8) + + self.frames.append((timestamp, new_frame)) + + success, frame = self.cap.read() + frame_count += 1 + + self.cap.release() + print(f"Extracted {len(self.frames)} frames.") + self.start_ms = get_current_ms() + + def check_for_start(self): + if self.frames == []: + self.convert_frames() + if not audio.is_music_stream_playing(self.audio): + audio.play_music_stream(self.audio) + + def audio_manager(self): + audio.update_music_stream(self.audio) + time_played = audio.get_music_time_played(self.audio) / audio.get_music_time_length(self.audio) + ending_lenience = 0.95 + if time_played > ending_lenience: + self.is_finished[1] = True + + def update(self): + self.check_for_start() + self.audio_manager() + + if self.frame_index == len(self.frames)-1: + self.is_finished[0] = True + return + + if self.start_ms is None: + return + + timestamp, frame = self.frames[self.frame_index][0], self.frames[self.frame_index][1] + elapsed_time = get_current_ms() - self.start_ms + if elapsed_time >= timestamp: + self.current_frame = ray.load_texture_from_image(frame) + if self.last_frame != self.current_frame and self.last_frame is not None: + ray.unload_texture(self.last_frame) + self.frame_index += 1 + self.last_frame = self.current_frame + + def draw(self): + if self.current_frame is not None: + ray.draw_texture(self.current_frame, 0, 0, ray.WHITE) + + def __del__(self): + if hasattr(self, 'current_frame') and self.current_frame: + ray.unload_texture(self.current_frame) + if hasattr(self, 'last_frame') and self.last_frame: + ray.unload_texture(self.last_frame) + if audio.is_music_stream_playing(self.audio): + audio.stop_music_stream(self.audio) diff --git a/main.py b/main.py index 686fe9e..37f99a8 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,13 @@ import pyray as ray -import sys -from entry import * -from game import * -from title import * +from libs.audio import audio +from libs.utils import GlobalData, get_config +from scenes.entry import EntryScreen +from scenes.game import GameScreen +from scenes.result import ResultScreen +from scenes.song_select import SongSelectScreen +from scenes.title import TitleScreen + class Screens: TITLE = "TITLE" @@ -16,37 +20,42 @@ def main(): screen_width = 1280 screen_height = 720 - ray.set_config_flags(ray.ConfigFlags.FLAG_VSYNC_HINT) + if get_config()["video"]["vsync"]: + ray.set_config_flags(ray.ConfigFlags.FLAG_VSYNC_HINT) ray.set_config_flags(ray.ConfigFlags.FLAG_MSAA_4X_HINT) ray.set_window_max_size(screen_width, screen_height) ray.set_window_min_size(screen_width, screen_height) ray.init_window(screen_width, screen_height, "PyTaiko") - #ray.toggle_borderless_windowed() + if get_config()["video"]["borderless"]: + ray.toggle_borderless_windowed() ray.clear_window_state(ray.ConfigFlags.FLAG_WINDOW_TOPMOST) - #ray.maximize_window() + if get_config()["video"]["fullscreen"]: + ray.maximize_window() current_screen = Screens.TITLE - frames_counter = 0 + _frames_counter = 0 - ray.init_audio_device() + audio.init_audio_device() title_screen = TitleScreen(screen_width, screen_height) entry_screen = EntryScreen(screen_width, screen_height) + song_select_screen = SongSelectScreen(screen_width, screen_height) game_screen = GameScreen(screen_width, screen_height) + result_screen = ResultScreen(screen_width, screen_height) screen_mapping = { Screens.ENTRY: entry_screen, Screens.TITLE: title_screen, - #Screens.SONG_SELECT: song_select_screen, + Screens.SONG_SELECT: song_select_screen, Screens.GAME: game_screen, - #Screens.RESULT: result_screen + Screens.RESULT: result_screen } target = ray.load_render_texture(screen_width, screen_height) ray.set_texture_filter(target.texture, ray.TextureFilter.TEXTURE_FILTER_TRILINEAR) #lmaooooooooooooo + #rl_set_blend_factors_separate(RL_SRC_ALPHA, RL_ONE_MINUS_SRC_ALPHA, RL_ONE, RL_ONE_MINUS_SRC_ALPHA, RL_FUNC_ADD, RL_FUNC_ADD) ray.rl_set_blend_factors_separate(0x302, 0x303, 1, 0x303, 0x8006, 0x8006) - start_song = False ray.set_exit_key(ray.KeyboardKey.KEY_A) while not ray.window_should_close(): @@ -60,9 +69,6 @@ def main(): elif ray.is_key_pressed(ray.KeyboardKey.KEY_F12): ray.toggle_borderless_windowed() - if screen == game_screen and not start_song: - game_screen.init_tja(sys.argv[1], sys.argv[2]) - start_song = True next_screen = screen.update() screen.draw() if screen == title_screen: @@ -73,7 +79,8 @@ def main(): if next_screen is not None: current_screen = next_screen - ray.draw_fps(20, 20) + if get_config()["general"]["fps_counter"]: + ray.draw_fps(20, 20) ray.end_blend_mode() ray.end_texture_mode() ray.begin_drawing() @@ -81,7 +88,7 @@ def main(): ray.draw_texture_pro(target.texture, ray.Rectangle(0, 0, target.texture.width, -target.texture.height), ray.Rectangle(0, 0, ray.get_render_width(), ray.get_render_height()), ray.Vector2(0,0), 0, ray.WHITE) ray.end_drawing() ray.close_window() - ray.close_audio_device() + audio.close_audio_device() if __name__ == "__main__": main() diff --git a/entry.py b/scenes/entry.py similarity index 82% rename from entry.py rename to scenes/entry.py index 2eefaa1..5c02a04 100644 --- a/entry.py +++ b/scenes/entry.py @@ -1,5 +1,7 @@ import pyray as ray -from global_funcs import load_texture_from_zip + +from libs.utils import load_texture_from_zip + class EntryScreen: def __init__(self, width, height): @@ -10,8 +12,7 @@ class EntryScreen: def update(self): if ray.is_key_pressed(ray.KeyboardKey.KEY_ENTER): - return "GAME" - return None + return "SONG_SELECT" def draw(self): ray.draw_texture(self.texture_footer, 0, self.height - 151, ray.WHITE) diff --git a/game.py b/scenes/game.py similarity index 91% rename from game.py rename to scenes/game.py index ba7d080..7245870 100644 --- a/game.py +++ b/scenes/game.py @@ -1,17 +1,28 @@ -import os -import pyray as ray -import random as rand +import bisect import math - -from global_funcs import * from collections import deque +import pyray as ray + +from libs.animation import Animation +from libs.audio import audio +from libs.tja import TJAParser, calculate_base_score +from libs.utils import ( + GlobalData, + get_config, + get_current_ms, + load_image_from_zip, + load_texture_from_zip, +) + + class GameScreen: def __init__(self, width, height): self.width = width self.height = height self.judge_x = 414 self.current_ms = 0 + self.song_is_started = False def load_textures(self): zip_file = 'Graphics\\lumendata\\enso_system\\common.zip' @@ -162,11 +173,11 @@ class GameScreen: self.texture_base_score_numbers.append(load_texture_from_zip(zip_file, filename)) def load_sounds(self): - self.sound_don = ray.load_sound('Sounds\\inst_00_don.wav') - self.sound_kat = ray.load_sound('Sounds\\inst_00_katsu.wav') - self.sound_balloon_pop = ray.load_sound('Sounds\\balloon_pop.wav') + self.sound_don = audio.load_sound('Sounds\\inst_00_don.wav') + self.sound_kat = audio.load_sound('Sounds\\inst_00_katsu.wav') + self.sound_balloon_pop = audio.load_sound('Sounds\\balloon_pop.wav') - def init_tja(self, song, difficulty): + def init_tja(self, song: str, difficulty: int): self.load_textures() self.load_sounds() @@ -184,25 +195,34 @@ class GameScreen: 'dai_drumroll_tail': self.texture_dai_drumroll_tail, 'balloon_tail': self.texture_balloon_tail} - self.tja = TJAParser(f'Songs\\{song}') + self.tja = TJAParser(song) self.tja.get_metadata() self.tja.distance = self.width - self.judge_x - self.player_1 = Player(self, 1, int(difficulty)) - self.song_music = ray.load_music_stream(self.tja.wave) - ray.play_music_stream(self.song_music) + self.player_1 = Player(self, 1, difficulty, get_config()["general"]["judge_offset"]) + self.song_music = audio.load_sound(self.tja.wave) self.start_ms = get_current_ms() - self.tja.offset*1000 + audio.play_sound(self.song_music) + + def update(self): - ray.update_music_stream(self.song_music) + if GlobalData.start_song and not self.song_is_started: + self.init_tja(GlobalData.selected_song, GlobalData.selected_difficulty) + self.song_is_started = True self.current_ms = get_current_ms() - self.start_ms self.player_1.update(self) + if len(self.player_1.play_note_list) == 0 and not audio.is_sound_playing(self.song_music): + GlobalData.result_good, GlobalData.result_ok, GlobalData.result_bad, GlobalData.result_score = self.player_1.get_result_score() + GlobalData.start_song = False + self.song_is_started = False + return 'RESULT' def draw(self): self.player_1.draw(self) class Player: - def __init__(self, game_screen, player_number, difficulty): + def __init__(self, game_screen, player_number: int, difficulty: int, judge_offset: int): self.timing_good = 25.0250015258789 self.timing_ok = 75.0750045776367 self.timing_bad = 108.441665649414 @@ -213,7 +233,7 @@ class Player: self.play_note_list, self.draw_note_list, self.draw_bar_list = game_screen.tja.notes_to_position(self.difficulty) self.base_score = calculate_base_score(self.play_note_list) - self.judge_offset = 0 + self.judge_offset = judge_offset #Note management self.current_notes = deque() @@ -248,6 +268,9 @@ class Player: self.score_list = [] self.base_score_list = [] + def get_result_score(self): + return self.good_count, self.ok_count, self.bad_count, self.score + def get_position(self, game_screen, ms, pixels_per_frame): return int(game_screen.width + pixels_per_frame * 60 / 1000 * (ms - game_screen.current_ms + self.judge_offset) - 64) @@ -278,9 +301,9 @@ class Player: def play_note_manager(self, game_screen): #Add note to current_notes list if it is ready to be shown on screen if len(self.play_note_list) > 0 and game_screen.current_ms + 1000 >= self.play_note_list[0]['load_ms']: - self.current_notes.append(self.play_note_list.popleft()) - if len(self.play_note_list) > 0 and self.play_note_list[0]['note'] == '8': + if self.play_note_list[0]['note'] == '8': self.current_notes.append(self.play_note_list.popleft()) + self.current_notes.append(self.play_note_list.popleft()) #if a note was not hit within the window, remove it if len(self.current_notes) == 0: @@ -307,28 +330,28 @@ class Player: def draw_note_manager(self, game_screen): if len(self.draw_note_list) > 0 and game_screen.current_ms + 1000 >= self.draw_note_list[0]['load_ms']: - if self.draw_note_list[0]['note'] in {'5','6','7','9'}: + if self.draw_note_list[0]['note'] in {'5', '6', '7'}: while self.draw_note_list[0]['note'] != '8': - self.current_notes_draw.append(self.draw_note_list.popleft()) - self.current_notes_draw.append(self.draw_note_list.popleft()) + bisect.insort_left(self.current_notes_draw, self.draw_note_list.popleft(), key=lambda x: x['index']) + bisect.insort_left(self.current_notes_draw, self.draw_note_list.popleft(), key=lambda x: x['index']) else: - self.current_notes_draw.append(self.draw_note_list.popleft()) + bisect.insort_left(self.current_notes_draw, self.draw_note_list.popleft(), key=lambda x: x['index']) #If a note is off screen, remove it if len(self.current_notes_draw) == 0: return if self.current_notes_draw[0]['note'] in {'5', '6', '8'} and 255 > self.current_notes_draw[0]['color'] > 0: - self.current_notes_draw[0]['color'] += 1 + self.current_notes_draw[0]['color'] += 1 - for i in range(len(self.current_notes_draw)-1, -1, -1): - note_type, note_ms, pixels_per_frame = self.current_notes_draw[i]['note'], self.current_notes_draw[i]['ms'], self.current_notes_draw[i]['ppf'] - position = self.get_position(game_screen, note_ms, pixels_per_frame) - if position < game_screen.judge_x + 650 and note_type not in {'5', '6', '7'}: - if note_type == '8' and self.current_notes_draw[i-1]['note'] in {'5', '6', '7'} and self.current_notes_draw[i-1]['ms'] < self.current_notes_draw[i]['ms']: - self.current_notes_draw.pop(i-1) - else: - self.current_notes_draw.pop(i) + note = self.current_notes_draw[0] + if note['note'] in {'5', '6', '7'}: + note = self.current_notes_draw[1] + position = self.get_position(game_screen, note['ms'], note['ppf']) + if position < game_screen.judge_x + 650: + if note['note'] == '8': + self.current_notes_draw.pop(0) + self.current_notes_draw.pop(0) def note_manager(self, game_screen): self.bar_manager(game_screen) @@ -349,14 +372,9 @@ class Player: self.draw_arc_list.append(NoteArc(note_type, game_screen.current_ms, self.player_number)) self.current_notes.popleft() - #Remove note from the screen if note in self.current_notes_draw: - i = self.current_notes_draw.index(note) - if note['note'] == '8' and self.current_notes_draw[i-1]['note'] == '7' and self.current_notes_draw[i-1]['ms'] < self.current_notes_draw[i]['ms']: - self.current_notes_draw.pop(i-1) - self.current_notes_draw.pop(i-1) - else: - self.current_notes_draw.pop(i) + index = self.current_notes_draw.index(note) + self.current_notes_draw.pop(index) def check_drumroll(self, game_screen, drum_type): note_type = game_screen.note_type_dict[str(int(drum_type)+self.drumroll_big)] @@ -383,7 +401,7 @@ class Player: if self.curr_balloon_count == current_note['balloon']: self.is_balloon = False self.current_notes_draw[0]['popped'] = True - ray.play_sound(game_screen.sound_balloon_pop) + audio.play_sound(game_screen.sound_balloon_pop) self.note_correct(game_screen, self.current_notes[0]) def check_note(self, game_screen, drum_type): @@ -396,12 +414,10 @@ class Player: self.curr_balloon_count = 0 current_note = self.current_notes[0] #Fix later - ''' i = 0 while current_note['note'] in {'5', '6', '7', '8'}: i += 1 current_note = self.current_notes[i] - ''' note_type = current_note['note'] note_ms = current_note['ms'] #If the wrong key was hit, stop checking @@ -478,30 +494,32 @@ class Player: self.score_list[0].update(game_screen.current_ms, self.score) def key_manager(self, game_screen): - if ray.is_key_pressed(ray.KeyboardKey.KEY_F) or ray.is_key_pressed(ray.KeyboardKey.KEY_D): + left_kat = ord(get_config()["keybinds"]["left_kat"]) + left_don = ord(get_config()["keybinds"]["left_don"]) + right_don = ord(get_config()["keybinds"]["right_don"]) + right_kat = ord(get_config()["keybinds"]["right_kat"]) + if ray.is_key_pressed(left_don): self.draw_effect_list.append(LaneHitEffect(game_screen.current_ms, 'DON')) self.draw_drum_hit_list.append(DrumHitEffect(game_screen.current_ms, 'DON', 'L')) - ray.play_sound(game_screen.sound_don) + audio.play_sound(game_screen.sound_don) self.check_note(game_screen, '1') - if ray.is_key_pressed(ray.KeyboardKey.KEY_J) or ray.is_key_pressed(ray.KeyboardKey.KEY_K): + if ray.is_key_pressed(right_don): self.draw_effect_list.append(LaneHitEffect(game_screen.current_ms, 'DON')) self.draw_drum_hit_list.append(DrumHitEffect(game_screen.current_ms, 'DON', 'R')) - ray.play_sound(game_screen.sound_don) + audio.play_sound(game_screen.sound_don) self.check_note(game_screen, '1') - if ray.is_key_pressed(ray.KeyboardKey.KEY_E) or ray.is_key_pressed(ray.KeyboardKey.KEY_R): + if ray.is_key_pressed(left_kat): self.draw_effect_list.append(LaneHitEffect(game_screen.current_ms, 'KAT')) self.draw_drum_hit_list.append(DrumHitEffect(game_screen.current_ms, 'KAT', 'L')) - ray.play_sound(game_screen.sound_kat) + audio.play_sound(game_screen.sound_kat) self.check_note(game_screen, '2') - if ray.is_key_pressed(ray.KeyboardKey.KEY_I) or ray.is_key_pressed(ray.KeyboardKey.KEY_U): + if ray.is_key_pressed(right_kat): self.draw_effect_list.append(LaneHitEffect(game_screen.current_ms, 'KAT')) self.draw_drum_hit_list.append(DrumHitEffect(game_screen.current_ms, 'KAT', 'R')) - ray.play_sound(game_screen.sound_kat) + audio.play_sound(game_screen.sound_kat) self.check_note(game_screen, '2') def update(self, game_screen): - #pls help turn this into priority queue instead of sorting every frame thanks - self.current_notes_draw = sorted(self.current_notes_draw, key=lambda d: d['ms']) self.note_manager(game_screen) self.combo_manager(game_screen) self.drumroll_counter_manager(game_screen) @@ -523,28 +541,27 @@ class Player: tail = self.current_notes_draw[index+1] i = 0 while tail['note'] != '8': - tail = self.current_notes_draw[index+i] i += 1 + tail = self.current_notes_draw[index+i] if big: drumroll_body = 'dai_drumroll_body' drumroll_tail = 'dai_drumroll_tail' - drumroll_length = 70 else: drumroll_body = 'drumroll_body' drumroll_tail = 'drumroll_tail' - drumroll_length = 47 - if tail['note'] == '8': - drumroll_end_position = self.get_position(game_screen, tail['load_ms'], tail['ppf']) - length = (drumroll_end_position - drumroll_start_position - 50) - self.draw_note(game_screen, drumroll_body, (drumroll_start_position+64), color, 8, drumroll_length=length) - self.draw_note(game_screen, drumroll_tail, drumroll_end_position, color, 10, drumroll_length=None) + drumroll_end_position = self.get_position(game_screen, tail['load_ms'], tail['ppf']) + length = (drumroll_end_position - drumroll_start_position - 50) + self.draw_note(game_screen, drumroll_body, (drumroll_start_position+64), color, 8, drumroll_length=length) + self.draw_note(game_screen, drumroll_tail, drumroll_end_position, color, 10, drumroll_length=None) def draw_balloon(self, game_screen, note, position, index): + if self.current_notes_draw[0].get('popped', None): + return end_time = self.current_notes_draw[index+1] i = 0 while end_time['note'] != '8': - end_time = self.current_notes_draw[index+i] i += 1 + end_time = self.current_notes_draw[index+i] end_time_position = self.get_position(game_screen, end_time['load_ms'], end_time['ppf']) if game_screen.current_ms >= end_time['ms']: position = end_time_position @@ -558,7 +575,7 @@ class Player: y = 184 ray.draw_texture(game_screen.texture_barline, position+note_padding-4, y+6, ray.WHITE) return - elif note not in game_screen.note_type_dict: + if note not in game_screen.note_type_dict: return eighth_in_ms = (60000 * 4 / game_screen.tja.bpm) / 8 @@ -576,14 +593,14 @@ class Player: else: offset = 0 balloon = False - if drumroll_length == None: + if drumroll_length is None: drumroll_length = game_screen.note_type_dict[note][0].width source_rect = ray.Rectangle(0,0,game_screen.note_type_dict[note][0].width,game_screen.note_type_dict[note][0].height) dest_rect = ray.Rectangle(position-offset, 192, drumroll_length,game_screen.note_type_dict['1'][0].height) ray.draw_texture_pro(game_screen.note_type_dict[note][current_eighth % 2], source_rect, dest_rect, ray.Vector2(0,0), 0, ray.Color(255, draw_color, draw_color, 255)) if balloon: ray.draw_texture(game_screen.note_type_dict['balloon_tail'][current_eighth % 2], position-offset+128, 192, ray.Color(255, draw_color, draw_color, 255)) - if se_note != None: + if se_note is not None: if drumroll_length == game_screen.note_type_dict[note][0].width: drumroll_length = game_screen.texture_se_moji[se_note].width offset = 0 @@ -593,8 +610,8 @@ class Player: dest_rect = ray.Rectangle(position-offset - (game_screen.texture_se_moji[se_note].width // 2) + 64, 323, drumroll_length,game_screen.texture_se_moji[se_note].height) ray.draw_texture_pro(game_screen.texture_se_moji[se_note], source_rect, dest_rect, ray.Vector2(0,0), 0, ray.WHITE) - def draw_notes(self, game_screen): - if len(self.current_notes_draw) <= 0 or len(self.current_bars) <= 0: + def draw_bars(self, game_screen): + if len(self.current_bars) <= 0: return for i in range(len(self.current_bars)-1, -1, -1): @@ -603,6 +620,10 @@ class Player: position = self.get_position(game_screen, load_ms, pixels_per_frame) self.draw_note(game_screen, 'barline', position, 255, None) + def draw_notes(self, game_screen): + if len(self.current_notes_draw) <= 0: + return + for i in range(len(self.current_notes_draw)-1, -1, -1): note = self.current_notes_draw[i] note_type, load_ms, pixels_per_frame = note['note'], note['load_ms'], note['ppf'] @@ -617,12 +638,14 @@ class Player: self.draw_balloon(game_screen, note, position, i) else: self.draw_note(game_screen, note_type, position, 255, note['se_note']) + #ray.draw_text(str(i), position+64, 192, 25, ray.GREEN) def draw(self, game_screen): ray.draw_texture(game_screen.texture_lane, 332, 184, ray.WHITE) self.draw_animation_list(game_screen, self.draw_effect_list) ray.draw_texture(game_screen.texture_judge_circle, 342, 184, ray.WHITE) self.draw_animation_list(game_screen, self.draw_judge_list) + self.draw_bars(game_screen) self.draw_notes(game_screen) ray.draw_texture(game_screen.texture_lane_cover, 0, 184, ray.WHITE) ray.draw_texture(game_screen.texture_drum, 211, 206, ray.WHITE) @@ -778,7 +801,7 @@ class NoteArc: self.y_i = center_y + radius * 0.5 * math.sin(theta_i) def draw(self, game_screen): - if self.note_type == None: + if self.note_type is None: return eighth_in_ms = (60000 * 4 / game_screen.tja.bpm) / 8 current_eighth = int(game_screen.current_ms // eighth_in_ms) diff --git a/scenes/result.py b/scenes/result.py new file mode 100644 index 0000000..f4be193 --- /dev/null +++ b/scenes/result.py @@ -0,0 +1,23 @@ +import pyray as ray + +from libs.audio import audio +from libs.utils import GlobalData + + +class ResultScreen: + def __init__(self, width, height): + self.width = width + self.height = height + self.sound_don = audio.load_sound('Sounds\\inst_00_don.wav') + + def update(self): + if ray.is_key_pressed(ray.KeyboardKey.KEY_ENTER): + audio.play_sound(self.sound_don) + return "SONG_SELECT" + + def draw(self): + ray.draw_text(f"{GlobalData.selected_song}", 100, 60, 20, ray.BLACK) + ray.draw_text(f"SCORE: {GlobalData.result_score}", 100, 80, 20, ray.BLACK) + ray.draw_text(f"GOOD: {GlobalData.result_good}", 100, 100, 20, ray.BLACK) + ray.draw_text(f"OK: {GlobalData.result_ok}", 100, 120, 20, ray.BLACK) + ray.draw_text(f"BAD: {GlobalData.result_bad}", 100, 140, 20, ray.BLACK) diff --git a/scenes/song_select.py b/scenes/song_select.py new file mode 100644 index 0000000..0f59a76 --- /dev/null +++ b/scenes/song_select.py @@ -0,0 +1,83 @@ +import os + +import pyray as ray + +from libs.audio import audio +from libs.utils import GlobalData, get_config + + +class SongSelectScreen: + def __init__(self, width, height): + self.width = width + self.height = height + self.is_song_select = True + self.is_difficulty_select = False + self.song_list = [] + self.selected_song = 0 + self.selected_difficulty = 0 + self.sound_don = audio.load_sound('Sounds\\inst_00_don.wav') + self.sound_kat = audio.load_sound('Sounds\\inst_00_katsu.wav') + for dirpath, dirnames, filenames in os.walk(f'{get_config()["paths"]["tja_path"]}'): + for filename in filenames: + if filename.endswith(".tja"): + self.song_list.append(dirpath) + + def update_song_select(self): + if ray.is_key_pressed(ray.KeyboardKey.KEY_ENTER): + audio.play_sound(self.sound_don) + self.is_song_select = False + self.is_difficulty_select = True + elif ray.is_key_pressed(ray.KeyboardKey.KEY_UP): + audio.play_sound(self.sound_kat) + self.selected_song -= 1 + elif ray.is_key_pressed(ray.KeyboardKey.KEY_DOWN): + audio.play_sound(self.sound_kat) + self.selected_song += 1 + + def update_difficulty_select(self): + if ray.is_key_pressed(ray.KeyboardKey.KEY_ENTER): + audio.play_sound(self.sound_don) + GlobalData.selected_song = self.song_list[self.selected_song] + GlobalData.selected_difficulty = self.selected_difficulty + GlobalData.start_song = True + self.is_song_select = True + self.is_difficulty_select = False + return "GAME" + elif ray.is_key_pressed(ray.KeyboardKey.KEY_BACKSPACE): + self.is_song_select = True + self.is_difficulty_select = False + elif ray.is_key_pressed(ray.KeyboardKey.KEY_UP): + audio.play_sound(self.sound_kat) + self.selected_difficulty -= 1 + elif ray.is_key_pressed(ray.KeyboardKey.KEY_DOWN): + audio.play_sound(self.sound_kat) + self.selected_difficulty += 1 + + def update(self): + if self.is_song_select: + self.update_song_select() + elif self.is_difficulty_select: + return self.update_difficulty_select() + + def draw_song_select(self): + for i in range(len(self.song_list)): + if i == self.selected_song: + color = ray.GREEN + else: + color = ray.BLACK + ray.draw_text(self.song_list[i], 20, (20*i), 20, color) + + def draw_difficulty_select(self): + difficulties = ["Easy", "Normal", "Hard", "Oni", "Ura"] + for i in range(len(difficulties)): + if i == self.selected_difficulty: + color = ray.GREEN + else: + color = ray.BLACK + ray.draw_text(difficulties[i], 20, (20*i), 20, color) + + def draw(self): + if self.is_song_select: + self.draw_song_select() + elif self.is_difficulty_select: + self.draw_difficulty_select() diff --git a/title.py b/scenes/title.py similarity index 89% rename from title.py rename to scenes/title.py index f5c4d48..0b4f465 100644 --- a/title.py +++ b/scenes/title.py @@ -1,24 +1,34 @@ -import pyray as ray import os import random -from global_funcs import Animation, VideoPlayer, get_current_ms, load_texture_from_zip + +import pyray as ray + +from libs.animation import Animation +from libs.audio import audio +from libs.utils import ( + get_config, + get_current_ms, + load_texture_from_zip, +) +from libs.video import VideoPlayer + class TitleScreen: - def __init__(self, width, height): + def __init__(self, width: int, height: int): self.width = width self.height = height self.op_video_list = [] - for root, folder, files in os.walk('Videos\\op_videos'): - for file in files: - if file.endswith('.mp4'): - self.op_video_list.append(VideoPlayer(root + '\\' + file)) - self.op_video = random.choice(self.op_video_list) self.attract_video_list = [] - for root, folder, files in os.walk('Videos\\attract_videos'): + for root, folder, files in os.walk(f'{get_config()["paths"]["video_path"]}\\op_videos'): for file in files: if file.endswith('.mp4'): - self.attract_video_list.append(VideoPlayer(root + '\\' + file)) - self.attract_video = random.choice(self.attract_video_list) + self.op_video_list.append(root + '\\' + file) + for root, folder, files in os.walk(f'{get_config()["paths"]["video_path"]}\\attract_videos'): + for file in files: + if file.endswith('.mp4'): + self.attract_video_list.append(root + '\\' + file) + self.attract_video = VideoPlayer(random.choice(self.attract_video_list)) + self.op_video = VideoPlayer(random.choice(self.op_video_list)) self.scene = 'Opening Video' self.load_textures() self.warning_board = WarningBoard(get_current_ms(), self) @@ -47,10 +57,10 @@ class TitleScreen: self.texture_warning_x_1 = load_texture_from_zip(zip_file, 'keikoku_img00014.png') self.texture_warning_x_2 = load_texture_from_zip(zip_file, 'keikoku_img00015.png') - self.sound_bachi_swipe = ray.load_sound('Sounds\\title\\SE_ATTRACT_2.ogg') - self.sound_bachi_hit = ray.load_sound('Sounds\\title\\SE_ATTRACT_3.ogg') - self.sound_warning_message = ray.load_sound('Sounds\\title\\VO_ATTRACT_3.ogg') - self.sound_warning_error = ray.load_sound('Sounds\\title\\SE_ATTRACT_1.ogg') + self.sound_bachi_swipe = audio.load_sound('Sounds\\title\\SE_ATTRACT_2.ogg') + self.sound_bachi_hit = audio.load_sound('Sounds\\title\\SE_ATTRACT_3.ogg') + self.sound_warning_message = audio.load_sound('Sounds\\title\\VO_ATTRACT_3.ogg') + self.sound_warning_error = audio.load_sound('Sounds\\title\\SE_ATTRACT_1.ogg') self.texture_black = load_texture_from_zip('Graphics\\lumendata\\attract\\movie.zip', 'movie_img00000.png') @@ -64,12 +74,12 @@ class TitleScreen: self.warning_board.update(get_current_ms(), self) if self.warning_board.is_finished: self.scene = 'Attract Video' - self.attract_video = random.choice(self.attract_video_list) + self.attract_video = VideoPlayer(random.choice(self.attract_video_list)) elif self.scene == 'Attract Video': self.attract_video.update() if all(self.attract_video.is_finished): self.scene = 'Opening Video' - self.op_video = random.choice(self.op_video_list) + self.op_video = VideoPlayer(random.choice(self.op_video_list)) def update(self): self.scene_manager() @@ -184,13 +194,13 @@ class WarningBoard: elapsed_time = current_ms - self.start_ms if self.character_index(1) != 8: self.fade_animation_2.params['delay'] = elapsed_time + 500 - if delay <= elapsed_time and not ray.is_sound_playing(title_screen.sound_bachi_swipe): - ray.play_sound(title_screen.sound_warning_message) - ray.play_sound(title_screen.sound_bachi_swipe) + if delay <= elapsed_time and not audio.is_sound_playing(title_screen.sound_bachi_swipe): + audio.play_sound(title_screen.sound_warning_message) + audio.play_sound(title_screen.sound_bachi_swipe) elif self.character_index(1) == 8: if not self.hit_played: self.hit_played = True - ray.play_sound(title_screen.sound_bachi_hit) + audio.play_sound(title_screen.sound_bachi_hit) self.resize_animation_3.start_ms = current_ms self.fade_animation_7.start_ms = current_ms self.resize_animation_3.update(current_ms) @@ -198,7 +208,7 @@ class WarningBoard: if self.error_time + 166.67 <= elapsed_time and not self.error_played: self.error_played = True - ray.play_sound(title_screen.sound_warning_error) + audio.play_sound(title_screen.sound_warning_error) if self.fade_animation_2.is_finished: self.is_finished = True