Files
ythdd/ythdd_globals.py
sherl caa9e0c2b1 feat: list traversal improvements with safeTraverse()
translateLinks() also now strips any params (if present) by default
2025-09-25 06:34:13 +02:00

154 lines
6.3 KiB
Python

#!/usr/bin/python3
import time, toml, os
global starttime, apiRequests, apiFailedRequests, outsideApiHits, config, version, apiVersion, colors, realUptime
class colors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
ENDL = '\n'
def notImplemented(name):
return 501, f"not recognised/implemented: {name}", []
configfile = "config.toml"
version = "0.0.1"
apiVersion = "1"
randomly_generated_passcode = 0
video_cache = {}
general_cache = {"search": [], "continuations": {"channels": {}, "comments": {}}, "channels": {}}
def getConfig(configfile):
# this function is responsible for an unwanted warning when using --help without config.toml
# for now it's not worth it to account for that edge case.
global randomly_generated_passcode
if not os.path.exists(configfile):
dummy_config = {'general': {'db_file_path': 'ythdd_db.sqlite', 'video_storage_directory_path': 'videos/', 'is_proxied': False, 'public_facing_url': 'http://127.0.0.1:5000/', 'debug': False, 'cache': True}, 'api': {'api_key': 'CHANGEME'}, 'proxy': {'user-agent': ''}, 'extractor': {'user-agent': '', 'cookies_path': ''}, 'admin': {'admins': ['admin']}, 'yt_dlp': {}, 'postprocessing': {'presets': [{'name': 'recommended: [N][<=720p] best V+A', 'format': 'bv[height<=720]+ba', 'reencode': ''}, {'name': '[N][1080p] best V+A', 'format': 'bv[height=1080]+ba', 'reencode': ''}, {'name': '[R][1080p] webm', 'format': 'bv[height=1080]+ba', 'reencode': 'webm'}, {'name': '[N][720p] best V+A', 'format': 'bv[height=720]+ba', 'reencode': ''}, {'name': '[R][720p] webm', 'format': 'bv[height=720]+ba', 'reencode': 'webm'}, {'name': '[N][480p] best V+A', 'format': 'bv[height=480]+ba', 'reencode': ''}, {'name': '[480p] VP9 webm/reencode', 'format': 'bv*[height=480][ext=webm]+ba/bv[height=480]+ba', 'reencode': 'webm'}, {'name': '[N][1080p] best video only', 'format': 'bv[height=1080]', 'reencode': ''}, {'name': '[N][opus] best audio only', 'format': 'ba', 'reencode': 'opus'}]}}
# if a passcode has not been provided by the user (config file doesn't exist, and user didn't specify it using an argument)
print(f"{colors.WARNING}WARNING{colors.ENDC}: Using default, baked in config data. {colors.ENDL}"
f" Consider copying and editing the provided example file ({colors.OKCYAN}config.default.toml{colors.ENDC}).")
if randomly_generated_passcode == 0:
# generate a pseudorandom one and use it in the temporary config
randomly_generated_passcode = str(int(time.time() * 1337 % 899_999 + 100_000))
print(f"{colors.WARNING}WARNING{colors.ENDC}: Default config populated with one-time, insecure pseudorandom admin API key: {colors.OKCYAN}{randomly_generated_passcode}{colors.ENDC}.\n"
f" The admin API key is not the Flask debugger PIN. You need to provide a config file for persistence!{colors.ENDL}")
dummy_config['api']['api_key_admin'] = randomly_generated_passcode
return dummy_config
else:
return toml.load(configfile)
def setConfig(configfile):
global config
config = getConfig(configfile)
#setConfig(configfile)
config = {}
def getHeaders(caller="proxy"):
# NOTE: use ESR user-agent
# user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:130.0) Gecko/20100101 Firefox/130.0'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:143.0) Gecko/20100101 Firefox/143.0'
if config[caller]['user-agent']:
user_agent = config[caller]['user-agent']
headers = {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'DNT': '1',
'Sec-GPC': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Priority': 'u=0, i',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
return headers
def translateLinks(link: str, remove_params: bool = True):
link = link.replace("https://i.ytimg.com/", config['general']['public_facing_url'])
link = link.replace("https://yt3.ggpht.com/", config['general']['public_facing_url'] + "ggpht/")
link = link.replace("https://yt3.googleusercontent.com/", config['general']['public_facing_url'] + "guc/")
# try to remove tracking params
if remove_params and "?" in link:
link = link[:link.find("?")]
return link
def getUptime():
return int(time.time()) - starttime
def safeTraverse(obj: dict, path: list, default=None, quiet: bool = False):
"""
Traverse dynamic objects with fallback to default values
This function can take an Ellipsis as part of traversal path,
meaning that it will return the object from the list
that contains the next key. This has been introduced
so that no matter which object in a list holds the relevant
model, it will find it (meaning no assumptions are necessary).
Kepp in mind that only one ellipsis at a time is supported,
thus ["some_key", ..., ..., "some_other_key"] won't work.
:param obj: Traversed object
:type obj: dict
:param path: Path which shall be traversed
:type path: list
:param default: Default value returned on failure
:type default: any, None by default
:param quiet: Quiet flag
:type quiet: bool
"""
result = obj
try:
# for every item in path and its position
for pos, iterable_key in enumerate(path):
# if the key is not an ellipsis, traverse it
if iterable_key is not Ellipsis:
result = result[iterable_key]
# if it is an ellipsis, and there is another key beside it
elif pos < len(path) - 1:
# then iterate through all of the list contents
for list_content in result:
# in search of the next traversal key
if path[pos + 1] in list_content:
result = list_content
# show an error message if ellipsis is used incorrectly
else:
print("error(safeTraverse): Traversal path can't end with an Ellipsis!")
raise TypeError()
# handle exceptions
except (KeyError, TypeError, IndexError):
result = default
if not quiet: print(f"error reading: {' -> '.join([str(x) for x in path])} - returning: {default}")
finally:
return result
def getCommit() -> str | None:
try:
return Repo(search_parent_directories=True).head.object.hexsha
except Exception as e:
return None