a lot of code responsible for parsing data into invidious-compatible structures has been moved to the ythdd_struct_parser file
116 lines
4.8 KiB
Python
116 lines
4.8 KiB
Python
#!/usr/bin/python3
|
|
import time, toml, os
|
|
|
|
global starttime, apiRequests, apiFailedRequests, outsideApiHits, config, version, apiVersion, colors, realUptime
|
|
|
|
class colors:
|
|
HEADER = '\033[95m'
|
|
OKBLUE = '\033[94m'
|
|
OKCYAN = '\033[96m'
|
|
OKGREEN = '\033[92m'
|
|
WARNING = '\033[93m'
|
|
FAIL = '\033[91m'
|
|
ENDC = '\033[0m'
|
|
BOLD = '\033[1m'
|
|
UNDERLINE = '\033[4m'
|
|
ENDL = '\n'
|
|
|
|
def notImplemented(name):
|
|
return 501, f"not recognised/implemented: {name}", []
|
|
|
|
configfile = "config.toml"
|
|
version = "0.0.1"
|
|
apiVersion = "1"
|
|
randomly_generated_passcode = 0
|
|
video_cache = {}
|
|
general_cache = {"search": [], "continuations": {"channels": {}}, "channels": {}}
|
|
|
|
def getConfig(configfile):
|
|
|
|
# this function is responsible for an unwanted warning when using --help without config.toml
|
|
# for now it's not worth it to account for that edge case.
|
|
global randomly_generated_passcode
|
|
|
|
if not os.path.exists(configfile):
|
|
dummy_config = {'general': {'db_file_path': 'ythdd_db.sqlite', 'video_storage_directory_path': 'videos/', 'is_proxied': False, 'public_facing_url': 'http://127.0.0.1:5000/', 'debug': False, 'cache': True}, 'api': {'api_key': 'CHANGEME'}, 'proxy': {'user-agent': ''}, 'extractor': {'user-agent': '', 'cookies_path': ''}, 'admin': {'admins': ['admin']}, 'yt_dlp': {}, 'postprocessing': {'presets': [{'name': 'recommended: [N][<=720p] best V+A', 'format': 'bv[height<=720]+ba', 'reencode': ''}, {'name': '[N][1080p] best V+A', 'format': 'bv[height=1080]+ba', 'reencode': ''}, {'name': '[R][1080p] webm', 'format': 'bv[height=1080]+ba', 'reencode': 'webm'}, {'name': '[N][720p] best V+A', 'format': 'bv[height=720]+ba', 'reencode': ''}, {'name': '[R][720p] webm', 'format': 'bv[height=720]+ba', 'reencode': 'webm'}, {'name': '[N][480p] best V+A', 'format': 'bv[height=480]+ba', 'reencode': ''}, {'name': '[480p] VP9 webm/reencode', 'format': 'bv*[height=480][ext=webm]+ba/bv[height=480]+ba', 'reencode': 'webm'}, {'name': '[N][1080p] best video only', 'format': 'bv[height=1080]', 'reencode': ''}, {'name': '[N][opus] best audio only', 'format': 'ba', 'reencode': 'opus'}]}}
|
|
# if a passcode has not been provided by the user (config file doesn't exist, and user didn't specify it using an argument)
|
|
print(f"{colors.WARNING}WARNING{colors.ENDC}: Using default, baked in config data. {colors.ENDL}"
|
|
f" Consider copying and editing the provided example file ({colors.OKCYAN}config.default.toml{colors.ENDC}).")
|
|
if randomly_generated_passcode == 0:
|
|
# generate a pseudorandom one and use it in the temporary config
|
|
randomly_generated_passcode = str(int(time.time() * 1337 % 899_999 + 100_000))
|
|
|
|
print(f"{colors.WARNING}WARNING{colors.ENDC}: Default config populated with one-time, insecure pseudorandom admin API key: {colors.OKCYAN}{randomly_generated_passcode}{colors.ENDC}.\n"
|
|
f" The admin API key is not the Flask debugger PIN. You need to provide a config file for persistence!{colors.ENDL}")
|
|
|
|
dummy_config['api']['api_key_admin'] = randomly_generated_passcode
|
|
return dummy_config
|
|
|
|
else:
|
|
return toml.load(configfile)
|
|
|
|
def setConfig(configfile):
|
|
global config
|
|
config = getConfig(configfile)
|
|
|
|
#setConfig(configfile)
|
|
config = {}
|
|
|
|
def getHeaders(caller="proxy"):
|
|
|
|
# NOTE: use ESR user-agent
|
|
# user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:130.0) Gecko/20100101 Firefox/130.0'
|
|
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0'
|
|
|
|
if config[caller]['user-agent']:
|
|
user_agent = config[caller]['user-agent']
|
|
|
|
headers = {
|
|
'User-Agent': user_agent,
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.5',
|
|
'DNT': '1',
|
|
'Sec-GPC': '1',
|
|
'Connection': 'keep-alive',
|
|
'Upgrade-Insecure-Requests': '1',
|
|
'Sec-Fetch-Dest': 'document',
|
|
'Sec-Fetch-Mode': 'navigate',
|
|
'Sec-Fetch-Site': 'none',
|
|
'Sec-Fetch-User': '?1',
|
|
'Priority': 'u=0, i',
|
|
'Pragma': 'no-cache',
|
|
'Cache-Control': 'no-cache',
|
|
}
|
|
|
|
return headers
|
|
|
|
def translateLinks(link):
|
|
|
|
link = link.replace("https://i.ytimg.com/", config['general']['public_facing_url'])
|
|
link = link.replace("https://yt3.ggpht.com/", config['general']['public_facing_url'] + "ggpht/")
|
|
link = link.replace("https://yt3.googleusercontent.com/", config['general']['public_facing_url'] + "guc/")
|
|
|
|
return link
|
|
|
|
def getUptime():
|
|
return int(time.time()) - starttime
|
|
|
|
def safeTraverse(obj: dict, path: list, default=None):
|
|
result = obj
|
|
try:
|
|
for x in path:
|
|
#print(f"traversing {result} with respect to {x}")
|
|
result = result[x]
|
|
except (KeyError, TypeError):
|
|
result = default
|
|
print(f"error reading: {' -> '.join(path)} - returning: {default}")
|
|
finally:
|
|
return result
|
|
|
|
def getCommit() -> str | None:
|
|
try:
|
|
return Repo(search_parent_directories=True).head.object.hexsha
|
|
except Exception as e:
|
|
return None
|
|
|