Compare commits
95 Commits
3212627d89
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 56af1f0735 | |||
| 72266aad0f | |||
| 900cc92229 | |||
| 2687cc2bdc | |||
| 4a9d59c9b4 | |||
| 4af581ab7c | |||
| d1f381220d | |||
| eebf434f3e | |||
| c979c97077 | |||
| 11c94c757e | |||
| 4421e68d9d | |||
| da2daab16a | |||
| 05b81e55da | |||
| 158dcc3b7f | |||
| 668e8c32aa | |||
| 760aaccfff | |||
| da54bd0818 | |||
| b0845d723a | |||
| c760104d70 | |||
| d6cb0fe692 | |||
| 81fba8c4d0 | |||
| 260039c307 | |||
| 2b24fc2906 | |||
| 002e3cba33 | |||
| 5944fd2458 | |||
| 719b545cb4 | |||
| 468795a7a2 | |||
| 7eb4452fec | |||
| e7ae42f289 | |||
| 5bb542826e | |||
| 365c54d214 | |||
| a2e2fedc90 | |||
| 4a311f185c | |||
| 45073fff67 | |||
| f73e6c11d4 | |||
| 34e00e2492 | |||
| f63c620541 | |||
| f2adc2e561 | |||
| e6d32091e1 | |||
| b98aa718b0 | |||
| 30850a7ce0 | |||
| 1c9174c888 | |||
| 6d0c70696b | |||
| 509e81aafa | |||
| caa9e0c2b1 | |||
| 873abbd413 | |||
| bedcaff7c0 | |||
| 5dcf8c81e0 | |||
| 60712f3b5d | |||
| c20d4c34aa | |||
| be697bb572 | |||
| 3e84cf3443 | |||
| 4a3937a923 | |||
| c3fae689e1 | |||
| 4cfb1db7d0 | |||
| 5a1e772909 | |||
| 7c4991cea7 | |||
| 5f88d6f096 | |||
| eaaa14c4d8 | |||
| ef177f7200 | |||
| d0d2298186 | |||
| 7086177a58 | |||
| dc8009db23 | |||
| e562d1ee14 | |||
| 96c1b5396e | |||
| 256d21bbcd | |||
| 2a9826eb03 | |||
| 37e932956d | |||
| 15d2de5228 | |||
| 3cf203ee27 | |||
| a814797363 | |||
| 89f8f2a786 | |||
| 5e655ddd2c | |||
| 3e7589aea6 | |||
| cd34b2e406 | |||
| 3a0a89f737 | |||
| 55a116e042 | |||
| 51b94842e3 | |||
| 3253447c72 | |||
| 32c563a45a | |||
| 3ea3558990 | |||
| 6cdae051b5 | |||
| 4e066e4b23 | |||
| 837567f8c8 | |||
| 99e914557a | |||
| 06679ee165 | |||
| dbc90d3f74 | |||
| d1b9f90e7e | |||
| 019e47edd9 | |||
| 8fa05486f1 | |||
| 71a24a4870 | |||
| 2837cdf612 | |||
| ee31114e51 | |||
| dd102cb6ae | |||
| 3a524d96a0 |
48
README.md
48
README.md
@@ -1,2 +1,50 @@
|
||||
# ythdd
|
||||
*Archive videos while you watch them.*
|
||||
|
||||
**ythdd** is tool which can help you create a multimedia archive by downloading videos and their metadata while you watch them.
|
||||
It does so by acting as an intermediary between you and YouTube, forwarding your requests while also saving them to a hard drive, hence the name.
|
||||
|
||||
## Goals
|
||||
**Current goal of the project**: provide support for [Invidious](https://github.com/iv-org/invidious) API calls through the Invidious translation layer (ythdd_inv_tl.py).
|
||||
|
||||
## Running ythdd:
|
||||
- First, download project files or clone the repository with git:
|
||||
```
|
||||
git clone https://gitea.7o7.cx/sherl/ythdd.git
|
||||
```
|
||||
|
||||
- Navigate into the directory with project files:
|
||||
```
|
||||
cd ythdd
|
||||
```
|
||||
|
||||
- Create a virtual enviroment:
|
||||
```
|
||||
python -m venv .venv
|
||||
```
|
||||
Above will create a hidden directory called *.venv*.
|
||||
|
||||
- To activate the virtual enviroment use:
|
||||
- on Linux:
|
||||
```
|
||||
source .venv/bin/activate
|
||||
```
|
||||
- on Windows (cmd):
|
||||
```
|
||||
.venv\Scripts\activate
|
||||
```
|
||||
|
||||
- Install required packages into the newly created virtual environment:
|
||||
```
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
- Run the script:
|
||||
- continuously without interrupts:
|
||||
```
|
||||
python ythdd.py
|
||||
```
|
||||
- with automatic reloading (code will be reloaded after a change to the codebase)
|
||||
```
|
||||
flask --app ythdd run --debug
|
||||
```
|
||||
@@ -1,19 +1,27 @@
|
||||
[general]
|
||||
db_file_path = "/path/to/ythdd_db.sqlite" # Preferably stored on an SSD.
|
||||
db_file_path = "ythdd_db.sqlite" # Path to the databse file, preferably stored on an SSD.
|
||||
video_storage_directory_path = "/path/to/videos/" # Path to video vault.
|
||||
is_proxied = false # Set to true if running behind reverse proxy.
|
||||
public_facing_url = "http://localhost:5000/" # Used for URL rewriting. Note the trailing backslash /.
|
||||
public_facing_url = "http://127.0.0.1:5000/" # Used for URL rewriting. Note the trailing backslash /.
|
||||
debug = false # Whether to print verbose, debug info on API endpoints.
|
||||
cache = true # Whether to cache requests for 3 hours (temporary solution to long load times).
|
||||
|
||||
[api]
|
||||
api_key = "" # Leave empty API key for public access to non-sensitive backend
|
||||
api_key_admin = "CHANGEME" # Empty *admin* API key will autogenerate a random one every launch.
|
||||
api_key = "" # Leave empty API key for public access to non-sensitive backend
|
||||
api_key_admin = "CHANGEME" # Empty *admin* API key will autogenerate a random one every launch.
|
||||
enable_debugger_halt = false # Whether to allow to trigger pdb using admin's API key.
|
||||
|
||||
[extractor]
|
||||
user-agent = "" # Leave empty for default (Firefox ESR).
|
||||
cookies_path = "" # Leave empty for none.
|
||||
user-agent = "" # Leave empty for default (Firefox ESR).
|
||||
cookies_path = "" # Leave empty for none.
|
||||
age_restricted_cookies_path = "" # Cookies to use when bypassing age-gated videos only. Leave empty to disable.
|
||||
deno_path = "" # Required when using cookies.
|
||||
preferred_extractor = "" # Leave empty for default (android_vr).
|
||||
|
||||
[proxy]
|
||||
user-agent = "" # Leave empty for default (Firefox ESR).
|
||||
user-agent = "" # Leave empty for default (Firefox ESR).
|
||||
allow_proxying_videos = false # Whether to allow video proxying through the instance (traffic-intensive).
|
||||
match_initcwndbps = true # Experimental: matches proxying speed to the one suggested by Innertube (may help avoid being ratelimited/banned).
|
||||
|
||||
[admin]
|
||||
# List of users with admin priviledges.
|
||||
|
||||
@@ -13,3 +13,5 @@ Flask-APScheduler>=1.13.1
|
||||
requests>=2.32.3
|
||||
yt_dlp
|
||||
brotli>=1.1.0
|
||||
dateparser>=1.2.2
|
||||
bbpb>=1.4.2
|
||||
129
views.py
129
views.py
@@ -1,8 +1,8 @@
|
||||
#!/usr/bin/python3
|
||||
from flask import render_template, Response
|
||||
from flask import redirect, render_template, request, Response
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from markupsafe import escape
|
||||
import requests, json
|
||||
import hashlib, json, re, requests
|
||||
import ythdd_globals
|
||||
|
||||
def homepage():
|
||||
@@ -26,18 +26,141 @@ def thumbnailProxy(received_request):
|
||||
'error_msg': 'invalid request. pretend this is a thumbnail :D'
|
||||
}), mimetype='application/json', status=400)
|
||||
|
||||
thumbnail = requests.get(prefix + "vi/" + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
||||
quality_urls = ['maxresdefault', 'sddefault', 'hqdefault', 'mqdefault', 'default', '1', '2', '3']
|
||||
video_id, requested_quality = received_request.split('/')
|
||||
|
||||
thumbnail = requests.get(prefix + "vi/" + video_id + "/" + requested_quality, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
||||
thumbnail.raw.decode_content = True
|
||||
|
||||
quality_id = 0
|
||||
if requested_quality == "maxres.jpg":
|
||||
# if requested quality is maxres,
|
||||
# provide the best quality possible
|
||||
while thumbnail.status_code != 200:
|
||||
thumbnail = requests.get(prefix + "vi/" + video_id + "/" + quality_urls[quality_id] + ".jpg", headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
||||
thumbnail.raw.decode_content = True
|
||||
quality_id += 1
|
||||
|
||||
response = Response(thumbnail.raw, mimetype=thumbnail.headers['content-type'], status=thumbnail.status_code)
|
||||
|
||||
return response
|
||||
|
||||
def storyboardProxy(received_request):
|
||||
|
||||
# will proxy storyboards
|
||||
prefix = "https://i.ytimg.com/"
|
||||
sqp = request.args.get("sqp")
|
||||
sigh = request.args.get("sigh")
|
||||
|
||||
if sqp is None or sigh is None:
|
||||
return Response(json.dumps({"status": "error", "error": "Request lacks the sqp, sigh params (or both)."}), mimetype="application/json", status=400)
|
||||
|
||||
sb = requests.get(prefix + "sb/" + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), params={"sqp": sqp, "sigh": sigh}, stream=True)
|
||||
sb.raw.decode_content = True
|
||||
response = Response(sb.raw, mimetype=sb.headers['content-type'], status=sb.status_code)
|
||||
return response
|
||||
|
||||
def ggphtProxy(received_request):
|
||||
|
||||
prefix = "https://yt3.ggpht.com/"
|
||||
|
||||
# fix for how materialious fetches avatars
|
||||
if received_request.startswith("guc/"):
|
||||
return gucProxy(received_request.removeprefix("guc/"))
|
||||
|
||||
ggpht = requests.get(prefix + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
||||
ggpht.raw.decode_content = True
|
||||
response = Response(ggpht.raw, mimetype=ggpht.headers['content-type'], status=ggpht.status_code)
|
||||
|
||||
return response
|
||||
|
||||
def gucProxy(received_request):
|
||||
|
||||
prefix = "https://yt3.googleusercontent.com/"
|
||||
|
||||
guc = requests.get(prefix + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
||||
guc.raw.decode_content = True
|
||||
response = Response(guc.raw, mimetype=guc.headers['content-type'], status=guc.status_code)
|
||||
|
||||
return response
|
||||
|
||||
def imgProxy(received_request):
|
||||
|
||||
# will proxy /img/no_thumbnail.jpg
|
||||
prefix = "https://i.ytimg.com/"
|
||||
|
||||
thumbnail = requests.get(prefix + "img/" + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
||||
thumbnail.raw.decode_content = True
|
||||
response = Response(thumbnail.raw, mimetype=thumbnail.headers['content-type'], status=thumbnail.status_code)
|
||||
|
||||
return response
|
||||
|
||||
def videoplaybackProxy():
|
||||
# inspired by Yotter's video proxy
|
||||
# https://github.com/ytorg/Yotter/blob/b43a72ab7bfa5a59916fa3259cbc39165717c6bb/app/routes.py#L527
|
||||
|
||||
if not ythdd_globals.config['proxy']['allow_proxying_videos']:
|
||||
return Response(json.dumps({"error": "Administrator has disabled this endpoint"}), mimetype="application/json", status=403)
|
||||
|
||||
headers = dict(request.headers)
|
||||
proxy_headers = ythdd_globals.getHeaders(caller='proxy')
|
||||
if "Range" in headers:
|
||||
proxy_headers["Range"] = headers["Range"]
|
||||
|
||||
params = dict(request.args)
|
||||
# support md5 videoplayback url shortening
|
||||
if "md5" in params:
|
||||
if params["md5"] not in ythdd_globals.general_cache["hashed_videoplayback"]:
|
||||
return Response(json.dumps({"error": "Videoplayback request not cached or expired."}), mimetype="application/json", status=404)
|
||||
return redirect(ythdd_globals.general_cache["hashed_videoplayback"][params["md5"]]["original_url"])
|
||||
else:
|
||||
public_url = request.url
|
||||
public_url = ythdd_globals.config["general"]["public_facing_url"] + public_url[public_url.rfind("videoplayback"):]
|
||||
md5sum = hashlib.md5(public_url.encode("utf-8")).hexdigest()
|
||||
ythdd_globals.general_cache["hashed_videoplayback"][md5sum] = {"original_url": request.url}
|
||||
|
||||
# reconstruct the url
|
||||
# first attempt: from host param
|
||||
host = params.get('host')
|
||||
# failed? then try to get it from the rest of the params
|
||||
if host is None:
|
||||
# second attempt: reconstruct url from mn and mvi?
|
||||
# the host schema seems to be as follows:
|
||||
# rr{mvi[any]/fvip[any]?}---{mn[any]}.googlevideo.com
|
||||
# regarding mvi/fvip, it seems that any value smaller than 5 passes
|
||||
try:
|
||||
mvi = params.get('mvi').split(',')[-1]
|
||||
mn = params.get('mn').split(',')[-1]
|
||||
if int(mvi) > 5:
|
||||
mvi = 3 # invidious uses this as fallback
|
||||
host = f"rr{mvi}---{mn}.googlevideo.com"
|
||||
except (AttributeError, ValueError):
|
||||
return Response(json.dumps({"error": "Couldn't extract crucial parameters for hostname reconstruction"}, mimetype="application/json", status=400))
|
||||
else:
|
||||
# don't echo host "hint" back to the googlevideo server
|
||||
del params['host']
|
||||
# run a regex sanity check
|
||||
if re.fullmatch(r"[\w-]+\.googlevideo\.com", host) is None:
|
||||
# fallback behavior for unexpected hostnames
|
||||
return Response(json.dumps({"error": "Please either pass a valid host, or don't pass any"}), mimetype="application/json", status=400)
|
||||
|
||||
try:
|
||||
# request the proxied data
|
||||
remote_response = requests.get(f"https://{host}/videoplayback", headers=proxy_headers, params=params, stream=True)
|
||||
except:
|
||||
return Response(json.dumps({"error": "Couldn't connect to googlevideo host"}), mimetype="application/json", status=500)
|
||||
|
||||
# determine the chunk size
|
||||
chunk_size = 10 * 1024 # by default it's 10 MB (as this is the most youtube is willing to send without ratelimiting)
|
||||
# or the one in initcwndbps (if user enabled the config flag to match chunk_size with initcwndbps)
|
||||
if ythdd_globals.config['proxy']['match_initcwndbps']:
|
||||
try:
|
||||
chunk_size = int(params.get('initcwndbps') / 1024)
|
||||
except:
|
||||
pass
|
||||
# return a chunked response
|
||||
resp = Response(remote_response.iter_content(chunk_size=chunk_size), content_type=remote_response.headers['Content-Type'], status=remote_response.status_code, headers=remote_response.headers, direct_passthrough=True)
|
||||
resp.cache_control.public = True
|
||||
resp.cache_control.max_age = int(60_000)
|
||||
|
||||
return resp
|
||||
9
ythdd.py
9
ythdd.py
@@ -53,6 +53,9 @@ def setup():
|
||||
sanity_string += f" If you're running a reverse proxy, set {colors.OKCYAN}is_proxied{colors.ENDC} to true to silence this message.\n"
|
||||
print(sanity_string)
|
||||
|
||||
# Should work around disconnects: https://stackoverflow.com/a/61739721
|
||||
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {"pool_pre_ping": True}
|
||||
|
||||
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{config['general']['db_file_path']}"
|
||||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
||||
app.add_url_rule('/', view_func=views.index)
|
||||
@@ -61,7 +64,11 @@ def setup():
|
||||
app.add_url_rule('/api/', view_func=ythdd_api.api_greeting)
|
||||
app.add_url_rule('/api/<path:received_request>', view_func=ythdd_api.api_global_catchall)
|
||||
app.add_url_rule('/vi/<path:received_request>', view_func=views.thumbnailProxy)
|
||||
app.add_url_rule('/sb/<path:received_request>', view_func=views.storyboardProxy)
|
||||
app.add_url_rule('/ggpht/<path:received_request>', view_func=views.ggphtProxy)
|
||||
app.add_url_rule('/guc/<path:received_request>', view_func=views.gucProxy)
|
||||
app.add_url_rule('/img/<path:received_request>', view_func=views.imgProxy)
|
||||
app.add_url_rule('/videoplayback', view_func=views.videoplaybackProxy)
|
||||
db = ythdd_db.initDB(app, config)
|
||||
|
||||
with app.app_context():
|
||||
@@ -129,7 +136,7 @@ def main(args):
|
||||
app_port = port
|
||||
|
||||
setup()
|
||||
app.run(host=host, port=int(port))
|
||||
app.run(host=host, port=int(port), threaded=True)
|
||||
|
||||
if __name__ == "__main__":
|
||||
#app.run(host="127.0.0.1", port=5000)
|
||||
|
||||
20
ythdd_api.py
20
ythdd_api.py
@@ -1,9 +1,11 @@
|
||||
#!/usr/bin/python3
|
||||
from flask import Response, request
|
||||
from markupsafe import escape
|
||||
from ythdd_globals import colors as c
|
||||
import requests, time, json
|
||||
import ythdd_globals
|
||||
import ythdd_api_v1, ythdd_inv_tl
|
||||
import traceback
|
||||
|
||||
def api_greeting():
|
||||
string = {'status': 200, 'msg': f"ok (ythdd {ythdd_globals.version})", 'latest_api': f"v{ythdd_globals.apiVersion}"}
|
||||
@@ -12,8 +14,8 @@ def api_greeting():
|
||||
|
||||
def api_global_catchall(received_request):
|
||||
ythdd_globals.apiRequests += 1
|
||||
if request.environ['REMOTE_ADDR'] != "127.0.0.1" or (ythdd_globals.isProxied and request.environ['X-Forwarded-For'] != "127.0.0.1"):
|
||||
ythdd_globals.outsideApiHits += 1
|
||||
if request.environ['REMOTE_ADDR'] != "127.0.0.1" or (ythdd_globals.isProxied and request.environ['HTTP_X_FORWARDED_FOR'] != "127.0.0.1"):
|
||||
ythdd_globals.outsideApiHits += 1
|
||||
|
||||
request_list = received_request.split('/')
|
||||
api_version = request_list[0]
|
||||
@@ -25,10 +27,15 @@ def api_global_catchall(received_request):
|
||||
#return api_greeting()
|
||||
resp = api_greeting()
|
||||
try:
|
||||
status, received, data = ythdd_api_v1.lookup(request_list)
|
||||
status, received, data = ythdd_api_v1.lookup(request_list, request)
|
||||
except Exception as e:
|
||||
ythdd_globals.apiFailedRequests += 1
|
||||
stripped_filename = __file__[max(__file__.rfind("/"), __file__.rfind("\\")) + 1:]
|
||||
print(f"\n{c.FAIL}Error! /api/{received_request} -> {stripped_filename}:L{e.__traceback__.tb_lineno} -> {type(e).__name__}{c.ENDC}:\n"
|
||||
+ f"{traceback.format_exc()}")
|
||||
status, received, data = 500, f"internal server error: call ended in failure: {e}", []
|
||||
if ythdd_globals.config["general"]["debug"]:
|
||||
status, received, data = 500, f"internal server error: call ended in failure: {e} ({stripped_filename}:L{e.__traceback__.tb_lineno})", []
|
||||
resp = Response(json.dumps({'status': status, 'msg': received, 'data': data}), mimetype='application/json', status=status)
|
||||
elif request_list[0] == 'invidious':
|
||||
# drop 'invidious' from the list
|
||||
@@ -43,12 +50,17 @@ def api_global_catchall(received_request):
|
||||
# if a path has been supplied try to get appropriate data
|
||||
try:
|
||||
# lookup and construct a response
|
||||
resp = ythdd_inv_tl.lookup(request_list)
|
||||
resp = ythdd_inv_tl.lookup(request_list, request)
|
||||
#print(resp) # for debugging purposes
|
||||
# unless an error occurs
|
||||
except Exception as e:
|
||||
ythdd_globals.apiFailedRequests += 1
|
||||
stripped_filename = __file__[max(__file__.rfind("/"), __file__.rfind("\\")) + 1:]
|
||||
print(f"\n{c.FAIL}Error! /api/{received_request} -> {stripped_filename}:L{e.__traceback__.tb_lineno} -> {type(e).__name__}{c.ENDC}:\n"
|
||||
+ f"{traceback.format_exc()}")
|
||||
status, received, data = 500, f"internal server error: invidious translation call ended in failure: {e}", []
|
||||
if ythdd_globals.config["general"]["debug"]:
|
||||
status, received, data = 500, f"internal server error: invidious translation call ended in failure: {e} ({stripped_filename}:L{e.__traceback__.tb_lineno})", []
|
||||
resp = Response(json.dumps({'status': status, 'msg': received, 'data': data}), mimetype='application/json', status=status)
|
||||
else:
|
||||
ythdd_globals.apiFailedRequests += 1
|
||||
|
||||
@@ -8,6 +8,21 @@ import ythdd_globals, ythdd_extractor
|
||||
#from flask_sqlalchemy import SQLAlchemy
|
||||
#import ythdd_api_v1_stats, ythdd_api_v1_user, ythdd_api_v1_info, ythdd_api_v1_query, ythdd_api_v1_meta, ythdd_api_v1_admin
|
||||
|
||||
def requireAuthentication(admin: bool = True):
|
||||
def functionWrapper(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
token = kwargs["r"].args.get('token')
|
||||
if token == ythdd_globals.config['api']['api_key' + admin * '_admin']:
|
||||
try:
|
||||
status, received, data = func(*args, **kwargs)
|
||||
return status, received, data
|
||||
except:
|
||||
raise AssertionError(f"Function \"{func.__name__}\" does not return status, code, and data as it should!")
|
||||
else:
|
||||
return 401, "error", {'error_msg': "Unauthorized"}
|
||||
return wrapper
|
||||
return functionWrapper
|
||||
|
||||
def incrementBadRequests():
|
||||
ythdd_globals.apiFailedRequests += 1
|
||||
|
||||
@@ -29,7 +44,7 @@ def stats():
|
||||
"outside_api_requests": ythdd_globals.outsideApiHits,
|
||||
"local_api_requests": ythdd_globals.apiRequests - ythdd_globals.outsideApiHits
|
||||
}
|
||||
return 200, "OK", data_to_send
|
||||
return 200, "ok", data_to_send
|
||||
|
||||
def videoIdSanityCheck(videoId: str):
|
||||
if len(videId) != 11:
|
||||
@@ -91,7 +106,7 @@ def hot(data):
|
||||
# try to get the data
|
||||
try:
|
||||
started = time.time()
|
||||
extracted_dict = ythdd_extractor.extract(url_lookup[data[1]] + videoId, getcomments=getcomments, maxcomments=comment_count)
|
||||
extracted_dict = ythdd_extractor.extract(url_lookup[data[1]] + videoId, getcomments=getcomments, maxcomments=comment_count, manifest_fix=True)
|
||||
extracted_dict["took"] = time.time() - started
|
||||
return 200, "OK", extracted_dict
|
||||
except Exception as e:
|
||||
@@ -129,7 +144,14 @@ def hot(data):
|
||||
incrementBadRequests()
|
||||
return notImplemented([data[1]]) # workaround before notImplemented is reworked
|
||||
|
||||
def lookup(data):
|
||||
@requireAuthentication(admin=True)
|
||||
def debugger_halt(r):
|
||||
if not ythdd_globals.config["api"]["enable_debugger_halt"]:
|
||||
return 403, "Administrator has disabled access for this endpoint.", []
|
||||
breakpoint()
|
||||
return 200, "Pdb triggered and ended successfully.", []
|
||||
|
||||
def lookup(data, request):
|
||||
match data[0]:
|
||||
case 'stats':
|
||||
return stats()
|
||||
@@ -149,6 +171,8 @@ def lookup(data):
|
||||
case 'admin':
|
||||
# REQUIRE CREDENTIALS!
|
||||
return stub_hello()
|
||||
case 'halt':
|
||||
return debugger_halt(r=request)
|
||||
case _:
|
||||
incrementBadRequests()
|
||||
return notImplemented(data)
|
||||
@@ -1,5 +1,8 @@
|
||||
#!/usr/bin/python3
|
||||
import brotli, yt_dlp, requests, json, time
|
||||
from http.cookiejar import MozillaCookieJar
|
||||
from ythdd_globals import safeTraverse
|
||||
import ythdd_proto
|
||||
import ythdd_globals
|
||||
|
||||
ytdl_opts = {
|
||||
@@ -12,7 +15,16 @@ ytdl_opts = {
|
||||
"default": "%(id)s.%(ext)s",
|
||||
"chapter": "%(id)s.%(ext)s_%(section_number)03d_%(section_title)s.%(ext)s"
|
||||
},
|
||||
"simulate": True
|
||||
"extractor_args": {
|
||||
"youtube": {
|
||||
# "formats": ["dashy"]
|
||||
}
|
||||
},
|
||||
"simulate": True,
|
||||
"js_runtimes": {
|
||||
"deno": {}
|
||||
},
|
||||
'remote_components': ['ejs:github']
|
||||
}
|
||||
|
||||
stage1_headers = {
|
||||
@@ -60,7 +72,7 @@ stage1_body = {
|
||||
|
||||
stage2_headers = {
|
||||
"Connection": "keep-alive",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"Accept-Language": "en-us,en;q=0.5",
|
||||
"Sec-Fetch-Mode": "navigate",
|
||||
@@ -69,13 +81,13 @@ stage2_headers = {
|
||||
|
||||
stage3_headers = {
|
||||
"Connection": "keep-alive",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"Accept-Language": "en-us,en;q=0.5",
|
||||
"Sec-Fetch-Mode": "navigate",
|
||||
"Content-Type": "application/json",
|
||||
"X-Youtube-Client-Name": "1",
|
||||
"X-Youtube-Client-Version": "2.20241126.01.00",
|
||||
"X-Youtube-Client-Version": "2.20251103.01.00",
|
||||
"Origin": "https://www.youtube.com",
|
||||
"Accept-Encoding": "gzip, deflate, br",
|
||||
"Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI"
|
||||
@@ -87,7 +99,7 @@ stage3_body = {
|
||||
"client":
|
||||
{
|
||||
"clientName": "WEB",
|
||||
"clientVersion": "2.20241126.01.00",
|
||||
"clientVersion": "2.20251103.01.00",
|
||||
"hl": "en",
|
||||
"timeZone": "UTC",
|
||||
"utcOffsetMinutes": 0
|
||||
@@ -98,23 +110,73 @@ stage3_body = {
|
||||
"racyCheckOk": True
|
||||
}
|
||||
|
||||
def extract(url: str, getcomments=False, maxcomments=""):
|
||||
web_context_dict = {
|
||||
'context': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'deviceMake': '',
|
||||
'deviceModel': '',
|
||||
'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0,gzip(gfe)',
|
||||
'clientName': 'WEB',
|
||||
'clientVersion': '2.20251103.01.00',
|
||||
'osName': 'Windows',
|
||||
'osVersion': '10.0',
|
||||
'screenPixelDensity': 2,
|
||||
'platform': 'DESKTOP',
|
||||
'screenDensityFloat': 2,
|
||||
'userInterfaceTheme': 'USER_INTERFACE_THEME_LIGHT',
|
||||
'browserName': 'Firefox',
|
||||
'browserVersion': '142.0',
|
||||
'acceptHeader': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||||
'utcOffsetMinutes': 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def extract(url: str, getcomments=False, maxcomments="", manifest_fix=False, use_cookies=None):
|
||||
# TODO: check user-agent and cookiefile
|
||||
|
||||
ytdl_context = ytdl_opts.copy()
|
||||
|
||||
if ythdd_globals.config['extractor']['user-agent']:
|
||||
yt_dlp.utils.std_headers['User-Agent'] = ythdd_globals.config['extractor']['user-agent']
|
||||
|
||||
if ythdd_globals.config['extractor']['cookies_path']:
|
||||
ytdl_opts['cookiefile'] = ythdd_globals.config['extractor']['cookies_path']
|
||||
|
||||
if len(url) == 11:
|
||||
url = "https://www.youtube.com/watch?v=" + url
|
||||
if getcomments:
|
||||
ytdl_opts['getcomments'] = True
|
||||
ytdl_context['getcomments'] = True
|
||||
if maxcomments:
|
||||
ytdl_opts['extractor_args'] = {'youtube': {'max_comments': [maxcomments, "all", "all", "all"]}}
|
||||
with yt_dlp.YoutubeDL(ytdl_opts) as ytdl:
|
||||
result = ytdl.extract_info(url, download=False)
|
||||
ytdl_context['extractor_args']['youtube']['max_comments'] = [maxcomments, "all", "all", "all"]
|
||||
if manifest_fix:
|
||||
# https://github.com/yt-dlp/yt-dlp/issues/11952#issuecomment-2565802294
|
||||
ytdl_context['extractor_args']['youtube']['player_client'] = ['default', 'web_safari']
|
||||
elif ythdd_globals.config['extractor']['preferred_extractor']:
|
||||
ytdl_context['extractor_args']['youtube']['player_client'] = [ythdd_globals.config['extractor']['preferred_extractor']]
|
||||
else:
|
||||
ytdl_context['extractor_args']['youtube']['player_client'] = ['android_vr']
|
||||
|
||||
if use_cookies is not None:
|
||||
# can be either "global", "agegated" or None
|
||||
deno_path = ythdd_globals.config['extractor']['deno_path']
|
||||
match use_cookies:
|
||||
case "global":
|
||||
ytdl_context['cookiefile'] = ythdd_globals.config['extractor']['cookies_path']
|
||||
ytdl_context['extractor_args']['youtube']['player_client'] = ['tv']
|
||||
if not deno_path:
|
||||
print("FATAL ERROR: deno path is required for playback using cookies!")
|
||||
ytdl_context['js_runtimes']['deno']['path'] = deno_path if deno_path else ""
|
||||
case "agegated":
|
||||
ytdl_context['cookiefile'] = ythdd_globals.config['extractor']['age_restricted_cookies_path']
|
||||
ytdl_context['extractor_args']['youtube']['player_client'] = ['tv']
|
||||
if not deno_path:
|
||||
print("FATAL ERROR: deno path is required for playback of age-restricted content!")
|
||||
ytdl_context['js_runtimes']['deno']['path'] = deno_path if deno_path else ""
|
||||
case None | _:
|
||||
pass
|
||||
|
||||
with yt_dlp.YoutubeDL(ytdl_context) as ytdl:
|
||||
result = ytdl.sanitize_info(ytdl.extract_info(url, download=False))
|
||||
return result
|
||||
|
||||
def WEBrelated(url: str):
|
||||
@@ -137,7 +199,7 @@ def WEBrelated(url: str):
|
||||
|
||||
return extracted_json["contents"]['twoColumnWatchNextResults']["secondaryResults"]
|
||||
|
||||
def WEBextractSinglePage(uri: str):
|
||||
def WEBextractSinglePage(uri: str, use_cookies=None):
|
||||
# WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME
|
||||
|
||||
start_time = time.time()
|
||||
@@ -145,11 +207,25 @@ def WEBextractSinglePage(uri: str):
|
||||
if len(uri) != 11:
|
||||
raise ValueError("WEBextractSinglePage expects a single, 11-character long argument")
|
||||
|
||||
response = requests.get("https://www.youtube.com/watch?v=" + uri, headers=ythdd_globals.getHeaders(caller='extractor'))
|
||||
cookies = None
|
||||
if use_cookies is not None:
|
||||
match use_cookies:
|
||||
case "global":
|
||||
ythdd_globals.print_debug("wdata: using global cookies")
|
||||
cookies = MozillaCookieJar(ythdd_globals.config["extractor"]["cookies_path"])
|
||||
cookies.load()
|
||||
case "agegated":
|
||||
ythdd_globals.print_debug("wdata: using agegated cookies")
|
||||
cookies = MozillaCookieJar(ythdd_globals.config["extractor"]["age_restricted_cookies_path"])
|
||||
cookies.load()
|
||||
case None | _:
|
||||
pass
|
||||
|
||||
response = requests.get("https://www.youtube.com/watch?v=" + uri, headers=ythdd_globals.getHeaders(caller='extractor'), cookies=cookies)
|
||||
extracted_string = str(response.content.decode('utf8', 'unicode_escape'))
|
||||
start = extracted_string.find('{"responseContext":{"serviceTrackingParams":')
|
||||
start = extracted_string.find('{"responseContext":')
|
||||
end = extracted_string.find(';var ', start)
|
||||
start2 = extracted_string.find('{"responseContext":{"serviceTrackingParams":', start + 1)
|
||||
start2 = extracted_string.find('{"responseContext":', start + 1)
|
||||
end2 = extracted_string.find(';</script>', start2)
|
||||
extracted_json1 = json.loads(extracted_string[start:end])
|
||||
extracted_json2 = json.loads(extracted_string[start2:end2])
|
||||
@@ -158,6 +234,19 @@ def WEBextractSinglePage(uri: str):
|
||||
|
||||
return {'ec1': extracted_json1, 'ec2': extracted_json2, 'took': end_time - start_time}
|
||||
|
||||
def paramsFromUrl(url: str) -> dict:
|
||||
# Returns a dictionary of params from a given URL.
|
||||
split_list = url.split("&")
|
||||
params = {}
|
||||
|
||||
for num, string in enumerate(split_list):
|
||||
if num == 0:
|
||||
string = string[string.find("?") + 1:]
|
||||
key, value = string.split("=")
|
||||
params[key] = value
|
||||
|
||||
return params
|
||||
|
||||
def IOSextract(uri: str):
|
||||
|
||||
start = time.time()
|
||||
@@ -180,3 +269,344 @@ def IOSextract(uri: str):
|
||||
|
||||
#return {'stage1': stage1, 'stage2': stage2, 'stage3': stage3, 'took': end - start}
|
||||
return {'stage1': stage1, 'stage3': stage3, 'took': end - start}
|
||||
|
||||
def makeWebContext(secondaryContextDict: dict):
|
||||
# Uses web_context_dict to create a context, returns a dict.
|
||||
# Essentially, expands the web_context_dict with a secondary one.
|
||||
|
||||
current_web_context_dict = web_context_dict.copy()
|
||||
|
||||
for key in secondaryContextDict:
|
||||
current_web_context_dict[key] = secondaryContextDict[key]
|
||||
|
||||
return current_web_context_dict
|
||||
|
||||
def getChannelAvatar(response_json: dict):
|
||||
# Returns a dictionary: {url: <proxied url to remote server>, width: ..., height: ...}
|
||||
# containing the best resolution in terms of pixel count.
|
||||
# A great majority of the code has been influenced by https://github.com/iv-org/invidious/blob/master/src/invidious/channels/about.cr.
|
||||
|
||||
avatars = safeTraverse(response_json, ['metadata', 'channelMetadataRenderer', 'avatar', 'thumbnails'], default=None)
|
||||
|
||||
if avatars is None:
|
||||
# fallback to lower resolution avatars
|
||||
avatars = safeTraverse(response_json, ['header',
|
||||
'pageHeaderRenderer',
|
||||
'content',
|
||||
'pageHeaderViewModel',
|
||||
'image',
|
||||
'decoratedAvatarViewModel',
|
||||
'avatar',
|
||||
'avatarViewModel',
|
||||
'image',
|
||||
'sources'], default=None)
|
||||
|
||||
# if avatars is None: # TODO: if avatars is still None, use a local avatar
|
||||
|
||||
best_avatar = avatars[-1] # usually, the best avatar is stored last
|
||||
for avatar in avatars:
|
||||
if avatar['width'] * avatar['height'] > best_avatar['width'] * best_avatar['height']:
|
||||
best_avatar = avatar
|
||||
|
||||
# or use regex substitution and set the size to something like 512x512
|
||||
# e.g.: =s128 -> =s512
|
||||
|
||||
best_avatar['url'] = ythdd_globals.translateLinks(best_avatar['url'])
|
||||
|
||||
return best_avatar
|
||||
|
||||
def generateChannelAvatarsFromUrl(url: str, proxied: bool = True) -> list:
|
||||
# Generates channel avatars at default sizes.
|
||||
|
||||
# avatar urls for channels in search results start with //yt3.ggpht.com/
|
||||
if url.startswith("//"):
|
||||
url = "https:" + url
|
||||
|
||||
avatars = []
|
||||
if not url.startswith("https://yt3.ggpht.com/") and not url.startswith("https://yt3.googleusercontent.com/"):
|
||||
return []
|
||||
|
||||
url = ythdd_globals.translateLinks(url)
|
||||
url_size_start = url.rfind("=s") + 2
|
||||
url_size_end = url. find("-", url_size_start)
|
||||
|
||||
default_sizes = [32, 48, 76, 100, 176, 512]
|
||||
|
||||
for size in default_sizes:
|
||||
avatars.append(
|
||||
{
|
||||
"url": url[:url_size_start] + str(size) + url[url_size_end:],
|
||||
"width": size,
|
||||
"height": size
|
||||
}
|
||||
)
|
||||
|
||||
return avatars
|
||||
|
||||
def isVerified(response_json: dict) -> bool:
|
||||
# Returns True if any user badge has been found (verified/artist).
|
||||
|
||||
if not isinstance(response_json, dict):
|
||||
return False
|
||||
|
||||
match safeTraverse(list(response_json.keys()), [0], default=""):
|
||||
case "metadataBadgeRenderer": # channels in search results
|
||||
verified = safeTraverse(response_json, ["metadataBadgeRenderer", "tooltip"], default="") in ("Verified", "Official Artist Channel") # perhaps look for badge styles?
|
||||
return verified
|
||||
|
||||
return False
|
||||
|
||||
def isPremium(response_json: dict) -> bool:
|
||||
# Returns True if content is paid (member-only).
|
||||
|
||||
if not isinstance(response_json, dict):
|
||||
return False
|
||||
|
||||
match safeTraverse(list(response_json.keys()), [0], default=""):
|
||||
case "metadataBadgeRenderer": # channels in search results
|
||||
paid = safeTraverse(response_json, ["metadataBadgeRenderer", "style"], default="") in ("BADGE_STYLE_TYPE_MEMBERS_ONLY")
|
||||
return paid
|
||||
|
||||
return False
|
||||
|
||||
def browseChannel(ucid: str, params: str = None, ctoken: str = None):
|
||||
# Returns the response from innertubes browse endpoint for channels (as a dict).
|
||||
|
||||
if len(ucid) != 24:
|
||||
raise ValueError(f"Something is wrong with the UCID {ucid}. Expected a 24-character long channel ID, not {len(ucid)}.")
|
||||
|
||||
additional_context = {'browseId': ucid}
|
||||
if params is not None:
|
||||
additional_context['params'] = params
|
||||
if ctoken is not None:
|
||||
additional_context['continuation'] = ctoken
|
||||
|
||||
context = makeWebContext(additional_context)
|
||||
|
||||
response = requests.post(
|
||||
'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
|
||||
headers = ythdd_globals.getHeaders(),
|
||||
json = context,
|
||||
)
|
||||
|
||||
response_json = json.loads(response.text)
|
||||
|
||||
return response_json
|
||||
|
||||
def WEBextractSearchResults(search_query: str, page: int) -> list:
|
||||
# Posts a search request to innertube API
|
||||
# and processes only the relevant part (the actual results)
|
||||
|
||||
if search_query is None:
|
||||
return []
|
||||
|
||||
additional_context = {"query": search_query}
|
||||
if page is not None:
|
||||
params = ythdd_proto.produceSearchParams(page)
|
||||
additional_context["params"] = params
|
||||
|
||||
web_context = makeWebContext(additional_context)
|
||||
response = requests.post('https://www.youtube.com/youtubei/v1/search',
|
||||
params={"prettyPrint": False},
|
||||
headers=stage2_headers,
|
||||
data=json.dumps(web_context)
|
||||
)
|
||||
|
||||
results = []
|
||||
try:
|
||||
results = json.loads(response.text)
|
||||
except:
|
||||
pass
|
||||
results = safeTraverse(results, ["contents", "twoColumnSearchResultsRenderer", "primaryContents", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents"], default=[])
|
||||
|
||||
return results
|
||||
|
||||
def WEBgetSearchSuggestions(query: str, previous_query: str = '') -> list:
|
||||
# Takes in a search query and returns relevant suggestions.
|
||||
# Can optionally take the previous query but that's rather novel and
|
||||
# not supported across players nor invidious API itself.
|
||||
|
||||
suggestions = []
|
||||
|
||||
if not isinstance(query, str):
|
||||
print("WEBgetSearchSuggestions: query is not a string (as it should)")
|
||||
return {}
|
||||
if not isinstance(previous_query, str):
|
||||
previous_query = ''
|
||||
|
||||
if ythdd_globals.config["general"]["cache"]:
|
||||
# look for cached suggestions
|
||||
for cached_search in ythdd_globals.general_cache["search"]:
|
||||
if cached_search["q"] == query.lower() and cached_search["pq"] == previous_query.lower():
|
||||
# found it? skip ahead
|
||||
suggestions = cached_search["resp"]
|
||||
break
|
||||
|
||||
# request wasn't cached? query the API
|
||||
if suggestions == []:
|
||||
|
||||
params = {
|
||||
'ds': 'yt',
|
||||
'hl': 'en', # host language
|
||||
'gl': 'us', # geolocation
|
||||
'client': 'youtube',
|
||||
'gs_ri': 'youtube',
|
||||
'q': query, # query
|
||||
'pq': previous_query # previous query
|
||||
}
|
||||
|
||||
response = requests.get(
|
||||
'https://suggestqueries-clients6.youtube.com/complete/search',
|
||||
params=params,
|
||||
headers=stage2_headers
|
||||
)
|
||||
|
||||
# can break anytime but hopefully the tiny speed gain will make up for it
|
||||
results = response.text[23 + len(query):]
|
||||
results = results[:results.rfind("{") - 1]
|
||||
results = json.loads(results)
|
||||
|
||||
for result in results:
|
||||
suggestions.append(result[0])
|
||||
|
||||
# cache response
|
||||
if ythdd_globals.config["general"]["cache"]:
|
||||
ythdd_globals.general_cache["search"].append(
|
||||
{
|
||||
"q": query.lower(),
|
||||
"pq": previous_query.lower(),
|
||||
"resp": suggestions
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
"query": query,
|
||||
"suggestions": suggestions
|
||||
}
|
||||
|
||||
def WEBgetVideoComments(ctoken: str) -> tuple:
|
||||
|
||||
# ctoken needs to be passed explicitly.
|
||||
# no guessing or retrieving it from globals.
|
||||
if ctoken is None:
|
||||
return [], ""
|
||||
|
||||
# build web context containing the relevant ctoken
|
||||
web_context = makeWebContext({"continuation": ctoken})
|
||||
response = requests.post('https://www.youtube.com/youtubei/v1/next',
|
||||
params={"prettyPrint": False},
|
||||
headers=stage2_headers,
|
||||
data=json.dumps(web_context)
|
||||
)
|
||||
|
||||
results = []
|
||||
try:
|
||||
results = json.loads(response.text)
|
||||
except:
|
||||
pass
|
||||
|
||||
comments = safeTraverse(results, ["frameworkUpdates", "entityBatchUpdate", "mutations"], default=[])
|
||||
comment_continuations = []
|
||||
comment_continuations_re = safeTraverse(results, ["onResponseReceivedEndpoints"], default=[])
|
||||
for received_endpoint in comment_continuations_re:
|
||||
|
||||
# this is horrible...
|
||||
|
||||
acia = safeTraverse(received_endpoint, ["appendContinuationItemsAction", "continuationItems"], default=[])
|
||||
rcic = safeTraverse(received_endpoint, ["reloadContinuationItemsCommand", "continuationItems"], default=[])
|
||||
|
||||
for entry in acia:
|
||||
if "commentThreadRenderer" in entry or "continuationItemRenderer" in entry:
|
||||
comment_continuations = acia
|
||||
break
|
||||
|
||||
for entry in rcic:
|
||||
if "commentThreadRenderer" in entry or "continuationItemRenderer" in entry:
|
||||
comment_continuations = rcic
|
||||
break
|
||||
|
||||
if comment_continuations != []:
|
||||
break
|
||||
|
||||
if comment_continuations == []:
|
||||
print("error: received an unknown comment structure, unable to parse continuations (replies)")
|
||||
# breakpoint()
|
||||
# return [], ""
|
||||
|
||||
# extract new continuation
|
||||
new_continuation = ""
|
||||
if "continuationItemRenderer" in safeTraverse(comment_continuations, [-1], default=[]):
|
||||
# first, look for ctoken inside of response for next page of comments
|
||||
new_continuation = safeTraverse(comment_continuations, [-1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default=None)
|
||||
# or search elsewhere in case this is a reply thread
|
||||
if new_continuation is None:
|
||||
new_continuation = safeTraverse(comment_continuations, [-1, "continuationItemRenderer", "button", "buttonRenderer", "command", "continuationCommand", "token"], default="")
|
||||
|
||||
# perform a basic mutation check before parsing
|
||||
# will ignore replies liked by video uploader ("hearts")
|
||||
actual_comments = [x for x in comments if "properties" in safeTraverse(x, ["payload", "commentEntityPayload"], default=[], quiet=True)]
|
||||
actual_comment_continuations = [x for x in comment_continuations if "replies" in safeTraverse(x, ["commentThreadRenderer"], default=[], quiet=True)]
|
||||
|
||||
# link reply data (reply count and ctoken) for comments with replies
|
||||
for reply_renderer in actual_comment_continuations:
|
||||
|
||||
mutual_key = safeTraverse(reply_renderer, ["commentThreadRenderer", "commentViewModel", "commentViewModel", "commentKey"], default="unknown-key")
|
||||
reply_ctoken = safeTraverse(reply_renderer, ["commentThreadRenderer", "replies", "commentRepliesRenderer", "contents", 0, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
|
||||
reply_count = safeTraverse(reply_renderer, ["commentThreadRenderer", "replies", "commentRepliesRenderer", "viewReplies", "buttonRenderer", "text", "runs", 0, "text"], default="0 replies").split(" ")[0]
|
||||
|
||||
# suspected a/b test. can be worked arount with on-demand ctoken creation.
|
||||
# workaround for yt not showing replies when sorting for "top" comments
|
||||
try:
|
||||
int(reply_count) # can be just "Replies"
|
||||
except:
|
||||
reply_count = "0"
|
||||
|
||||
for comment in actual_comments:
|
||||
found_key = safeTraverse(comment, ["entityKey"], default="unknown-key")
|
||||
# try to link a relevant ctoken if a comment has response
|
||||
if found_key == mutual_key:
|
||||
if ythdd_globals.config["general"]["debug"]: print(f"found reply for {found_key}")
|
||||
comment["replies"] = {
|
||||
"replyCount": int(reply_count),
|
||||
"continuation": reply_ctoken
|
||||
}
|
||||
|
||||
return actual_comments, new_continuation
|
||||
|
||||
def WEBextractPlaylist(plid: str = "", ctoken: str = ""):
|
||||
|
||||
# if ctoken has been provided, use it
|
||||
if ctoken:
|
||||
# playlist id can be omitted if ctoken is provided
|
||||
additional_context = {'continuation': ctoken}
|
||||
else:
|
||||
# try to create ctoken which will allow for accessing the full playlist, including delisted/deleted videos
|
||||
additional_context = {'continuation': ythdd_proto.producePlaylistContinuation(plid, offset=0)}
|
||||
|
||||
context = makeWebContext(additional_context)
|
||||
|
||||
response = requests.post(
|
||||
'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
|
||||
headers = ythdd_globals.getHeaders(),
|
||||
json = context
|
||||
)
|
||||
|
||||
resp_json = json.loads(response.text)
|
||||
|
||||
# if this is a first-time fetch (no ctoken passed), extract metadata
|
||||
metadata = None
|
||||
if not ctoken:
|
||||
metadata = {
|
||||
"microformat": safeTraverse(resp_json, ["microformat"]),
|
||||
"sidebar": safeTraverse(resp_json, ["sidebar"])
|
||||
}
|
||||
|
||||
# extract continuation
|
||||
new_continuation = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems", -1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"])
|
||||
|
||||
# "best-effort" playlist's videos extraction
|
||||
# "best-effort" because None's (unsuccessful video extraction = None) are passed as they are
|
||||
videos = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"]) # includes continuation as last element of list, which will be ignored
|
||||
|
||||
return metadata, new_continuation, videos
|
||||
|
||||
|
||||
@@ -22,6 +22,8 @@ configfile = "config.toml"
|
||||
version = "0.0.1"
|
||||
apiVersion = "1"
|
||||
randomly_generated_passcode = 0
|
||||
video_cache = {}
|
||||
general_cache = {"search": [], "continuations": {"channels": {}, "comments": {}}, "channels": {}, "playlists": {}, "storyboards": {}, "hashed_videoplayback": {}}
|
||||
|
||||
def getConfig(configfile):
|
||||
|
||||
@@ -30,7 +32,7 @@ def getConfig(configfile):
|
||||
global randomly_generated_passcode
|
||||
|
||||
if not os.path.exists(configfile):
|
||||
dummy_config = {'general': {'db_file_path': 'ythdd_db.sqlite', 'video_storage_directory_path': 'videos/', 'is_proxied': False, 'public_facing_url': 'http://localhost:5000/'}, 'api': {'api_key': 'CHANGEME'}, 'extractor': {'user-agent': '', 'cookies_path': ''}, 'admin': {'admins': ['admin']}, 'yt_dlp': {}, 'postprocessing': {'presets': [{'name': 'recommended: [N][<=720p] best V+A', 'format': 'bv[height<=720]+ba', 'reencode': ''}, {'name': '[N][1080p] best V+A', 'format': 'bv[height=1080]+ba', 'reencode': ''}, {'name': '[R][1080p] webm', 'format': 'bv[height=1080]+ba', 'reencode': 'webm'}, {'name': '[N][720p] best V+A', 'format': 'bv[height=720]+ba', 'reencode': ''}, {'name': '[R][720p] webm', 'format': 'bv[height=720]+ba', 'reencode': 'webm'}, {'name': '[N][480p] best V+A', 'format': 'bv[height=480]+ba', 'reencode': ''}, {'name': '[480p] VP9 webm/reencode', 'format': 'bv*[height=480][ext=webm]+ba/bv[height=480]+ba', 'reencode': 'webm'}, {'name': '[N][1080p] best video only', 'format': 'bv[height=1080]', 'reencode': ''}, {'name': '[N][opus] best audio only', 'format': 'ba', 'reencode': 'opus'}]}}
|
||||
dummy_config = {'general': {'db_file_path': 'ythdd_db.sqlite', 'video_storage_directory_path': 'videos/', 'is_proxied': False, 'public_facing_url': 'http://127.0.0.1:5000/', 'debug': False, 'cache': True}, 'api': {'api_key': 'CHANGEME', 'enable_debugger_halt': False}, 'proxy': {'user-agent': '', 'allow_proxying_videos': True, 'match_initcwndbps': True}, 'extractor': {'user-agent': '', 'cookies_path': ''}, 'admin': {'admins': ['admin']}, 'yt_dlp': {}, 'postprocessing': {'presets': [{'name': 'recommended: [N][<=720p] best V+A', 'format': 'bv[height<=720]+ba', 'reencode': ''}, {'name': '[N][1080p] best V+A', 'format': 'bv[height=1080]+ba', 'reencode': ''}, {'name': '[R][1080p] webm', 'format': 'bv[height=1080]+ba', 'reencode': 'webm'}, {'name': '[N][720p] best V+A', 'format': 'bv[height=720]+ba', 'reencode': ''}, {'name': '[R][720p] webm', 'format': 'bv[height=720]+ba', 'reencode': 'webm'}, {'name': '[N][480p] best V+A', 'format': 'bv[height=480]+ba', 'reencode': ''}, {'name': '[480p] VP9 webm/reencode', 'format': 'bv*[height=480][ext=webm]+ba/bv[height=480]+ba', 'reencode': 'webm'}, {'name': '[N][1080p] best video only', 'format': 'bv[height=1080]', 'reencode': ''}, {'name': '[N][opus] best audio only', 'format': 'ba', 'reencode': 'opus'}]}}
|
||||
# if a passcode has not been provided by the user (config file doesn't exist, and user didn't specify it using an argument)
|
||||
print(f"{colors.WARNING}WARNING{colors.ENDC}: Using default, baked in config data. {colors.ENDL}"
|
||||
f" Consider copying and editing the provided example file ({colors.OKCYAN}config.default.toml{colors.ENDC}).")
|
||||
@@ -58,7 +60,7 @@ def getHeaders(caller="proxy"):
|
||||
|
||||
# NOTE: use ESR user-agent
|
||||
# user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:130.0) Gecko/20100101 Firefox/130.0'
|
||||
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0'
|
||||
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0'
|
||||
|
||||
if config[caller]['user-agent']:
|
||||
user_agent = config[caller]['user-agent']
|
||||
@@ -82,12 +84,74 @@ def getHeaders(caller="proxy"):
|
||||
|
||||
return headers
|
||||
|
||||
def translateLinks(link):
|
||||
def translateLinks(link: str, remove_params: bool = True):
|
||||
|
||||
link = link.replace("https://i.ytimg.com/", config['general']['public_facing_url'])
|
||||
link = link.replace("https://yt3.ggpht.com/", config['general']['public_facing_url'] + "ggpht/")
|
||||
link = link.replace("https://yt3.googleusercontent.com/", config['general']['public_facing_url'] + "guc/")
|
||||
|
||||
# try to remove tracking params
|
||||
if remove_params and "?" in link:
|
||||
link = link[:link.find("?")]
|
||||
|
||||
return link
|
||||
|
||||
def getUptime():
|
||||
return int(time.time()) - starttime
|
||||
|
||||
def safeTraverse(obj: dict, path: list, default=None, quiet: bool = False):
|
||||
"""
|
||||
Traverse dynamic objects with fallback to default values
|
||||
|
||||
This function can take an Ellipsis as part of traversal path,
|
||||
meaning that it will return the object from the list
|
||||
that contains the next key. This has been introduced
|
||||
so that no matter which object in a list holds the relevant
|
||||
model, it will find it (meaning no assumptions are necessary).
|
||||
Kepp in mind that only one ellipsis at a time is supported,
|
||||
thus ["some_key", ..., ..., "some_other_key"] won't work.
|
||||
|
||||
:param obj: Traversed object
|
||||
:type obj: dict
|
||||
:param path: Path which shall be traversed
|
||||
:type path: list
|
||||
:param default: Default value returned on failure
|
||||
:type default: any, None by default
|
||||
:param quiet: Quiet flag
|
||||
:type quiet: bool
|
||||
"""
|
||||
result = obj
|
||||
try:
|
||||
# for every item in path and its position
|
||||
for pos, iterable_key in enumerate(path):
|
||||
# if the key is not an ellipsis, traverse it
|
||||
if iterable_key is not Ellipsis:
|
||||
result = result[iterable_key]
|
||||
# if it is an ellipsis, and there is another key beside it
|
||||
elif pos < len(path) - 1:
|
||||
# then iterate through all of the list contents
|
||||
for list_content in result:
|
||||
# in search of the next traversal key
|
||||
if path[pos + 1] in list_content:
|
||||
result = list_content
|
||||
# show an error message if ellipsis is used incorrectly
|
||||
else:
|
||||
print("error(safeTraverse): Traversal path can't end with an Ellipsis!")
|
||||
raise TypeError()
|
||||
# handle exceptions
|
||||
except (KeyError, TypeError, IndexError):
|
||||
result = default
|
||||
if not quiet: print(f"error reading: {' -> '.join([str(x) for x in path])} - returning: {default}")
|
||||
finally:
|
||||
return result
|
||||
|
||||
def getCommit() -> str | None:
|
||||
try:
|
||||
return Repo(search_parent_directories=True).head.object.hexsha
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
def print_debug(text: str) -> None:
|
||||
# Will print a string only if debugging is enabled.
|
||||
if config["general"]["debug"]:
|
||||
print(text)
|
||||
|
||||
992
ythdd_inv_tl.py
992
ythdd_inv_tl.py
File diff suppressed because it is too large
Load Diff
96
ythdd_proto.py
Normal file
96
ythdd_proto.py
Normal file
@@ -0,0 +1,96 @@
|
||||
from ythdd_globals import safeTraverse
|
||||
import base64
|
||||
import blackboxprotobuf as bbpb
|
||||
import json
|
||||
import urllib.parse
|
||||
import ythdd_globals
|
||||
|
||||
def bbpbToB64(msg_and_typedef: tuple, urlsafe: bool = False, padding: bool = False) -> str:
|
||||
encoded_protobuf = bbpb.encode_message(*msg_and_typedef)
|
||||
if urlsafe:
|
||||
b64_protobuf = base64.urlsafe_b64encode(encoded_protobuf)
|
||||
else:
|
||||
b64_protobuf = base64.b64encode(encoded_protobuf)
|
||||
if padding:
|
||||
url_encoded_b64 = urllib.parse.quote(b64_protobuf.decode())
|
||||
else:
|
||||
url_encoded_b64 = b64_protobuf.decode().rstrip('=')
|
||||
return url_encoded_b64
|
||||
|
||||
def fdictToBbpb(msg: dict) -> tuple:
|
||||
# Requires Python 3.7+ or CPython 3.6+,
|
||||
# as these versions preserve dictionary insertion order.
|
||||
# Structural matching (match, case) requires Python 3.10+.
|
||||
clean_msg = {}
|
||||
clean_type = {}
|
||||
for key in msg:
|
||||
num, type = key.split(":")
|
||||
|
||||
match type:
|
||||
case "message":
|
||||
# if the type is an embedded message
|
||||
internal_msg, internal_type = fdictToBbpb(msg[key])
|
||||
# msg can just be appended as usual
|
||||
clean_msg[num] = internal_msg
|
||||
# type contains more fields than normally
|
||||
clean_type[num] = {
|
||||
'field_order': list(internal_msg.keys()),
|
||||
'message_typedef': internal_type,
|
||||
'type': type
|
||||
}
|
||||
|
||||
case "base64" | "base64u" | "base64p" | "base64up":
|
||||
# if the type is a base64-embedded message
|
||||
internal_msg, internal_type = fdictToBbpb(msg[key])
|
||||
match type.removeprefix("base64"):
|
||||
case "":
|
||||
b64_encoded_msg = bbpbToB64((internal_msg, internal_type))
|
||||
case "u":
|
||||
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), urlsafe=True)
|
||||
case "p":
|
||||
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), padding=True)
|
||||
case "up":
|
||||
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), urlsafe=True, padding=True)
|
||||
clean_msg[num] = b64_encoded_msg
|
||||
clean_type[num] = {'type': 'string'}
|
||||
|
||||
case "int" | "string":
|
||||
clean_msg[num] = msg[key]
|
||||
clean_type[num] = {'type': type}
|
||||
|
||||
case _:
|
||||
raise KeyError(f'error(fmsgToBBPBTuple): invalid key "{type}"')
|
||||
|
||||
|
||||
return (clean_msg, clean_type)
|
||||
|
||||
def producePlaylistContinuation(plid: str, offset: int = 0) -> str:
|
||||
msge = {
|
||||
'80226972:message': {
|
||||
'2:string': f'VL{plid}',
|
||||
'3:base64': {
|
||||
'1:int': int(offset / 100),
|
||||
'15:string': f'PT:{bbpbToB64(fdictToBbpb({"1:int": offset}))}',
|
||||
'104:message': {
|
||||
'1:int': 0
|
||||
}
|
||||
},
|
||||
'35:string': plid
|
||||
}
|
||||
}
|
||||
|
||||
bbpb_dicts = fdictToBbpb(msge)
|
||||
b64_ctoken = bbpbToB64(bbpb_dicts, urlsafe=True, padding=True)
|
||||
|
||||
return b64_ctoken
|
||||
|
||||
def produceSearchParams(page: int = 1) -> str:
|
||||
msge = {
|
||||
"9:int": 20 * (page - 1), # pagination
|
||||
"30:int": 1 # no self-harm censorship
|
||||
}
|
||||
|
||||
bbpb_dicts = fdictToBbpb(msge)
|
||||
b64_params = bbpbToB64(bbpb_dicts, urlsafe=True, padding=True)
|
||||
|
||||
return b64_params
|
||||
93
ythdd_struct_builder.py
Normal file
93
ythdd_struct_builder.py
Normal file
@@ -0,0 +1,93 @@
|
||||
from ythdd_globals import safeTraverse
|
||||
import ythdd_globals
|
||||
|
||||
def genThumbs(videoId: str):
|
||||
|
||||
result = []
|
||||
thumbnails = [
|
||||
{'height': 720, 'width': 1280, 'quality': "maxres", 'url': "maxres"}, # will always attempt to return the best quality available
|
||||
{'height': 720, 'width': 1280, 'quality': "maxresdefault", 'url': "maxresdefault"},
|
||||
{'height': 480, 'width': 640, 'quality': "sddefault", 'url': "sddefault"},
|
||||
{'height': 360, 'width': 480, 'quality': "high", 'url': "hqdefault"},
|
||||
{'height': 180, 'width': 320, 'quality': "medium", 'url': "mqdefault"},
|
||||
{'height': 90, 'width': 120, 'quality': "default", 'url': "default"},
|
||||
{'height': 90, 'width': 120, 'quality': "start", 'url': "1"},
|
||||
{'height': 90, 'width': 120, 'quality': "middle", 'url': "2"},
|
||||
{'height': 90, 'width': 120, 'quality': "end", 'url': "3"},
|
||||
]
|
||||
|
||||
for x in thumbnails:
|
||||
width = x['width']
|
||||
height = x['height']
|
||||
quality = x['quality']
|
||||
url = ythdd_globals.config['general']['public_facing_url'] + 'vi/' + videoId + '/' + x['url'] + '.jpg'
|
||||
result.append({'quality': quality, 'url': url, 'width': width, 'height': height})
|
||||
|
||||
return result
|
||||
|
||||
def genStoryboards(video_id: str) -> list:
|
||||
# generates storyboards inside of /api/v1/videos/:video_id
|
||||
storyboards = []
|
||||
cached_storyboards = safeTraverse(ythdd_globals.general_cache["storyboards"], [video_id], default=[])
|
||||
for sb in cached_storyboards["formats"]:
|
||||
built_storyboard = {
|
||||
"url": f"/api/v1/storyboards/{video_id}?width={sb['width']}&height={sb['height']}",
|
||||
"templateUrl": cached_storyboards['template_url'].replace("$L", str(sb['index'])).replace("$N", sb['name']) + f"&sigh={sb['sigh']}",
|
||||
"width": sb['width'],
|
||||
"height": sb['height'],
|
||||
"count": sb['thumb_count'],
|
||||
"interval": sb['interval'],
|
||||
"storyboardWidth": sb['columns'],
|
||||
"storyboardHeight": sb['rows'],
|
||||
"storyboardCount": sb['images_count']
|
||||
}
|
||||
storyboards.append(built_storyboard)
|
||||
|
||||
return storyboards
|
||||
|
||||
def msToWebvttTimestamp(time: int):
|
||||
|
||||
ms = time % 1000
|
||||
time //= 1000
|
||||
|
||||
hours = time // (60 * 60)
|
||||
time -= hours * 60 * 60
|
||||
minutes = time // 60
|
||||
time -= minutes * 60
|
||||
seconds = time
|
||||
timestamp = f"{str(hours).zfill(2)}:{str(minutes).zfill(2)}:{str(seconds).zfill(2)}.{str(ms).zfill(3)}"
|
||||
|
||||
return timestamp
|
||||
|
||||
def genWebvttStoryboard(video_id: str, width: int = None, height: int = None):
|
||||
# generates WebVTT storyboards for /api/v1/storyboards/:video_id
|
||||
webvtt = "WEBVTT\n\n"
|
||||
cached_storyboards = safeTraverse(ythdd_globals.general_cache["storyboards"], [video_id])
|
||||
if cached_storyboards is None:
|
||||
return ""
|
||||
found_storyboard = {}
|
||||
|
||||
for sb in cached_storyboards["formats"]:
|
||||
if width is not None and width == sb['width']:
|
||||
found_storyboard = sb
|
||||
if height is not None and height == sb['height']:
|
||||
found_storyboard = sb
|
||||
|
||||
# could be changed
|
||||
if not found_storyboard:
|
||||
found_storyboard = cached_storyboards["formats"][0]
|
||||
|
||||
start = 0
|
||||
thumbs_per_sb = sb['columns'] * sb['rows']
|
||||
xx = 0
|
||||
yy = 0
|
||||
for x in range(found_storyboard["thumb_count"]):
|
||||
xx = x % sb['columns']
|
||||
yy = (x // sb['rows']) % sb['rows']
|
||||
xywh = f"#xywh={xx * sb['width']},{yy * sb['height']},{sb['width']},{sb['height']}"
|
||||
webvtt += f"{msToWebvttTimestamp(start)} --> {msToWebvttTimestamp(start + found_storyboard['interval'])}\n"
|
||||
webvtt += cached_storyboards['template_url'].replace("$L", str(sb['index'])).replace("$N", sb['name']).replace("$M", str(x // (thumbs_per_sb))) + f"&sigh={sb['sigh']}{xywh}\n"
|
||||
webvtt += "\n"
|
||||
start += found_storyboard['interval']
|
||||
|
||||
return webvtt
|
||||
618
ythdd_struct_parser.py
Normal file
618
ythdd_struct_parser.py
Normal file
@@ -0,0 +1,618 @@
|
||||
from html import escape
|
||||
from invidious_formats import FORMATS
|
||||
from ythdd_globals import safeTraverse
|
||||
import json
|
||||
import dateparser
|
||||
import ythdd_globals
|
||||
import ythdd_extractor
|
||||
import ythdd_struct_builder
|
||||
|
||||
DEFAULT_AVATAR = "https://yt3.ggpht.com/a/default-user=s176-c-k-c0x00ffffff-no-rj"
|
||||
|
||||
def doesContainNumber(string: str, numeric_system: int = 10) -> bool:
|
||||
try:
|
||||
number = int(string, numeric_system)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
raise BaseException("doesContainNumber(): Unknown error while determining if a string contains a number")
|
||||
|
||||
def parseLengthFromTimeBadge(time_str: str) -> int:
|
||||
# Returns 0 if unsuccessful
|
||||
length = 0
|
||||
time_lookup_list = [1, 60, 3_600, 86_400]
|
||||
time_list = time_str.split(":")
|
||||
if False in map(doesContainNumber, time_list): # works around ['LIVE'] for livestreams or ['Upcoming'] for scheduled videos
|
||||
pass
|
||||
else:
|
||||
for z in range(len(time_list)):
|
||||
length += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
|
||||
return length
|
||||
|
||||
def parseViewsFromViewText(viewcounttext: str) -> int:
|
||||
# Returns 0 if unsuccessful
|
||||
views = 0
|
||||
magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
|
||||
if viewcounttext:
|
||||
if viewcounttext.lower() == "no":
|
||||
viewcounttext = "0"
|
||||
views = float("0" + "".join([z for z in viewcounttext if 48 <= ord(z) and ord(z) <= 57 or ord(z) == 46]))
|
||||
viewcounttext = viewcounttext.split(" ")[0]
|
||||
for x in magnitude.keys():
|
||||
if x == viewcounttext[-1].upper():
|
||||
views *= magnitude[x]
|
||||
return int(views)
|
||||
|
||||
def parseRenderers(entry: dict, context: dict = {}) -> dict:
|
||||
|
||||
if not isinstance(entry, dict):
|
||||
raise ValueError("parsed entry is not of type dict")
|
||||
|
||||
match safeTraverse(list(entry.keys()), [0], default=""):
|
||||
|
||||
case "videoRenderer": # represents a video
|
||||
# as of october 2025 slowly phased out in favor of lockupViewModel(?)
|
||||
|
||||
published_date = safeTraverse(entry, ["videoRenderer", "publishedTimeText", "simpleText"], default="now")
|
||||
published_date = published_date.removeprefix("Streamed ")
|
||||
description, description_html = parseDescriptionSnippet(safeTraverse(entry, ["videoRenderer", "descriptionSnippet", "runs"], default=[]))
|
||||
collaborative = False
|
||||
|
||||
if "author_name" in context:
|
||||
author_name = context["author_name"]
|
||||
else:
|
||||
author_name = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "text"], default="Unknown author")
|
||||
|
||||
if "author_ucid" in context:
|
||||
author_ucid = context["author_ucid"]
|
||||
else:
|
||||
author_ucid = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
||||
if author_ucid == "UNKNOWNCHANNELID":
|
||||
# this is a first indicator that a video is a collaborative (has multiple authors)
|
||||
# if that's the case, let's take the first author's ucid as the ucid
|
||||
collaborative = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "header", "dialogHeaderViewModel", "headline", "content"]) == "Collaborators"
|
||||
|
||||
if "verified" in context:
|
||||
verified = context["verified"]
|
||||
else:
|
||||
verified = ythdd_extractor.isVerified(safeTraverse(entry, ["videoRenderer", "ownerBadges", 0]))
|
||||
|
||||
if "avatar" in context:
|
||||
avatar_url = context["avatar"]
|
||||
else:
|
||||
avatar_url = safeTraverse(entry, ["videoRenderer", "avatar", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
|
||||
|
||||
views_or_viewers_model = safeTraverse(entry, ["videoRenderer", "viewCountText"], default={})
|
||||
if "simpleText" in views_or_viewers_model:
|
||||
# means this is a video with X views
|
||||
view_count = parseViewsFromViewText(entry["videoRenderer"]["viewCountText"]["simpleText"])
|
||||
view_count_text = entry["videoRenderer"]["viewCountText"]["simpleText"]
|
||||
elif "runs" in views_or_viewers_model:
|
||||
# means this is a livestream with X concurrent viewers
|
||||
view_count = parseViewsFromViewText(entry["videoRenderer"]["viewCountText"]["runs"][0]["text"] + " watching")
|
||||
view_count_text = entry["videoRenderer"]["viewCountText"]["runs"][0]["text"] + " watching"
|
||||
else:
|
||||
# unknown model, assume no views
|
||||
view_count = 0
|
||||
view_count_text = "Unknown amount of views"
|
||||
|
||||
if collaborative:
|
||||
livm = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "customContent", "listViewModel", "listItems"], default=[])
|
||||
if "author_name" not in context:
|
||||
# override the default "name1 and others" or "name1 and name2" text
|
||||
# with full author info
|
||||
all_authors = []
|
||||
for collaborative_author in livm:
|
||||
collaborative_author_name = safeTraverse(collaborative_author, ["listItemViewModel", "title", "content"])
|
||||
if collaborative_author_name is not None:
|
||||
all_authors.append(collaborative_author_name)
|
||||
if all_authors != []: # check if custom extraction succeeded
|
||||
author_name = ", ".join(all_authors)
|
||||
if author_ucid == "UNKNOWNCHANNELID":
|
||||
# retrieve main author's ucid
|
||||
author_ucid = safeTraverse(livm, [0, "listItemViewModel", "title", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
||||
if safeTraverse(entry, ["videoRenderer", "ownerBadges", 0]) is None:
|
||||
# check if the main author is verified
|
||||
verified = False
|
||||
if safeTraverse(livm, [0, "listItemViewModel", "title", "attachmentRuns", 0, "element", "type", "imageType", "image", "sources", 0, "clientResource", "imageName"]) in ("AUDIO_BADGE", "CHECK_CIRCLE_FILLED"):
|
||||
verified = True
|
||||
if avatar_url == DEFAULT_AVATAR:
|
||||
# retrieve the main channel's avatar
|
||||
avatar_url = safeTraverse(livm, [0, "listItemViewModel", "leadingAccessory", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
|
||||
|
||||
ythdd_globals.print_debug("videoRenderer fired")
|
||||
return {
|
||||
"type": "video",
|
||||
"title": safeTraverse(entry, ["videoRenderer", "title", "runs", 0, "text"]),
|
||||
"videoId": safeTraverse(entry, ["videoRenderer", "videoId"]),
|
||||
"author": author_name,
|
||||
"authorId": author_ucid,
|
||||
"authorUrl": "/channel/" + author_ucid,
|
||||
"authorVerified": verified, # TODO
|
||||
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url),
|
||||
"videoThumbnails": ythdd_struct_builder.genThumbs(safeTraverse(entry, ["videoRenderer", "videoId"], default="unknown")),
|
||||
"description": description,
|
||||
"descriptionHtml": description_html,
|
||||
"viewCount": view_count,
|
||||
"viewCountText": view_count_text,
|
||||
"published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too
|
||||
"publishedText": published_date,
|
||||
"lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["videoRenderer", "lengthText", "simpleText"], default="0:0")),
|
||||
"liveNow": False,
|
||||
"premium": ythdd_extractor.isPremium(safeTraverse(entry, ["videoRenderer", "badges", 0])), # will fail if it's not the only badge
|
||||
"isUpcoming": False,
|
||||
"isNew": False,
|
||||
"is4k": False,
|
||||
"is8k": False,
|
||||
"isVr180": False,
|
||||
"isVr360": False,
|
||||
"is3d": False,
|
||||
"hasCaptions": False
|
||||
}
|
||||
|
||||
# modify the premiere timestamp afterwards here?
|
||||
|
||||
case "lockupViewModel": # represents playlists/mixes (and videos since october 2025)
|
||||
# related videos lvms are handled in ythdd_inv_tl.videos()
|
||||
|
||||
lvm = entry["lockupViewModel"]
|
||||
playlist_type = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "icon", "sources", 0, "clientResource", "imageName"], default="")
|
||||
|
||||
if playlist_type == "MIX":
|
||||
# mixes aren't currently supported
|
||||
return
|
||||
|
||||
if not playlist_type:
|
||||
# struct represents a video
|
||||
ythdd_globals.print_debug("lockupViewModel fired (not a playlist). this is an a/b test; any following errors stem from it.")
|
||||
|
||||
lmvm = safeTraverse(lvm, ['metadata', 'lockupMetadataViewModel'], default={})
|
||||
video_id = safeTraverse(lvm, ['contentId'])
|
||||
|
||||
author_name = safeTraverse(context, ["author_name"], default="Unknown author")
|
||||
author_ucid = safeTraverse(context, ["author_ucid"], default="UNKNOWNCHANNELID")
|
||||
verified = safeTraverse(context, ["verified"], default=False) # TODO: check if this can be retrieved here
|
||||
avatar_url = safeTraverse(context, ["avatar"], default=DEFAULT_AVATAR)
|
||||
|
||||
title = safeTraverse(lmvm, ["title", "content"], default="No title")
|
||||
video_metadata = safeTraverse(lmvm, ["metadata", "contentMetadataViewModel", "metadataRows", 0, "metadataParts"], default=[])
|
||||
view_count_text = safeTraverse(video_metadata, [0, "text", "content"], default="0 views")
|
||||
published_date = safeTraverse(video_metadata, [1, "text", "content"], default="now")
|
||||
length_text = safeTraverse(lvm, ["contentImage", "thumbnailViewModel", "overlays", ..., "thumbnailBottomOverlayViewModel", "badges", -1, "thumbnailBadgeViewModel", "text"], default="0:0")
|
||||
view_count = parseViewsFromViewText(view_count_text)
|
||||
length = parseLengthFromTimeBadge(length_text)
|
||||
|
||||
resp = {
|
||||
"type": "video",
|
||||
"title": title,
|
||||
"videoId": video_id,
|
||||
"author": author_name,
|
||||
"authorId": author_ucid,
|
||||
"authorUrl": "/channel/" + author_ucid,
|
||||
"authorVerified": verified, # TODO
|
||||
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url),
|
||||
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
|
||||
"description": "", # can't be retrieved from lockupViewModel
|
||||
"descriptionHtml": "",
|
||||
"viewCount": view_count,
|
||||
"viewCountText": view_count_text,
|
||||
"published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too
|
||||
"publishedText": published_date,
|
||||
"lengthSeconds": length,
|
||||
"liveNow": False, # can't be live if it's in creator's video feed
|
||||
"premium": False, # todo: check this
|
||||
"isUpcoming": False,
|
||||
"isNew": False,
|
||||
"is4k": False,
|
||||
"is8k": False,
|
||||
"isVr180": False,
|
||||
"isVr360": False,
|
||||
"is3d": False,
|
||||
"hasCaptions": False
|
||||
}
|
||||
return resp
|
||||
|
||||
# struct represents a playlist
|
||||
meta = safeTraverse(lvm, ["metadata"], default=[])
|
||||
lmvm = safeTraverse(meta, ["lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows"], default=[])
|
||||
thumbnail = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", -1, "url"], default="no-url?")
|
||||
thumbnail = ythdd_globals.translateLinks(thumbnail[:thumbnail.rfind("?")])
|
||||
verified = safeTraverse(context, ["verified"], default=False)
|
||||
|
||||
playlist_id = safeTraverse(lvm, ["contentId"], default="UNKNOWNPLAYLISTID")
|
||||
length = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="0 videos")
|
||||
length = parseViewsFromViewText(length.split(" ")[0])
|
||||
|
||||
# Turns out for some responses we do have some data, while not on others.
|
||||
# Data from context should be prioritized, thus even if something is found with safeTraverse,
|
||||
# the parser will ignore it in favour of the context.
|
||||
ucid = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
||||
author = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "content"], default="ythdd: unknown author")
|
||||
ucid = safeTraverse(context, ["author_ucid"], default=ucid)
|
||||
author = safeTraverse(context, ["author_name"], default=author)
|
||||
|
||||
ythdd_globals.print_debug("lockupViewModel fired (playlist)")
|
||||
return {
|
||||
"type": "playlist",
|
||||
"title": safeTraverse(meta, ["lockupMetadataViewModel", "title", "content"], default="ythdd: unknown title"),
|
||||
"playlistId": playlist_id,
|
||||
"playlistThumbnail": thumbnail,
|
||||
"author": author,
|
||||
"authorId": ucid,
|
||||
"authorUrl": "/channel/" + ucid,
|
||||
"authorVerified": verified,
|
||||
"videoCount": length,
|
||||
"videos": [] # provided for historical reasons i guess
|
||||
}
|
||||
|
||||
case "shelfRenderer": # "people also watched"
|
||||
return
|
||||
|
||||
case "gridShelfViewModel": # shorts?
|
||||
return
|
||||
|
||||
case "shortsLockupViewModel": # shorts on channel pages
|
||||
|
||||
video_id = safeTraverse(entry, ["shortsLockupViewModel", "onTap", "innertubeCommand", "reelWatchEndpoint", "videoId"], default="UnknownVideoId")
|
||||
title = safeTraverse(entry, ["shortsLockupViewModel", "overlayMetadata", "primaryText", "content"], default="ythdd: couldn't find title")
|
||||
views_text = safeTraverse(entry, ["shortsLockupViewModel", "overlayMetadata", "secondaryText", "content"], default="No views")
|
||||
|
||||
published_date = "No data about published time" # the view model doesn't provide data about the date a short is published
|
||||
|
||||
if video_id == "UnknownVideoId": # failsafe
|
||||
video_id = safeTraverse(entry, ["shortsLockupViewModel", "entityId"], default="-UnknownVideoId")
|
||||
video_id = video_id[video_id.rfind("-") + 1:]
|
||||
|
||||
if "author_name" in context:
|
||||
author_name = context["author_name"]
|
||||
else:
|
||||
author_name = "Unknown author"
|
||||
|
||||
if "author_ucid" in context:
|
||||
author_ucid = context["author_ucid"]
|
||||
else:
|
||||
author_ucid = "UNKNOWNCHANNELID"
|
||||
|
||||
if "verified" in context:
|
||||
verified = context["verified"]
|
||||
else:
|
||||
verified = False
|
||||
|
||||
if "avatar" in context:
|
||||
avatar_url = context["avatar"]
|
||||
else:
|
||||
avatar_url = "unknown"
|
||||
|
||||
ythdd_globals.print_debug("shortsLockupViewModel fired")
|
||||
return {
|
||||
"type": "video",
|
||||
"title": title,
|
||||
"videoId": video_id,
|
||||
"author": author_name,
|
||||
"authorId": author_ucid,
|
||||
"authorUrl": "/channel/" + author_ucid,
|
||||
"authorVerified": False,
|
||||
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
|
||||
"description": "",
|
||||
"descriptionHtml": "",
|
||||
"viewCount": parseViewsFromViewText(views_text),
|
||||
"viewCountText": views_text,
|
||||
"published": int(0),
|
||||
"publishedText": published_date,
|
||||
"lengthSeconds": int(60), # invidious locks this to 60s no matter what the actual duration is
|
||||
"liveNow": False,
|
||||
"premium": False,
|
||||
"isUpcoming": False,
|
||||
"premiereTimestamp": 0,
|
||||
"isNew": False,
|
||||
"is4k": False,
|
||||
"is8k": False,
|
||||
"isVr180": False,
|
||||
"isVr360": False,
|
||||
"is3d": False,
|
||||
"hasCaptions": False
|
||||
}
|
||||
|
||||
case "gridVideoRenderer": # videos on channel pages
|
||||
|
||||
# doesn't work on Yattee
|
||||
# thumbnails = safeTraverse(entry, ["gridVideoRenderer", "thumbnail", "thumbnails"], default=[])
|
||||
# for thumbnail in thumbnails:
|
||||
# thumbnail["url"] = ythdd_globals.translateLinks(thumbnail["url"])
|
||||
|
||||
video_id = safeTraverse(entry, ["gridVideoRenderer", "videoId"], default="UnknownVideoId")
|
||||
thumbnails = ythdd_struct_builder.genThumbs(video_id)
|
||||
|
||||
published_date = safeTraverse(entry, ["gridVideoRenderer", "publishedTimeText", "simpleText"], default="now")
|
||||
published_date = published_date.removeprefix("Streamed ")
|
||||
|
||||
ythdd_globals.print_debug("gridVideoRenderer fired")
|
||||
return {
|
||||
"type": "video",
|
||||
"title": safeTraverse(entry, ["gridVideoRenderer", "title", "simpleText"], default="unknown video title"),
|
||||
"videoId": video_id,
|
||||
"author": context["author_name"],
|
||||
"authorId": context["author_ucid"],
|
||||
"authorUrl": "/channel/" + context["author_ucid"],
|
||||
"authorVerified": False, # TODO: handle badge related tasks here using context
|
||||
"videoThumbnails": thumbnails,
|
||||
"description": "", # won't work without using an RSS feed (?)
|
||||
"descriptionHtml": "", # -||-
|
||||
"viewCount": parseViewsFromViewText(safeTraverse(entry, ["gridVideoRenderer", "viewCountText", "simpleText"], default="0 views")),
|
||||
"viewCountText": safeTraverse(entry, ["gridVideoRenderer", "shortViewCountText", "simpleText"], default="0 views"),
|
||||
"published": int(dateparser.parse(published_date).timestamp()),
|
||||
"publishedText": published_date,
|
||||
"lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["gridVideoRenderer", "thumbnailOverlays", 0, "thumbnailOverlayTimeStatusRenderer", "text", "simpleText"], default="0:0")),
|
||||
"liveNow": True if published_date == "now" else False,
|
||||
"premium": False,
|
||||
"isUpcoming": False,
|
||||
"isNew": False,
|
||||
"is4k": False,
|
||||
"is8k": False,
|
||||
"isVr180": False,
|
||||
"isVr360": False,
|
||||
"is3d": False,
|
||||
"hasCaptions": False
|
||||
}
|
||||
|
||||
case "channelRenderer": # channels in search results
|
||||
|
||||
avatars = ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(entry, ["channelRenderer", "thumbnail", "thumbnails", 0, "url"], default=DEFAULT_AVATAR))
|
||||
description, description_html = parseDescriptionSnippet(safeTraverse(entry, ["channelRenderer", "descriptionSnippet", "runs"], default=[]))
|
||||
isVerified = ythdd_extractor.isVerified(safeTraverse(entry, ["channelRenderer", "ownerBadges", 0], default=[]))
|
||||
|
||||
ythdd_globals.print_debug("channelRenderer fired")
|
||||
return {
|
||||
"type": "channel",
|
||||
"author": safeTraverse(entry, ["channelRenderer", "title", "simpleText"], default="Unknown channel"),
|
||||
"authorId": safeTraverse(entry, ["channelRenderer", "channelId"], default="UNKNOWNCHANNELID"),
|
||||
"authorUrl": "/channel/" + safeTraverse(entry, ["channelRenderer", "channelId"], default="UNKNOWNCHANNELID"),
|
||||
"authorVerified": isVerified,
|
||||
"authorThumbnails": avatars,
|
||||
"autoGenerated": False,
|
||||
"subCount": parseViewsFromViewText(safeTraverse(entry, ["channelRenderer", "videoCountText", "simpleText"], default="0 subscribers")),
|
||||
"videoCount": 0,
|
||||
"channelHandle": safeTraverse(entry, ["channelRenderer", "navigationEndpoint", "browseEndpoint", "canonicalBaseUrl"], default="/@ythdd_unknown_handle")[1:],
|
||||
"description": description,
|
||||
"descriptionHtml": description_html
|
||||
}
|
||||
|
||||
case "playlistVideoRenderer":
|
||||
# used by all content inside of playlists which have at least one non-shorts video/livestream
|
||||
|
||||
video_id = safeTraverse(entry, ["playlistVideoRenderer", "videoId"], default="UnknownVideoId")
|
||||
title = safeTraverse(entry, ["playlistVideoRenderer", "title", "runs", 0, "text"], default="Unknown video title")
|
||||
author_ucid = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"])
|
||||
author_name = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "text"], default="Unknown author")
|
||||
video_index = int(safeTraverse(entry, ["playlistVideoRenderer", "index", "simpleText"], default="1")) - 1
|
||||
length = parseLengthFromTimeBadge(safeTraverse(entry, ["playlistVideoRenderer", "lengthText", "simpleText"], default="0:0"))
|
||||
published_date = safeTraverse(entry, ["playlistVideoRenderer", "videoInfo", "runs", -1, "text"], default="2000-01-01")
|
||||
published_date = published_date.removeprefix("Streamed ").removeprefix(" watching")
|
||||
|
||||
# handle livestreams
|
||||
if not published_date:
|
||||
published_date = "now"
|
||||
|
||||
if author_ucid is None:
|
||||
# likely a collaborative video, let's try
|
||||
# to fetch the uploader's ucid with that in mind
|
||||
livm = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "customContent", "listViewModel", "listItems"], default=[])
|
||||
# name extraction logic the same as in videoRenderer
|
||||
all_authors = []
|
||||
for collaborative_author in livm:
|
||||
collaborative_author_name = safeTraverse(collaborative_author, ["listItemViewModel", "title", "content"])
|
||||
if collaborative_author_name is not None:
|
||||
all_authors.append(collaborative_author_name)
|
||||
if all_authors != []:
|
||||
author_name = ", ".join(all_authors)
|
||||
author_ucid = safeTraverse(livm, [0, "listItemViewModel", "title", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
||||
|
||||
# surprisingly, innertube responds with the avatar of the user that added the video to the playlist
|
||||
# we can extract that information, e.g. for yattee to display
|
||||
avatar_url = safeTraverse(entry, ["playlistVideoRenderer", "thumbnailOverlays", ..., "thumbnailOverlayAvatarStackViewModel", "avatarStack", "avatarStackViewModel", "avatars", 0, "avatarViewModel", "image", "sources", 0, "url"])
|
||||
avatars = None if avatar_url is None else ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url)
|
||||
|
||||
ythdd_globals.print_debug("playlistVideoRenderer fired")
|
||||
return {
|
||||
"type": "video",
|
||||
"title": title,
|
||||
"videoId": video_id,
|
||||
"author": author_name,
|
||||
"authorId": author_ucid,
|
||||
"authorUrl": "/channel/" + author_ucid,
|
||||
"authorThumbnails": avatars,
|
||||
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
|
||||
"index": video_index,
|
||||
"lengthSeconds": length,
|
||||
"liveNow": False, # todo: check this?
|
||||
# these do not need to be returned, but some clients try to read it
|
||||
# so we return an approximation here:
|
||||
"published": int(dateparser.parse(published_date).timestamp()),
|
||||
"publishedText": published_date
|
||||
}
|
||||
|
||||
case _:
|
||||
print("received an entry of unknown type (thus can't be parsed):")
|
||||
print(entry)
|
||||
print("")
|
||||
# breakpoint()
|
||||
return
|
||||
|
||||
def customCommentRendererParser(comment: dict, context: dict = {}) -> dict:
|
||||
|
||||
cep = safeTraverse(comment, ["payload", "commentEntityPayload"], default={})
|
||||
content = safeTraverse(cep, ["properties", "content", "content"], default="")
|
||||
content_html = escape(content).replace("\r\n", "<br>").replace("\n", "<br>")
|
||||
author = safeTraverse(cep, ["author"], default={})
|
||||
verified = safeTraverse(author, ["isVerified"], default=False) or safeTraverse(author, ["isArtist"], default=False)
|
||||
ucid = safeTraverse(author, ["channelId"], default="UNKNOWNCHANNELID")
|
||||
published_date = safeTraverse(cep, ["properties", "publishedTime"], default="now")
|
||||
edited = False
|
||||
|
||||
if published_date.endswith(" (edited)"):
|
||||
edited = True
|
||||
published_date_unix = int(dateparser.parse(published_date.removesuffix(" (edited)")).timestamp())
|
||||
else:
|
||||
published_date_unix = int(dateparser.parse(published_date).timestamp())
|
||||
|
||||
inv_comment = {
|
||||
"authorId": ucid,
|
||||
"authorUrl": "/channel/" + ucid,
|
||||
"author": safeTraverse(author, ["displayName"], default="@ythdd-unknown-user"),
|
||||
"verified": verified,
|
||||
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(author, ["avatarThumbnailUrl"], default=DEFAULT_AVATAR)), # proxy them!
|
||||
"authorIsChannelOwner": safeTraverse(author, ["isCreator"], default=False), # ???
|
||||
"isSponsor": False, # not sure how to retrieve this
|
||||
"likeCount": parseViewsFromViewText("0" + safeTraverse(cep, ["toolbar", "likeCountNotliked"], default="0") + " likes"),
|
||||
"isPinned": False,
|
||||
"commentId": safeTraverse(cep, ["properties", "commentId"], default="UNKNOWNCOMMENTID"),
|
||||
"content": content,
|
||||
"contentHtml": content_html,
|
||||
"isEdited": edited,
|
||||
"published": published_date_unix,
|
||||
"publishedText": published_date if published_date != "now" else "unknown amount of time ago"
|
||||
}
|
||||
|
||||
if "replies" in comment:
|
||||
inv_comment["replies"] = comment["replies"]
|
||||
|
||||
return inv_comment
|
||||
|
||||
def parseDescriptionSnippet(snippet: list):
|
||||
|
||||
text = ""
|
||||
text_html = ""
|
||||
for entry in snippet:
|
||||
text += entry["text"]
|
||||
if "bold" in entry: # is checking entry["bold"] == True necessary?
|
||||
text_html += "<b>" + entry["text"] + "</b>"
|
||||
else:
|
||||
text_html += entry["text"]
|
||||
text_html = escape(text_html).replace("\r\n", "<br>").replace("\n", "<br>")
|
||||
|
||||
return text, text_html
|
||||
|
||||
def runsToText(runs: list, default: str = "") -> str:
|
||||
# "default" will be returned when text extraction fails.
|
||||
extracted_text = ""
|
||||
|
||||
for field in runs:
|
||||
extracted_text += safeTraverse(field, ["text"], default="")
|
||||
|
||||
if extracted_text:
|
||||
return extracted_text
|
||||
|
||||
return default
|
||||
|
||||
def extractTextFromSimpleOrRuns(obj: dict, default: str = "") -> str:
|
||||
# Extracts the text both from "runs" and "simpleText"
|
||||
# with failsafe to default.
|
||||
text = default
|
||||
if not isinstance(obj, dict):
|
||||
return default
|
||||
if "runs" in obj:
|
||||
text = runsToText(obj["runs"])
|
||||
elif "simpleText" in obj:
|
||||
text = obj["simpleText"]
|
||||
else:
|
||||
print(f"error(extractTextFromSimpleOrRuns): text extraction failed for {obj}")
|
||||
return text
|
||||
|
||||
|
||||
def findNearestResolution(width: int, height: int) -> int:
|
||||
# Finds the nearest standard resolution (one of 144p, 240p, ...)
|
||||
# So far only used for Yattee, as it has trouble playing anything
|
||||
# without one of the standard resolutions. Playback on other
|
||||
# clients is unaffected.
|
||||
|
||||
# failsafe behaviour
|
||||
try:
|
||||
width = int(width)
|
||||
height = int(height)
|
||||
res = min(width, height)
|
||||
except:
|
||||
return 360
|
||||
|
||||
standard_resolutions = [144, 240, 360, 720, 1080, 2160, 4320]
|
||||
if res in standard_resolutions:
|
||||
return res
|
||||
|
||||
# calculate relative distance to one of the standard resolutions
|
||||
res_normalized = [abs(1 - (x / res)) for x in standard_resolutions]
|
||||
# pick the one where the distance is the smallest
|
||||
target_index = res_normalized.index(min(res_normalized))
|
||||
target_res = standard_resolutions[target_index]
|
||||
|
||||
return target_res
|
||||
|
||||
def parseFormatStreams(wdata_fstream: dict, ydata_stream: dict) -> dict:
|
||||
|
||||
try:
|
||||
stream_url = ydata_stream["url"]
|
||||
except:
|
||||
ythdd_globals.print_debug( "could not extract format stream URL from yt-dlp response:")
|
||||
ythdd_globals.print_debug(f"wdata: {wdata_fstream}")
|
||||
ythdd_globals.print_debug(f"ydata: {ydata_stream}")
|
||||
|
||||
fstream = {
|
||||
"url": stream_url,
|
||||
"itag": str(wdata_fstream["itag"]),
|
||||
"type": wdata_fstream["mimeType"],
|
||||
"quality": wdata_fstream["quality"],
|
||||
"bitrate": str(wdata_fstream["bitrate"]),
|
||||
"fps": wdata_fstream["fps"],
|
||||
"size": f"{wdata_fstream['width']}x{wdata_fstream['height']}",
|
||||
"resolution": f"{findNearestResolution(wdata_fstream['width'], wdata_fstream['height'])}p", # possibly not really needed here
|
||||
"qualityLabel": wdata_fstream["qualityLabel"],
|
||||
"container": safeTraverse(FORMATS.get(wdata_fstream["itag"]), [ "ext"], default="mp4"), # invidious_formats
|
||||
"encoding": safeTraverse(FORMATS.get(wdata_fstream["itag"]), ["vcodec"], default="mp4") # invidious_formats
|
||||
}
|
||||
|
||||
|
||||
return fstream
|
||||
|
||||
def parseAdaptiveStreams(wdata_astream: dict, ydata_stream: dict) -> dict:
|
||||
|
||||
try:
|
||||
stream_url = ydata_stream["url"]
|
||||
except:
|
||||
ythdd_globals.print_debug( "could not extract adaptive stream URL from yt-dlp response:")
|
||||
ythdd_globals.print_debug(f"wdata: {wdata_fstream}")
|
||||
ythdd_globals.print_debug(f"ydata: {ydata_stream}")
|
||||
|
||||
astream_common = {
|
||||
"init": f"{wdata_astream[ 'initRange']['start']}-{wdata_astream[ 'initRange']['end']}",
|
||||
"index": f"{wdata_astream['indexRange']['start']}-{wdata_astream['indexRange']['end']}",
|
||||
"bitrate": str(wdata_astream["bitrate"]),
|
||||
"url": stream_url,
|
||||
"itag": str(wdata_astream["itag"]),
|
||||
"type": wdata_astream["mimeType"],
|
||||
"clen": wdata_astream["contentLength"],
|
||||
"lmt": wdata_astream["lastModified"],
|
||||
"projectionType": wdata_astream["projectionType"],
|
||||
"container": safeTraverse(FORMATS.get(wdata_astream["itag"]), [ "ext"], default="mp4"), # invidious_formats
|
||||
"encoding": safeTraverse(FORMATS.get(wdata_astream["itag"]), ["vcodec"], default="mp4") # invidious_formats
|
||||
}
|
||||
|
||||
isVideo = True
|
||||
if "audioQuality" in wdata_astream:
|
||||
isVideo = False
|
||||
|
||||
if isVideo:
|
||||
astream = astream_common
|
||||
# video-specific metadata
|
||||
astream["fps"] = wdata_astream["fps"]
|
||||
astream["size"] = f"{wdata_astream['width']}x{wdata_astream['height']}"
|
||||
astream["resolution"] = f"{findNearestResolution(wdata_astream['width'], wdata_astream['height'])}p"
|
||||
astream["qualityLabel"] = wdata_astream["qualityLabel"]
|
||||
astream["colorInfo"] = safeTraverse(wdata_astream, ["colorInfo"])
|
||||
else:
|
||||
astream = astream_common
|
||||
# audio-specific metadata
|
||||
astream["encoding"] = safeTraverse(FORMATS.get(wdata_astream["itag"]), ["acodec"], default="mp4")
|
||||
astream["audioQuality"] = wdata_astream["audioQuality"],
|
||||
astream["audioSampleRate"] = int(wdata_astream["audioSampleRate"]),
|
||||
astream["audioChannels"] = wdata_astream["audioChannels"]
|
||||
|
||||
return astream
|
||||
|
||||
Reference in New Issue
Block a user