diff --git a/PyTaiko.py b/PyTaiko.py index 3f42132..2b5f0e2 100644 --- a/PyTaiko.py +++ b/PyTaiko.py @@ -1,3 +1,4 @@ +from scenes.game2 import GameScreen2 import logging import os from pathlib import Path @@ -272,7 +273,7 @@ def main(): song_select_screen = SongSelectScreen('song_select') song_select_screen_2p = TwoPlayerSongSelectScreen('song_select') load_screen = LoadScreen('loading') - game_screen = GameScreen('game') + game_screen = GameScreen2('game') game_screen_2p = TwoPlayerGameScreen('game') game_screen_practice = PracticeGameScreen('game') practice_select_screen = PracticeSongSelectScreen('song_select') diff --git a/libs/tja2.py b/libs/tja2.py new file mode 100644 index 0000000..ce448fd --- /dev/null +++ b/libs/tja2.py @@ -0,0 +1,265 @@ +import bisect +from enum import IntEnum +import hashlib +from dataclasses import dataclass, field, fields + +from libs.tja import NoteList, TJAParser, TimelineObject, get_ms_per_measure + +class NoteType(IntEnum): + NONE = 0 + DON = 1 + KAT = 2 + DON_L = 3 + KAT_L = 4 + ROLL_HEAD = 5 + ROLL_HEAD_L = 6 + BALLOON_HEAD = 7 + TAIL = 8 + KUSUDAMA = 9 + +class ScrollType(IntEnum): + NMSCROLL = 0 + BMSCROLL = 1 + HBSCROLL = 2 + +@dataclass() +class Note: + type: int = field(init=False) + hit_ms: float = field(init=False) + display: bool = field(init=False) + index: int = field(init=False) + moji: int = field(init=False) + bpm: float = field(init=False) + scroll_x: float = field(init=False) + scroll_y: float = field(init=False) + + def __lt__(self, other): + return self.hit_ms < other.hit_ms + + def __le__(self, other): + return self.hit_ms <= other.hit_ms + + def __gt__(self, other): + return self.hit_ms > other.hit_ms + + def __ge__(self, other): + return self.hit_ms >= other.hit_ms + + def __eq__(self, other): + return self.hit_ms == other.hit_ms + + def _get_hash_data(self) -> bytes: + hash_fields = ['type', 'hit_ms'] + field_values = [] + + for field_name in sorted(hash_fields): + value = getattr(self, field_name, None) + field_values.append((field_name, value)) + + field_values.append(('__class__', self.__class__.__name__)) + hash_string = str(field_values) + return hash_string.encode('utf-8') + + def get_hash(self, algorithm='sha256') -> str: + """Generate hash of the note""" + hash_obj = hashlib.new(algorithm) + hash_obj.update(self._get_hash_data()) + return hash_obj.hexdigest() + + def __hash__(self) -> int: + """Make instances hashable for use in sets/dicts""" + return int(self.get_hash('md5')[:8], 16) # Use first 8 chars of MD5 as int + + def __repr__(self): + return str(self.__dict__) + +@dataclass +class Drumroll(Note): + """A drumroll note in a TJA file. + + Attributes: + _source_note (Note): The source note. + color (int): The color of the drumroll. (0-255 where 255 is red) + """ + _source_note: Note + color: int = field(init=False) + + def __repr__(self): + return str(self.__dict__) + + def __eq__(self, other): + return self.hit_ms == other.hit_ms + + def __post_init__(self): + for field_name in [f.name for f in fields(Note)]: + if hasattr(self._source_note, field_name): + setattr(self, field_name, getattr(self._source_note, field_name)) + +@dataclass +class Balloon(Note): + """A balloon note in a TJA file. + + Attributes: + _source_note (Note): The source note. + count (int): The number of hits it takes to pop. + popped (bool): Whether the balloon has been popped. + is_kusudama (bool): Whether the balloon is a kusudama. + """ + _source_note: Note + count: int = field(init=False) + popped: bool = False + is_kusudama: bool = False + + def __repr__(self): + return str(self.__dict__) + + def __eq__(self, other): + return self.hit_ms == other.hit_ms + + def __post_init__(self): + for field_name in [f.name for f in fields(Note)]: + if hasattr(self._source_note, field_name): + setattr(self, field_name, getattr(self._source_note, field_name)) + + def _get_hash_data(self) -> bytes: + """Override to include source note and balloon-specific data""" + hash_fields = ['type', 'hit_ms', 'load_ms', 'count'] + field_values = [] + + for field_name in sorted(hash_fields): + value = getattr(self, field_name, None) + field_values.append((field_name, value)) + + field_values.append(('__class__', self.__class__.__name__)) + hash_string = str(field_values) + return hash_string.encode('utf-8') + +class TJAParser2(TJAParser): + def notes_to_position(self, diff: int): + """Parse a TJA's notes into a NoteList.""" + master_notes = NoteList() + notes = self.data_to_notes(diff) + balloon = self.metadata.course_data[diff].balloon.copy() + count = 0 + index = 0 + time_signature = 4/4 + bpm = self.metadata.bpm + scroll_x_modifier = 1 + scroll_y_modifier = 0 + barline_display = True + curr_note_list = master_notes.play_notes + curr_draw_list = master_notes.draw_notes + curr_bar_list = master_notes.bars + curr_timeline = master_notes.timeline + init_bpm = TimelineObject() + init_bpm.hit_ms = self.current_ms + init_bpm.bpm = bpm + curr_timeline.append(init_bpm) + prev_note = None + scroll_type = ScrollType.NMSCROLL + + bpmchange_last_bpm = bpm + + for bar in notes: + bar_length = sum(len(part) for part in bar if '#' not in part) + barline_added = False + + for part in bar: + if '#MEASURE' in part: + divisor = part.find('/') + time_signature = float(part[9:divisor]) / float(part[divisor+1:]) + continue + elif '#SCROLL' in part: + if scroll_type != ScrollType.BMSCROLL: + scroll_value = part[7:] + if 'i' in scroll_value: + normalized = scroll_value.replace('.i', 'j').replace('i', 'j') + normalized = normalized.replace(',', '') + c = complex(normalized) + scroll_x_modifier = c.real + scroll_y_modifier = c.imag + else: + scroll_x_modifier = float(scroll_value) + scroll_y_modifier = 0.0 + continue + elif '#BPMCHANGE' in part: + parsed_bpm = float(part[11:]) + if scroll_type == ScrollType.BMSCROLL or scroll_type == ScrollType.HBSCROLL: + # Do not modify bpm, it needs to be changed live by bpmchange + bpmchange = parsed_bpm / bpmchange_last_bpm + bpmchange_last_bpm = parsed_bpm + + bpmchange_timeline = TimelineObject() + bpmchange_timeline.hit_ms = self.current_ms + bpmchange_timeline.bpmchange = bpmchange + bisect.insort(curr_timeline, bpmchange_timeline, key=lambda x: x.hit_ms) + else: + timeline_obj = TimelineObject() + timeline_obj.hit_ms = self.current_ms + timeline_obj.bpm = parsed_bpm + bpm = parsed_bpm + bisect.insort(curr_timeline, timeline_obj, key=lambda x: x.hit_ms) + continue + elif len(part) > 0 and not part[0].isdigit(): + continue + + ms_per_measure = get_ms_per_measure(bpm, time_signature) + bar_line = Note() + + bar_line.hit_ms = self.current_ms + bar_line.type = 0 + bar_line.display = barline_display + bar_line.bpm = bpm + bar_line.scroll_x = scroll_x_modifier + bar_line.scroll_y = scroll_y_modifier + + if barline_added: + bar_line.display = False + + curr_bar_list.append(bar_line) + barline_added = True + + if len(part) == 0: + self.current_ms += ms_per_measure + increment = 0 + else: + increment = ms_per_measure / bar_length + + for item in part: + if item == '0' or (not item.isdigit()): + self.current_ms += increment + continue + + note = Note() + note.hit_ms = self.current_ms + note.display = True + note.type = int(item) + note.index = index + note.bpm = bpm + note.scroll_x = scroll_x_modifier + note.scroll_y = scroll_y_modifier + + if item in {'5', '6'}: + note = Drumroll(note) + note.color = 255 + elif item in {'7', '9'}: + count += 1 + if balloon is None: + raise Exception("Balloon note found, but no count was specified") + if item == '9': + note = Balloon(note, is_kusudama=True) + else: + note = Balloon(note) + note.count = 1 if not balloon else balloon.pop(0) + elif item == '8': + if prev_note is None: + raise ValueError("No previous note found") + + self.current_ms += increment + curr_note_list.append(note) + curr_draw_list.append(note) + self.get_moji(curr_note_list, ms_per_measure) + index += 1 + prev_note = note + + return master_notes, [master_notes], [master_notes], [master_notes] diff --git a/scenes/game.py b/scenes/game.py index 18275d8..83579cf 100644 --- a/scenes/game.py +++ b/scenes/game.py @@ -1123,6 +1123,7 @@ class Player: self.evaluate_branch(ms_from_start) # Get the next note from any of the three lists for BPM and gogo time updates + ''' next_note = None candidates = [] if self.don_notes: @@ -1144,6 +1145,7 @@ class Player: self.is_gogo_time = False self.gogo_time = None self.chara.set_animation('gogo_stop') + ''' if self.gauge is None: self.chara.update(current_time, self.bpm, False, False) else: diff --git a/scenes/game2.py b/scenes/game2.py new file mode 100644 index 0000000..a95b015 --- /dev/null +++ b/scenes/game2.py @@ -0,0 +1,234 @@ +from collections import deque +from libs.tja2 import TJAParser2 +import bisect +from enum import IntEnum +import math +import logging +from pathlib import Path +from typing import Optional +from itertools import chain + +import pyray as ray + +from libs.audio import audio +from libs.background import Background +from libs.texture import tex +from libs.tja import ( + Balloon, + Drumroll, + Note, + NoteType, + calculate_base_score, +) +from libs.utils import ( + get_current_ms, + global_data, +) +from libs.video import VideoPlayer +from scenes.game import GameScreen, Player + +logger = logging.getLogger(__name__) + +class DrumType(IntEnum): + DON = 1 + KAT = 2 + +class Side(IntEnum): + LEFT = 1 + RIGHT = 2 + +class Judgments(IntEnum): + GOOD = 0 + OK = 1 + BAD = 2 + +class GameScreen2(GameScreen): + def init_tja(self, song: Path): + """Initialize the TJA file""" + self.tja = TJAParser2(song, start_delay=self.start_delay, distance=tex.screen_width - GameScreen.JUDGE_X) + if self.tja.metadata.bgmovie != Path() and self.tja.metadata.bgmovie.exists(): + self.movie = VideoPlayer(self.tja.metadata.bgmovie) + self.movie.set_volume(0.0) + else: + self.movie = None + global_data.session_data[global_data.player_num].song_title = self.tja.metadata.title.get(global_data.config['general']['language'].lower(), self.tja.metadata.title['en']) + if self.tja.metadata.wave.exists() and self.tja.metadata.wave.is_file() and self.song_music is None: + self.song_music = audio.load_music_stream(self.tja.metadata.wave, 'song') + + self.player_1 = Player2(self.tja, global_data.player_num, global_data.session_data[global_data.player_num].selected_difficulty, False, global_data.modifiers[global_data.player_num]) + self.start_ms = get_current_ms() - self.tja.metadata.offset*1000 + +class Player2(Player): + def reset_chart(self): + notes, self.branch_m, self.branch_e, self.branch_n = self.tja.notes_to_position(self.difficulty) + self.play_notes, self.draw_note_list, self.draw_bar_list = deque(notes.play_notes), deque(notes.draw_notes), deque(notes.bars) + + self.don_notes = deque([note for note in self.play_notes if note.type in {NoteType.DON, NoteType.DON_L}]) + self.kat_notes = deque([note for note in self.play_notes if note.type in {NoteType.KAT, NoteType.KAT_L}]) + self.other_notes = deque([note for note in self.play_notes if note.type not in {NoteType.DON, NoteType.DON_L, NoteType.KAT, NoteType.KAT_L}]) + self.total_notes = len([note for note in self.play_notes if 0 < note.type < 5]) + total_notes = notes + if self.branch_m: + for section in self.branch_m: + self.total_notes += len([note for note in section.play_notes if 0 < note.type < 5]) + total_notes += section + self.base_score = calculate_base_score(total_notes) + + #Note management + self.timeline = notes.timeline + self.timeline_index = 0 # Range: [0, len(timeline)] + self.current_bars: list[Note] = [] + self.current_notes_draw: list[Note | Drumroll | Balloon] = [] + self.is_drumroll = False + self.curr_drumroll_count = 0 + self.is_balloon = False + self.curr_balloon_count = 0 + self.is_branch = False + self.curr_branch_reqs = [] + self.branch_condition_count = 0 + self.branch_condition = '' + self.balloon_index = 0 + self.bpm = 120 + if self.timeline and hasattr(self.timeline[self.timeline_index], 'bpm'): + self.bpm = self.timeline[self.timeline_index].bpm + + self.end_time = 0 + if self.play_notes: + self.end_time = self.play_notes[-1].hit_ms + + def get_position_x(self, note, current_ms): + judge_line_x = 414 + return judge_line_x + ((note.hit_ms - current_ms) / 1000.0) * 866 * note.scroll_x + + + def get_position_y(self): + return 0 + + def bar_manager(self, current_ms: float): + """Manages the bars and removes if necessary + Also sets branch conditions""" + #Add bar to current_bars list if it is ready to be shown on screen + if self.draw_bar_list and current_ms + 1000 > self.draw_bar_list[0].hit_ms: + self.current_bars.append(self.draw_bar_list.popleft()) + + + if self.current_bars and self.current_bars[0].hit_ms < current_ms + 1000: + self.current_bars.pop(0) + + def draw_note_manager(self, current_ms: float): + """Manages the draw_notes and removes if necessary""" + if self.draw_note_list and current_ms >= self.draw_note_list[0].hit_ms - 10000: + current_note = self.draw_note_list.popleft() + if 5 <= current_note.type <= 7: + bisect.insort_left(self.current_notes_draw, current_note, key=lambda x: x.index) + try: + tail_note = next((note for note in self.draw_note_list if note.type == NoteType.TAIL)) + bisect.insort_left(self.current_notes_draw, tail_note, key=lambda x: x.index) + self.draw_note_list.remove(tail_note) + except Exception as e: + raise(e) + else: + bisect.insort_left(self.current_notes_draw, current_note, key=lambda x: x.index) + + if not self.current_notes_draw: + return + + if isinstance(self.current_notes_draw[0], Drumroll): + self.current_notes_draw[0].color = min(255, self.current_notes_draw[0].color + 1) + + note = self.current_notes_draw[0] + if note.type in {NoteType.ROLL_HEAD, NoteType.ROLL_HEAD_L, NoteType.BALLOON_HEAD, NoteType.KUSUDAMA} and len(self.current_notes_draw) > 1: + note = self.current_notes_draw[1] + if current_ms > note.hit_ms + 200: + if note.type == NoteType.TAIL: + self.current_notes_draw.pop(0) + self.current_notes_draw.pop(0) + + def draw_drumroll(self, current_ms: float, head: Drumroll, current_eighth: int): + """Draws a drumroll in the player's lane""" + start_position = self.get_position_x(head, current_ms) + start_position += self.judge_x + tail = next((note for note in self.current_notes_draw[1:] if note.type == NoteType.TAIL and note.index > head.index), self.current_notes_draw[1]) + is_big = int(head.type == NoteType.ROLL_HEAD_L) + end_position = self.get_position_x(tail, current_ms) + end_position += self.judge_x + length = end_position - start_position + color = ray.Color(255, head.color, head.color, 255) + y = tex.skin_config["notes"].y + self.get_position_y() + moji_y = tex.skin_config["moji"].y + moji_x = -(tex.textures["notes"]["moji"].width//2) + (tex.textures["notes"]["1"].width//2) + if head.display: + if length > 0: + tex.draw_texture('notes', "8", frame=is_big, x=start_position+(tex.textures["notes"]["5"].width//2), y=y+(self.is_2p*tex.skin_config["2p_offset"].y)+self.judge_y, x2=length+tex.skin_config["drumroll_width_offset"].width, color=color) + if is_big: + tex.draw_texture('notes', "drumroll_big_tail", x=end_position+tex.textures["notes"]["5"].width//2, y=y+(self.is_2p*tex.skin_config["2p_offset"].y)+self.judge_y, color=color) + else: + tex.draw_texture('notes', "drumroll_tail", x=end_position+tex.textures["notes"]["5"].width//2, y=y+(self.is_2p*tex.skin_config["2p_offset"].y)+self.judge_y, color=color) + tex.draw_texture('notes', str(head.type), frame=current_eighth % 2, x=start_position, y=y+(self.is_2p*tex.skin_config["2p_offset"].y)+self.judge_y, color=color) + + tex.draw_texture('notes', 'moji_drumroll_mid', x=start_position + tex.textures["notes"]["1"].width//2, y=moji_y+(self.is_2p*tex.skin_config["2p_offset"].y)+self.judge_y, x2=length) + tex.draw_texture('notes', 'moji', frame=head.moji, x=start_position + moji_x, y=moji_y+(self.is_2p*tex.skin_config["2p_offset"].y)+self.judge_y) + tex.draw_texture('notes', 'moji', frame=tail.moji, x=end_position + moji_x, y=moji_y+(self.is_2p*tex.skin_config["2p_offset"].y)+self.judge_y) + + def draw_balloon(self, current_ms: float, head: Balloon, current_eighth: int): + """Draws a balloon in the player's lane""" + offset = tex.skin_config["balloon_offset"].x + start_position = self.get_position_x(head, current_ms) + start_position += self.judge_x + tail = next((note for note in self.current_notes_draw[1:] if note.type == NoteType.TAIL and note.index > head.index), self.current_notes_draw[1]) + end_position = self.get_position_x(tail, current_ms) + end_position += self.judge_x + pause_position = tex.skin_config["balloon_pause_position"].x + self.judge_x + y = tex.skin_config["notes"].y + self.get_position_y() + if current_ms >= tail.hit_ms: + position = end_position + elif current_ms >= head.hit_ms: + position = pause_position + else: + position = start_position + if head.display: + tex.draw_texture('notes', str(head.type), frame=current_eighth % 2, x=position-offset, y=y+(self.is_2p*tex.skin_config["2p_offset"].y)+self.judge_y) + tex.draw_texture('notes', '10', frame=current_eighth % 2, x=position-offset+tex.textures["notes"]["10"].width, y=y+(self.is_2p*tex.skin_config["2p_offset"].y)+self.judge_y) + + def draw_bars(self, current_ms: float): + """Draw bars in the player's lane""" + if not self.current_bars: + return + + for bar in reversed(self.current_bars): + if not bar.display: + continue + x_position = self.get_position_x(bar, current_ms) + y_position = self.get_position_y() + if y_position != 0: + angle = math.degrees(math.atan2(bar.pixels_per_frame_y, bar.pixels_per_frame_x)) + else: + angle = 0 + tex.draw_texture('notes', str(bar.type), x=x_position+tex.skin_config["moji_drumroll"].x, y=y_position+tex.skin_config["moji_drumroll"].y+(self.is_2p*tex.skin_config["2p_offset"].y), rotation=angle) + + + def draw_notes(self, current_ms: float, start_ms: float): + """Draw notes in the player's lane""" + if not self.current_notes_draw: + return + + for note in reversed(self.current_notes_draw): + if self.balloon_anim is not None and note == self.current_notes_draw[0]: + continue + if note.type == NoteType.TAIL: + continue + + current_eighth = 0 + x_position = self.get_position_x(note, current_ms) + y_position = self.get_position_y() + if isinstance(note, Drumroll): + pass + #self.draw_drumroll(current_ms, note, current_eighth) + elif isinstance(note, Balloon) and not note.is_kusudama: + pass + #self.draw_balloon(current_ms, note, current_eighth) + #tex.draw_texture('notes', 'moji', frame=note.moji, x=x_position, y=tex.skin_config["moji"].y + y_position+(self.is_2p*tex.skin_config["2p_offset"].y)) + else: + if note.display: + tex.draw_texture('notes', str(note.type), frame=current_eighth % 2, x=x_position - (tex.textures["notes"]["1"].width//2), y=y_position+tex.skin_config["notes"].y+(self.is_2p*tex.skin_config["2p_offset"].y), center=True) + tex.draw_texture('notes', 'moji', frame=note.moji, x=x_position - (tex.textures["notes"]["moji"].width//2), y=tex.skin_config["moji"].y + y_position+(self.is_2p*tex.skin_config["2p_offset"].y))