Compare commits

...

35 Commits

Author SHA1 Message Date
56af1f0735 fix: adjust initialData extraction for new a/b change 2025-12-29 23:32:06 +01:00
72266aad0f chore: remove unnecessary string from age-restricted videos' description 2025-11-22 21:42:47 +01:00
900cc92229 fix: play other formats than 16x9 on yattee
reports factually wrong, but close enough resolution. to be removed
when this will be fixed on yattee's end.
2025-11-21 22:38:43 +01:00
2687cc2bdc hotfix: fix typo 2025-11-21 10:15:37 +01:00
4a9d59c9b4 fix: filter out DRC and VB audio from wdata 2025-11-21 09:59:22 +01:00
4af581ab7c fix: use acodec for audio streams instead of vcodec
should fix some issues with more strict DASH players
2025-11-21 09:44:30 +01:00
d1f381220d fix: use exclusively tv player for age-restricted videos
also filter out DRC audio
2025-11-20 17:54:41 +01:00
eebf434f3e feat: support age-restricted videos when cookies are provided 2025-11-20 13:02:38 +01:00
c979c97077 feat: allow debugging with pdb
allows user to debug the webapp when admin API key is passed as a param.
also - an anniversary, 100th commit!
2025-11-13 11:28:40 +01:00
11c94c757e fix: don't show playlists in related videos feed
items in related feed are now checked by contentType to determine if
they are videos or not
2025-11-07 18:45:11 +01:00
4421e68d9d fix: format stream itag is now a string (as it should)
fixes playback on clipious
2025-11-07 18:43:07 +01:00
da2daab16a feat: preliminary DASH support (for freetube/yattee, no manifest)
rewrites struct returned by innertube with video url retrieved by yt-dlp
works on freetube with proxying disabled; enabling proxying causes
horrible performance. yattee seems unaffected.
2025-11-06 21:53:32 +01:00
05b81e55da chore: bump version strings 2025-11-02 04:07:41 +01:00
158dcc3b7f fix: comment reply count extraction
suspected a/b test, sorting comments with "top" option returns a new
ctoken which will return "Replies" text for comments without replies.
to get rid of this behavior it'd be best to create known good ctokens
for every request.
2025-10-19 23:32:12 +02:00
668e8c32aa feat: support for lockupViewModel inside of channels' video feed
this is rare and currently a/b tested
2025-10-18 14:39:55 +02:00
760aaccfff feat: add debug printing
allows to print messages to stdout only if the instance has debugging
enabled
2025-10-18 14:23:08 +02:00
da54bd0818 chore: bump version strings 2025-10-15 01:17:55 +02:00
b0845d723a feat: storyboard generation (json, webvtt) and proxy
adds support for video storyboard extraction, generation and proxying
2025-10-15 00:03:45 +02:00
c760104d70 fix: support md5 url rewriting for proxied instances
depends on public_facing_url instead of perceived request url (which
can, and will be different for proxied instances)
2025-10-12 19:10:28 +02:00
d6cb0fe692 feat: support for md5 videoplayback url shortening
md5(url) can be supplied as md5 param to shorten videoplayback url
2025-10-12 18:52:38 +02:00
81fba8c4d0 fix: handle forwarded requests
proxied requests are now counted correctly
2025-10-05 23:42:23 +02:00
260039c307 hotfix: make the chunk size an integer in videoplayback proxy 2025-10-05 20:03:43 +02:00
2b24fc2906 feat: videoplayback proxying
adds support for proxying videos through the instance
the support is configurable, and disabled by default
2025-10-05 19:59:23 +02:00
002e3cba33 hotfix: glaring mistake while parsing text runs 2025-10-05 18:15:56 +02:00
5944fd2458 feat: support for "trending" playlists (without authors)
also introduces a fix for text run parsing (by forcing obj to be a dict)
2025-10-05 17:22:15 +02:00
719b545cb4 fix: livestream data extraction
prevents "NoneType" is not iterable error
2025-10-05 11:12:44 +02:00
468795a7a2 feat: search pagination
adds support for getting past the first page of search results
2025-10-03 01:16:56 +02:00
7eb4452fec feat: make maxres provide the best quality thumbnail available 2025-10-02 12:42:12 +02:00
e7ae42f289 fix: simulate pagination for clients that rely on it
some clients (like clipious) use pagination on playlists and rely on
videos field to tell if the playlist has no more videos.
here all videos are returned for non-paginated queries, with empty video
field returned for every paginated query, which page is not equal to "1"
2025-10-02 02:27:51 +02:00
5bb542826e feat: rewrite channel api endpoints in reversed order 2025-10-02 01:23:37 +02:00
365c54d214 fix: actually fall back to the android_vr extractor 2025-10-01 09:10:15 +02:00
a2e2fedc90 fix: handle empty channel tabs gracefully
prevents 500s when a channel tab is empty
2025-09-28 10:12:23 +02:00
4a311f185c fix: prefer android_vr extractor by default
this is possibly one of the last known extractors unaffected by
signature extraction and decryption shenanigans, which cripples load
times on lower-end hardware
2025-09-28 07:47:17 +02:00
45073fff67 feat: extract avatar of the channel which added a video to a
collaborative playlist
surprisingly, innertube response contains the user's avatar, but doesn't
contain the ucid, name nor the handle
2025-09-28 06:59:53 +02:00
f73e6c11d4 fix: playlist description and collaborative playlist rendering
falls back to the more "universal" microformat for description
extraction and provides limited support for collaborative videos
(no channel ucids or collaborator names)
2025-09-28 06:32:39 +02:00
11 changed files with 716 additions and 323 deletions

View File

@@ -1,5 +1,5 @@
[general]
db_file_path = "/path/to/ythdd_db.sqlite" # Preferably stored on an SSD.
db_file_path = "ythdd_db.sqlite" # Path to the databse file, preferably stored on an SSD.
video_storage_directory_path = "/path/to/videos/" # Path to video vault.
is_proxied = false # Set to true if running behind reverse proxy.
public_facing_url = "http://127.0.0.1:5000/" # Used for URL rewriting. Note the trailing backslash /.
@@ -9,13 +9,19 @@ cache = true # Whether to cache requests f
[api]
api_key = "" # Leave empty API key for public access to non-sensitive backend
api_key_admin = "CHANGEME" # Empty *admin* API key will autogenerate a random one every launch.
enable_debugger_halt = false # Whether to allow to trigger pdb using admin's API key.
[extractor]
user-agent = "" # Leave empty for default (Firefox ESR).
cookies_path = "" # Leave empty for none.
age_restricted_cookies_path = "" # Cookies to use when bypassing age-gated videos only. Leave empty to disable.
deno_path = "" # Required when using cookies.
preferred_extractor = "" # Leave empty for default (android_vr).
[proxy]
user-agent = "" # Leave empty for default (Firefox ESR).
allow_proxying_videos = false # Whether to allow video proxying through the instance (traffic-intensive).
match_initcwndbps = true # Experimental: matches proxying speed to the one suggested by Innertube (may help avoid being ratelimited/banned).
[admin]
# List of users with admin priviledges.

104
views.py
View File

