154 lines
6.5 KiB
Python
154 lines
6.5 KiB
Python
#!/usr/bin/python3
|
|
import time, toml, os
|
|
|
|
global starttime, apiRequests, apiFailedRequests, outsideApiHits, config, version, apiVersion, colors, realUptime
|
|
|
|
class colors:
|
|
HEADER = '\033[95m'
|
|
OKBLUE = '\033[94m'
|
|
OKCYAN = '\033[96m'
|
|
OKGREEN = '\033[92m'
|
|
WARNING = '\033[93m'
|
|
FAIL = '\033[91m'
|
|
ENDC = '\033[0m'
|
|
BOLD = '\033[1m'
|
|
UNDERLINE = '\033[4m'
|
|
ENDL = '\n'
|
|
|
|
def notImplemented(name):
|
|
return 501, f"not recognised/implemented: {name}", []
|
|
|
|
configfile = "config.toml"
|
|
version = "0.0.1"
|
|
apiVersion = "1"
|
|
randomly_generated_passcode = 0
|
|
video_cache = {}
|
|
general_cache = {"search": [], "continuations": {"channels": {}, "comments": {}}, "channels": {}, "playlists": {}, "storyboards": {}, "hashed_videoplayback": {}}
|
|
|
|
def getConfig(configfile):
|
|
|
|
# this function is responsible for an unwanted warning when using --help without config.toml
|
|
# for now it's not worth it to account for that edge case.
|
|
global randomly_generated_passcode
|
|
|
|
if not os.path.exists(configfile):
|
|
dummy_config = {'general': {'db_file_path': 'ythdd_db.sqlite', 'video_storage_directory_path': 'videos/', 'is_proxied': False, 'public_facing_url': 'http://127.0.0.1:5000/', 'debug': False, 'cache': True}, 'api': {'api_key': 'CHANGEME'}, 'proxy': {'user-agent': '', 'allow_proxying_videos': True, 'match_initcwndbps': True}, 'extractor': {'user-agent': '', 'cookies_path': ''}, 'admin': {'admins': ['admin']}, 'yt_dlp': {}, 'postprocessing': {'presets': [{'name': 'recommended: [N][<=720p] best V+A', 'format': 'bv[height<=720]+ba', 'reencode': ''}, {'name': '[N][1080p] best V+A', 'format': 'bv[height=1080]+ba', 'reencode': ''}, {'name': '[R][1080p] webm', 'format': 'bv[height=1080]+ba', 'reencode': 'webm'}, {'name': '[N][720p] best V+A', 'format': 'bv[height=720]+ba', 'reencode': ''}, {'name': '[R][720p] webm', 'format': 'bv[height=720]+ba', 'reencode': 'webm'}, {'name': '[N][480p] best V+A', 'format': 'bv[height=480]+ba', 'reencode': ''}, {'name': '[480p] VP9 webm/reencode', 'format': 'bv*[height=480][ext=webm]+ba/bv[height=480]+ba', 'reencode': 'webm'}, {'name': '[N][1080p] best video only', 'format': 'bv[height=1080]', 'reencode': ''}, {'name': '[N][opus] best audio only', 'format': 'ba', 'reencode': 'opus'}]}}
|
|
# if a passcode has not been provided by the user (config file doesn't exist, and user didn't specify it using an argument)
|
|
print(f"{colors.WARNING}WARNING{colors.ENDC}: Using default, baked in config data. {colors.ENDL}"
|
|
f" Consider copying and editing the provided example file ({colors.OKCYAN}config.default.toml{colors.ENDC}).")
|
|
if randomly_generated_passcode == 0:
|
|
# generate a pseudorandom one and use it in the temporary config
|
|
randomly_generated_passcode = str(int(time.time() * 1337 % 899_999 + 100_000))
|
|
|
|
print(f"{colors.WARNING}WARNING{colors.ENDC}: Default config populated with one-time, insecure pseudorandom admin API key: {colors.OKCYAN}{randomly_generated_passcode}{colors.ENDC}.\n"
|
|
f" The admin API key is not the Flask debugger PIN. You need to provide a config file for persistence!{colors.ENDL}")
|
|
|
|
dummy_config['api']['api_key_admin'] = randomly_generated_passcode
|
|
return dummy_config
|
|
|
|
else:
|
|
return toml.load(configfile)
|
|
|
|
def setConfig(configfile):
|
|
global config
|
|
config = getConfig(configfile)
|
|
|
|
#setConfig(configfile)
|
|
config = {}
|
|
|
|
def getHeaders(caller="proxy"):
|
|
|
|
# NOTE: use ESR user-agent
|
|
# user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:130.0) Gecko/20100101 Firefox/130.0'
|
|
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:143.0) Gecko/20100101 Firefox/143.0'
|
|
|
|
if config[caller]['user-agent']:
|
|
user_agent = config[caller]['user-agent']
|
|
|
|
headers = {
|
|
'User-Agent': user_agent,
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.5',
|
|
'DNT': '1',
|
|
'Sec-GPC': '1',
|
|
'Connection': 'keep-alive',
|
|
'Upgrade-Insecure-Requests': '1',
|
|
'Sec-Fetch-Dest': 'document',
|
|
'Sec-Fetch-Mode': 'navigate',
|
|
'Sec-Fetch-Site': 'none',
|
|
'Sec-Fetch-User': '?1',
|
|
'Priority': 'u=0, i',
|
|
'Pragma': 'no-cache',
|
|
'Cache-Control': 'no-cache',
|
|
}
|
|
|
|
return headers
|
|
|
|
def translateLinks(link: str, remove_params: bool = True):
|
|
|
|
link = link.replace("https://i.ytimg.com/", config['general']['public_facing_url'])
|
|
link = link.replace("https://yt3.ggpht.com/", config['general']['public_facing_url'] + "ggpht/")
|
|
link = link.replace("https://yt3.googleusercontent.com/", config['general']['public_facing_url'] + "guc/")
|
|
|
|
# try to remove tracking params
|
|
if remove_params and "?" in link:
|
|
link = link[:link.find("?")]
|
|
|
|
return link
|
|
|
|
def getUptime():
|
|
return int(time.time()) - starttime
|
|
|
|
def safeTraverse(obj: dict, path: list, default=None, quiet: bool = False):
|
|
"""
|
|
Traverse dynamic objects with fallback to default values
|
|
|
|
This function can take an Ellipsis as part of traversal path,
|
|
meaning that it will return the object from the list
|
|
that contains the next key. This has been introduced
|
|
so that no matter which object in a list holds the relevant
|
|
model, it will find it (meaning no assumptions are necessary).
|
|
Kepp in mind that only one ellipsis at a time is supported,
|
|
thus ["some_key", ..., ..., "some_other_key"] won't work.
|
|
|
|
:param obj: Traversed object
|
|
:type obj: dict
|
|
:param path: Path which shall be traversed
|
|
:type path: list
|
|
:param default: Default value returned on failure
|
|
:type default: any, None by default
|
|
:param quiet: Quiet flag
|
|
:type quiet: bool
|
|
"""
|
|
result = obj
|
|
try:
|
|
# for every item in path and its position
|
|
for pos, iterable_key in enumerate(path):
|
|
# if the key is not an ellipsis, traverse it
|
|
if iterable_key is not Ellipsis:
|
|
result = result[iterable_key]
|
|
# if it is an ellipsis, and there is another key beside it
|
|
elif pos < len(path) - 1:
|
|
# then iterate through all of the list contents
|
|
for list_content in result:
|
|
# in search of the next traversal key
|
|
if path[pos + 1] in list_content:
|
|
result = list_content
|
|
# show an error message if ellipsis is used incorrectly
|
|
else:
|
|
print("error(safeTraverse): Traversal path can't end with an Ellipsis!")
|
|
raise TypeError()
|
|
# handle exceptions
|
|
except (KeyError, TypeError, IndexError):
|
|
result = default
|
|
if not quiet: print(f"error reading: {' -> '.join([str(x) for x in path])} - returning: {default}")
|
|
finally:
|
|
return result
|
|
|
|
def getCommit() -> str | None:
|
|
try:
|
|
return Repo(search_parent_directories=True).head.object.hexsha
|
|
except Exception as e:
|
|
return None
|
|
|