Compare commits
68 Commits
15d2de5228
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 56af1f0735 | |||
| 72266aad0f | |||
| 900cc92229 | |||
| 2687cc2bdc | |||
| 4a9d59c9b4 | |||
| 4af581ab7c | |||
| d1f381220d | |||
| eebf434f3e | |||
| c979c97077 | |||
| 11c94c757e | |||
| 4421e68d9d | |||
| da2daab16a | |||
| 05b81e55da | |||
| 158dcc3b7f | |||
| 668e8c32aa | |||
| 760aaccfff | |||
| da54bd0818 | |||
| b0845d723a | |||
| c760104d70 | |||
| d6cb0fe692 | |||
| 81fba8c4d0 | |||
| 260039c307 | |||
| 2b24fc2906 | |||
| 002e3cba33 | |||
| 5944fd2458 | |||
| 719b545cb4 | |||
| 468795a7a2 | |||
| 7eb4452fec | |||
| e7ae42f289 | |||
| 5bb542826e | |||
| 365c54d214 | |||
| a2e2fedc90 | |||
| 4a311f185c | |||
| 45073fff67 | |||
| f73e6c11d4 | |||
| 34e00e2492 | |||
| f63c620541 | |||
| f2adc2e561 | |||
| e6d32091e1 | |||
| b98aa718b0 | |||
| 30850a7ce0 | |||
| 1c9174c888 | |||
| 6d0c70696b | |||
| 509e81aafa | |||
| caa9e0c2b1 | |||
| 873abbd413 | |||
| bedcaff7c0 | |||
| 5dcf8c81e0 | |||
| 60712f3b5d | |||
| c20d4c34aa | |||
| be697bb572 | |||
| 3e84cf3443 | |||
| 4a3937a923 | |||
| c3fae689e1 | |||
| 4cfb1db7d0 | |||
| 5a1e772909 | |||
| 7c4991cea7 | |||
| 5f88d6f096 | |||
| eaaa14c4d8 | |||
| ef177f7200 | |||
| d0d2298186 | |||
| 7086177a58 | |||
| dc8009db23 | |||
| e562d1ee14 | |||
| 96c1b5396e | |||
| 256d21bbcd | |||
| 2a9826eb03 | |||
| 37e932956d |
@@ -1,5 +1,5 @@
|
|||||||
[general]
|
[general]
|
||||||
db_file_path = "/path/to/ythdd_db.sqlite" # Preferably stored on an SSD.
|
db_file_path = "ythdd_db.sqlite" # Path to the databse file, preferably stored on an SSD.
|
||||||
video_storage_directory_path = "/path/to/videos/" # Path to video vault.
|
video_storage_directory_path = "/path/to/videos/" # Path to video vault.
|
||||||
is_proxied = false # Set to true if running behind reverse proxy.
|
is_proxied = false # Set to true if running behind reverse proxy.
|
||||||
public_facing_url = "http://127.0.0.1:5000/" # Used for URL rewriting. Note the trailing backslash /.
|
public_facing_url = "http://127.0.0.1:5000/" # Used for URL rewriting. Note the trailing backslash /.
|
||||||
@@ -9,13 +9,19 @@ cache = true # Whether to cache requests f
|
|||||||
[api]
|
[api]
|
||||||
api_key = "" # Leave empty API key for public access to non-sensitive backend
|
api_key = "" # Leave empty API key for public access to non-sensitive backend
|
||||||
api_key_admin = "CHANGEME" # Empty *admin* API key will autogenerate a random one every launch.
|
api_key_admin = "CHANGEME" # Empty *admin* API key will autogenerate a random one every launch.
|
||||||
|
enable_debugger_halt = false # Whether to allow to trigger pdb using admin's API key.
|
||||||
|
|
||||||
[extractor]
|
[extractor]
|
||||||
user-agent = "" # Leave empty for default (Firefox ESR).
|
user-agent = "" # Leave empty for default (Firefox ESR).
|
||||||
cookies_path = "" # Leave empty for none.
|
cookies_path = "" # Leave empty for none.
|
||||||
|
age_restricted_cookies_path = "" # Cookies to use when bypassing age-gated videos only. Leave empty to disable.
|
||||||
|
deno_path = "" # Required when using cookies.
|
||||||
|
preferred_extractor = "" # Leave empty for default (android_vr).
|
||||||
|
|
||||||
[proxy]
|
[proxy]
|
||||||
user-agent = "" # Leave empty for default (Firefox ESR).
|
user-agent = "" # Leave empty for default (Firefox ESR).
|
||||||
|
allow_proxying_videos = false # Whether to allow video proxying through the instance (traffic-intensive).
|
||||||
|
match_initcwndbps = true # Experimental: matches proxying speed to the one suggested by Innertube (may help avoid being ratelimited/banned).
|
||||||
|
|
||||||
[admin]
|
[admin]
|
||||||
# List of users with admin priviledges.
|
# List of users with admin priviledges.
|
||||||
|
|||||||
@@ -14,3 +14,4 @@ requests>=2.32.3
|
|||||||
yt_dlp
|
yt_dlp
|
||||||
brotli>=1.1.0
|
brotli>=1.1.0
|
||||||
dateparser>=1.2.2
|
dateparser>=1.2.2
|
||||||
|
bbpb>=1.4.2
|
||||||
119
views.py
119
views.py
@@ -1,8 +1,8 @@
|
|||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
from flask import render_template, Response
|
from flask import redirect, render_template, request, Response
|
||||||
from flask_sqlalchemy import SQLAlchemy
|
from flask_sqlalchemy import SQLAlchemy
|
||||||
from markupsafe import escape
|
from markupsafe import escape
|
||||||
import requests, json
|
import hashlib, json, re, requests
|
||||||
import ythdd_globals
|
import ythdd_globals
|
||||||
|
|
||||||
def homepage():
|
def homepage():
|
||||||
@@ -26,16 +26,48 @@ def thumbnailProxy(received_request):
|
|||||||
'error_msg': 'invalid request. pretend this is a thumbnail :D'
|
'error_msg': 'invalid request. pretend this is a thumbnail :D'
|
||||||
}), mimetype='application/json', status=400)
|
}), mimetype='application/json', status=400)
|
||||||
|
|
||||||
thumbnail = requests.get(prefix + "vi/" + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
quality_urls = ['maxresdefault', 'sddefault', 'hqdefault', 'mqdefault', 'default', '1', '2', '3']
|
||||||
|
video_id, requested_quality = received_request.split('/')
|
||||||
|
|
||||||
|
thumbnail = requests.get(prefix + "vi/" + video_id + "/" + requested_quality, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
||||||
thumbnail.raw.decode_content = True
|
thumbnail.raw.decode_content = True
|
||||||
|
|
||||||
|
quality_id = 0
|
||||||
|
if requested_quality == "maxres.jpg":
|
||||||
|
# if requested quality is maxres,
|
||||||
|
# provide the best quality possible
|
||||||
|
while thumbnail.status_code != 200:
|
||||||
|
thumbnail = requests.get(prefix + "vi/" + video_id + "/" + quality_urls[quality_id] + ".jpg", headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
||||||
|
thumbnail.raw.decode_content = True
|
||||||
|
quality_id += 1
|
||||||
|
|
||||||
response = Response(thumbnail.raw, mimetype=thumbnail.headers['content-type'], status=thumbnail.status_code)
|
response = Response(thumbnail.raw, mimetype=thumbnail.headers['content-type'], status=thumbnail.status_code)
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
def storyboardProxy(received_request):
|
||||||
|
|
||||||
|
# will proxy storyboards
|
||||||
|
prefix = "https://i.ytimg.com/"
|
||||||
|
sqp = request.args.get("sqp")
|
||||||
|
sigh = request.args.get("sigh")
|
||||||
|
|
||||||
|
if sqp is None or sigh is None:
|
||||||
|
return Response(json.dumps({"status": "error", "error": "Request lacks the sqp, sigh params (or both)."}), mimetype="application/json", status=400)
|
||||||
|
|
||||||
|
sb = requests.get(prefix + "sb/" + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), params={"sqp": sqp, "sigh": sigh}, stream=True)
|
||||||
|
sb.raw.decode_content = True
|
||||||
|
response = Response(sb.raw, mimetype=sb.headers['content-type'], status=sb.status_code)
|
||||||
|
return response
|
||||||
|
|
||||||
def ggphtProxy(received_request):
|
def ggphtProxy(received_request):
|
||||||
|
|
||||||
prefix = "https://yt3.ggpht.com/"
|
prefix = "https://yt3.ggpht.com/"
|
||||||
|
|
||||||
|
# fix for how materialious fetches avatars
|
||||||
|
if received_request.startswith("guc/"):
|
||||||
|
return gucProxy(received_request.removeprefix("guc/"))
|
||||||
|
|
||||||
ggpht = requests.get(prefix + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
ggpht = requests.get(prefix + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
||||||
ggpht.raw.decode_content = True
|
ggpht.raw.decode_content = True
|
||||||
response = Response(ggpht.raw, mimetype=ggpht.headers['content-type'], status=ggpht.status_code)
|
response = Response(ggpht.raw, mimetype=ggpht.headers['content-type'], status=ggpht.status_code)
|
||||||
@@ -51,3 +83,84 @@ def gucProxy(received_request):
|
|||||||
response = Response(guc.raw, mimetype=guc.headers['content-type'], status=guc.status_code)
|
response = Response(guc.raw, mimetype=guc.headers['content-type'], status=guc.status_code)
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
def imgProxy(received_request):
|
||||||
|
|
||||||
|
# will proxy /img/no_thumbnail.jpg
|
||||||
|
prefix = "https://i.ytimg.com/"
|
||||||
|
|
||||||
|
thumbnail = requests.get(prefix + "img/" + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
|
||||||
|
thumbnail.raw.decode_content = True
|
||||||
|
response = Response(thumbnail.raw, mimetype=thumbnail.headers['content-type'], status=thumbnail.status_code)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def videoplaybackProxy():
|
||||||
|
# inspired by Yotter's video proxy
|
||||||
|
# https://github.com/ytorg/Yotter/blob/b43a72ab7bfa5a59916fa3259cbc39165717c6bb/app/routes.py#L527
|
||||||
|
|
||||||
|
if not ythdd_globals.config['proxy']['allow_proxying_videos']:
|
||||||
|
return Response(json.dumps({"error": "Administrator has disabled this endpoint"}), mimetype="application/json", status=403)
|
||||||
|
|
||||||
|
headers = dict(request.headers)
|
||||||
|
proxy_headers = ythdd_globals.getHeaders(caller='proxy')
|
||||||
|
if "Range" in headers:
|
||||||
|
proxy_headers["Range"] = headers["Range"]
|
||||||
|
|
||||||
|
params = dict(request.args)
|
||||||
|
# support md5 videoplayback url shortening
|
||||||
|
if "md5" in params:
|
||||||
|
if params["md5"] not in ythdd_globals.general_cache["hashed_videoplayback"]:
|
||||||
|
return Response(json.dumps({"error": "Videoplayback request not cached or expired."}), mimetype="application/json", status=404)
|
||||||
|
return redirect(ythdd_globals.general_cache["hashed_videoplayback"][params["md5"]]["original_url"])
|
||||||
|
else:
|
||||||
|
public_url = request.url
|
||||||
|
public_url = ythdd_globals.config["general"]["public_facing_url"] + public_url[public_url.rfind("videoplayback"):]
|
||||||
|
md5sum = hashlib.md5(public_url.encode("utf-8")).hexdigest()
|
||||||
|
ythdd_globals.general_cache["hashed_videoplayback"][md5sum] = {"original_url": request.url}
|
||||||
|
|
||||||
|
# reconstruct the url
|
||||||
|
# first attempt: from host param
|
||||||
|
host = params.get('host')
|
||||||
|
# failed? then try to get it from the rest of the params
|
||||||
|
if host is None:
|
||||||
|
# second attempt: reconstruct url from mn and mvi?
|
||||||
|
# the host schema seems to be as follows:
|
||||||
|
# rr{mvi[any]/fvip[any]?}---{mn[any]}.googlevideo.com
|
||||||
|
# regarding mvi/fvip, it seems that any value smaller than 5 passes
|
||||||
|
try:
|
||||||
|
mvi = params.get('mvi').split(',')[-1]
|
||||||
|
mn = params.get('mn').split(',')[-1]
|
||||||
|
if int(mvi) > 5:
|
||||||
|
mvi = 3 # invidious uses this as fallback
|
||||||
|
host = f"rr{mvi}---{mn}.googlevideo.com"
|
||||||
|
except (AttributeError, ValueError):
|
||||||
|
return Response(json.dumps({"error": "Couldn't extract crucial parameters for hostname reconstruction"}, mimetype="application/json", status=400))
|
||||||
|
else:
|
||||||
|
# don't echo host "hint" back to the googlevideo server
|
||||||
|
del params['host']
|
||||||
|
# run a regex sanity check
|
||||||
|
if re.fullmatch(r"[\w-]+\.googlevideo\.com", host) is None:
|
||||||
|
# fallback behavior for unexpected hostnames
|
||||||
|
return Response(json.dumps({"error": "Please either pass a valid host, or don't pass any"}), mimetype="application/json", status=400)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# request the proxied data
|
||||||
|
remote_response = requests.get(f"https://{host}/videoplayback", headers=proxy_headers, params=params, stream=True)
|
||||||
|
except:
|
||||||
|
return Response(json.dumps({"error": "Couldn't connect to googlevideo host"}), mimetype="application/json", status=500)
|
||||||
|
|
||||||
|
# determine the chunk size
|
||||||
|
chunk_size = 10 * 1024 # by default it's 10 MB (as this is the most youtube is willing to send without ratelimiting)
|
||||||
|
# or the one in initcwndbps (if user enabled the config flag to match chunk_size with initcwndbps)
|
||||||
|
if ythdd_globals.config['proxy']['match_initcwndbps']:
|
||||||
|
try:
|
||||||
|
chunk_size = int(params.get('initcwndbps') / 1024)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
# return a chunked response
|
||||||
|
resp = Response(remote_response.iter_content(chunk_size=chunk_size), content_type=remote_response.headers['Content-Type'], status=remote_response.status_code, headers=remote_response.headers, direct_passthrough=True)
|
||||||
|
resp.cache_control.public = True
|
||||||
|
resp.cache_control.max_age = int(60_000)
|
||||||
|
|
||||||
|
return resp
|
||||||
5
ythdd.py
5
ythdd.py
@@ -64,8 +64,11 @@ def setup():
|
|||||||
app.add_url_rule('/api/', view_func=ythdd_api.api_greeting)
|
app.add_url_rule('/api/', view_func=ythdd_api.api_greeting)
|
||||||
app.add_url_rule('/api/<path:received_request>', view_func=ythdd_api.api_global_catchall)
|
app.add_url_rule('/api/<path:received_request>', view_func=ythdd_api.api_global_catchall)
|
||||||
app.add_url_rule('/vi/<path:received_request>', view_func=views.thumbnailProxy)
|
app.add_url_rule('/vi/<path:received_request>', view_func=views.thumbnailProxy)
|
||||||
|
app.add_url_rule('/sb/<path:received_request>', view_func=views.storyboardProxy)
|
||||||
app.add_url_rule('/ggpht/<path:received_request>', view_func=views.ggphtProxy)
|
app.add_url_rule('/ggpht/<path:received_request>', view_func=views.ggphtProxy)
|
||||||
app.add_url_rule('/guc/<path:received_request>', view_func=views.gucProxy)
|
app.add_url_rule('/guc/<path:received_request>', view_func=views.gucProxy)
|
||||||
|
app.add_url_rule('/img/<path:received_request>', view_func=views.imgProxy)
|
||||||
|
app.add_url_rule('/videoplayback', view_func=views.videoplaybackProxy)
|
||||||
db = ythdd_db.initDB(app, config)
|
db = ythdd_db.initDB(app, config)
|
||||||
|
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
@@ -133,7 +136,7 @@ def main(args):
|
|||||||
app_port = port
|
app_port = port
|
||||||
|
|
||||||
setup()
|
setup()
|
||||||
app.run(host=host, port=int(port))
|
app.run(host=host, port=int(port), threaded=True)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
#app.run(host="127.0.0.1", port=5000)
|
#app.run(host="127.0.0.1", port=5000)
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ def api_greeting():
|
|||||||
|
|
||||||
def api_global_catchall(received_request):
|
def api_global_catchall(received_request):
|
||||||
ythdd_globals.apiRequests += 1
|
ythdd_globals.apiRequests += 1
|
||||||
if request.environ['REMOTE_ADDR'] != "127.0.0.1" or (ythdd_globals.isProxied and request.environ['X-Forwarded-For'] != "127.0.0.1"):
|
if request.environ['REMOTE_ADDR'] != "127.0.0.1" or (ythdd_globals.isProxied and request.environ['HTTP_X_FORWARDED_FOR'] != "127.0.0.1"):
|
||||||
ythdd_globals.outsideApiHits += 1
|
ythdd_globals.outsideApiHits += 1
|
||||||
|
|
||||||
request_list = received_request.split('/')
|
request_list = received_request.split('/')
|
||||||
|
|||||||
@@ -8,11 +8,11 @@ import ythdd_globals, ythdd_extractor
|
|||||||
#from flask_sqlalchemy import SQLAlchemy
|
#from flask_sqlalchemy import SQLAlchemy
|
||||||
#import ythdd_api_v1_stats, ythdd_api_v1_user, ythdd_api_v1_info, ythdd_api_v1_query, ythdd_api_v1_meta, ythdd_api_v1_admin
|
#import ythdd_api_v1_stats, ythdd_api_v1_user, ythdd_api_v1_info, ythdd_api_v1_query, ythdd_api_v1_meta, ythdd_api_v1_admin
|
||||||
|
|
||||||
def requireAuthentication(func):
|
def requireAuthentication(admin: bool = True):
|
||||||
@wraps(func)
|
def functionWrapper(func):
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
token = kwargs["r"].args.get('token')
|
token = kwargs["r"].args.get('token')
|
||||||
if token == lewy_globals.config['api']['api_key']:
|
if token == ythdd_globals.config['api']['api_key' + admin * '_admin']:
|
||||||
try:
|
try:
|
||||||
status, received, data = func(*args, **kwargs)
|
status, received, data = func(*args, **kwargs)
|
||||||
return status, received, data
|
return status, received, data
|
||||||
@@ -21,6 +21,7 @@ def requireAuthentication(func):
|
|||||||
else:
|
else:
|
||||||
return 401, "error", {'error_msg': "Unauthorized"}
|
return 401, "error", {'error_msg': "Unauthorized"}
|
||||||
return wrapper
|
return wrapper
|
||||||
|
return functionWrapper
|
||||||
|
|
||||||
def incrementBadRequests():
|
def incrementBadRequests():
|
||||||
ythdd_globals.apiFailedRequests += 1
|
ythdd_globals.apiFailedRequests += 1
|
||||||
@@ -143,6 +144,13 @@ def hot(data):
|
|||||||
incrementBadRequests()
|
incrementBadRequests()
|
||||||
return notImplemented([data[1]]) # workaround before notImplemented is reworked
|
return notImplemented([data[1]]) # workaround before notImplemented is reworked
|
||||||
|
|
||||||
|
@requireAuthentication(admin=True)
|
||||||
|
def debugger_halt(r):
|
||||||
|
if not ythdd_globals.config["api"]["enable_debugger_halt"]:
|
||||||
|
return 403, "Administrator has disabled access for this endpoint.", []
|
||||||
|
breakpoint()
|
||||||
|
return 200, "Pdb triggered and ended successfully.", []
|
||||||
|
|
||||||
def lookup(data, request):
|
def lookup(data, request):
|
||||||
match data[0]:
|
match data[0]:
|
||||||
case 'stats':
|
case 'stats':
|
||||||
@@ -163,6 +171,8 @@ def lookup(data, request):
|
|||||||
case 'admin':
|
case 'admin':
|
||||||
# REQUIRE CREDENTIALS!
|
# REQUIRE CREDENTIALS!
|
||||||
return stub_hello()
|
return stub_hello()
|
||||||
|
case 'halt':
|
||||||
|
return debugger_halt(r=request)
|
||||||
case _:
|
case _:
|
||||||
incrementBadRequests()
|
incrementBadRequests()
|
||||||
return notImplemented(data)
|
return notImplemented(data)
|
||||||
@@ -1,6 +1,8 @@
|
|||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
import brotli, yt_dlp, requests, json, time
|
import brotli, yt_dlp, requests, json, time
|
||||||
|
from http.cookiejar import MozillaCookieJar
|
||||||
from ythdd_globals import safeTraverse
|
from ythdd_globals import safeTraverse
|
||||||
|
import ythdd_proto
|
||||||
import ythdd_globals
|
import ythdd_globals
|
||||||
|
|
||||||
ytdl_opts = {
|
ytdl_opts = {
|
||||||
@@ -18,7 +20,11 @@ ytdl_opts = {
|
|||||||
# "formats": ["dashy"]
|
# "formats": ["dashy"]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"simulate": True
|
"simulate": True,
|
||||||
|
"js_runtimes": {
|
||||||
|
"deno": {}
|
||||||
|
},
|
||||||
|
'remote_components': ['ejs:github']
|
||||||
}
|
}
|
||||||
|
|
||||||
stage1_headers = {
|
stage1_headers = {
|
||||||
@@ -66,7 +72,7 @@ stage1_body = {
|
|||||||
|
|
||||||
stage2_headers = {
|
stage2_headers = {
|
||||||
"Connection": "keep-alive",
|
"Connection": "keep-alive",
|
||||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0",
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0",
|
||||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||||
"Accept-Language": "en-us,en;q=0.5",
|
"Accept-Language": "en-us,en;q=0.5",
|
||||||
"Sec-Fetch-Mode": "navigate",
|
"Sec-Fetch-Mode": "navigate",
|
||||||
@@ -75,13 +81,13 @@ stage2_headers = {
|
|||||||
|
|
||||||
stage3_headers = {
|
stage3_headers = {
|
||||||
"Connection": "keep-alive",
|
"Connection": "keep-alive",
|
||||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0",
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0",
|
||||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||||
"Accept-Language": "en-us,en;q=0.5",
|
"Accept-Language": "en-us,en;q=0.5",
|
||||||
"Sec-Fetch-Mode": "navigate",
|
"Sec-Fetch-Mode": "navigate",
|
||||||
"Content-Type": "application/json",
|
"Content-Type": "application/json",
|
||||||
"X-Youtube-Client-Name": "1",
|
"X-Youtube-Client-Name": "1",
|
||||||
"X-Youtube-Client-Version": "2.20250829.01.00",
|
"X-Youtube-Client-Version": "2.20251103.01.00",
|
||||||
"Origin": "https://www.youtube.com",
|
"Origin": "https://www.youtube.com",
|
||||||
"Accept-Encoding": "gzip, deflate, br",
|
"Accept-Encoding": "gzip, deflate, br",
|
||||||
"Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI"
|
"Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI"
|
||||||
@@ -93,7 +99,7 @@ stage3_body = {
|
|||||||
"client":
|
"client":
|
||||||
{
|
{
|
||||||
"clientName": "WEB",
|
"clientName": "WEB",
|
||||||
"clientVersion": "2.20250829.01.00",
|
"clientVersion": "2.20251103.01.00",
|
||||||
"hl": "en",
|
"hl": "en",
|
||||||
"timeZone": "UTC",
|
"timeZone": "UTC",
|
||||||
"utcOffsetMinutes": 0
|
"utcOffsetMinutes": 0
|
||||||
@@ -111,9 +117,9 @@ web_context_dict = {
|
|||||||
'gl': 'US',
|
'gl': 'US',
|
||||||
'deviceMake': '',
|
'deviceMake': '',
|
||||||
'deviceModel': '',
|
'deviceModel': '',
|
||||||
'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0,gzip(gfe)',
|
'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0,gzip(gfe)',
|
||||||
'clientName': 'WEB',
|
'clientName': 'WEB',
|
||||||
'clientVersion': '2.20250829.01.00',
|
'clientVersion': '2.20251103.01.00',
|
||||||
'osName': 'Windows',
|
'osName': 'Windows',
|
||||||
'osVersion': '10.0',
|
'osVersion': '10.0',
|
||||||
'screenPixelDensity': 2,
|
'screenPixelDensity': 2,
|
||||||
@@ -128,25 +134,48 @@ web_context_dict = {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def extract(url: str, getcomments=False, maxcomments="", manifest_fix=False):
|
def extract(url: str, getcomments=False, maxcomments="", manifest_fix=False, use_cookies=None):
|
||||||
# TODO: check user-agent and cookiefile
|
# TODO: check user-agent and cookiefile
|
||||||
|
|
||||||
|
ytdl_context = ytdl_opts.copy()
|
||||||
|
|
||||||
if ythdd_globals.config['extractor']['user-agent']:
|
if ythdd_globals.config['extractor']['user-agent']:
|
||||||
yt_dlp.utils.std_headers['User-Agent'] = ythdd_globals.config['extractor']['user-agent']
|
yt_dlp.utils.std_headers['User-Agent'] = ythdd_globals.config['extractor']['user-agent']
|
||||||
|
|
||||||
if ythdd_globals.config['extractor']['cookies_path']:
|
|
||||||
ytdl_opts['cookiefile'] = ythdd_globals.config['extractor']['cookies_path']
|
|
||||||
|
|
||||||
if len(url) == 11:
|
if len(url) == 11:
|
||||||
url = "https://www.youtube.com/watch?v=" + url
|
url = "https://www.youtube.com/watch?v=" + url
|
||||||
if getcomments:
|
if getcomments:
|
||||||
ytdl_opts['getcomments'] = True
|
ytdl_context['getcomments'] = True
|
||||||
if maxcomments:
|
if maxcomments:
|
||||||
ytdl_opts['extractor_args']['youtube']['max_comments'] = [maxcomments, "all", "all", "all"]
|
ytdl_context['extractor_args']['youtube']['max_comments'] = [maxcomments, "all", "all", "all"]
|
||||||
if manifest_fix:
|
if manifest_fix:
|
||||||
# https://github.com/yt-dlp/yt-dlp/issues/11952#issuecomment-2565802294
|
# https://github.com/yt-dlp/yt-dlp/issues/11952#issuecomment-2565802294
|
||||||
ytdl_opts['extractor_args']['youtube']['player_client'] = ['default', 'web_safari']
|
ytdl_context['extractor_args']['youtube']['player_client'] = ['default', 'web_safari']
|
||||||
with yt_dlp.YoutubeDL(ytdl_opts) as ytdl:
|
elif ythdd_globals.config['extractor']['preferred_extractor']:
|
||||||
|
ytdl_context['extractor_args']['youtube']['player_client'] = [ythdd_globals.config['extractor']['preferred_extractor']]
|
||||||
|
else:
|
||||||
|
ytdl_context['extractor_args']['youtube']['player_client'] = ['android_vr']
|
||||||
|
|
||||||
|
if use_cookies is not None:
|
||||||
|
# can be either "global", "agegated" or None
|
||||||
|
deno_path = ythdd_globals.config['extractor']['deno_path']
|
||||||
|
match use_cookies:
|
||||||
|
case "global":
|
||||||
|
ytdl_context['cookiefile'] = ythdd_globals.config['extractor']['cookies_path']
|
||||||
|
ytdl_context['extractor_args']['youtube']['player_client'] = ['tv']
|
||||||
|
if not deno_path:
|
||||||
|
print("FATAL ERROR: deno path is required for playback using cookies!")
|
||||||
|
ytdl_context['js_runtimes']['deno']['path'] = deno_path if deno_path else ""
|
||||||
|
case "agegated":
|
||||||
|
ytdl_context['cookiefile'] = ythdd_globals.config['extractor']['age_restricted_cookies_path']
|
||||||
|
ytdl_context['extractor_args']['youtube']['player_client'] = ['tv']
|
||||||
|
if not deno_path:
|
||||||
|
print("FATAL ERROR: deno path is required for playback of age-restricted content!")
|
||||||
|
ytdl_context['js_runtimes']['deno']['path'] = deno_path if deno_path else ""
|
||||||
|
case None | _:
|
||||||
|
pass
|
||||||
|
|
||||||
|
with yt_dlp.YoutubeDL(ytdl_context) as ytdl:
|
||||||
result = ytdl.sanitize_info(ytdl.extract_info(url, download=False))
|
result = ytdl.sanitize_info(ytdl.extract_info(url, download=False))
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@@ -170,7 +199,7 @@ def WEBrelated(url: str):
|
|||||||
|
|
||||||
return extracted_json["contents"]['twoColumnWatchNextResults']["secondaryResults"]
|
return extracted_json["contents"]['twoColumnWatchNextResults']["secondaryResults"]
|
||||||
|
|
||||||
def WEBextractSinglePage(uri: str):
|
def WEBextractSinglePage(uri: str, use_cookies=None):
|
||||||
# WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME
|
# WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
@@ -178,11 +207,25 @@ def WEBextractSinglePage(uri: str):
|
|||||||
if len(uri) != 11:
|
if len(uri) != 11:
|
||||||
raise ValueError("WEBextractSinglePage expects a single, 11-character long argument")
|
raise ValueError("WEBextractSinglePage expects a single, 11-character long argument")
|
||||||
|
|
||||||
response = requests.get("https://www.youtube.com/watch?v=" + uri, headers=ythdd_globals.getHeaders(caller='extractor'))
|
cookies = None
|
||||||
|
if use_cookies is not None:
|
||||||
|
match use_cookies:
|
||||||
|
case "global":
|
||||||
|
ythdd_globals.print_debug("wdata: using global cookies")
|
||||||
|
cookies = MozillaCookieJar(ythdd_globals.config["extractor"]["cookies_path"])
|
||||||
|
cookies.load()
|
||||||
|
case "agegated":
|
||||||
|
ythdd_globals.print_debug("wdata: using agegated cookies")
|
||||||
|
cookies = MozillaCookieJar(ythdd_globals.config["extractor"]["age_restricted_cookies_path"])
|
||||||
|
cookies.load()
|
||||||
|
case None | _:
|
||||||
|
pass
|
||||||
|
|
||||||
|
response = requests.get("https://www.youtube.com/watch?v=" + uri, headers=ythdd_globals.getHeaders(caller='extractor'), cookies=cookies)
|
||||||
extracted_string = str(response.content.decode('utf8', 'unicode_escape'))
|
extracted_string = str(response.content.decode('utf8', 'unicode_escape'))
|
||||||
start = extracted_string.find('{"responseContext":{"serviceTrackingParams":')
|
start = extracted_string.find('{"responseContext":')
|
||||||
end = extracted_string.find(';var ', start)
|
end = extracted_string.find(';var ', start)
|
||||||
start2 = extracted_string.find('{"responseContext":{"serviceTrackingParams":', start + 1)
|
start2 = extracted_string.find('{"responseContext":', start + 1)
|
||||||
end2 = extracted_string.find(';</script>', start2)
|
end2 = extracted_string.find(';</script>', start2)
|
||||||
extracted_json1 = json.loads(extracted_string[start:end])
|
extracted_json1 = json.loads(extracted_string[start:end])
|
||||||
extracted_json2 = json.loads(extracted_string[start2:end2])
|
extracted_json2 = json.loads(extracted_string[start2:end2])
|
||||||
@@ -231,7 +274,7 @@ def makeWebContext(secondaryContextDict: dict):
|
|||||||
# Uses web_context_dict to create a context, returns a dict.
|
# Uses web_context_dict to create a context, returns a dict.
|
||||||
# Essentially, expands the web_context_dict with a secondary one.
|
# Essentially, expands the web_context_dict with a secondary one.
|
||||||
|
|
||||||
current_web_context_dict = web_context_dict
|
current_web_context_dict = web_context_dict.copy()
|
||||||
|
|
||||||
for key in secondaryContextDict:
|
for key in secondaryContextDict:
|
||||||
current_web_context_dict[key] = secondaryContextDict[key]
|
current_web_context_dict[key] = secondaryContextDict[key]
|
||||||
@@ -275,13 +318,17 @@ def getChannelAvatar(response_json: dict):
|
|||||||
def generateChannelAvatarsFromUrl(url: str, proxied: bool = True) -> list:
|
def generateChannelAvatarsFromUrl(url: str, proxied: bool = True) -> list:
|
||||||
# Generates channel avatars at default sizes.
|
# Generates channel avatars at default sizes.
|
||||||
|
|
||||||
|
# avatar urls for channels in search results start with //yt3.ggpht.com/
|
||||||
|
if url.startswith("//"):
|
||||||
|
url = "https:" + url
|
||||||
|
|
||||||
avatars = []
|
avatars = []
|
||||||
if not url.startswith("https://yt3.ggpht.com/"):
|
if not url.startswith("https://yt3.ggpht.com/") and not url.startswith("https://yt3.googleusercontent.com/"):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
url = ythdd_globals.translateLinks(url)
|
url = ythdd_globals.translateLinks(url)
|
||||||
url_size_start = url.rfind("=s") + 2
|
url_size_start = url.rfind("=s") + 2
|
||||||
url_size_end = url. find("-", url_size_start) - 1
|
url_size_end = url. find("-", url_size_start)
|
||||||
|
|
||||||
default_sizes = [32, 48, 76, 100, 176, 512]
|
default_sizes = [32, 48, 76, 100, 176, 512]
|
||||||
|
|
||||||
@@ -296,20 +343,45 @@ def generateChannelAvatarsFromUrl(url: str, proxied: bool = True) -> list:
|
|||||||
|
|
||||||
return avatars
|
return avatars
|
||||||
|
|
||||||
def isVerified(response_json: dict):
|
def isVerified(response_json: dict) -> bool:
|
||||||
# Returns True if any user badge has been found (verified/artist).
|
# Returns True if any user badge has been found (verified/artist).
|
||||||
badges = safeTraverse(response_json, [], default=False)
|
|
||||||
|
|
||||||
if badges: return True
|
if not isinstance(response_json, dict):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def browseAbout(ucid: str):
|
match safeTraverse(list(response_json.keys()), [0], default=""):
|
||||||
|
case "metadataBadgeRenderer": # channels in search results
|
||||||
|
verified = safeTraverse(response_json, ["metadataBadgeRenderer", "tooltip"], default="") in ("Verified", "Official Artist Channel") # perhaps look for badge styles?
|
||||||
|
return verified
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def isPremium(response_json: dict) -> bool:
|
||||||
|
# Returns True if content is paid (member-only).
|
||||||
|
|
||||||
|
if not isinstance(response_json, dict):
|
||||||
|
return False
|
||||||
|
|
||||||
|
match safeTraverse(list(response_json.keys()), [0], default=""):
|
||||||
|
case "metadataBadgeRenderer": # channels in search results
|
||||||
|
paid = safeTraverse(response_json, ["metadataBadgeRenderer", "style"], default="") in ("BADGE_STYLE_TYPE_MEMBERS_ONLY")
|
||||||
|
return paid
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def browseChannel(ucid: str, params: str = None, ctoken: str = None):
|
||||||
# Returns the response from innertubes browse endpoint for channels (as a dict).
|
# Returns the response from innertubes browse endpoint for channels (as a dict).
|
||||||
|
|
||||||
if len(ucid) != 24:
|
if len(ucid) != 24:
|
||||||
raise ValueError(f"Something is wrong with the UCID {ucid}. Expected a 24-character long channel ID, not {len(ucid)}.")
|
raise ValueError(f"Something is wrong with the UCID {ucid}. Expected a 24-character long channel ID, not {len(ucid)}.")
|
||||||
|
|
||||||
context = makeWebContext({'browseId': ucid})
|
additional_context = {'browseId': ucid}
|
||||||
|
if params is not None:
|
||||||
|
additional_context['params'] = params
|
||||||
|
if ctoken is not None:
|
||||||
|
additional_context['continuation'] = ctoken
|
||||||
|
|
||||||
|
context = makeWebContext(additional_context)
|
||||||
|
|
||||||
response = requests.post(
|
response = requests.post(
|
||||||
'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
|
'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
|
||||||
@@ -321,14 +393,19 @@ def browseAbout(ucid: str):
|
|||||||
|
|
||||||
return response_json
|
return response_json
|
||||||
|
|
||||||
def WEBextractSearchResults(search_query: str) -> list:
|
def WEBextractSearchResults(search_query: str, page: int) -> list:
|
||||||
# Posts a search request to innertube API
|
# Posts a search request to innertube API
|
||||||
# and processes only the relevant part (the actual results)
|
# and processes only the relevant part (the actual results)
|
||||||
|
|
||||||
if search_query is None:
|
if search_query is None:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
web_context = makeWebContext({"query": search_query})
|
additional_context = {"query": search_query}
|
||||||
|
if page is not None:
|
||||||
|
params = ythdd_proto.produceSearchParams(page)
|
||||||
|
additional_context["params"] = params
|
||||||
|
|
||||||
|
web_context = makeWebContext(additional_context)
|
||||||
response = requests.post('https://www.youtube.com/youtubei/v1/search',
|
response = requests.post('https://www.youtube.com/youtubei/v1/search',
|
||||||
params={"prettyPrint": False},
|
params={"prettyPrint": False},
|
||||||
headers=stage2_headers,
|
headers=stage2_headers,
|
||||||
@@ -343,3 +420,193 @@ def WEBextractSearchResults(search_query: str) -> list:
|
|||||||
results = safeTraverse(results, ["contents", "twoColumnSearchResultsRenderer", "primaryContents", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents"], default=[])
|
results = safeTraverse(results, ["contents", "twoColumnSearchResultsRenderer", "primaryContents", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents"], default=[])
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
def WEBgetSearchSuggestions(query: str, previous_query: str = '') -> list:
|
||||||
|
# Takes in a search query and returns relevant suggestions.
|
||||||
|
# Can optionally take the previous query but that's rather novel and
|
||||||
|
# not supported across players nor invidious API itself.
|
||||||
|
|
||||||
|
suggestions = []
|
||||||
|
|
||||||
|
if not isinstance(query, str):
|
||||||
|
print("WEBgetSearchSuggestions: query is not a string (as it should)")
|
||||||
|
return {}
|
||||||
|
if not isinstance(previous_query, str):
|
||||||
|
previous_query = ''
|
||||||
|
|
||||||
|
if ythdd_globals.config["general"]["cache"]:
|
||||||
|
# look for cached suggestions
|
||||||
|
for cached_search in ythdd_globals.general_cache["search"]:
|
||||||
|
if cached_search["q"] == query.lower() and cached_search["pq"] == previous_query.lower():
|
||||||
|
# found it? skip ahead
|
||||||
|
suggestions = cached_search["resp"]
|
||||||
|
break
|
||||||
|
|
||||||
|
# request wasn't cached? query the API
|
||||||
|
if suggestions == []:
|
||||||
|
|
||||||
|
params = {
|
||||||
|
'ds': 'yt',
|
||||||
|
'hl': 'en', # host language
|
||||||
|
'gl': 'us', # geolocation
|
||||||
|
'client': 'youtube',
|
||||||
|
'gs_ri': 'youtube',
|
||||||
|
'q': query, # query
|
||||||
|
'pq': previous_query # previous query
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.get(
|
||||||
|
'https://suggestqueries-clients6.youtube.com/complete/search',
|
||||||
|
params=params,
|
||||||
|
headers=stage2_headers
|
||||||
|
)
|
||||||
|
|
||||||
|
# can break anytime but hopefully the tiny speed gain will make up for it
|
||||||
|
results = response.text[23 + len(query):]
|
||||||
|
results = results[:results.rfind("{") - 1]
|
||||||
|
results = json.loads(results)
|
||||||
|
|
||||||
|
for result in results:
|
||||||
|
suggestions.append(result[0])
|
||||||
|
|
||||||
|
# cache response
|
||||||
|
if ythdd_globals.config["general"]["cache"]:
|
||||||
|
ythdd_globals.general_cache["search"].append(
|
||||||
|
{
|
||||||
|
"q": query.lower(),
|
||||||
|
"pq": previous_query.lower(),
|
||||||
|
"resp": suggestions
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"query": query,
|
||||||
|
"suggestions": suggestions
|
||||||
|
}
|
||||||
|
|
||||||
|
def WEBgetVideoComments(ctoken: str) -> tuple:
|
||||||
|
|
||||||
|
# ctoken needs to be passed explicitly.
|
||||||
|
# no guessing or retrieving it from globals.
|
||||||
|
if ctoken is None:
|
||||||
|
return [], ""
|
||||||
|
|
||||||
|
# build web context containing the relevant ctoken
|
||||||
|
web_context = makeWebContext({"continuation": ctoken})
|
||||||
|
response = requests.post('https://www.youtube.com/youtubei/v1/next',
|
||||||
|
params={"prettyPrint": False},
|
||||||
|
headers=stage2_headers,
|
||||||
|
data=json.dumps(web_context)
|
||||||
|
)
|
||||||
|
|
||||||
|
results = []
|
||||||
|
try:
|
||||||
|
results = json.loads(response.text)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
comments = safeTraverse(results, ["frameworkUpdates", "entityBatchUpdate", "mutations"], default=[])
|
||||||
|
comment_continuations = []
|
||||||
|
comment_continuations_re = safeTraverse(results, ["onResponseReceivedEndpoints"], default=[])
|
||||||
|
for received_endpoint in comment_continuations_re:
|
||||||
|
|
||||||
|
# this is horrible...
|
||||||
|
|
||||||
|
acia = safeTraverse(received_endpoint, ["appendContinuationItemsAction", "continuationItems"], default=[])
|
||||||
|
rcic = safeTraverse(received_endpoint, ["reloadContinuationItemsCommand", "continuationItems"], default=[])
|
||||||
|
|
||||||
|
for entry in acia:
|
||||||
|
if "commentThreadRenderer" in entry or "continuationItemRenderer" in entry:
|
||||||
|
comment_continuations = acia
|
||||||
|
break
|
||||||
|
|
||||||
|
for entry in rcic:
|
||||||
|
if "commentThreadRenderer" in entry or "continuationItemRenderer" in entry:
|
||||||
|
comment_continuations = rcic
|
||||||
|
break
|
||||||
|
|
||||||
|
if comment_continuations != []:
|
||||||
|
break
|
||||||
|
|
||||||
|
if comment_continuations == []:
|
||||||
|
print("error: received an unknown comment structure, unable to parse continuations (replies)")
|
||||||
|
# breakpoint()
|
||||||
|
# return [], ""
|
||||||
|
|
||||||
|
# extract new continuation
|
||||||
|
new_continuation = ""
|
||||||
|
if "continuationItemRenderer" in safeTraverse(comment_continuations, [-1], default=[]):
|
||||||
|
# first, look for ctoken inside of response for next page of comments
|
||||||
|
new_continuation = safeTraverse(comment_continuations, [-1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default=None)
|
||||||
|
# or search elsewhere in case this is a reply thread
|
||||||
|
if new_continuation is None:
|
||||||
|
new_continuation = safeTraverse(comment_continuations, [-1, "continuationItemRenderer", "button", "buttonRenderer", "command", "continuationCommand", "token"], default="")
|
||||||
|
|
||||||
|
# perform a basic mutation check before parsing
|
||||||
|
# will ignore replies liked by video uploader ("hearts")
|
||||||
|
actual_comments = [x for x in comments if "properties" in safeTraverse(x, ["payload", "commentEntityPayload"], default=[], quiet=True)]
|
||||||
|
actual_comment_continuations = [x for x in comment_continuations if "replies" in safeTraverse(x, ["commentThreadRenderer"], default=[], quiet=True)]
|
||||||
|
|
||||||
|
# link reply data (reply count and ctoken) for comments with replies
|
||||||
|
for reply_renderer in actual_comment_continuations:
|
||||||
|
|
||||||
|
mutual_key = safeTraverse(reply_renderer, ["commentThreadRenderer", "commentViewModel", "commentViewModel", "commentKey"], default="unknown-key")
|
||||||
|
reply_ctoken = safeTraverse(reply_renderer, ["commentThreadRenderer", "replies", "commentRepliesRenderer", "contents", 0, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
|
||||||
|
reply_count = safeTraverse(reply_renderer, ["commentThreadRenderer", "replies", "commentRepliesRenderer", "viewReplies", "buttonRenderer", "text", "runs", 0, "text"], default="0 replies").split(" ")[0]
|
||||||
|
|
||||||
|
# suspected a/b test. can be worked arount with on-demand ctoken creation.
|
||||||
|
# workaround for yt not showing replies when sorting for "top" comments
|
||||||
|
try:
|
||||||
|
int(reply_count) # can be just "Replies"
|
||||||
|
except:
|
||||||
|
reply_count = "0"
|
||||||
|
|
||||||
|
for comment in actual_comments:
|
||||||
|
found_key = safeTraverse(comment, ["entityKey"], default="unknown-key")
|
||||||
|
# try to link a relevant ctoken if a comment has response
|
||||||
|
if found_key == mutual_key:
|
||||||
|
if ythdd_globals.config["general"]["debug"]: print(f"found reply for {found_key}")
|
||||||
|
comment["replies"] = {
|
||||||
|
"replyCount": int(reply_count),
|
||||||
|
"continuation": reply_ctoken
|
||||||
|
}
|
||||||
|
|
||||||
|
return actual_comments, new_continuation
|
||||||
|
|
||||||
|
def WEBextractPlaylist(plid: str = "", ctoken: str = ""):
|
||||||
|
|
||||||
|
# if ctoken has been provided, use it
|
||||||
|
if ctoken:
|
||||||
|
# playlist id can be omitted if ctoken is provided
|
||||||
|
additional_context = {'continuation': ctoken}
|
||||||
|
else:
|
||||||
|
# try to create ctoken which will allow for accessing the full playlist, including delisted/deleted videos
|
||||||
|
additional_context = {'continuation': ythdd_proto.producePlaylistContinuation(plid, offset=0)}
|
||||||
|
|
||||||
|
context = makeWebContext(additional_context)
|
||||||
|
|
||||||
|
response = requests.post(
|
||||||
|
'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
|
||||||
|
headers = ythdd_globals.getHeaders(),
|
||||||
|
json = context
|
||||||
|
)
|
||||||
|
|
||||||
|
resp_json = json.loads(response.text)
|
||||||
|
|
||||||
|
# if this is a first-time fetch (no ctoken passed), extract metadata
|
||||||
|
metadata = None
|
||||||
|
if not ctoken:
|
||||||
|
metadata = {
|
||||||
|
"microformat": safeTraverse(resp_json, ["microformat"]),
|
||||||
|
"sidebar": safeTraverse(resp_json, ["sidebar"])
|
||||||
|
}
|
||||||
|
|
||||||
|
# extract continuation
|
||||||
|
new_continuation = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems", -1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"])
|
||||||
|
|
||||||
|
# "best-effort" playlist's videos extraction
|
||||||
|
# "best-effort" because None's (unsuccessful video extraction = None) are passed as they are
|
||||||
|
videos = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"]) # includes continuation as last element of list, which will be ignored
|
||||||
|
|
||||||
|
return metadata, new_continuation, videos
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ version = "0.0.1"
|
|||||||
apiVersion = "1"
|
apiVersion = "1"
|
||||||
randomly_generated_passcode = 0
|
randomly_generated_passcode = 0
|
||||||
video_cache = {}
|
video_cache = {}
|
||||||
|
general_cache = {"search": [], "continuations": {"channels": {}, "comments": {}}, "channels": {}, "playlists": {}, "storyboards": {}, "hashed_videoplayback": {}}
|
||||||
|
|
||||||
def getConfig(configfile):
|
def getConfig(configfile):
|
||||||
|
|
||||||
@@ -31,7 +32,7 @@ def getConfig(configfile):
|
|||||||
global randomly_generated_passcode
|
global randomly_generated_passcode
|
||||||
|
|
||||||
if not os.path.exists(configfile):
|
if not os.path.exists(configfile):
|
||||||
dummy_config = {'general': {'db_file_path': 'ythdd_db.sqlite', 'video_storage_directory_path': 'videos/', 'is_proxied': False, 'public_facing_url': 'http://127.0.0.1:5000/', 'debug': False, 'cache': True}, 'api': {'api_key': 'CHANGEME'}, 'proxy': {'user-agent': ''}, 'extractor': {'user-agent': '', 'cookies_path': ''}, 'admin': {'admins': ['admin']}, 'yt_dlp': {}, 'postprocessing': {'presets': [{'name': 'recommended: [N][<=720p] best V+A', 'format': 'bv[height<=720]+ba', 'reencode': ''}, {'name': '[N][1080p] best V+A', 'format': 'bv[height=1080]+ba', 'reencode': ''}, {'name': '[R][1080p] webm', 'format': 'bv[height=1080]+ba', 'reencode': 'webm'}, {'name': '[N][720p] best V+A', 'format': 'bv[height=720]+ba', 'reencode': ''}, {'name': '[R][720p] webm', 'format': 'bv[height=720]+ba', 'reencode': 'webm'}, {'name': '[N][480p] best V+A', 'format': 'bv[height=480]+ba', 'reencode': ''}, {'name': '[480p] VP9 webm/reencode', 'format': 'bv*[height=480][ext=webm]+ba/bv[height=480]+ba', 'reencode': 'webm'}, {'name': '[N][1080p] best video only', 'format': 'bv[height=1080]', 'reencode': ''}, {'name': '[N][opus] best audio only', 'format': 'ba', 'reencode': 'opus'}]}}
|
dummy_config = {'general': {'db_file_path': 'ythdd_db.sqlite', 'video_storage_directory_path': 'videos/', 'is_proxied': False, 'public_facing_url': 'http://127.0.0.1:5000/', 'debug': False, 'cache': True}, 'api': {'api_key': 'CHANGEME', 'enable_debugger_halt': False}, 'proxy': {'user-agent': '', 'allow_proxying_videos': True, 'match_initcwndbps': True}, 'extractor': {'user-agent': '', 'cookies_path': ''}, 'admin': {'admins': ['admin']}, 'yt_dlp': {}, 'postprocessing': {'presets': [{'name': 'recommended: [N][<=720p] best V+A', 'format': 'bv[height<=720]+ba', 'reencode': ''}, {'name': '[N][1080p] best V+A', 'format': 'bv[height=1080]+ba', 'reencode': ''}, {'name': '[R][1080p] webm', 'format': 'bv[height=1080]+ba', 'reencode': 'webm'}, {'name': '[N][720p] best V+A', 'format': 'bv[height=720]+ba', 'reencode': ''}, {'name': '[R][720p] webm', 'format': 'bv[height=720]+ba', 'reencode': 'webm'}, {'name': '[N][480p] best V+A', 'format': 'bv[height=480]+ba', 'reencode': ''}, {'name': '[480p] VP9 webm/reencode', 'format': 'bv*[height=480][ext=webm]+ba/bv[height=480]+ba', 'reencode': 'webm'}, {'name': '[N][1080p] best video only', 'format': 'bv[height=1080]', 'reencode': ''}, {'name': '[N][opus] best audio only', 'format': 'ba', 'reencode': 'opus'}]}}
|
||||||
# if a passcode has not been provided by the user (config file doesn't exist, and user didn't specify it using an argument)
|
# if a passcode has not been provided by the user (config file doesn't exist, and user didn't specify it using an argument)
|
||||||
print(f"{colors.WARNING}WARNING{colors.ENDC}: Using default, baked in config data. {colors.ENDL}"
|
print(f"{colors.WARNING}WARNING{colors.ENDC}: Using default, baked in config data. {colors.ENDL}"
|
||||||
f" Consider copying and editing the provided example file ({colors.OKCYAN}config.default.toml{colors.ENDC}).")
|
f" Consider copying and editing the provided example file ({colors.OKCYAN}config.default.toml{colors.ENDC}).")
|
||||||
@@ -59,7 +60,7 @@ def getHeaders(caller="proxy"):
|
|||||||
|
|
||||||
# NOTE: use ESR user-agent
|
# NOTE: use ESR user-agent
|
||||||
# user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:130.0) Gecko/20100101 Firefox/130.0'
|
# user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:130.0) Gecko/20100101 Firefox/130.0'
|
||||||
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0'
|
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0'
|
||||||
|
|
||||||
if config[caller]['user-agent']:
|
if config[caller]['user-agent']:
|
||||||
user_agent = config[caller]['user-agent']
|
user_agent = config[caller]['user-agent']
|
||||||
@@ -83,26 +84,64 @@ def getHeaders(caller="proxy"):
|
|||||||
|
|
||||||
return headers
|
return headers
|
||||||
|
|
||||||
def translateLinks(link):
|
def translateLinks(link: str, remove_params: bool = True):
|
||||||
|
|
||||||
link = link.replace("https://i.ytimg.com/", config['general']['public_facing_url'])
|
link = link.replace("https://i.ytimg.com/", config['general']['public_facing_url'])
|
||||||
link = link.replace("https://yt3.ggpht.com/", config['general']['public_facing_url'] + "ggpht/")
|
link = link.replace("https://yt3.ggpht.com/", config['general']['public_facing_url'] + "ggpht/")
|
||||||
link = link.replace("https://yt3.googleusercontent.com/", config['general']['public_facing_url'] + "guc/")
|
link = link.replace("https://yt3.googleusercontent.com/", config['general']['public_facing_url'] + "guc/")
|
||||||
|
|
||||||
|
# try to remove tracking params
|
||||||
|
if remove_params and "?" in link:
|
||||||
|
link = link[:link.find("?")]
|
||||||
|
|
||||||
return link
|
return link
|
||||||
|
|
||||||
def getUptime():
|
def getUptime():
|
||||||
return int(time.time()) - starttime
|
return int(time.time()) - starttime
|
||||||
|
|
||||||
def safeTraverse(obj: dict, path: list, default=None):
|
def safeTraverse(obj: dict, path: list, default=None, quiet: bool = False):
|
||||||
|
"""
|
||||||
|
Traverse dynamic objects with fallback to default values
|
||||||
|
|
||||||
|
This function can take an Ellipsis as part of traversal path,
|
||||||
|
meaning that it will return the object from the list
|
||||||
|
that contains the next key. This has been introduced
|
||||||
|
so that no matter which object in a list holds the relevant
|
||||||
|
model, it will find it (meaning no assumptions are necessary).
|
||||||
|
Kepp in mind that only one ellipsis at a time is supported,
|
||||||
|
thus ["some_key", ..., ..., "some_other_key"] won't work.
|
||||||
|
|
||||||
|
:param obj: Traversed object
|
||||||
|
:type obj: dict
|
||||||
|
:param path: Path which shall be traversed
|
||||||
|
:type path: list
|
||||||
|
:param default: Default value returned on failure
|
||||||
|
:type default: any, None by default
|
||||||
|
:param quiet: Quiet flag
|
||||||
|
:type quiet: bool
|
||||||
|
"""
|
||||||
result = obj
|
result = obj
|
||||||
try:
|
try:
|
||||||
for x in path:
|
# for every item in path and its position
|
||||||
#print(f"traversing {result} with respect to {x}")
|
for pos, iterable_key in enumerate(path):
|
||||||
result = result[x]
|
# if the key is not an ellipsis, traverse it
|
||||||
except (KeyError, TypeError):
|
if iterable_key is not Ellipsis:
|
||||||
|
result = result[iterable_key]
|
||||||
|
# if it is an ellipsis, and there is another key beside it
|
||||||
|
elif pos < len(path) - 1:
|
||||||
|
# then iterate through all of the list contents
|
||||||
|
for list_content in result:
|
||||||
|
# in search of the next traversal key
|
||||||
|
if path[pos + 1] in list_content:
|
||||||
|
result = list_content
|
||||||
|
# show an error message if ellipsis is used incorrectly
|
||||||
|
else:
|
||||||
|
print("error(safeTraverse): Traversal path can't end with an Ellipsis!")
|
||||||
|
raise TypeError()
|
||||||
|
# handle exceptions
|
||||||
|
except (KeyError, TypeError, IndexError):
|
||||||
result = default
|
result = default
|
||||||
print(f"error reading: {' -> '.join(path)} - returning: {default}")
|
if not quiet: print(f"error reading: {' -> '.join([str(x) for x in path])} - returning: {default}")
|
||||||
finally:
|
finally:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@@ -112,3 +151,7 @@ def getCommit() -> str | None:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def print_debug(text: str) -> None:
|
||||||
|
# Will print a string only if debugging is enabled.
|
||||||
|
if config["general"]["debug"]:
|
||||||
|
print(text)
|
||||||
|
|||||||
1174
ythdd_inv_tl.py
1174
ythdd_inv_tl.py
File diff suppressed because it is too large
Load Diff
96
ythdd_proto.py
Normal file
96
ythdd_proto.py
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
from ythdd_globals import safeTraverse
|
||||||
|
import base64
|
||||||
|
import blackboxprotobuf as bbpb
|
||||||
|
import json
|
||||||
|
import urllib.parse
|
||||||
|
import ythdd_globals
|
||||||
|
|
||||||
|
def bbpbToB64(msg_and_typedef: tuple, urlsafe: bool = False, padding: bool = False) -> str:
|
||||||
|
encoded_protobuf = bbpb.encode_message(*msg_and_typedef)
|
||||||
|
if urlsafe:
|
||||||
|
b64_protobuf = base64.urlsafe_b64encode(encoded_protobuf)
|
||||||
|
else:
|
||||||
|
b64_protobuf = base64.b64encode(encoded_protobuf)
|
||||||
|
if padding:
|
||||||
|
url_encoded_b64 = urllib.parse.quote(b64_protobuf.decode())
|
||||||
|
else:
|
||||||
|
url_encoded_b64 = b64_protobuf.decode().rstrip('=')
|
||||||
|
return url_encoded_b64
|
||||||
|
|
||||||
|
def fdictToBbpb(msg: dict) -> tuple:
|
||||||
|
# Requires Python 3.7+ or CPython 3.6+,
|
||||||
|
# as these versions preserve dictionary insertion order.
|
||||||
|
# Structural matching (match, case) requires Python 3.10+.
|
||||||
|
clean_msg = {}
|
||||||
|
clean_type = {}
|
||||||
|
for key in msg:
|
||||||
|
num, type = key.split(":")
|
||||||
|
|
||||||
|
match type:
|
||||||
|
case "message":
|
||||||
|
# if the type is an embedded message
|
||||||
|
internal_msg, internal_type = fdictToBbpb(msg[key])
|
||||||
|
# msg can just be appended as usual
|
||||||
|
clean_msg[num] = internal_msg
|
||||||
|
# type contains more fields than normally
|
||||||
|
clean_type[num] = {
|
||||||
|
'field_order': list(internal_msg.keys()),
|
||||||
|
'message_typedef': internal_type,
|
||||||
|
'type': type
|
||||||
|
}
|
||||||
|
|
||||||
|
case "base64" | "base64u" | "base64p" | "base64up":
|
||||||
|
# if the type is a base64-embedded message
|
||||||
|
internal_msg, internal_type = fdictToBbpb(msg[key])
|
||||||
|
match type.removeprefix("base64"):
|
||||||
|
case "":
|
||||||
|
b64_encoded_msg = bbpbToB64((internal_msg, internal_type))
|
||||||
|
case "u":
|
||||||
|
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), urlsafe=True)
|
||||||
|
case "p":
|
||||||
|
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), padding=True)
|
||||||
|
case "up":
|
||||||
|
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), urlsafe=True, padding=True)
|
||||||
|
clean_msg[num] = b64_encoded_msg
|
||||||
|
clean_type[num] = {'type': 'string'}
|
||||||
|
|
||||||
|
case "int" | "string":
|
||||||
|
clean_msg[num] = msg[key]
|
||||||
|
clean_type[num] = {'type': type}
|
||||||
|
|
||||||
|
case _:
|
||||||
|
raise KeyError(f'error(fmsgToBBPBTuple): invalid key "{type}"')
|
||||||
|
|
||||||
|
|
||||||
|
return (clean_msg, clean_type)
|
||||||
|
|
||||||
|
def producePlaylistContinuation(plid: str, offset: int = 0) -> str:
|
||||||
|
msge = {
|
||||||
|
'80226972:message': {
|
||||||
|
'2:string': f'VL{plid}',
|
||||||
|
'3:base64': {
|
||||||
|
'1:int': int(offset / 100),
|
||||||
|
'15:string': f'PT:{bbpbToB64(fdictToBbpb({"1:int": offset}))}',
|
||||||
|
'104:message': {
|
||||||
|
'1:int': 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
'35:string': plid
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bbpb_dicts = fdictToBbpb(msge)
|
||||||
|
b64_ctoken = bbpbToB64(bbpb_dicts, urlsafe=True, padding=True)
|
||||||
|
|
||||||
|
return b64_ctoken
|
||||||
|
|
||||||
|
def produceSearchParams(page: int = 1) -> str:
|
||||||
|
msge = {
|
||||||
|
"9:int": 20 * (page - 1), # pagination
|
||||||
|
"30:int": 1 # no self-harm censorship
|
||||||
|
}
|
||||||
|
|
||||||
|
bbpb_dicts = fdictToBbpb(msge)
|
||||||
|
b64_params = bbpbToB64(bbpb_dicts, urlsafe=True, padding=True)
|
||||||
|
|
||||||
|
return b64_params
|
||||||
93
ythdd_struct_builder.py
Normal file
93
ythdd_struct_builder.py
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
from ythdd_globals import safeTraverse
|
||||||
|
import ythdd_globals
|
||||||
|
|
||||||
|
def genThumbs(videoId: str):
|
||||||
|
|
||||||
|
result = []
|
||||||
|
thumbnails = [
|
||||||
|
{'height': 720, 'width': 1280, 'quality': "maxres", 'url': "maxres"}, # will always attempt to return the best quality available
|
||||||
|
{'height': 720, 'width': 1280, 'quality': "maxresdefault", 'url': "maxresdefault"},
|
||||||
|
{'height': 480, 'width': 640, 'quality': "sddefault", 'url': "sddefault"},
|
||||||
|
{'height': 360, 'width': 480, 'quality': "high", 'url': "hqdefault"},
|
||||||
|
{'height': 180, 'width': 320, 'quality': "medium", 'url': "mqdefault"},
|
||||||
|
{'height': 90, 'width': 120, 'quality': "default", 'url': "default"},
|
||||||
|
{'height': 90, 'width': 120, 'quality': "start", 'url': "1"},
|
||||||
|
{'height': 90, 'width': 120, 'quality': "middle", 'url': "2"},
|
||||||
|
{'height': 90, 'width': 120, 'quality': "end", 'url': "3"},
|
||||||
|
]
|
||||||
|
|
||||||
|
for x in thumbnails:
|
||||||
|
width = x['width']
|
||||||
|
height = x['height']
|
||||||
|
quality = x['quality']
|
||||||
|
url = ythdd_globals.config['general']['public_facing_url'] + 'vi/' + videoId + '/' + x['url'] + '.jpg'
|
||||||
|
result.append({'quality': quality, 'url': url, 'width': width, 'height': height})
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def genStoryboards(video_id: str) -> list:
|
||||||
|
# generates storyboards inside of /api/v1/videos/:video_id
|
||||||
|
storyboards = []
|
||||||
|
cached_storyboards = safeTraverse(ythdd_globals.general_cache["storyboards"], [video_id], default=[])
|
||||||
|
for sb in cached_storyboards["formats"]:
|
||||||
|
built_storyboard = {
|
||||||
|
"url": f"/api/v1/storyboards/{video_id}?width={sb['width']}&height={sb['height']}",
|
||||||
|
"templateUrl": cached_storyboards['template_url'].replace("$L", str(sb['index'])).replace("$N", sb['name']) + f"&sigh={sb['sigh']}",
|
||||||
|
"width": sb['width'],
|
||||||
|
"height": sb['height'],
|
||||||
|
"count": sb['thumb_count'],
|
||||||
|
"interval": sb['interval'],
|
||||||
|
"storyboardWidth": sb['columns'],
|
||||||
|
"storyboardHeight": sb['rows'],
|
||||||
|
"storyboardCount": sb['images_count']
|
||||||
|
}
|
||||||
|
storyboards.append(built_storyboard)
|
||||||
|
|
||||||
|
return storyboards
|
||||||
|
|
||||||
|
def msToWebvttTimestamp(time: int):
|
||||||
|
|
||||||
|
ms = time % 1000
|
||||||
|
time //= 1000
|
||||||
|
|
||||||
|
hours = time // (60 * 60)
|
||||||
|
time -= hours * 60 * 60
|
||||||
|
minutes = time // 60
|
||||||
|
time -= minutes * 60
|
||||||
|
seconds = time
|
||||||
|
timestamp = f"{str(hours).zfill(2)}:{str(minutes).zfill(2)}:{str(seconds).zfill(2)}.{str(ms).zfill(3)}"
|
||||||
|
|
||||||
|
return timestamp
|
||||||
|
|
||||||
|
def genWebvttStoryboard(video_id: str, width: int = None, height: int = None):
|
||||||
|
# generates WebVTT storyboards for /api/v1/storyboards/:video_id
|
||||||
|
webvtt = "WEBVTT\n\n"
|
||||||
|
cached_storyboards = safeTraverse(ythdd_globals.general_cache["storyboards"], [video_id])
|
||||||
|
if cached_storyboards is None:
|
||||||
|
return ""
|
||||||
|
found_storyboard = {}
|
||||||
|
|
||||||
|
for sb in cached_storyboards["formats"]:
|
||||||
|
if width is not None and width == sb['width']:
|
||||||
|
found_storyboard = sb
|
||||||
|
if height is not None and height == sb['height']:
|
||||||
|
found_storyboard = sb
|
||||||
|
|
||||||
|
# could be changed
|
||||||
|
if not found_storyboard:
|
||||||
|
found_storyboard = cached_storyboards["formats"][0]
|
||||||
|
|
||||||
|
start = 0
|
||||||
|
thumbs_per_sb = sb['columns'] * sb['rows']
|
||||||
|
xx = 0
|
||||||
|
yy = 0
|
||||||
|
for x in range(found_storyboard["thumb_count"]):
|
||||||
|
xx = x % sb['columns']
|
||||||
|
yy = (x // sb['rows']) % sb['rows']
|
||||||
|
xywh = f"#xywh={xx * sb['width']},{yy * sb['height']},{sb['width']},{sb['height']}"
|
||||||
|
webvtt += f"{msToWebvttTimestamp(start)} --> {msToWebvttTimestamp(start + found_storyboard['interval'])}\n"
|
||||||
|
webvtt += cached_storyboards['template_url'].replace("$L", str(sb['index'])).replace("$N", sb['name']).replace("$M", str(x // (thumbs_per_sb))) + f"&sigh={sb['sigh']}{xywh}\n"
|
||||||
|
webvtt += "\n"
|
||||||
|
start += found_storyboard['interval']
|
||||||
|
|
||||||
|
return webvtt
|
||||||
618
ythdd_struct_parser.py
Normal file
618
ythdd_struct_parser.py
Normal file
@@ -0,0 +1,618 @@
|
|||||||
|
from html import escape
|
||||||
|
from invidious_formats import FORMATS
|
||||||
|
from ythdd_globals import safeTraverse
|
||||||
|
import json
|
||||||
|
import dateparser
|
||||||
|
import ythdd_globals
|
||||||
|
import ythdd_extractor
|
||||||
|
import ythdd_struct_builder
|
||||||
|
|
||||||
|
DEFAULT_AVATAR = "https://yt3.ggpht.com/a/default-user=s176-c-k-c0x00ffffff-no-rj"
|
||||||
|
|
||||||
|
def doesContainNumber(string: str, numeric_system: int = 10) -> bool:
|
||||||
|
try:
|
||||||
|
number = int(string, numeric_system)
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
raise BaseException("doesContainNumber(): Unknown error while determining if a string contains a number")
|
||||||
|
|
||||||
|
def parseLengthFromTimeBadge(time_str: str) -> int:
|
||||||
|
# Returns 0 if unsuccessful
|
||||||
|
length = 0
|
||||||
|
time_lookup_list = [1, 60, 3_600, 86_400]
|
||||||
|
time_list = time_str.split(":")
|
||||||
|
if False in map(doesContainNumber, time_list): # works around ['LIVE'] for livestreams or ['Upcoming'] for scheduled videos
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
for z in range(len(time_list)):
|
||||||
|
length += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
|
||||||
|
return length
|
||||||
|
|
||||||
|
def parseViewsFromViewText(viewcounttext: str) -> int:
|
||||||
|
# Returns 0 if unsuccessful
|
||||||
|
views = 0
|
||||||
|
magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
|
||||||
|
if viewcounttext:
|
||||||
|
if viewcounttext.lower() == "no":
|
||||||
|
viewcounttext = "0"
|
||||||
|
views = float("0" + "".join([z for z in viewcounttext if 48 <= ord(z) and ord(z) <= 57 or ord(z) == 46]))
|
||||||
|
viewcounttext = viewcounttext.split(" ")[0]
|
||||||
|
for x in magnitude.keys():
|
||||||
|
if x == viewcounttext[-1].upper():
|
||||||
|
views *= magnitude[x]
|
||||||
|
return int(views)
|
||||||
|
|
||||||
|
def parseRenderers(entry: dict, context: dict = {}) -> dict:
|
||||||
|
|
||||||
|
if not isinstance(entry, dict):
|
||||||
|
raise ValueError("parsed entry is not of type dict")
|
||||||
|
|
||||||
|
match safeTraverse(list(entry.keys()), [0], default=""):
|
||||||
|
|
||||||
|
case "videoRenderer": # represents a video
|
||||||
|
# as of october 2025 slowly phased out in favor of lockupViewModel(?)
|
||||||
|
|
||||||
|
published_date = safeTraverse(entry, ["videoRenderer", "publishedTimeText", "simpleText"], default="now")
|
||||||
|
published_date = published_date.removeprefix("Streamed ")
|
||||||
|
description, description_html = parseDescriptionSnippet(safeTraverse(entry, ["videoRenderer", "descriptionSnippet", "runs"], default=[]))
|
||||||
|
collaborative = False
|
||||||
|
|
||||||
|
if "author_name" in context:
|
||||||
|
author_name = context["author_name"]
|
||||||
|
else:
|
||||||
|
author_name = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "text"], default="Unknown author")
|
||||||
|
|
||||||
|
if "author_ucid" in context:
|
||||||
|
author_ucid = context["author_ucid"]
|
||||||
|
else:
|
||||||
|
author_ucid = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
||||||
|
if author_ucid == "UNKNOWNCHANNELID":
|
||||||
|
# this is a first indicator that a video is a collaborative (has multiple authors)
|
||||||
|
# if that's the case, let's take the first author's ucid as the ucid
|
||||||
|
collaborative = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "header", "dialogHeaderViewModel", "headline", "content"]) == "Collaborators"
|
||||||
|
|
||||||
|
if "verified" in context:
|
||||||
|
verified = context["verified"]
|
||||||
|
else:
|
||||||
|
verified = ythdd_extractor.isVerified(safeTraverse(entry, ["videoRenderer", "ownerBadges", 0]))
|
||||||
|
|
||||||
|
if "avatar" in context:
|
||||||
|
avatar_url = context["avatar"]
|
||||||
|
else:
|
||||||
|
avatar_url = safeTraverse(entry, ["videoRenderer", "avatar", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
|
||||||
|
|
||||||
|
views_or_viewers_model = safeTraverse(entry, ["videoRenderer", "viewCountText"], default={})
|
||||||
|
if "simpleText" in views_or_viewers_model:
|
||||||
|
# means this is a video with X views
|
||||||
|
view_count = parseViewsFromViewText(entry["videoRenderer"]["viewCountText"]["simpleText"])
|
||||||
|
view_count_text = entry["videoRenderer"]["viewCountText"]["simpleText"]
|
||||||
|
elif "runs" in views_or_viewers_model:
|
||||||
|
# means this is a livestream with X concurrent viewers
|
||||||
|
view_count = parseViewsFromViewText(entry["videoRenderer"]["viewCountText"]["runs"][0]["text"] + " watching")
|
||||||
|
view_count_text = entry["videoRenderer"]["viewCountText"]["runs"][0]["text"] + " watching"
|
||||||
|
else:
|
||||||
|
# unknown model, assume no views
|
||||||
|
view_count = 0
|
||||||
|
view_count_text = "Unknown amount of views"
|
||||||
|
|
||||||
|
if collaborative:
|
||||||
|
livm = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "customContent", "listViewModel", "listItems"], default=[])
|
||||||
|
if "author_name" not in context:
|
||||||
|
# override the default "name1 and others" or "name1 and name2" text
|
||||||
|
# with full author info
|
||||||
|
all_authors = []
|
||||||
|
for collaborative_author in livm:
|
||||||
|
collaborative_author_name = safeTraverse(collaborative_author, ["listItemViewModel", "title", "content"])
|
||||||
|
if collaborative_author_name is not None:
|
||||||
|
all_authors.append(collaborative_author_name)
|
||||||
|
if all_authors != []: # check if custom extraction succeeded
|
||||||
|
author_name = ", ".join(all_authors)
|
||||||
|
if author_ucid == "UNKNOWNCHANNELID":
|
||||||
|
# retrieve main author's ucid
|
||||||
|
author_ucid = safeTraverse(livm, [0, "listItemViewModel", "title", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
||||||
|
if safeTraverse(entry, ["videoRenderer", "ownerBadges", 0]) is None:
|
||||||
|
# check if the main author is verified
|
||||||
|
verified = False
|
||||||
|
if safeTraverse(livm, [0, "listItemViewModel", "title", "attachmentRuns", 0, "element", "type", "imageType", "image", "sources", 0, "clientResource", "imageName"]) in ("AUDIO_BADGE", "CHECK_CIRCLE_FILLED"):
|
||||||
|
verified = True
|
||||||
|
if avatar_url == DEFAULT_AVATAR:
|
||||||
|
# retrieve the main channel's avatar
|
||||||
|
avatar_url = safeTraverse(livm, [0, "listItemViewModel", "leadingAccessory", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
|
||||||
|
|
||||||
|
ythdd_globals.print_debug("videoRenderer fired")
|
||||||
|
return {
|
||||||
|
"type": "video",
|
||||||
|
"title": safeTraverse(entry, ["videoRenderer", "title", "runs", 0, "text"]),
|
||||||
|
"videoId": safeTraverse(entry, ["videoRenderer", "videoId"]),
|
||||||
|
"author": author_name,
|
||||||
|
"authorId": author_ucid,
|
||||||
|
"authorUrl": "/channel/" + author_ucid,
|
||||||
|
"authorVerified": verified, # TODO
|
||||||
|
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url),
|
||||||
|
"videoThumbnails": ythdd_struct_builder.genThumbs(safeTraverse(entry, ["videoRenderer", "videoId"], default="unknown")),
|
||||||
|
"description": description,
|
||||||
|
"descriptionHtml": description_html,
|
||||||
|
"viewCount": view_count,
|
||||||
|
"viewCountText": view_count_text,
|
||||||
|
"published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too
|
||||||
|
"publishedText": published_date,
|
||||||
|
"lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["videoRenderer", "lengthText", "simpleText"], default="0:0")),
|
||||||
|
"liveNow": False,
|
||||||
|
"premium": ythdd_extractor.isPremium(safeTraverse(entry, ["videoRenderer", "badges", 0])), # will fail if it's not the only badge
|
||||||
|
"isUpcoming": False,
|
||||||
|
"isNew": False,
|
||||||
|
"is4k": False,
|
||||||
|
"is8k": False,
|
||||||
|
"isVr180": False,
|
||||||
|
"isVr360": False,
|
||||||
|
"is3d": False,
|
||||||
|
"hasCaptions": False
|
||||||
|
}
|
||||||
|
|
||||||
|
# modify the premiere timestamp afterwards here?
|
||||||
|
|
||||||
|
case "lockupViewModel": # represents playlists/mixes (and videos since october 2025)
|
||||||
|
# related videos lvms are handled in ythdd_inv_tl.videos()
|
||||||
|
|
||||||
|
lvm = entry["lockupViewModel"]
|
||||||
|
playlist_type = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "icon", "sources", 0, "clientResource", "imageName"], default="")
|
||||||
|
|
||||||
|
if playlist_type == "MIX":
|
||||||
|
# mixes aren't currently supported
|
||||||
|
return
|
||||||
|
|
||||||
|
if not playlist_type:
|
||||||
|
# struct represents a video
|
||||||
|
ythdd_globals.print_debug("lockupViewModel fired (not a playlist). this is an a/b test; any following errors stem from it.")
|
||||||
|
|
||||||
|
lmvm = safeTraverse(lvm, ['metadata', 'lockupMetadataViewModel'], default={})
|
||||||
|
video_id = safeTraverse(lvm, ['contentId'])
|
||||||
|
|
||||||
|
author_name = safeTraverse(context, ["author_name"], default="Unknown author")
|
||||||
|
author_ucid = safeTraverse(context, ["author_ucid"], default="UNKNOWNCHANNELID")
|
||||||
|
verified = safeTraverse(context, ["verified"], default=False) # TODO: check if this can be retrieved here
|
||||||
|
avatar_url = safeTraverse(context, ["avatar"], default=DEFAULT_AVATAR)
|
||||||
|
|
||||||
|
title = safeTraverse(lmvm, ["title", "content"], default="No title")
|
||||||
|
video_metadata = safeTraverse(lmvm, ["metadata", "contentMetadataViewModel", "metadataRows", 0, "metadataParts"], default=[])
|
||||||
|
view_count_text = safeTraverse(video_metadata, [0, "text", "content"], default="0 views")
|
||||||
|
published_date = safeTraverse(video_metadata, [1, "text", "content"], default="now")
|
||||||
|
length_text = safeTraverse(lvm, ["contentImage", "thumbnailViewModel", "overlays", ..., "thumbnailBottomOverlayViewModel", "badges", -1, "thumbnailBadgeViewModel", "text"], default="0:0")
|
||||||
|
view_count = parseViewsFromViewText(view_count_text)
|
||||||
|
length = parseLengthFromTimeBadge(length_text)
|
||||||
|
|
||||||
|
resp = {
|
||||||
|
"type": "video",
|
||||||
|
"title": title,
|
||||||
|
"videoId": video_id,
|
||||||
|
"author": author_name,
|
||||||
|
"authorId": author_ucid,
|
||||||
|
"authorUrl": "/channel/" + author_ucid,
|
||||||
|
"authorVerified": verified, # TODO
|
||||||
|
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url),
|
||||||
|
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
|
||||||
|
"description": "", # can't be retrieved from lockupViewModel
|
||||||
|
"descriptionHtml": "",
|
||||||
|
"viewCount": view_count,
|
||||||
|
"viewCountText": view_count_text,
|
||||||
|
"published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too
|
||||||
|
"publishedText": published_date,
|
||||||
|
"lengthSeconds": length,
|
||||||
|
"liveNow": False, # can't be live if it's in creator's video feed
|
||||||
|
"premium": False, # todo: check this
|
||||||
|
"isUpcoming": False,
|
||||||
|
"isNew": False,
|
||||||
|
"is4k": False,
|
||||||
|
"is8k": False,
|
||||||
|
"isVr180": False,
|
||||||
|
"isVr360": False,
|
||||||
|
"is3d": False,
|
||||||
|
"hasCaptions": False
|
||||||
|
}
|
||||||
|
return resp
|
||||||
|
|
||||||
|
# struct represents a playlist
|
||||||
|
meta = safeTraverse(lvm, ["metadata"], default=[])
|
||||||
|
lmvm = safeTraverse(meta, ["lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows"], default=[])
|
||||||
|
thumbnail = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", -1, "url"], default="no-url?")
|
||||||
|
thumbnail = ythdd_globals.translateLinks(thumbnail[:thumbnail.rfind("?")])
|
||||||
|
verified = safeTraverse(context, ["verified"], default=False)
|
||||||
|
|
||||||
|
playlist_id = safeTraverse(lvm, ["contentId"], default="UNKNOWNPLAYLISTID")
|
||||||
|
length = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="0 videos")
|
||||||
|
length = parseViewsFromViewText(length.split(" ")[0])
|
||||||
|
|
||||||
|
# Turns out for some responses we do have some data, while not on others.
|
||||||
|
# Data from context should be prioritized, thus even if something is found with safeTraverse,
|
||||||
|
# the parser will ignore it in favour of the context.
|
||||||
|
ucid = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
||||||
|
author = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "content"], default="ythdd: unknown author")
|
||||||
|
ucid = safeTraverse(context, ["author_ucid"], default=ucid)
|
||||||
|
author = safeTraverse(context, ["author_name"], default=author)
|
||||||
|
|
||||||
|
ythdd_globals.print_debug("lockupViewModel fired (playlist)")
|
||||||
|
return {
|
||||||
|
"type": "playlist",
|
||||||
|
"title": safeTraverse(meta, ["lockupMetadataViewModel", "title", "content"], default="ythdd: unknown title"),
|
||||||
|
"playlistId": playlist_id,
|
||||||
|
"playlistThumbnail": thumbnail,
|
||||||
|
"author": author,
|
||||||
|
"authorId": ucid,
|
||||||
|
"authorUrl": "/channel/" + ucid,
|
||||||
|
"authorVerified": verified,
|
||||||
|
"videoCount": length,
|
||||||
|
"videos": [] # provided for historical reasons i guess
|
||||||
|
}
|
||||||
|
|
||||||
|
case "shelfRenderer": # "people also watched"
|
||||||
|
return
|
||||||
|
|
||||||
|
case "gridShelfViewModel": # shorts?
|
||||||
|
return
|
||||||
|
|
||||||
|
case "shortsLockupViewModel": # shorts on channel pages
|
||||||
|
|
||||||
|
video_id = safeTraverse(entry, ["shortsLockupViewModel", "onTap", "innertubeCommand", "reelWatchEndpoint", "videoId"], default="UnknownVideoId")
|
||||||
|
title = safeTraverse(entry, ["shortsLockupViewModel", "overlayMetadata", "primaryText", "content"], default="ythdd: couldn't find title")
|
||||||
|
views_text = safeTraverse(entry, ["shortsLockupViewModel", "overlayMetadata", "secondaryText", "content"], default="No views")
|
||||||
|
|
||||||
|
published_date = "No data about published time" # the view model doesn't provide data about the date a short is published
|
||||||
|
|
||||||
|
if video_id == "UnknownVideoId": # failsafe
|
||||||
|
video_id = safeTraverse(entry, ["shortsLockupViewModel", "entityId"], default="-UnknownVideoId")
|
||||||
|
video_id = video_id[video_id.rfind("-") + 1:]
|
||||||
|
|
||||||
|
if "author_name" in context:
|
||||||
|
author_name = context["author_name"]
|
||||||
|
else:
|
||||||
|
author_name = "Unknown author"
|
||||||
|
|
||||||
|
if "author_ucid" in context:
|
||||||
|
author_ucid = context["author_ucid"]
|
||||||
|
else:
|
||||||
|
author_ucid = "UNKNOWNCHANNELID"
|
||||||
|
|
||||||
|
if "verified" in context:
|
||||||
|
verified = context["verified"]
|
||||||
|
else:
|
||||||
|
verified = False
|
||||||
|
|
||||||
|
if "avatar" in context:
|
||||||
|
avatar_url = context["avatar"]
|
||||||
|
else:
|
||||||
|
avatar_url = "unknown"
|
||||||
|
|
||||||
|
ythdd_globals.print_debug("shortsLockupViewModel fired")
|
||||||
|
return {
|
||||||
|
"type": "video",
|
||||||
|
"title": title,
|
||||||
|
"videoId": video_id,
|
||||||
|
"author": author_name,
|
||||||
|
"authorId": author_ucid,
|
||||||
|
"authorUrl": "/channel/" + author_ucid,
|
||||||
|
"authorVerified": False,
|
||||||
|
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
|
||||||
|
"description": "",
|
||||||
|
"descriptionHtml": "",
|
||||||
|
"viewCount": parseViewsFromViewText(views_text),
|
||||||
|
"viewCountText": views_text,
|
||||||
|
"published": int(0),
|
||||||
|
"publishedText": published_date,
|
||||||
|
"lengthSeconds": int(60), # invidious locks this to 60s no matter what the actual duration is
|
||||||
|
"liveNow": False,
|
||||||
|
"premium": False,
|
||||||
|
"isUpcoming": False,
|
||||||
|
"premiereTimestamp": 0,
|
||||||
|
"isNew": False,
|
||||||
|
"is4k": False,
|
||||||
|
"is8k": False,
|
||||||
|
"isVr180": False,
|
||||||
|
"isVr360": False,
|
||||||
|
"is3d": False,
|
||||||
|
"hasCaptions": False
|
||||||
|
}
|
||||||
|
|
||||||
|
case "gridVideoRenderer": # videos on channel pages
|
||||||
|
|
||||||
|
# doesn't work on Yattee
|
||||||
|
# thumbnails = safeTraverse(entry, ["gridVideoRenderer", "thumbnail", "thumbnails"], default=[])
|
||||||
|
# for thumbnail in thumbnails:
|
||||||
|
# thumbnail["url"] = ythdd_globals.translateLinks(thumbnail["url"])
|
||||||
|
|
||||||
|
video_id = safeTraverse(entry, ["gridVideoRenderer", "videoId"], default="UnknownVideoId")
|
||||||
|
thumbnails = ythdd_struct_builder.genThumbs(video_id)
|
||||||
|
|
||||||
|
published_date = safeTraverse(entry, ["gridVideoRenderer", "publishedTimeText", "simpleText"], default="now")
|
||||||
|
published_date = published_date.removeprefix("Streamed ")
|
||||||
|
|
||||||
|
ythdd_globals.print_debug("gridVideoRenderer fired")
|
||||||
|
return {
|
||||||
|
"type": "video",
|
||||||
|
"title": safeTraverse(entry, ["gridVideoRenderer", "title", "simpleText"], default="unknown video title"),
|
||||||
|
"videoId": video_id,
|
||||||
|
"author": context["author_name"],
|
||||||
|
"authorId": context["author_ucid"],
|
||||||
|
"authorUrl": "/channel/" + context["author_ucid"],
|
||||||
|
"authorVerified": False, # TODO: handle badge related tasks here using context
|
||||||
|
"videoThumbnails": thumbnails,
|
||||||
|
"description": "", # won't work without using an RSS feed (?)
|
||||||
|
"descriptionHtml": "", # -||-
|
||||||
|
"viewCount": parseViewsFromViewText(safeTraverse(entry, ["gridVideoRenderer", "viewCountText", "simpleText"], default="0 views")),
|
||||||
|
"viewCountText": safeTraverse(entry, ["gridVideoRenderer", "shortViewCountText", "simpleText"], default="0 views"),
|
||||||
|
"published": int(dateparser.parse(published_date).timestamp()),
|
||||||
|
"publishedText": published_date,
|
||||||
|
"lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["gridVideoRenderer", "thumbnailOverlays", 0, "thumbnailOverlayTimeStatusRenderer", "text", "simpleText"], default="0:0")),
|
||||||
|
"liveNow": True if published_date == "now" else False,
|
||||||
|
"premium": False,
|
||||||
|
"isUpcoming": False,
|
||||||
|
"isNew": False,
|
||||||
|
"is4k": False,
|
||||||
|
"is8k": False,
|
||||||
|
"isVr180": False,
|
||||||
|
"isVr360": False,
|
||||||
|
"is3d": False,
|
||||||
|
"hasCaptions": False
|
||||||
|
}
|
||||||
|
|
||||||
|
case "channelRenderer": # channels in search results
|
||||||
|
|
||||||
|
avatars = ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(entry, ["channelRenderer", "thumbnail", "thumbnails", 0, "url"], default=DEFAULT_AVATAR))
|
||||||
|
description, description_html = parseDescriptionSnippet(safeTraverse(entry, ["channelRenderer", "descriptionSnippet", "runs"], default=[]))
|
||||||
|
isVerified = ythdd_extractor.isVerified(safeTraverse(entry, ["channelRenderer", "ownerBadges", 0], default=[]))
|
||||||
|
|
||||||
|
ythdd_globals.print_debug("channelRenderer fired")
|
||||||
|
return {
|
||||||
|
"type": "channel",
|
||||||
|
"author": safeTraverse(entry, ["channelRenderer", "title", "simpleText"], default="Unknown channel"),
|
||||||
|
"authorId": safeTraverse(entry, ["channelRenderer", "channelId"], default="UNKNOWNCHANNELID"),
|
||||||
|
"authorUrl": "/channel/" + safeTraverse(entry, ["channelRenderer", "channelId"], default="UNKNOWNCHANNELID"),
|
||||||
|
"authorVerified": isVerified,
|
||||||
|
"authorThumbnails": avatars,
|
||||||
|
"autoGenerated": False,
|
||||||
|
"subCount": parseViewsFromViewText(safeTraverse(entry, ["channelRenderer", "videoCountText", "simpleText"], default="0 subscribers")),
|
||||||
|
"videoCount": 0,
|
||||||
|
"channelHandle": safeTraverse(entry, ["channelRenderer", "navigationEndpoint", "browseEndpoint", "canonicalBaseUrl"], default="/@ythdd_unknown_handle")[1:],
|
||||||
|
"description": description,
|
||||||
|
"descriptionHtml": description_html
|
||||||
|
}
|
||||||
|
|
||||||
|
case "playlistVideoRenderer":
|
||||||
|
# used by all content inside of playlists which have at least one non-shorts video/livestream
|
||||||
|
|
||||||
|
video_id = safeTraverse(entry, ["playlistVideoRenderer", "videoId"], default="UnknownVideoId")
|
||||||
|
title = safeTraverse(entry, ["playlistVideoRenderer", "title", "runs", 0, "text"], default="Unknown video title")
|
||||||
|
author_ucid = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"])
|
||||||
|
author_name = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "text"], default="Unknown author")
|
||||||
|
video_index = int(safeTraverse(entry, ["playlistVideoRenderer", "index", "simpleText"], default="1")) - 1
|
||||||
|
length = parseLengthFromTimeBadge(safeTraverse(entry, ["playlistVideoRenderer", "lengthText", "simpleText"], default="0:0"))
|
||||||
|
published_date = safeTraverse(entry, ["playlistVideoRenderer", "videoInfo", "runs", -1, "text"], default="2000-01-01")
|
||||||
|
published_date = published_date.removeprefix("Streamed ").removeprefix(" watching")
|
||||||
|
|
||||||
|
# handle livestreams
|
||||||
|
if not published_date:
|
||||||
|
published_date = "now"
|
||||||
|
|
||||||
|
if author_ucid is None:
|
||||||
|
# likely a collaborative video, let's try
|
||||||
|
# to fetch the uploader's ucid with that in mind
|
||||||
|
livm = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "customContent", "listViewModel", "listItems"], default=[])
|
||||||
|
# name extraction logic the same as in videoRenderer
|
||||||
|
all_authors = []
|
||||||
|
for collaborative_author in livm:
|
||||||
|
collaborative_author_name = safeTraverse(collaborative_author, ["listItemViewModel", "title", "content"])
|
||||||
|
if collaborative_author_name is not None:
|
||||||
|
all_authors.append(collaborative_author_name)
|
||||||
|
if all_authors != []:
|
||||||
|
author_name = ", ".join(all_authors)
|
||||||
|
author_ucid = safeTraverse(livm, [0, "listItemViewModel", "title", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
||||||
|
|
||||||
|
# surprisingly, innertube responds with the avatar of the user that added the video to the playlist
|
||||||
|
# we can extract that information, e.g. for yattee to display
|
||||||
|
avatar_url = safeTraverse(entry, ["playlistVideoRenderer", "thumbnailOverlays", ..., "thumbnailOverlayAvatarStackViewModel", "avatarStack", "avatarStackViewModel", "avatars", 0, "avatarViewModel", "image", "sources", 0, "url"])
|
||||||
|
avatars = None if avatar_url is None else ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url)
|
||||||
|
|
||||||
|
ythdd_globals.print_debug("playlistVideoRenderer fired")
|
||||||
|
return {
|
||||||
|
"type": "video",
|
||||||
|
"title": title,
|
||||||
|
"videoId": video_id,
|
||||||
|
"author": author_name,
|
||||||
|
"authorId": author_ucid,
|
||||||
|
"authorUrl": "/channel/" + author_ucid,
|
||||||
|
"authorThumbnails": avatars,
|
||||||
|
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
|
||||||
|
"index": video_index,
|
||||||
|
"lengthSeconds": length,
|
||||||
|
"liveNow": False, # todo: check this?
|
||||||
|
# these do not need to be returned, but some clients try to read it
|
||||||
|
# so we return an approximation here:
|
||||||
|
"published": int(dateparser.parse(published_date).timestamp()),
|
||||||
|
"publishedText": published_date
|
||||||
|
}
|
||||||
|
|
||||||
|
case _:
|
||||||
|
print("received an entry of unknown type (thus can't be parsed):")
|
||||||
|
print(entry)
|
||||||
|
print("")
|
||||||
|
# breakpoint()
|
||||||
|
return
|
||||||
|
|
||||||
|
def customCommentRendererParser(comment: dict, context: dict = {}) -> dict:
|
||||||
|
|
||||||
|
cep = safeTraverse(comment, ["payload", "commentEntityPayload"], default={})
|
||||||
|
content = safeTraverse(cep, ["properties", "content", "content"], default="")
|
||||||
|
content_html = escape(content).replace("\r\n", "<br>").replace("\n", "<br>")
|
||||||
|
author = safeTraverse(cep, ["author"], default={})
|
||||||
|
verified = safeTraverse(author, ["isVerified"], default=False) or safeTraverse(author, ["isArtist"], default=False)
|
||||||
|
ucid = safeTraverse(author, ["channelId"], default="UNKNOWNCHANNELID")
|
||||||
|
published_date = safeTraverse(cep, ["properties", "publishedTime"], default="now")
|
||||||
|
edited = False
|
||||||
|
|
||||||
|
if published_date.endswith(" (edited)"):
|
||||||
|
edited = True
|
||||||
|
published_date_unix = int(dateparser.parse(published_date.removesuffix(" (edited)")).timestamp())
|
||||||
|
else:
|
||||||
|
published_date_unix = int(dateparser.parse(published_date).timestamp())
|
||||||
|
|
||||||
|
inv_comment = {
|
||||||
|
"authorId": ucid,
|
||||||
|
"authorUrl": "/channel/" + ucid,
|
||||||
|
"author": safeTraverse(author, ["displayName"], default="@ythdd-unknown-user"),
|
||||||
|
"verified": verified,
|
||||||
|
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(author, ["avatarThumbnailUrl"], default=DEFAULT_AVATAR)), # proxy them!
|
||||||
|
"authorIsChannelOwner": safeTraverse(author, ["isCreator"], default=False), # ???
|
||||||
|
"isSponsor": False, # not sure how to retrieve this
|
||||||
|
"likeCount": parseViewsFromViewText("0" + safeTraverse(cep, ["toolbar", "likeCountNotliked"], default="0") + " likes"),
|
||||||
|
"isPinned": False,
|
||||||
|
"commentId": safeTraverse(cep, ["properties", "commentId"], default="UNKNOWNCOMMENTID"),
|
||||||
|
"content": content,
|
||||||
|
"contentHtml": content_html,
|
||||||
|
"isEdited": edited,
|
||||||
|
"published": published_date_unix,
|
||||||
|
"publishedText": published_date if published_date != "now" else "unknown amount of time ago"
|
||||||
|
}
|
||||||
|
|
||||||
|
if "replies" in comment:
|
||||||
|
inv_comment["replies"] = comment["replies"]
|
||||||
|
|
||||||
|
return inv_comment
|
||||||
|
|
||||||
|
def parseDescriptionSnippet(snippet: list):
|
||||||
|
|
||||||
|
text = ""
|
||||||
|
text_html = ""
|
||||||
|
for entry in snippet:
|
||||||
|
text += entry["text"]
|
||||||
|
if "bold" in entry: # is checking entry["bold"] == True necessary?
|
||||||
|
text_html += "<b>" + entry["text"] + "</b>"
|
||||||
|
else:
|
||||||
|
text_html += entry["text"]
|
||||||
|
text_html = escape(text_html).replace("\r\n", "<br>").replace("\n", "<br>")
|
||||||
|
|
||||||
|
return text, text_html
|
||||||
|
|
||||||
|
def runsToText(runs: list, default: str = "") -> str:
|
||||||
|
# "default" will be returned when text extraction fails.
|
||||||
|
extracted_text = ""
|
||||||
|
|
||||||
|
for field in runs:
|
||||||
|
extracted_text += safeTraverse(field, ["text"], default="")
|
||||||
|
|
||||||
|
if extracted_text:
|
||||||
|
return extracted_text
|
||||||
|
|
||||||
|
return default
|
||||||
|
|
||||||
|
def extractTextFromSimpleOrRuns(obj: dict, default: str = "") -> str:
|
||||||
|
# Extracts the text both from "runs" and "simpleText"
|
||||||
|
# with failsafe to default.
|
||||||
|
text = default
|
||||||
|
if not isinstance(obj, dict):
|
||||||
|
return default
|
||||||
|
if "runs" in obj:
|
||||||
|
text = runsToText(obj["runs"])
|
||||||
|
elif "simpleText" in obj:
|
||||||
|
text = obj["simpleText"]
|
||||||
|
else:
|
||||||
|
print(f"error(extractTextFromSimpleOrRuns): text extraction failed for {obj}")
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
|
def findNearestResolution(width: int, height: int) -> int:
|
||||||
|
# Finds the nearest standard resolution (one of 144p, 240p, ...)
|
||||||
|
# So far only used for Yattee, as it has trouble playing anything
|
||||||
|
# without one of the standard resolutions. Playback on other
|
||||||
|
# clients is unaffected.
|
||||||
|
|
||||||
|
# failsafe behaviour
|
||||||
|
try:
|
||||||
|
width = int(width)
|
||||||
|
height = int(height)
|
||||||
|
res = min(width, height)
|
||||||
|
except:
|
||||||
|
return 360
|
||||||
|
|
||||||
|
standard_resolutions = [144, 240, 360, 720, 1080, 2160, 4320]
|
||||||
|
if res in standard_resolutions:
|
||||||
|
return res
|
||||||
|
|
||||||
|
# calculate relative distance to one of the standard resolutions
|
||||||
|
res_normalized = [abs(1 - (x / res)) for x in standard_resolutions]
|
||||||
|
# pick the one where the distance is the smallest
|
||||||
|
target_index = res_normalized.index(min(res_normalized))
|
||||||
|
target_res = standard_resolutions[target_index]
|
||||||
|
|
||||||
|
return target_res
|
||||||
|
|
||||||
|
def parseFormatStreams(wdata_fstream: dict, ydata_stream: dict) -> dict:
|
||||||
|
|
||||||
|
try:
|
||||||
|
stream_url = ydata_stream["url"]
|
||||||
|
except:
|
||||||
|
ythdd_globals.print_debug( "could not extract format stream URL from yt-dlp response:")
|
||||||
|
ythdd_globals.print_debug(f"wdata: {wdata_fstream}")
|
||||||
|
ythdd_globals.print_debug(f"ydata: {ydata_stream}")
|
||||||
|
|
||||||
|
fstream = {
|
||||||
|
"url": stream_url,
|
||||||
|
"itag": str(wdata_fstream["itag"]),
|
||||||
|
"type": wdata_fstream["mimeType"],
|
||||||
|
"quality": wdata_fstream["quality"],
|
||||||
|
"bitrate": str(wdata_fstream["bitrate"]),
|
||||||
|
"fps": wdata_fstream["fps"],
|
||||||
|
"size": f"{wdata_fstream['width']}x{wdata_fstream['height']}",
|
||||||
|
"resolution": f"{findNearestResolution(wdata_fstream['width'], wdata_fstream['height'])}p", # possibly not really needed here
|
||||||
|
"qualityLabel": wdata_fstream["qualityLabel"],
|
||||||
|
"container": safeTraverse(FORMATS.get(wdata_fstream["itag"]), [ "ext"], default="mp4"), # invidious_formats
|
||||||
|
"encoding": safeTraverse(FORMATS.get(wdata_fstream["itag"]), ["vcodec"], default="mp4") # invidious_formats
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
return fstream
|
||||||
|
|
||||||
|
def parseAdaptiveStreams(wdata_astream: dict, ydata_stream: dict) -> dict:
|
||||||
|
|
||||||
|
try:
|
||||||
|
stream_url = ydata_stream["url"]
|
||||||
|
except:
|
||||||
|
ythdd_globals.print_debug( "could not extract adaptive stream URL from yt-dlp response:")
|
||||||
|
ythdd_globals.print_debug(f"wdata: {wdata_fstream}")
|
||||||
|
ythdd_globals.print_debug(f"ydata: {ydata_stream}")
|
||||||
|
|
||||||
|
astream_common = {
|
||||||
|
"init": f"{wdata_astream[ 'initRange']['start']}-{wdata_astream[ 'initRange']['end']}",
|
||||||
|
"index": f"{wdata_astream['indexRange']['start']}-{wdata_astream['indexRange']['end']}",
|
||||||
|
"bitrate": str(wdata_astream["bitrate"]),
|
||||||
|
"url": stream_url,
|
||||||
|
"itag": str(wdata_astream["itag"]),
|
||||||
|
"type": wdata_astream["mimeType"],
|
||||||
|
"clen": wdata_astream["contentLength"],
|
||||||
|
"lmt": wdata_astream["lastModified"],
|
||||||
|
"projectionType": wdata_astream["projectionType"],
|
||||||
|
"container": safeTraverse(FORMATS.get(wdata_astream["itag"]), [ "ext"], default="mp4"), # invidious_formats
|
||||||
|
"encoding": safeTraverse(FORMATS.get(wdata_astream["itag"]), ["vcodec"], default="mp4") # invidious_formats
|
||||||
|
}
|
||||||
|
|
||||||
|
isVideo = True
|
||||||
|
if "audioQuality" in wdata_astream:
|
||||||
|
isVideo = False
|
||||||
|
|
||||||
|
if isVideo:
|
||||||
|
astream = astream_common
|
||||||
|
# video-specific metadata
|
||||||
|
astream["fps"] = wdata_astream["fps"]
|
||||||
|
astream["size"] = f"{wdata_astream['width']}x{wdata_astream['height']}"
|
||||||
|
astream["resolution"] = f"{findNearestResolution(wdata_astream['width'], wdata_astream['height'])}p"
|
||||||
|
astream["qualityLabel"] = wdata_astream["qualityLabel"]
|
||||||
|
astream["colorInfo"] = safeTraverse(wdata_astream, ["colorInfo"])
|
||||||
|
else:
|
||||||
|
astream = astream_common
|
||||||
|
# audio-specific metadata
|
||||||
|
astream["encoding"] = safeTraverse(FORMATS.get(wdata_astream["itag"]), ["acodec"], default="mp4")
|
||||||
|
astream["audioQuality"] = wdata_astream["audioQuality"],
|
||||||
|
astream["audioSampleRate"] = int(wdata_astream["audioSampleRate"]),
|
||||||
|
astream["audioChannels"] = wdata_astream["audioChannels"]
|
||||||
|
|
||||||
|
return astream
|
||||||
|
|
||||||
Reference in New Issue
Block a user