@@ -1,8 +1,8 @@
#!/usr/bin/python3
from flask import render_template, Response
from flask import redirect, render_template, request, Response
from flask_sqlalchemy import SQLAlchemy
from markupsafe import escape
import requests, json
import hashlib, json, re, requests
import ythdd_globals
def homepage():
@@ -26,12 +26,40 @@ def thumbnailProxy(received_request):
'error_msg': 'invalid request. pretend this is a thumbnail :D'
}), mimetype='application/json', status=400)
thumbnail = requests.get(prefix + "vi/" + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
quality_urls = ['maxresdefault', 'sddefault', 'hqdefault', 'mqdefault', 'default', '1', '2', '3']
video_id, requested_quality = received_request.split('/')
thumbnail = requests.get(prefix + "vi/" + video_id + "/" + requested_quality, headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
thumbnail.raw.decode_content = True
quality_id = 0
if requested_quality == "maxres.jpg":
# if requested quality is maxres,
# provide the best quality possible
while thumbnail.status_code != 200:
thumbnail = requests.get(prefix + "vi/" + video_id + "/" + quality_urls[quality_id] + ".jpg", headers=ythdd_globals.getHeaders(caller='proxy'), stream=True)
thumbnail.raw.decode_content = True
quality_id += 1
response = Response(thumbnail.raw, mimetype=thumbnail.headers['content-type'], status=thumbnail.status_code)
return response
def storyboardProxy(received_request):
# will proxy storyboards
prefix = "https://i.ytimg.com/"
sqp = request.args.get("sqp")
sigh = request.args.get("sigh")
if sqp is None or sigh is None:
return Response(json.dumps({"status": "error", "error": "Request lacks the sqp, sigh params (or both)."}), mimetype="application/json", status=400)
sb = requests.get(prefix + "sb/" + received_request, headers=ythdd_globals.getHeaders(caller='proxy'), params={"sqp": sqp, "sigh": sigh}, stream=True)
sb.raw.decode_content = True
response = Response(sb.raw, mimetype=sb.headers['content-type'], status=sb.status_code)
return response
def ggphtProxy(received_request):
prefix = "https://yt3.ggpht.com/"
@@ -66,3 +94,73 @@ def imgProxy(received_request):
response = Response(thumbnail.raw, mimetype=thumbnail.headers['content-type'], status=thumbnail.status_code)
return response
def videoplaybackProxy():
# inspired by Yotter's video proxy
# https://github.com/ytorg/Yotter/blob/b43a72ab7bfa5a59916fa3259cbc39165717c6bb/app/routes.py#L527
if not ythdd_globals.config['proxy']['allow_proxying_videos']:
return Response(json.dumps({"error": "Administrator has disabled this endpoint"}), mimetype="application/json", status=403)
headers = dict(request.headers)
proxy_headers = ythdd_globals.getHeaders(caller='proxy')
if "Range" in headers:
proxy_headers["Range"] = headers["Range"]
params = dict(request.args)
# support md5 videoplayback url shortening
if "md5" in params:
if params["md5"] not in ythdd_globals.general_cache["hashed_videoplayback"]:
return Response(json.dumps({"error": "Videoplayback request not cached or expired."}), mimetype="application/json", status=404)
return redirect(ythdd_globals.general_cache["hashed_videoplayback"][params["md5"]]["original_url"])
else:
public_url = request.url
public_url = ythdd_globals.config["general"]["public_facing_url"] + public_url[public_url.rfind("videoplayback"):]
md5sum = hashlib.md5(public_url.encode("utf-8")).hexdigest()
ythdd_globals.general_cache["hashed_videoplayback"][md5sum] = {"original_url": request.url}
# reconstruct the url
# first attempt: from host param
host = params.get('host')
# failed? then try to get it from the rest of the params
if host is None:
# second attempt: reconstruct url from mn and mvi?
# the host schema seems to be as follows:
# rr{mvi[any]/fvip[any]?}---{mn[any]}.googlevideo.com
# regarding mvi/fvip, it seems that any value smaller than 5 passes
try:
mvi = params.get('mvi').split(',')[-1]
mn = params.get('mn').split(',')[-1]
if int(mvi) > 5:
mvi = 3 # invidious uses this as fallback
host = f"rr{mvi}---{mn}.googlevideo.com"
except (AttributeError, ValueError):
return Response(json.dumps({"error": "Couldn't extract crucial parameters for hostname reconstruction"}, mimetype="application/json", status=400))
else:
# don't echo host "hint" back to the googlevideo server
del params['host']
# run a regex sanity check
if re.fullmatch(r"[\w-]+\.googlevideo\.com", host) is None:
# fallback behavior for unexpected hostnames
return Response(json.dumps({"error": "Please either pass a valid host, or don't pass any"}), mimetype="application/json", status=400)
try:
# request the proxied data
remote_response = requests.get(f"https://{host}/videoplayback", headers=proxy_headers, params=params, stream=True)
except:
return Response(json.dumps({"error": "Couldn't connect to googlevideo host"}), mimetype="application/json", status=500)
# determine the chunk size
chunk_size = 10 * 1024 # by default it's 10 MB (as this is the most youtube is willing to send without ratelimiting)
# or the one in initcwndbps (if user enabled the config flag to match chunk_size with initcwndbps)
if ythdd_globals.config['proxy']['match_initcwndbps']:
try:
chunk_size = int(params.get('initcwndbps') / 1024)
except:
pass
# return a chunked response
resp = Response(remote_response.iter_content(chunk_size=chunk_size), content_type=remote_response.headers['Content-Type'], status=remote_response.status_code, headers=remote_response.headers, direct_passthrough=True)
resp.cache_control.public = True
resp.cache_control.max_age = int(60_000)
return resp

View File

@@ -64,9 +64,11 @@ def setup():
app.add_url_rule('/api/', view_func=ythdd_api.api_greeting)
app.add_url_rule('/api/<path:received_request>', view_func=ythdd_api.api_global_catchall)
app.add_url_rule('/vi/<path:received_request>', view_func=views.thumbnailProxy)
app.add_url_rule('/sb/<path:received_request>', view_func=views.storyboardProxy)
app.add_url_rule('/ggpht/<path:received_request>', view_func=views.ggphtProxy)
app.add_url_rule('/guc/<path:received_request>', view_func=views.gucProxy)
app.add_url_rule('/img/<path:received_request>', view_func=views.imgProxy)
app.add_url_rule('/videoplayback', view_func=views.videoplaybackProxy)
db = ythdd_db.initDB(app, config)
with app.app_context():
@@ -134,7 +136,7 @@ def main(args):
app_port = port
setup()
app.run(host=host, port=int(port))
app.run(host=host, port=int(port), threaded=True)
if __name__ == "__main__":
#app.run(host="127.0.0.1", port=5000)

View File

@@ -14,7 +14,7 @@ def api_greeting():
def api_global_catchall(received_request):
ythdd_globals.apiRequests += 1
if request.environ['REMOTE_ADDR'] != "127.0.0.1" or (ythdd_globals.isProxied and request.environ['X-Forwarded-For'] != "127.0.0.1"):
if request.environ['REMOTE_ADDR'] != "127.0.0.1" or (ythdd_globals.isProxied and request.environ['HTTP_X_FORWARDED_FOR'] != "127.0.0.1"):
ythdd_globals.outsideApiHits += 1
request_list = received_request.split('/')

View File

@@ -8,11 +8,11 @@ import ythdd_globals, ythdd_extractor
#from flask_sqlalchemy import SQLAlchemy
#import ythdd_api_v1_stats, ythdd_api_v1_user, ythdd_api_v1_info, ythdd_api_v1_query, ythdd_api_v1_meta, ythdd_api_v1_admin
def requireAuthentication(func):
@wraps(func)
def requireAuthentication(admin: bool = True):
def functionWrapper(func):
def wrapper(*args, **kwargs):
token = kwargs["r"].args.get('token')
if token == lewy_globals.config['api']['api_key']:
if token == ythdd_globals.config['api']['api_key' + admin * '_admin']:
try:
status, received, data = func(*args, **kwargs)
return status, received, data
@@ -21,6 +21,7 @@ def requireAuthentication(func):
else:
return 401, "error", {'error_msg': "Unauthorized"}
return wrapper
return functionWrapper
def incrementBadRequests():
ythdd_globals.apiFailedRequests += 1
@@ -143,6 +144,13 @@ def hot(data):
incrementBadRequests()
return notImplemented([data[1]]) # workaround before notImplemented is reworked
@requireAuthentication(admin=True)
def debugger_halt(r):
if not ythdd_globals.config["api"]["enable_debugger_halt"]:
return 403, "Administrator has disabled access for this endpoint.", []
breakpoint()
return 200, "Pdb triggered and ended successfully.", []
def lookup(data, request):
match data[0]:
case 'stats':
@@ -163,6 +171,8 @@ def lookup(data, request):
case 'admin':
# REQUIRE CREDENTIALS!
return stub_hello()
case 'halt':
return debugger_halt(r=request)
case _:
incrementBadRequests()
return notImplemented(data)

View File

@@ -1,5 +1,6 @@
#!/usr/bin/python3
import brotli, yt_dlp, requests, json, time
from http.cookiejar import MozillaCookieJar
from ythdd_globals import safeTraverse
import ythdd_proto
import ythdd_globals
@@ -19,7 +20,11 @@ ytdl_opts = {
# "formats": ["dashy"]
}
},
"simulate": True
"simulate": True,
"js_runtimes": {
"deno": {}
},
'remote_components': ['ejs:github']
}
stage1_headers = {
@@ -67,7 +72,7 @@ stage1_body = {
stage2_headers = {
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:143.0) Gecko/20100101 Firefox/143.0",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-us,en;q=0.5",
"Sec-Fetch-Mode": "navigate",
@@ -76,13 +81,13 @@ stage2_headers = {
stage3_headers = {
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:143.0) Gecko/20100101 Firefox/143.0",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-us,en;q=0.5",
"Sec-Fetch-Mode": "navigate",
"Content-Type": "application/json",
"X-Youtube-Client-Name": "1",
"X-Youtube-Client-Version": "2.20250923.01.00",
"X-Youtube-Client-Version": "2.20251103.01.00",
"Origin": "https://www.youtube.com",
"Accept-Encoding": "gzip, deflate, br",
"Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI"
@@ -94,7 +99,7 @@ stage3_body = {
"client":
{
"clientName": "WEB",
"clientVersion": "2.20250923.01.00",
"clientVersion": "2.20251103.01.00",
"hl": "en",
"timeZone": "UTC",
"utcOffsetMinutes": 0
@@ -112,9 +117,9 @@ web_context_dict = {
'gl': 'US',
'deviceMake': '',
'deviceModel': '',
'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:143.0) Gecko/20100101 Firefox/143.0,gzip(gfe)',
'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0,gzip(gfe)',
'clientName': 'WEB',
'clientVersion': '2.20250923.01.00',
'clientVersion': '2.20251103.01.00',
'osName': 'Windows',
'osVersion': '10.0',
'screenPixelDensity': 2,
@@ -129,25 +134,48 @@ web_context_dict = {
}
}
def extract(url: str, getcomments=False, maxcomments="", manifest_fix=False):
def extract(url: str, getcomments=False, maxcomments="", manifest_fix=False, use_cookies=None):
# TODO: check user-agent and cookiefile
ytdl_context = ytdl_opts.copy()
if ythdd_globals.config['extractor']['user-agent']:
yt_dlp.utils.std_headers['User-Agent'] = ythdd_globals.config['extractor']['user-agent']
if ythdd_globals.config['extractor']['cookies_path']:
ytdl_opts['cookiefile'] = ythdd_globals.config['extractor']['cookies_path']
if len(url) == 11:
url = "https://www.youtube.com/watch?v=" + url
if getcomments:
ytdl_opts['getcomments'] = True
ytdl_context['getcomments'] = True
if maxcomments:
ytdl_opts['extractor_args']['youtube']['max_comments'] = [maxcomments, "all", "all", "all"]
ytdl_context['extractor_args']['youtube']['max_comments'] = [maxcomments, "all", "all", "all"]
if manifest_fix:
# https://github.com/yt-dlp/yt-dlp/issues/11952#issuecomment-2565802294
ytdl_opts['extractor_args']['youtube']['player_client'] = ['default', 'web_safari']
with yt_dlp.YoutubeDL(ytdl_opts) as ytdl:
ytdl_context['extractor_args']['youtube']['player_client'] = ['default', 'web_safari']
elif ythdd_globals.config['extractor']['preferred_extractor']:
ytdl_context['extractor_args']['youtube']['player_client'] = [ythdd_globals.config['extractor']['preferred_extractor']]
else:
ytdl_context['extractor_args']['youtube']['player_client'] = ['android_vr']
if use_cookies is not None:
# can be either "global", "agegated" or None
deno_path = ythdd_globals.config['extractor']['deno_path']
match use_cookies:
case "global":
ytdl_context['cookiefile'] = ythdd_globals.config['extractor']['cookies_path']
ytdl_context['extractor_args']['youtube']['player_client'] = ['tv']
if not deno_path:
print("FATAL ERROR: deno path is required for playback using cookies!")
ytdl_context['js_runtimes']['deno']['path'] = deno_path if deno_path else ""
case "agegated":
ytdl_context['cookiefile'] = ythdd_globals.config['extractor']['age_restricted_cookies_path']
ytdl_context['extractor_args']['youtube']['player_client'] = ['tv']
if not deno_path:
print("FATAL ERROR: deno path is required for playback of age-restricted content!")
ytdl_context['js_runtimes']['deno']['path'] = deno_path if deno_path else ""
case None | _:
pass
with yt_dlp.YoutubeDL(ytdl_context) as ytdl:
result = ytdl.sanitize_info(ytdl.extract_info(url, download=False))
return result
@@ -171,7 +199,7 @@ def WEBrelated(url: str):
return extracted_json["contents"]['twoColumnWatchNextResults']["secondaryResults"]
def WEBextractSinglePage(uri: str):
def WEBextractSinglePage(uri: str, use_cookies=None):
# WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME
start_time = time.time()
@@ -179,11 +207,25 @@ def WEBextractSinglePage(uri: str):
if len(uri) != 11:
raise ValueError("WEBextractSinglePage expects a single, 11-character long argument")
response = requests.get("https://www.youtube.com/watch?v=" + uri, headers=ythdd_globals.getHeaders(caller='extractor'))
cookies = None
if use_cookies is not None:
match use_cookies:
case "global":
ythdd_globals.print_debug("wdata: using global cookies")
cookies = MozillaCookieJar(ythdd_globals.config["extractor"]["cookies_path"])
cookies.load()
case "agegated":
ythdd_globals.print_debug("wdata: using agegated cookies")
cookies = MozillaCookieJar(ythdd_globals.config["extractor"]["age_restricted_cookies_path"])
cookies.load()
case None | _:
pass
response = requests.get("https://www.youtube.com/watch?v=" + uri, headers=ythdd_globals.getHeaders(caller='extractor'), cookies=cookies)
extracted_string = str(response.content.decode('utf8', 'unicode_escape'))
start = extracted_string.find('{"responseContext":{"serviceTrackingParams":')
start = extracted_string.find('{"responseContext":')
end = extracted_string.find(';var ', start)
start2 = extracted_string.find('{"responseContext":{"serviceTrackingParams":', start + 1)
start2 = extracted_string.find('{"responseContext":', start + 1)
end2 = extracted_string.find(';</script>', start2)
extracted_json1 = json.loads(extracted_string[start:end])
extracted_json2 = json.loads(extracted_string[start2:end2])
@@ -351,14 +393,19 @@ def browseChannel(ucid: str, params: str = None, ctoken: str = None):
return response_json
def WEBextractSearchResults(search_query: str) -> list:
def WEBextractSearchResults(search_query: str, page: int) -> list:
# Posts a search request to innertube API
# and processes only the relevant part (the actual results)
if search_query is None:
return []
web_context = makeWebContext({"query": search_query})
additional_context = {"query": search_query}
if page is not None:
params = ythdd_proto.produceSearchParams(page)
additional_context["params"] = params
web_context = makeWebContext(additional_context)
response = requests.post('https://www.youtube.com/youtubei/v1/search',
params={"prettyPrint": False},
headers=stage2_headers,
@@ -507,6 +554,13 @@ def WEBgetVideoComments(ctoken: str) -> tuple:
reply_ctoken = safeTraverse(reply_renderer, ["commentThreadRenderer", "replies", "commentRepliesRenderer", "contents", 0, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
reply_count = safeTraverse(reply_renderer, ["commentThreadRenderer", "replies", "commentRepliesRenderer", "viewReplies", "buttonRenderer", "text", "runs", 0, "text"], default="0 replies").split(" ")[0]
# suspected a/b test. can be worked arount with on-demand ctoken creation.
# workaround for yt not showing replies when sorting for "top" comments
try:
int(reply_count) # can be just "Replies"
except:
reply_count = "0"
for comment in actual_comments:
found_key = safeTraverse(comment, ["entityKey"], default="unknown-key")
# try to link a relevant ctoken if a comment has response
@@ -543,7 +597,7 @@ def WEBextractPlaylist(plid: str = "", ctoken: str = ""):
metadata = None
if not ctoken:
metadata = {
"header": safeTraverse(resp_json, ["header"]),
"microformat": safeTraverse(resp_json, ["microformat"]),
"sidebar": safeTraverse(resp_json, ["sidebar"])
}

View File

@@ -23,7 +23,7 @@ version = "0.0.1"
apiVersion = "1"
randomly_generated_passcode = 0
video_cache = {}
general_cache = {"search": [], "continuations": {"channels": {}, "comments": {}}, "channels": {}, "playlists": {}}
general_cache = {"search": [], "continuations": {"channels": {}, "comments": {}}, "channels": {}, "playlists": {}, "storyboards": {}, "hashed_videoplayback": {}}
def getConfig(configfile):
@@ -32,7 +32,7 @@ def getConfig(configfile):
global randomly_generated_passcode
if not os.path.exists(configfile):
dummy_config = {'general': {'db_file_path': 'ythdd_db.sqlite', 'video_storage_directory_path': 'videos/', 'is_proxied': False, 'public_facing_url': 'http://127.0.0.1:5000/', 'debug': False, 'cache': True}, 'api': {'api_key': 'CHANGEME'}, 'proxy': {'user-agent': ''}, 'extractor': {'user-agent': '', 'cookies_path': ''}, 'admin': {'admins': ['admin']}, 'yt_dlp': {}, 'postprocessing': {'presets': [{'name': 'recommended: [N][<=720p] best V+A', 'format': 'bv[height<=720]+ba', 'reencode': ''}, {'name': '[N][1080p] best V+A', 'format': 'bv[height=1080]+ba', 'reencode': ''}, {'name': '[R][1080p] webm', 'format': 'bv[height=1080]+ba', 'reencode': 'webm'}, {'name': '[N][720p] best V+A', 'format': 'bv[height=720]+ba', 'reencode': ''}, {'name': '[R][720p] webm', 'format': 'bv[height=720]+ba', 'reencode': 'webm'}, {'name': '[N][480p] best V+A', 'format': 'bv[height=480]+ba', 'reencode': ''}, {'name': '[480p] VP9 webm/reencode', 'format': 'bv*[height=480][ext=webm]+ba/bv[height=480]+ba', 'reencode': 'webm'}, {'name': '[N][1080p] best video only', 'format': 'bv[height=1080]', 'reencode': ''}, {'name': '[N][opus] best audio only', 'format': 'ba', 'reencode': 'opus'}]}}
dummy_config = {'general': {'db_file_path': 'ythdd_db.sqlite', 'video_storage_directory_path': 'videos/', 'is_proxied': False, 'public_facing_url': 'http://127.0.0.1:5000/', 'debug': False, 'cache': True}, 'api': {'api_key': 'CHANGEME', 'enable_debugger_halt': False}, 'proxy': {'user-agent': '', 'allow_proxying_videos': True, 'match_initcwndbps': True}, 'extractor': {'user-agent': '', 'cookies_path': ''}, 'admin': {'admins': ['admin']}, 'yt_dlp': {}, 'postprocessing': {'presets': [{'name': 'recommended: [N][<=720p] best V+A', 'format': 'bv[height<=720]+ba', 'reencode': ''}, {'name': '[N][1080p] best V+A', 'format': 'bv[height=1080]+ba', 'reencode': ''}, {'name': '[R][1080p] webm', 'format': 'bv[height=1080]+ba', 'reencode': 'webm'}, {'name': '[N][720p] best V+A', 'format': 'bv[height=720]+ba', 'reencode': ''}, {'name': '[R][720p] webm', 'format': 'bv[height=720]+ba', 'reencode': 'webm'}, {'name': '[N][480p] best V+A', 'format': 'bv[height=480]+ba', 'reencode': ''}, {'name': '[480p] VP9 webm/reencode', 'format': 'bv*[height=480][ext=webm]+ba/bv[height=480]+ba', 'reencode': 'webm'}, {'name': '[N][1080p] best video only', 'format': 'bv[height=1080]', 'reencode': ''}, {'name': '[N][opus] best audio only', 'format': 'ba', 'reencode': 'opus'}]}}
# if a passcode has not been provided by the user (config file doesn't exist, and user didn't specify it using an argument)
print(f"{colors.WARNING}WARNING{colors.ENDC}: Using default, baked in config data. {colors.ENDL}"
f" Consider copying and editing the provided example file ({colors.OKCYAN}config.default.toml{colors.ENDC}).")
@@ -60,7 +60,7 @@ def getHeaders(caller="proxy"):
# NOTE: use ESR user-agent
# user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:130.0) Gecko/20100101 Firefox/130.0'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:143.0) Gecko/20100101 Firefox/143.0'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0'
if config[caller]['user-agent']:
user_agent = config[caller]['user-agent']
@@ -151,3 +151,7 @@ def getCommit() -> str | None:
except Exception as e:
return None
def print_debug(text: str) -> None:
# Will print a string only if debugging is enabled.
if config["general"]["debug"]:
print(text)

View File

@@ -3,7 +3,7 @@
# -----
# Translates requests sent through Invidious API at /api/invidious/
# to use internal extractors.
from flask import Response, request, redirect
from flask import Response, request, redirect, url_for
from markupsafe import escape
from time import strftime, gmtime, time
from ythdd_globals import safeTraverse
@@ -12,9 +12,11 @@ import json, datetime
import dateparser
import html
import invidious_formats
import math
import ythdd_globals
import ythdd_api_v1
import ythdd_extractor
import ythdd_struct_builder
import ythdd_struct_parser
# TODOs:
@@ -23,25 +25,27 @@ import ythdd_struct_parser
# [✓] /api/v1/stats (stats())
# [✓] /streams/dQw4w9WgXcQ (does nothing)
# [✓] /vi/:videoIdXXXX/maxresdefault.jpg
# [✓] /api/v1/search?q=... (videos and playlists)
# [✓] /api/v1/search?q=... (videos and playlists), pagination
# [✓] /api/v1/search/suggestions?q=...&pq=...
# [✓] /api/v1/channel/:ucid
# [✓] /api/v1/channel/:ucid/videos, shorts, playlists, streams
# [✓] /api/v1/comments/:videoid?continuation=...
# [✓] /api/v1/videos/:videoIdXXXX
# [✓] /api/v1/playlists/:plid
# [✓] /api/v1/channel/{videos, shorts, playlists, streams, latest?}/:ucid (rewrite)
# [✓] /api/v1/:videoIdXXXX/maxres.jpg redirects to best quality thumbnail
# [✓] /api/v1/storyboards/:videoIdXXXX
# ----------
# PLANNED:
# [X] /api/v1/channel/{videos, shorts, playlists, streams, latest?}/:ucid (rewrite)
# [X] /api/v1/:videoIdXXXX/maxres.jpg redirects to best quality thumbnail
# [X] /api/v1/storyboards/:videoIdXXXX
# [X] /api/v1/videos/:videoIdXXXX does not depend on yt-dlp and offloads stream retrieval elsewhere (making initial response fast)
# [X] /api/v1/manifest/:videoIdXXXX (above is prerequisite)
# [X] rewrite the awful lookup logic
# [X] /api/v1/search?q=... complex filtering options (https://gitea.invidious.io/iv-org/invidious/src/branch/master/src/invidious/search/filters.cr)
# ----------
# IDEAS:
# [*] /api/v1/popular returns last requested videos by the IP (serving as multi-device history?)
# [*] /api/v1/trending returns recently archived videos
# [*] produce continuations instead of extracting them
# ----------
# NOT PLANNED/MAYBE IN THE FUTURE:
# [ ] /api/v1/auth/subscriptions (stub? db?)
@@ -131,215 +135,6 @@ def getError(wdata: dict):
return error
def rebuildFormats(data):
result = [{} for x in data]
formatStreams = []
best_bitrate_video = 0
best_bitrate_audio = -1
for x in range(len(data)):
try:
result[x]['audioChannels'] = data[x]['audioChannels']
isVideo = 0
except:
isVideo = 1
if not "initRange" in data[x]: # for livestreams?
continue
result[x]['init'] = str(data[x]['initRange']['start']) + "-" + str(data[x]['initRange']['end'])
result[x]['index'] = str(data[x]['indexRange']['start']) + "-" + str(data[x]['indexRange']['end'])
result[x]['bitrate'] = str(data[x]['averageBitrate'])
result[x]['url'] = data[x]['url']
result[x]['itag'] = str(data[x]['itag'])
result[x]['type'] = data[x]['mimeType']
result[x]['clen'] = data[x]['contentLength']
result[x]['lmt'] = data[x]['lastModified']
result[x]['projectionType'] = data[x]['projectionType']
try:
result[x]['colorInfo'] = data[x]['colorInfo']
except:
pass
if "audio" == data[x]['mimeType'][:5]:
isAudio = 1
else:
isAudio = 0
if isVideo:
result[x]['fps'] = str(data[x]['fps'])
else:
result[x]['audioQuality'] = data[x]['audioQuality']
result[x]['audioSampleRate'] = data[x]['audioSampleRate']
if data[x]['itag'] in invidious_formats.FORMATS.keys():
result[x]['container'] = invidious_formats.FORMATS[data[x]['itag']]['ext']
try:
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['vcodec']
except:
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['acodec']
if isVideo:
try:
result[x]['resolution'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p"
result[x]['qualityLabel'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p" + str(result[x]['fps']) * (data[x]['fps'] > 30) # NOT IMPLEMENTED, that's just a placeholder
result[x]['size'] = str(invidious_formats.FORMATS[data[x]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[x]['itag']]['height'])
except:
pass
# we assume here that a stream with the highest bitrate must be a video stream- that may not be the case
if data[x]['averageBitrate'] > data[best_bitrate_video]['averageBitrate'] and isVideo:
best_bitrate_video = x
if data[x]['averageBitrate'] > data[best_bitrate_audio]['averageBitrate'] and isAudio:
best_bitrate_audio = x
# makes FreeTube work, unfortunately it's a video-only stream
formatStreams = [
{
"url": data[best_bitrate_video]['url'],
"itag": str(data[best_bitrate_video]['itag']),
"type": data[best_bitrate_video]['mimeType'],
"quality": data[best_bitrate_video]['quality'],
"bitrate": str(data[best_bitrate_video]['averageBitrate']),
"fps": data[best_bitrate_video]['fps'],
"size": "", # workaround for clipious, which requires ANYTHING to be passed, or else it will throw and error and won't load the video
"resolution": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
"qualityLabel": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
"container": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['ext'],
"encoding": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['vcodec']
},
# {
# "audioChannels": data[best_bitrate_audio]['audioChannels'],
# "init": result[best_bitrate_audio]['init'],
# "index": result[best_bitrate_audio]['index'],
# "bitrate": str(data[best_bitrate_audio]['averageBitrate']),
# "url": data[best_bitrate_audio]['url'],
# "itag": str(data[best_bitrate_audio]['itag']),
# "type": data[best_bitrate_audio]['mimeType'],
# "clen": result[best_bitrate_audio]['clen'],
# "lmt": result[best_bitrate_audio]['lmt'],
# "projectionType": result[best_bitrate_audio]['projectionType'],
# "audioQuality": result[best_bitrate_audio]['audioQuality'],
# "audioSampleRate": result[best_bitrate_audio]['audioSampleRate'],
# "qualityLabel": "audio"
# }
]
# not all itags have width and/or height
try:
formatStreams[0]["size"] = str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['height'])
except:
pass
return result, formatStreams
def rebuildFormatsFromYtdlpApi(ydata: dict):
# Rebuild invidious-compatible formats from yt-dlp's output (ydata)
adaptive_formats = []
format_streams = []
for stream in safeTraverse(ydata, ["formats"], default=[]):
if safeTraverse(stream, ["protocol"], default="storyboard") not in ("http_dash_segments", "https"):
continue
newRow = {}
# Add from ...'s ... to ... as ...
newRow["bitrate"] = str(int(safeTraverse(stream, ["tbr"], default=0) * 1000))
newRow["url"] = safeTraverse(stream, ["url"])
newRow["itag"] = safeTraverse(stream, ["format_id"])
params = ythdd_extractor.paramsFromUrl(newRow["url"])
vcodec = safeTraverse(stream, ["vcodec"], default="none")
acodec = safeTraverse(stream, ["acodec"], default="none")
if vcodec == "none" and acodec == "none":
continue
if safeTraverse(stream, ["acodec"]) != "none":
# audio-only track
type = safeTraverse(stream, ["audio_ext"], default=None)
fnote = safeTraverse(stream, ["format_note"], default="low")
if type is None:
type = "mp4"
abr = safeTraverse(stream, ["abr"], default="0")
if abr is None:
abr = "0"
newRow[ "type"] = "audio/" + type
newRow[ "audioQuality"] = fnote
newRow["audioSampleRate"] = int(safeTraverse(stream, ["asr"], default="44100"))
newRow[ "audioChannels"] = int(safeTraverse(stream, ["audio_channels"]))
newRow[ "qualityLabel"] = str(int(abr)) + "k (audio)"
newRow[ "resolution"] = f"{fnote} quality"
newRow[ "size"] = "0x0"
if safeTraverse(stream, ["vcodec"]) != "none":
# either video-only or video+audio
type = safeTraverse(stream, ["video_ext"], default=None)
if type is None:
type = "mp4"
height = str(safeTraverse(stream, ["height"], default=0))
width = str(safeTraverse(stream, [ "width"], default=0))
newRow[ "type"] = "video/" + type
newRow[ "resolution"] = (height if height in ("144", "240", "360", "480", "720", "1080") else "360") + "p" # mpv won't play the video inside of Yattee if it's a non-standard resolution (bug?)
newRow[ "fps"] = safeTraverse(stream, ["fps"], default=30)
newRow[ "qualityLabel"] = height + "p" + str(int(newRow['fps'])) * (newRow["fps"] > 30) + " (video)" # also a placeholder
newRow[ "size"] = width + "x" + height
newRow[ "clen"] = safeTraverse(params, ["clen"], default=safeTraverse(stream, ["filesize"], default="0"))
newRow[ "lmt"] = safeTraverse(params, ["lmt"], default="0")
if newRow["clen"] is None:
# for clipious sake which expects a string
newRow["clen"] = "0"
newRow[ "projectionType"] = "RECTANGULAR" # clipious requires this to be mentioned explicitly and cannot be nulled
newRow[ "container"] = safeTraverse(stream, ["ext"], default="unknown_container")
newRow[ "encoding"] = safeTraverse(invidious_formats.FORMATS, [int("0" + newRow["itag"].split("-")[0]), "ext"], default="unknown_encoding") # not sure this split is necessary
newRow[ "quality"] = newRow["qualityLabel"]
newRow[ "init"] = "0-1" # dummy values
newRow[ "index"] = "2-3" # dummy values
if vcodec != "none" and acodec != "none":
# 360p stream
newRow["qualityLabel"] = height + "p" + str(int(newRow['fps'])) * (newRow["fps"] > 30)
format_streams.append(newRow)
if vcodec != "none" or acodec != "none":
adaptive_formats.append(newRow)
# {
# "url": data[best_bitrate_video]['url'],
# "itag": str(data[best_bitrate_video]['itag']),
# "type": data[best_bitrate_video]['mimeType'],
# "quality": data[best_bitrate_video]['quality'],
# "bitrate": str(data[best_bitrate_video]['averageBitrate']),
# "fps": data[best_bitrate_video]['fps'],
# "size": "", # workaround for clipious, which requires ANYTHING to be passed, or else it will throw and error and won't load the video
# "resolution": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
# "qualityLabel": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
# "container": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['ext'],
# "encoding": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['vcodec']
# }
# {
# "audioChannels": data[best_bitrate_audio]['audioChannels'],
# "init": result[best_bitrate_audio]['init'],
# "index": result[best_bitrate_audio]['index'],
# "bitrate": str(data[best_bitrate_audio]['averageBitrate']),
# "url": data[best_bitrate_audio]['url'],
# "itag": str(data[best_bitrate_audio]['itag']),
# "type": data[best_bitrate_audio]['mimeType'],
# "clen": result[best_bitrate_audio]['clen'],
# "lmt": result[best_bitrate_audio]['lmt'],
# "projectionType": result[best_bitrate_audio]['projectionType'],
# "audioQuality": result[best_bitrate_audio]['audioQuality'],
# "audioSampleRate": result[best_bitrate_audio]['audioSampleRate'],
# "qualityLabel": "audio"
# }
return adaptive_formats, format_streams
def videos(data):
# an attempt on a faithful rewrite of
# https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr
@@ -368,11 +163,24 @@ def videos(data):
wdata = ythdd_extractor.WEBextractSinglePage(data[3])
age_restricted = False
error = getError(wdata)
if error is not None:
if error.startswith("(LOGIN_REQUIRED)") and "inappropriate for some users" in error:
# check if user provided age-gated cookies
if ythdd_globals.config["extractor"]["age_restricted_cookies_path"]:
ythdd_globals.print_debug(f"videos({data[3]}): using agegated cookies to bypass restriction")
ydata = ythdd_extractor.extract(data[3], use_cookies="agegated")
wdata = ythdd_extractor.WEBextractSinglePage(data[3], use_cookies="agegated")
age_restricted = True
else:
# return error if no age-gated cookies are provided
return send(500, {"status": "error", "error": error})
ydata = ythdd_extractor.extract(data[3], manifest_fix=True)
else:
# return error if it doesn't mention age restriction
return send(500, {"status": "error", "error": error})
else:
ydata = ythdd_extractor.extract(data[3])
#return send(200, {'ydata': ydata, 'wdata': wdata})
#return send(200, {'idata': idata, 'wdata': wdata})
@@ -417,14 +225,14 @@ def videos(data):
y = safeTraverse(entry, ['lockupViewModel'])
if not isinstance(y, dict):
continue
is_mix_or_playlist = safeTraverse(entry, ["lockupViewModel", "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "icon", "sources", 0, "clientResource", "imageName"], default="") in ("MIX", "PLAYLISTS")
if is_mix_or_playlist:
if safeTraverse(y, ["contentType"], default="LOCKUP_CONTENT_TYPE_VIDEO") != "LOCKUP_CONTENT_TYPE_VIDEO":
# neither mixes nor playlists are currently supported by the invidious api
continue
# note: this model is similar, but not identical to the one in ythdd_struct_parser. perhaps they can be both handled in the struct parser some time.
lmvm = safeTraverse(y, ['metadata', 'lockupMetadataViewModel'], default=[])
related_entry['videoId'] = safeTraverse(y, ['contentId'])
related_entry['title'] = safeTraverse(lmvm, ['title', 'content'])
related_entry['videoThumbnails'] = ythdd_struct_parser.genThumbs(related_entry['videoId']) #safeTraverse(y, ['thumbnail', 'thumbnails'])
related_entry['videoThumbnails'] = ythdd_struct_builder.genThumbs(related_entry['videoId']) #safeTraverse(y, ['thumbnail', 'thumbnails'])
related_entry['author'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 0, 'metadataParts', 0, 'text', 'content'])
related_entry['authorId'] = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'rendererContext', 'commandContext', 'onTap', 'innertubeCommand', 'browseEndpoint', 'browseId'], default="UNKNOWNCHANNELID")
related_entry['authorUrl'] = '/channel/' + related_entry['authorId']
@@ -464,15 +272,57 @@ def videos(data):
author_verified = author_verified or safeTraverse(livm, [0, "listItemViewModel", "title", "attachmentRuns", 0, "element", "type", "imageType", "image", "sources", 0, "clientResource", "imageName"]) in ("AUDIO_BADGE", "CHECK_CIRCLE_FILLED")
author_thumbnail = ythdd_extractor.generateChannelAvatarsFromUrl(author_thumbnail)
wdata_streams = safeTraverse(wdata, ["ec1", "streamingData"], default=[])
adaptive_formats = []
format_streams = []
# adaptive_formats, format_streams = rebuildFormats(adaptive_formats)
if not live_now:
adaptive_formats, format_streams = rebuildFormatsFromYtdlpApi(ydata)
# adaptive_formats, format_streams = rebuildFormatsFromYtdlpApi(ydata)
initial_astreams_y = {} # itag is the key
initial_fstreams_y = {} # same here
initial_astreams_w = {}
initial_fstreams_w = {}
for video_stream in ydata["formats"]:
if video_stream["format_note"] in ("storyboard"):
# ignore non-audio/video formats (e.g. storyboards)
continue
if video_stream["format_id"] == "18": # todo: do this dynamically
initial_fstreams_y[int(video_stream["format_id"])] = video_stream
elif video_stream["format_id"].isdigit():
# filter out DRC audio
initial_astreams_y[int(video_stream["format_id"])] = video_stream
else:
continue
# format streams
for video_stream in wdata_streams["formats"]:
initial_fstreams_w[video_stream["itag"]] = video_stream
# adaptive streams
for audiovideo_stream in wdata_streams["adaptiveFormats"]:
if not "isVb" in audiovideo_stream and not "isDrc" in audiovideo_stream:
# skip DRC and VB formats
initial_astreams_w[audiovideo_stream["itag"]] = audiovideo_stream
for itag in initial_astreams_y:
if itag in initial_astreams_w:
adaptive_formats.append(ythdd_struct_parser.parseAdaptiveStreams(initial_astreams_w[itag], initial_astreams_y[itag]))
for itag in initial_fstreams_y:
if itag in initial_fstreams_w:
format_streams.append( ythdd_struct_parser.parseFormatStreams( initial_fstreams_w[itag], initial_fstreams_y[itag]))
hls_url = None
else:
adaptive_formats, format_streams = [{"url": f"http://a/?expire={int(time_start + 5.9 * 60 * 60)}", "itag": "18", "type": "", "clen": "0", "lmt": "", "projectionType": "RECTANGULAR"}], [] # freetube/clipious shenanigans, see: https://github.com/FreeTubeApp/FreeTube/pull/5997 and https://github.com/lamarios/clipious/blob/b9e7885/lib/videos/models/adaptive_format.g.dart
hls_url = safeTraverse(ydata, ["url"], default="ythdd: unable to retrieve stream url")
if age_restricted:
if not adaptive_formats:
adaptive_formats = [{"url": f"http://a/?expire={int(time_start + 5.9 * 60 * 60)}", "itag": "18", "type": "", "clen": "0", "lmt": "", "projectionType": "RECTANGULAR"}] # same as above
if live_now:
video_type = "livestream"
premiere_timestamp = published # ??? that works i guess
@@ -492,6 +342,10 @@ def videos(data):
# requests for the video's comments don't have to
# spawn an additional request for initial ctoken
ensure_comment_continuation(video_id, wdata)
storyboards = []
storyboards_extracted = ensure_storyboards(video_id, wdata, length=length)
if storyboards_extracted:
storyboards = ythdd_struct_builder.genStoryboards(video_id)
time_end = time()
@@ -499,8 +353,8 @@ def videos(data):
"type": video_type,
"title": title,
"videoId": video_id,
"videoThumbnails": ythdd_struct_parser.genThumbs(video_id),
"storyboards": [], # not implemented
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
"storyboards": storyboards,
"description": description, # due to change (include ythdd metadata)
"descriptionHtml": description_html,
"published": published,
@@ -579,14 +433,19 @@ def search(data, req):
# ignore paginated requests as we do nothing with the continuation token
page = req.args.get('page')
if page is not None and page != '1':
return send(404, [])
try:
page = int(page)
except:
return send(400, {"error": "Wrong page."})
else:
page = None # when page is "1"
if (data[-2].lower() != "search" or data[-1].lower() != "") and data[-1].lower() != "search":
previous_query = req.args.get('pq')
suggestions = ythdd_extractor.WEBgetSearchSuggestions(search_query, previous_query)
return send(200, suggestions)
results = ythdd_extractor.WEBextractSearchResults(search_query)
results = ythdd_extractor.WEBextractSearchResults(search_query, page)
results_list = []
for entry in results:
@@ -665,9 +524,9 @@ def get_channel_tab(requested_tab, ucid, req, only_json: bool = False):
# only the renderers differ - but they are taken care of in ythdd_struct_parser.parseRenderers()
if ctoken is None:
inner_contents = safeTraverse(result, ["richGridRenderer", "contents"], default=[[]])
inner_contents = safeTraverse(result, ["richGridRenderer", "contents"], default=[])
else:
inner_contents = safeTraverse(wdata, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"], default=[[]])
inner_contents = safeTraverse(wdata, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"], default=[])
for entry in inner_contents:
# videos from videos tab have no owner info (?) or it's in another place. if it is somewhere, this expression can be made simpler by traversing something else in struct parser.
@@ -675,7 +534,7 @@ def get_channel_tab(requested_tab, ucid, req, only_json: bool = False):
if item is not None:
items.append(ythdd_struct_parser.parseRenderers(item, {"author_name": name, "author_ucid": ucid, "avatar": avatar}))
new_continuation = safeTraverse(inner_contents[-1], ["continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
new_continuation = safeTraverse(inner_contents, [-1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
response = {
"videos": items
@@ -694,16 +553,17 @@ def get_channel_tab(requested_tab, ucid, req, only_json: bool = False):
case "playlists":
# todo: replace this with an on-demand generated ctoken?
if ctoken is None:
inner_contents = safeTraverse(result, ["sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "gridRenderer", "items"], default=[[]])
inner_contents = safeTraverse(result, ["sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "gridRenderer", "items"], default=[])
else:
inner_contents = safeTraverse(wdata, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"], default=[[]])
inner_contents = safeTraverse(wdata, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"], default=[])
for entry in inner_contents:
item = ythdd_struct_parser.parseRenderers(entry, {"author_name": name, "author_ucid": ucid, "avatar": avatar})
items.append(item)
new_continuation = safeTraverse(inner_contents[-1], ["continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
new_continuation = safeTraverse(inner_contents, [-1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
response = {
"playlists": items
@@ -781,6 +641,7 @@ def ensure_comment_continuation(video_id: str, wdata = None):
wdata = ythdd_extractor.WEBextractSinglePage(video_id)
# search for "top comments" continuation token
# todo: replace this with on-demand continuation creation
comment_continuation = safeTraverse(wdata, ["ec2", "engagementPanels", 0, "engagementPanelSectionListRenderer", "header", "engagementPanelTitleHeaderRenderer", "menu", "sortFilterSubMenuRenderer", "subMenuItems", 0, "serviceEndpoint", "continuationCommand", "token"], default=None)
if comment_continuation is not None:
ythdd_globals.general_cache["continuations"]["comments"][video_id].append(comment_continuation)
@@ -788,14 +649,89 @@ def ensure_comment_continuation(video_id: str, wdata = None):
print(f"error: couldn't extract comment continuation token from video page ({video_id}). this video likely has comments disabled.")
ythdd_globals.general_cache["continuations"]["comments"][video_id].append("")
def ensure_storyboards(video_id: str, wdata = None, length = 60):
# Returns True on successful extraction, False when it failed.
# Storyboards don't expire. They can be cached indefinitely.
if not video_id in ythdd_globals.general_cache["storyboards"]:
ythdd_globals.general_cache["storyboards"][video_id] = None
if wdata is None:
wdata = ythdd_extractor.WEBextractSinglePage(video_id)
# get storyboard template string
storyboards = None
storyboard_template = safeTraverse(wdata, ["ec1", "storyboards", "playerStoryboardSpecRenderer", "spec"], default=None)
# silly sanity check, todo: do a regex one instead?
if isinstance(storyboard_template, str):
# sample storyboard template url structure, indented for readability
# https://i.ytimg.com/sb/:videoId/storyboard3_L$L/$N.jpg?sqp=b64encodedprotobuf
# | 48 # 27 # 100 # 10 # 10 # 0 # default # rs$datadatadatadatadatadatadatadatada
# | 80 # 45 # 55 # 10 # 10 # 1000 # M$M # rs$datadatadatadatadatadatadatadatada
# | 160 # 90 # 55 # 5 # 5 # 1000 # M$M # rs$datadatadatadatadatadatadatadatada
# | 320 # 180 # 55 # 3 # 3 # 1000 # M$M # rs$datadatadatadatadatadatadatadatada
# ^ width, height, thumb_count, columns, rows, interval, $N, sigh parameter. $L is just the index of a given storyboard, say, 0 for $N=default
# try to extract data from the storyboard template
try:
base_url, *formats = storyboard_template.split("|")
extracted_formats = []
for index, fmt in enumerate(formats):
fmt = fmt.split("#")
width = int(fmt[0])
height = int(fmt[1])
count = int(fmt[2])
columns = int(fmt[3])
rows = int(fmt[4])
interval = int(fmt[5])
name = fmt[6]
sigh = fmt[7]
thumbs_per_image = columns * rows
images_count = math.ceil(count / thumbs_per_image)
interval = interval if interval != 0 else int((length / count) * 1000) # calculated only for $N=default as it's the only one that has interval=0
extracted_formats.append({
"index": index,
"width": width,
"height": height,
"thumb_count": count,
"columns": columns,
"rows": rows,
"interval": interval,
"name": name,
"sigh": sigh,
"images_count": images_count
})
storyboards = {
"template_url": ythdd_globals.translateLinks(base_url, remove_params=False), # NOT removing params is crucial, otherwise sqp will be dropped!
"formats": extracted_formats
}
ythdd_globals.general_cache["storyboards"][video_id] = storyboards
return True
except:
print("error(ensure_storyboards): storyboard template url layout changed. please update ythdd for latest storyboard extraction fixes.")
return False
else:
print(f"error(ensure_storyboards: couldn't extract storyboards from video page ({video_id}). this video won't have storyboards.")
return False
def channels(data, req, only_json: bool = False):
# prevent potential out of bound read
if len(data) < 4:
return send(400, {"error": "No channel specified."})
if len(data) > 4 and len(data[4]) == 24 and data[4].startswith("UC"):
# reversed order (/api/v1/api/invidious/channels/videos/UC...)
data[3], data[4] = data[4], data[3]
# silly sanity check
if len(data[3]) != 24 or not data[3].startswith("UC"):
# typical order (/api/v1/api/invidious/channels/UC.../videos)
return send(404, {"error": "This channel does not exist."})
if len(data) > 4:
@@ -885,15 +821,15 @@ def playlists(data, req, only_json: bool = False):
# todo: make clipious stop spamming requests for paginated response
page = req.args.get('page')
if page is not None and page != '1':
return send(404, {"error": "Paginated queries are not supported."})
# if page is not None and page != '1':
# return send(404, {"error": "Paginated queries are not supported."})
plid = data[3]
# no info about what length can playlists be
# settle for a basic sanity check instead
# if len(plid) not in (18, 34, 36):
if not plid.startswith("PL"):
if not (plid.startswith("PL") or plid.startswith("OL")):
response = {"error": "Only standard playlists are currently supported (no mixes, video-based playlists, etc.)"}
if only_json:
return response
@@ -902,7 +838,9 @@ def playlists(data, req, only_json: bool = False):
# check if request has been cached within the last hour
if ythdd_globals.config['general']['cache'] and plid in ythdd_globals.general_cache['playlists']:
if ythdd_globals.general_cache['playlists'][plid]['cacheTime'] + 1 * 60 * 60 > time():
response = ythdd_globals.general_cache['playlists'][plid]
response = ythdd_globals.general_cache['playlists'][plid].copy()
if page is not None and page != '1':
response['videos'] = []
if only_json:
return response
else:
@@ -938,7 +876,14 @@ def playlists(data, req, only_json: bool = False):
author = safeTraverse(secondary_sidebar, ["videoOwner", "videoOwnerRenderer", "title", "runs", 0, "text"], default="Unknown channel")
author_ucid = safeTraverse(secondary_sidebar, ["videoOwner", "videoOwnerRenderer", "title", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
author_avatars = ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(secondary_sidebar, ["videoOwner", "videoOwnerRenderer", "thumbnail", "thumbnails", 0, "url"], default=DEFAULT_AVATAR))
description = safeTraverse(meta, ["header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "description", "descriptionPreviewViewModel", "description", "content"], default="(ythdd: failed to retrieve description, perhaps it's empty?)")
description = safeTraverse(meta, ["microformat", "microformatDataRenderer", "description"], default="(ythdd: failed to retrieve description, perhaps it's empty?)")
if author_ucid == "UNKNOWNCHANNELID" and author == "Unknown channel":
# most likely a collaborative playlist
author = safeTraverse(secondary_sidebar, ["videoOwner", "videoOwnerRenderer", "title", "simpleText"], default="by Unknown user and others").removeprefix("by ")
author_orig = author[:author.rfind(" and ")]
description += f"\n(ythdd: This is a collaborative playlist by \"{author_orig}\" {author.removeprefix(author_orig + ' ')}. You can't view the authors' channels.)"
author = author_orig
description_html = html.escape(description).replace("\r\n", "<br>").replace("\n", "<br>")
video_count = ythdd_struct_parser.parseViewsFromViewText(ythdd_struct_parser.extractTextFromSimpleOrRuns(safeTraverse(primary_sidebar, ["stats", 0]), default="No videos"))
view_count = ythdd_struct_parser.parseViewsFromViewText(ythdd_struct_parser.extractTextFromSimpleOrRuns(safeTraverse(primary_sidebar, ["stats", 1]), default="No views"))
@@ -967,14 +912,36 @@ def playlists(data, req, only_json: bool = False):
# todo: cache videos and metadata separately, so that paginated queries can be supported as well
if ythdd_globals.config['general']['cache']:
ythdd_globals.general_cache['playlists'][plid] = response
ythdd_globals.general_cache['playlists'][plid] = response.copy()
ythdd_globals.general_cache['playlists'][plid]['cacheTime'] = time()
if page is not None or page == '1':
response['videos'] = []
if only_json:
return response
return send(200, response)
def storyboards(data, req):
height = req.args.get("height")
width = req.args.get("width")
video_id = data[3]
try:
height = int(height)
except:
pass
try:
width = int(width)
except:
pass
resp = ythdd_struct_builder.genWebvttStoryboard(video_id, width, height)
return Response(resp, mimetype="text/vtt", status=200)
def lookup(data, req):
# possibly TODO: rewrite this mess
if len(data) > 2:
@@ -998,6 +965,8 @@ def lookup(data, req):
return get_comments(data, req)
case 'playlists':
return playlists(data, req)
case 'storyboards':
return storyboards(data, req)
case _:
incrementBadRequests()
return notImplemented(data)
@@ -1022,4 +991,6 @@ def lookup(data, req):
return notImplemented(data)
elif len(data) == 1:
if data[0] == "videoplayback":
return redirect(url_for('videoplaybackProxy', **req.args))
return stats() # /api/invidious/something

View File

@@ -83,3 +83,14 @@ def producePlaylistContinuation(plid: str, offset: int = 0) -> str:
b64_ctoken = bbpbToB64(bbpb_dicts, urlsafe=True, padding=True)
return b64_ctoken
def produceSearchParams(page: int = 1) -> str:
msge = {
"9:int": 20 * (page - 1), # pagination
"30:int": 1 # no self-harm censorship
}
bbpb_dicts = fdictToBbpb(msge)
b64_params = bbpbToB64(bbpb_dicts, urlsafe=True, padding=True)
return b64_params

93
ythdd_struct_builder.py Normal file
View File

@@ -0,0 +1,93 @@
from ythdd_globals import safeTraverse
import ythdd_globals
def genThumbs(videoId: str):
result = []
thumbnails = [
{'height': 720, 'width': 1280, 'quality': "maxres", 'url': "maxres"}, # will always attempt to return the best quality available
{'height': 720, 'width': 1280, 'quality': "maxresdefault", 'url': "maxresdefault"},
{'height': 480, 'width': 640, 'quality': "sddefault", 'url': "sddefault"},
{'height': 360, 'width': 480, 'quality': "high", 'url': "hqdefault"},
{'height': 180, 'width': 320, 'quality': "medium", 'url': "mqdefault"},
{'height': 90, 'width': 120, 'quality': "default", 'url': "default"},
{'height': 90, 'width': 120, 'quality': "start", 'url': "1"},
{'height': 90, 'width': 120, 'quality': "middle", 'url': "2"},
{'height': 90, 'width': 120, 'quality': "end", 'url': "3"},
]
for x in thumbnails:
width = x['width']
height = x['height']
quality = x['quality']
url = ythdd_globals.config['general']['public_facing_url'] + 'vi/' + videoId + '/' + x['url'] + '.jpg'
result.append({'quality': quality, 'url': url, 'width': width, 'height': height})
return result
def genStoryboards(video_id: str) -> list:
# generates storyboards inside of /api/v1/videos/:video_id
storyboards = []
cached_storyboards = safeTraverse(ythdd_globals.general_cache["storyboards"], [video_id], default=[])
for sb in cached_storyboards["formats"]:
built_storyboard = {
"url": f"/api/v1/storyboards/{video_id}?width={sb['width']}&height={sb['height']}",
"templateUrl": cached_storyboards['template_url'].replace("$L", str(sb['index'])).replace("$N", sb['name']) + f"&sigh={sb['sigh']}",
"width": sb['width'],
"height": sb['height'],
"count": sb['thumb_count'],
"interval": sb['interval'],
"storyboardWidth": sb['columns'],
"storyboardHeight": sb['rows'],
"storyboardCount": sb['images_count']
}
storyboards.append(built_storyboard)
return storyboards
def msToWebvttTimestamp(time: int):
ms = time % 1000
time //= 1000
hours = time // (60 * 60)
time -= hours * 60 * 60
minutes = time // 60
time -= minutes * 60
seconds = time
timestamp = f"{str(hours).zfill(2)}:{str(minutes).zfill(2)}:{str(seconds).zfill(2)}.{str(ms).zfill(3)}"
return timestamp
def genWebvttStoryboard(video_id: str, width: int = None, height: int = None):
# generates WebVTT storyboards for /api/v1/storyboards/:video_id
webvtt = "WEBVTT\n\n"
cached_storyboards = safeTraverse(ythdd_globals.general_cache["storyboards"], [video_id])
if cached_storyboards is None:
return ""
found_storyboard = {}
for sb in cached_storyboards["formats"]:
if width is not None and width == sb['width']:
found_storyboard = sb
if height is not None and height == sb['height']:
found_storyboard = sb
# could be changed
if not found_storyboard:
found_storyboard = cached_storyboards["formats"][0]
start = 0
thumbs_per_sb = sb['columns'] * sb['rows']
xx = 0
yy = 0
for x in range(found_storyboard["thumb_count"]):
xx = x % sb['columns']
yy = (x // sb['rows']) % sb['rows']
xywh = f"#xywh={xx * sb['width']},{yy * sb['height']},{sb['width']},{sb['height']}"
webvtt += f"{msToWebvttTimestamp(start)} --> {msToWebvttTimestamp(start + found_storyboard['interval'])}\n"
webvtt += cached_storyboards['template_url'].replace("$L", str(sb['index'])).replace("$N", sb['name']).replace("$M", str(x // (thumbs_per_sb))) + f"&sigh={sb['sigh']}{xywh}\n"
webvtt += "\n"
start += found_storyboard['interval']
return webvtt

View File

@@ -1,36 +1,14 @@
from ythdd_globals import safeTraverse
from html import escape
from invidious_formats import FORMATS
from ythdd_globals import safeTraverse
import json
import dateparser
import ythdd_globals
import ythdd_extractor
import ythdd_struct_builder
DEFAULT_AVATAR = "https://yt3.ggpht.com/a/default-user=s176-c-k-c0x00ffffff-no-rj"
def genThumbs(videoId: str):
result = []
thumbnails = [
#{'height': 720, 'width': 1280, 'quality': "maxres", 'url': "maxres"}, # for the time being omit the buggy maxres quality
{'height': 720, 'width': 1280, 'quality': "maxresdefault", 'url': "maxresdefault"},
{'height': 480, 'width': 640, 'quality': "sddefault", 'url': "sddefault"},
{'height': 360, 'width': 480, 'quality': "high", 'url': "hqdefault"},
{'height': 180, 'width': 320, 'quality': "medium", 'url': "mqdefault"},
{'height': 90, 'width': 120, 'quality': "default", 'url': "default"},
{'height': 90, 'width': 120, 'quality': "start", 'url': "1"},
{'height': 90, 'width': 120, 'quality': "middle", 'url': "2"},
{'height': 90, 'width': 120, 'quality': "end", 'url': "3"},
]
for x in thumbnails:
width = x['width']
height = x['height']
quality = x['quality']
url = ythdd_globals.config['general']['public_facing_url'] + 'vi/' + videoId + '/' + x['url'] + '.jpg'
result.append({'quality': quality, 'url': url, 'width': width, 'height': height})
return result
def doesContainNumber(string: str, numeric_system: int = 10) -> bool:
try:
number = int(string, numeric_system)
@@ -73,6 +51,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
match safeTraverse(list(entry.keys()), [0], default=""):
case "videoRenderer": # represents a video
# as of october 2025 slowly phased out in favor of lockupViewModel(?)
published_date = safeTraverse(entry, ["videoRenderer", "publishedTimeText", "simpleText"], default="now")
published_date = published_date.removeprefix("Streamed ")
@@ -103,7 +82,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
else:
avatar_url = safeTraverse(entry, ["videoRenderer", "avatar", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
views_or_viewers_model = safeTraverse(entry, ["videoRenderer", "viewCountText"])
views_or_viewers_model = safeTraverse(entry, ["videoRenderer", "viewCountText"], default={})
if "simpleText" in views_or_viewers_model:
# means this is a video with X views
view_count = parseViewsFromViewText(entry["videoRenderer"]["viewCountText"]["simpleText"])
@@ -141,6 +120,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
# retrieve the main channel's avatar
avatar_url = safeTraverse(livm, [0, "listItemViewModel", "leadingAccessory", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
ythdd_globals.print_debug("videoRenderer fired")
return {
"type": "video",
"title": safeTraverse(entry, ["videoRenderer", "title", "runs", 0, "text"]),
@@ -150,7 +130,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
"authorUrl": "/channel/" + author_ucid,
"authorVerified": verified, # TODO
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url),
"videoThumbnails": genThumbs(safeTraverse(entry, ["videoRenderer", "videoId"], default="unknown")),
"videoThumbnails": ythdd_struct_builder.genThumbs(safeTraverse(entry, ["videoRenderer", "videoId"], default="unknown")),
"description": description,
"descriptionHtml": description_html,
"viewCount": view_count,
@@ -172,15 +152,67 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
# modify the premiere timestamp afterwards here?
case "lockupViewModel": # represents playlists/mixes
case "lockupViewModel": # represents playlists/mixes (and videos since october 2025)
# related videos lvms are handled in ythdd_inv_tl.videos()
playlist_type = safeTraverse(entry, ["lockupViewModel", "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "icon", "sources", 0, "clientResource", "imageName"], default="PLAYLISTS")
lvm = entry["lockupViewModel"]
playlist_type = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "icon", "sources", 0, "clientResource", "imageName"], default="")
if playlist_type == "MIX":
# mixes aren't currently supported
return
lvm = entry["lockupViewModel"]
if not playlist_type:
# struct represents a video
ythdd_globals.print_debug("lockupViewModel fired (not a playlist). this is an a/b test; any following errors stem from it.")
lmvm = safeTraverse(lvm, ['metadata', 'lockupMetadataViewModel'], default={})
video_id = safeTraverse(lvm, ['contentId'])
author_name = safeTraverse(context, ["author_name"], default="Unknown author")
author_ucid = safeTraverse(context, ["author_ucid"], default="UNKNOWNCHANNELID")
verified = safeTraverse(context, ["verified"], default=False) # TODO: check if this can be retrieved here
avatar_url = safeTraverse(context, ["avatar"], default=DEFAULT_AVATAR)
title = safeTraverse(lmvm, ["title", "content"], default="No title")
video_metadata = safeTraverse(lmvm, ["metadata", "contentMetadataViewModel", "metadataRows", 0, "metadataParts"], default=[])
view_count_text = safeTraverse(video_metadata, [0, "text", "content"], default="0 views")
published_date = safeTraverse(video_metadata, [1, "text", "content"], default="now")
length_text = safeTraverse(lvm, ["contentImage", "thumbnailViewModel", "overlays", ..., "thumbnailBottomOverlayViewModel", "badges", -1, "thumbnailBadgeViewModel", "text"], default="0:0")
view_count = parseViewsFromViewText(view_count_text)
length = parseLengthFromTimeBadge(length_text)
resp = {
"type": "video",
"title": title,
"videoId": video_id,
"author": author_name,
"authorId": author_ucid,
"authorUrl": "/channel/" + author_ucid,
"authorVerified": verified, # TODO
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url),
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
"description": "", # can't be retrieved from lockupViewModel
"descriptionHtml": "",
"viewCount": view_count,
"viewCountText": view_count_text,
"published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too
"publishedText": published_date,
"lengthSeconds": length,
"liveNow": False, # can't be live if it's in creator's video feed
"premium": False, # todo: check this
"isUpcoming": False,
"isNew": False,
"is4k": False,
"is8k": False,
"isVr180": False,
"isVr360": False,
"is3d": False,
"hasCaptions": False
}
return resp
# struct represents a playlist
meta = safeTraverse(lvm, ["metadata"], default=[])
lmvm = safeTraverse(meta, ["lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows"], default=[])
thumbnail = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", -1, "url"], default="no-url?")
@@ -191,7 +223,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
length = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="0 videos")
length = parseViewsFromViewText(length.split(" ")[0])
# Turns out for some responses we do some data, while not on others.
# Turns out for some responses we do have some data, while not on others.
# Data from context should be prioritized, thus even if something is found with safeTraverse,
# the parser will ignore it in favour of the context.
ucid = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
@@ -199,6 +231,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
ucid = safeTraverse(context, ["author_ucid"], default=ucid)
author = safeTraverse(context, ["author_name"], default=author)
ythdd_globals.print_debug("lockupViewModel fired (playlist)")
return {
"type": "playlist",
"title": safeTraverse(meta, ["lockupMetadataViewModel", "title", "content"], default="ythdd: unknown title"),
@@ -250,6 +283,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
else:
avatar_url = "unknown"
ythdd_globals.print_debug("shortsLockupViewModel fired")
return {
"type": "video",
"title": title,
@@ -258,7 +292,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
"authorId": author_ucid,
"authorUrl": "/channel/" + author_ucid,
"authorVerified": False,
"videoThumbnails": genThumbs(video_id),
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
"description": "",
"descriptionHtml": "",
"viewCount": parseViewsFromViewText(views_text),
@@ -287,11 +321,12 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
# thumbnail["url"] = ythdd_globals.translateLinks(thumbnail["url"])
video_id = safeTraverse(entry, ["gridVideoRenderer", "videoId"], default="UnknownVideoId")
thumbnails = genThumbs(video_id)
thumbnails = ythdd_struct_builder.genThumbs(video_id)
published_date = safeTraverse(entry, ["gridVideoRenderer", "publishedTimeText", "simpleText"], default="now")
published_date = published_date.removeprefix("Streamed ")
ythdd_globals.print_debug("gridVideoRenderer fired")
return {
"type": "video",
"title": safeTraverse(entry, ["gridVideoRenderer", "title", "simpleText"], default="unknown video title"),
@@ -326,6 +361,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
description, description_html = parseDescriptionSnippet(safeTraverse(entry, ["channelRenderer", "descriptionSnippet", "runs"], default=[]))
isVerified = ythdd_extractor.isVerified(safeTraverse(entry, ["channelRenderer", "ownerBadges", 0], default=[]))
ythdd_globals.print_debug("channelRenderer fired")
return {
"type": "channel",
"author": safeTraverse(entry, ["channelRenderer", "title", "simpleText"], default="Unknown channel"),
@@ -371,6 +407,12 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
author_name = ", ".join(all_authors)
author_ucid = safeTraverse(livm, [0, "listItemViewModel", "title", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
# surprisingly, innertube responds with the avatar of the user that added the video to the playlist
# we can extract that information, e.g. for yattee to display
avatar_url = safeTraverse(entry, ["playlistVideoRenderer", "thumbnailOverlays", ..., "thumbnailOverlayAvatarStackViewModel", "avatarStack", "avatarStackViewModel", "avatars", 0, "avatarViewModel", "image", "sources", 0, "url"])
avatars = None if avatar_url is None else ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url)
ythdd_globals.print_debug("playlistVideoRenderer fired")
return {
"type": "video",
"title": title,
@@ -378,7 +420,8 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
"author": author_name,
"authorId": author_ucid,
"authorUrl": "/channel/" + author_ucid,
"videoThumbnails": genThumbs(video_id),
"authorThumbnails": avatars,
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
"index": video_index,
"lengthSeconds": length,
"liveNow": False, # todo: check this?
@@ -389,7 +432,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
}
case _:
print("received an entry of unknown type:")
print("received an entry of unknown type (thus can't be parsed):")
print(entry)
print("")
# breakpoint()
@@ -465,6 +508,8 @@ def extractTextFromSimpleOrRuns(obj: dict, default: str = "") -> str:
# Extracts the text both from "runs" and "simpleText"
# with failsafe to default.
text = default
if not isinstance(obj, dict):
return default
if "runs" in obj:
text = runsToText(obj["runs"])
elif "simpleText" in obj:
@@ -472,3 +517,102 @@ def extractTextFromSimpleOrRuns(obj: dict, default: str = "") -> str:
else:
print(f"error(extractTextFromSimpleOrRuns): text extraction failed for {obj}")
return text
def findNearestResolution(width: int, height: int) -> int:
# Finds the nearest standard resolution (one of 144p, 240p, ...)
# So far only used for Yattee, as it has trouble playing anything
# without one of the standard resolutions. Playback on other
# clients is unaffected.
# failsafe behaviour
try:
width = int(width)
height = int(height)
res = min(width, height)
except:
return 360
standard_resolutions = [144, 240, 360, 720, 1080, 2160, 4320]
if res in standard_resolutions:
return res
# calculate relative distance to one of the standard resolutions
res_normalized = [abs(1 - (x / res)) for x in standard_resolutions]
# pick the one where the distance is the smallest
target_index = res_normalized.index(min(res_normalized))
target_res = standard_resolutions[target_index]
return target_res
def parseFormatStreams(wdata_fstream: dict, ydata_stream: dict) -> dict:
try:
stream_url = ydata_stream["url"]
except:
ythdd_globals.print_debug( "could not extract format stream URL from yt-dlp response:")
ythdd_globals.print_debug(f"wdata: {wdata_fstream}")
ythdd_globals.print_debug(f"ydata: {ydata_stream}")
fstream = {
"url": stream_url,
"itag": str(wdata_fstream["itag"]),
"type": wdata_fstream["mimeType"],
"quality": wdata_fstream["quality"],
"bitrate": str(wdata_fstream["bitrate"]),
"fps": wdata_fstream["fps"],
"size": f"{wdata_fstream['width']}x{wdata_fstream['height']}",
"resolution": f"{findNearestResolution(wdata_fstream['width'], wdata_fstream['height'])}p", # possibly not really needed here
"qualityLabel": wdata_fstream["qualityLabel"],
"container": safeTraverse(FORMATS.get(wdata_fstream["itag"]), [ "ext"], default="mp4"), # invidious_formats
"encoding": safeTraverse(FORMATS.get(wdata_fstream["itag"]), ["vcodec"], default="mp4") # invidious_formats
}
return fstream
def parseAdaptiveStreams(wdata_astream: dict, ydata_stream: dict) -> dict:
try:
stream_url = ydata_stream["url"]
except:
ythdd_globals.print_debug( "could not extract adaptive stream URL from yt-dlp response:")
ythdd_globals.print_debug(f"wdata: {wdata_fstream}")
ythdd_globals.print_debug(f"ydata: {ydata_stream}")
astream_common = {
"init": f"{wdata_astream[ 'initRange']['start']}-{wdata_astream[ 'initRange']['end']}",
"index": f"{wdata_astream['indexRange']['start']}-{wdata_astream['indexRange']['end']}",
"bitrate": str(wdata_astream["bitrate"]),
"url": stream_url,
"itag": str(wdata_astream["itag"]),
"type": wdata_astream["mimeType"],
"clen": wdata_astream["contentLength"],
"lmt": wdata_astream["lastModified"],
"projectionType": wdata_astream["projectionType"],
"container": safeTraverse(FORMATS.get(wdata_astream["itag"]), [ "ext"], default="mp4"), # invidious_formats
"encoding": safeTraverse(FORMATS.get(wdata_astream["itag"]), ["vcodec"], default="mp4") # invidious_formats
}
isVideo = True
if "audioQuality" in wdata_astream:
isVideo = False
if isVideo:
astream = astream_common
# video-specific metadata
astream["fps"] = wdata_astream["fps"]
astream["size"] = f"{wdata_astream['width']}x{wdata_astream['height']}"
astream["resolution"] = f"{findNearestResolution(wdata_astream['width'], wdata_astream['height'])}p"
astream["qualityLabel"] = wdata_astream["qualityLabel"]
astream["colorInfo"] = safeTraverse(wdata_astream, ["colorInfo"])
else:
astream = astream_common
# audio-specific metadata
astream["encoding"] = safeTraverse(FORMATS.get(wdata_astream["itag"]), ["acodec"], default="mp4")
astream["audioQuality"] = wdata_astream["audioQuality"],
astream["audioSampleRate"] = int(wdata_astream["audioSampleRate"]),
astream["audioChannels"] = wdata_astream["audioChannels"]
return astream