#!/usr/bin/python3 import time, toml, os global starttime, apiRequests, apiFailedRequests, outsideApiHits, config, version, apiVersion, colors, realUptime class colors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' ENDL = '\n' def notImplemented(name): return 501, f"not recognised/implemented: {name}", [] configfile = "config.toml" version = "0.0.1" apiVersion = "1" randomly_generated_passcode = 0 video_cache = {} general_cache = {"search": [], "continuations": {"channels": {}, "comments": {}}, "channels": {}, "playlists": {}, "storyboards": {}, "hashed_videoplayback": {}} def getConfig(configfile): # this function is responsible for an unwanted warning when using --help without config.toml # for now it's not worth it to account for that edge case. global randomly_generated_passcode if not os.path.exists(configfile): dummy_config = {'general': {'db_file_path': 'ythdd_db.sqlite', 'video_storage_directory_path': 'videos/', 'is_proxied': False, 'public_facing_url': 'http://127.0.0.1:5000/', 'debug': False, 'cache': True}, 'api': {'api_key': 'CHANGEME'}, 'proxy': {'user-agent': '', 'allow_proxying_videos': True, 'match_initcwndbps': True}, 'extractor': {'user-agent': '', 'cookies_path': ''}, 'admin': {'admins': ['admin']}, 'yt_dlp': {}, 'postprocessing': {'presets': [{'name': 'recommended: [N][<=720p] best V+A', 'format': 'bv[height<=720]+ba', 'reencode': ''}, {'name': '[N][1080p] best V+A', 'format': 'bv[height=1080]+ba', 'reencode': ''}, {'name': '[R][1080p] webm', 'format': 'bv[height=1080]+ba', 'reencode': 'webm'}, {'name': '[N][720p] best V+A', 'format': 'bv[height=720]+ba', 'reencode': ''}, {'name': '[R][720p] webm', 'format': 'bv[height=720]+ba', 'reencode': 'webm'}, {'name': '[N][480p] best V+A', 'format': 'bv[height=480]+ba', 'reencode': ''}, {'name': '[480p] VP9 webm/reencode', 'format': 'bv*[height=480][ext=webm]+ba/bv[height=480]+ba', 'reencode': 'webm'}, {'name': '[N][1080p] best video only', 'format': 'bv[height=1080]', 'reencode': ''}, {'name': '[N][opus] best audio only', 'format': 'ba', 'reencode': 'opus'}]}} # if a passcode has not been provided by the user (config file doesn't exist, and user didn't specify it using an argument) print(f"{colors.WARNING}WARNING{colors.ENDC}: Using default, baked in config data. {colors.ENDL}" f" Consider copying and editing the provided example file ({colors.OKCYAN}config.default.toml{colors.ENDC}).") if randomly_generated_passcode == 0: # generate a pseudorandom one and use it in the temporary config randomly_generated_passcode = str(int(time.time() * 1337 % 899_999 + 100_000)) print(f"{colors.WARNING}WARNING{colors.ENDC}: Default config populated with one-time, insecure pseudorandom admin API key: {colors.OKCYAN}{randomly_generated_passcode}{colors.ENDC}.\n" f" The admin API key is not the Flask debugger PIN. You need to provide a config file for persistence!{colors.ENDL}") dummy_config['api']['api_key_admin'] = randomly_generated_passcode return dummy_config else: return toml.load(configfile) def setConfig(configfile): global config config = getConfig(configfile) #setConfig(configfile) config = {} def getHeaders(caller="proxy"): # NOTE: use ESR user-agent # user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:130.0) Gecko/20100101 Firefox/130.0' user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:143.0) Gecko/20100101 Firefox/143.0' if config[caller]['user-agent']: user_agent = config[caller]['user-agent'] headers = { 'User-Agent': user_agent, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'DNT': '1', 'Sec-GPC': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Priority': 'u=0, i', 'Pragma': 'no-cache', 'Cache-Control': 'no-cache', } return headers def translateLinks(link: str, remove_params: bool = True): link = link.replace("https://i.ytimg.com/", config['general']['public_facing_url']) link = link.replace("https://yt3.ggpht.com/", config['general']['public_facing_url'] + "ggpht/") link = link.replace("https://yt3.googleusercontent.com/", config['general']['public_facing_url'] + "guc/") # try to remove tracking params if remove_params and "?" in link: link = link[:link.find("?")] return link def getUptime(): return int(time.time()) - starttime def safeTraverse(obj: dict, path: list, default=None, quiet: bool = False): """ Traverse dynamic objects with fallback to default values This function can take an Ellipsis as part of traversal path, meaning that it will return the object from the list that contains the next key. This has been introduced so that no matter which object in a list holds the relevant model, it will find it (meaning no assumptions are necessary). Kepp in mind that only one ellipsis at a time is supported, thus ["some_key", ..., ..., "some_other_key"] won't work. :param obj: Traversed object :type obj: dict :param path: Path which shall be traversed :type path: list :param default: Default value returned on failure :type default: any, None by default :param quiet: Quiet flag :type quiet: bool """ result = obj try: # for every item in path and its position for pos, iterable_key in enumerate(path): # if the key is not an ellipsis, traverse it if iterable_key is not Ellipsis: result = result[iterable_key] # if it is an ellipsis, and there is another key beside it elif pos < len(path) - 1: # then iterate through all of the list contents for list_content in result: # in search of the next traversal key if path[pos + 1] in list_content: result = list_content # show an error message if ellipsis is used incorrectly else: print("error(safeTraverse): Traversal path can't end with an Ellipsis!") raise TypeError() # handle exceptions except (KeyError, TypeError, IndexError): result = default if not quiet: print(f"error reading: {' -> '.join([str(x) for x in path])} - returning: {default}") finally: return result def getCommit() -> str | None: try: return Repo(search_parent_directories=True).head.object.hexsha except Exception as e: return None