Compare commits
3 Commits
3e7589aea6
...
a814797363
| Author | SHA1 | Date | |
|---|---|---|---|
| a814797363 | |||
| 89f8f2a786 | |||
| 5e655ddd2c |
@@ -13,3 +13,4 @@ Flask-APScheduler>=1.13.1
|
||||
requests>=2.32.3
|
||||
yt_dlp
|
||||
brotli>=1.1.0
|
||||
dateparser>=1.2.2
|
||||
@@ -66,7 +66,7 @@ stage1_body = {
|
||||
|
||||
stage2_headers = {
|
||||
"Connection": "keep-alive",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"Accept-Language": "en-us,en;q=0.5",
|
||||
"Sec-Fetch-Mode": "navigate",
|
||||
@@ -75,13 +75,13 @@ stage2_headers = {
|
||||
|
||||
stage3_headers = {
|
||||
"Connection": "keep-alive",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"Accept-Language": "en-us,en;q=0.5",
|
||||
"Sec-Fetch-Mode": "navigate",
|
||||
"Content-Type": "application/json",
|
||||
"X-Youtube-Client-Name": "1",
|
||||
"X-Youtube-Client-Version": "2.20250226.01.00",
|
||||
"X-Youtube-Client-Version": "2.20250829.01.00",
|
||||
"Origin": "https://www.youtube.com",
|
||||
"Accept-Encoding": "gzip, deflate, br",
|
||||
"Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI"
|
||||
@@ -93,7 +93,7 @@ stage3_body = {
|
||||
"client":
|
||||
{
|
||||
"clientName": "WEB",
|
||||
"clientVersion": "2.20250226.01.00",
|
||||
"clientVersion": "2.20250829.01.00",
|
||||
"hl": "en",
|
||||
"timeZone": "UTC",
|
||||
"utcOffsetMinutes": 0
|
||||
@@ -111,9 +111,9 @@ web_context_dict = {
|
||||
'gl': 'US',
|
||||
'deviceMake': '',
|
||||
'deviceModel': '',
|
||||
'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0,gzip(gfe)',
|
||||
'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0,gzip(gfe)',
|
||||
'clientName': 'WEB',
|
||||
'clientVersion': '2.20250226.01.00',
|
||||
'clientVersion': '2.20250829.01.00',
|
||||
'osName': 'Windows',
|
||||
'osVersion': '10.0',
|
||||
'screenPixelDensity': 2,
|
||||
@@ -121,7 +121,7 @@ web_context_dict = {
|
||||
'screenDensityFloat': 2,
|
||||
'userInterfaceTheme': 'USER_INTERFACE_THEME_LIGHT',
|
||||
'browserName': 'Firefox',
|
||||
'browserVersion': '135.0',
|
||||
'browserVersion': '142.0',
|
||||
'acceptHeader': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||||
'utcOffsetMinutes': 0,
|
||||
}
|
||||
@@ -229,6 +229,7 @@ def IOSextract(uri: str):
|
||||
|
||||
def makeWebContext(secondaryContextDict: dict):
|
||||
# Uses web_context_dict to create a context, returns a dict.
|
||||
# Essentially, expands the web_context_dict with a secondary one.
|
||||
|
||||
current_web_context_dict = web_context_dict
|
||||
|
||||
@@ -264,10 +265,37 @@ def getChannelAvatar(response_json: dict):
|
||||
if avatar['width'] * avatar['height'] > best_avatar['width'] * best_avatar['height']:
|
||||
best_avatar = avatar
|
||||
|
||||
# or use regex substitution and set the size to something like 512x512
|
||||
# e.g.: =s128 -> =s512
|
||||
|
||||
best_avatar['url'] = ythdd_globals.translateLinks(best_avatar['url'])
|
||||
|
||||
return best_avatar
|
||||
|
||||
def generateChannelAvatarsFromUrl(url: str, proxied: bool = True) -> list:
|
||||
# Generates channel avatars at default sizes.
|
||||
|
||||
avatars = []
|
||||
if not url.startswith("https://yt3.ggpht.com/"):
|
||||
return []
|
||||
|
||||
url = ythdd_globals.translateLinks(url)
|
||||
url_size_start = url.rfind("=s") + 2
|
||||
url_size_end = url. find("-", url_size_start) - 1
|
||||
|
||||
default_sizes = [32, 48, 76, 100, 176, 512]
|
||||
|
||||
for size in default_sizes:
|
||||
avatars.append(
|
||||
{
|
||||
"url": url[:url_size_start] + str(size) + url[url_size_end:],
|
||||
"width": size,
|
||||
"height": size
|
||||
}
|
||||
)
|
||||
|
||||
return avatars
|
||||
|
||||
def isVerified(response_json: dict):
|
||||
# Returns True if any user badge has been found (verified/artist).
|
||||
badges = safeTraverse(response_json, [], default=False)
|
||||
@@ -292,3 +320,26 @@ def browseAbout(ucid: str):
|
||||
response_json = json.loads(response.text)
|
||||
|
||||
return response_json
|
||||
|
||||
def WEBextractSearchResults(search_query: str) -> list:
|
||||
# Posts a search request to innertube API
|
||||
# and processes only the relevant part (the actual results)
|
||||
|
||||
if search_query is None:
|
||||
return []
|
||||
|
||||
web_context = makeWebContext({"query": search_query})
|
||||
response = requests.post('https://www.youtube.com/youtubei/v1/search',
|
||||
params={"prettyPrint": False},
|
||||
headers=stage2_headers,
|
||||
data=json.dumps(web_context)
|
||||
)
|
||||
|
||||
results = []
|
||||
try:
|
||||
results = json.loads(response.text)
|
||||
except:
|
||||
pass
|
||||
results = safeTraverse(results, ["contents", "twoColumnSearchResultsRenderer", "primaryContents", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents"], default=[])
|
||||
|
||||
return results
|
||||
|
||||
158
ythdd_inv_tl.py
158
ythdd_inv_tl.py
@@ -8,6 +8,7 @@ from markupsafe import escape
|
||||
from time import strftime, gmtime, time
|
||||
from ythdd_globals import safeTraverse
|
||||
import json, datetime
|
||||
import dateparser
|
||||
import invidious_formats
|
||||
import ythdd_globals
|
||||
import ythdd_api_v1
|
||||
@@ -16,7 +17,9 @@ import ythdd_extractor
|
||||
# TODO:
|
||||
# [✓] /api/v1/stats (stats())
|
||||
# [✓] /streams/dQw4w9WgXcQ (does nothing)
|
||||
# [✓] /vi/videoIdXXXX/maxresdefault.jpg
|
||||
# [✓] /vi/videoIdXXXX/maxresdefault.jpg (todo: add a fallback for 404s)
|
||||
# [✓] /api/v1/search?q=... (videos and playlists)
|
||||
# [X] /api/v1/playlists/:plid
|
||||
# [*] /api/v1/auth/subscriptions (stub? db?)
|
||||
# [*] /api/v1/auth/feed?page=1 (stub? db?)
|
||||
# [*] /api/v1/auth/playlists (stub? db?)
|
||||
@@ -279,7 +282,7 @@ def rebuildFormatsFromYtdlpApi(ydata: dict):
|
||||
height = str(safeTraverse(stream, ["height"], default=0))
|
||||
width = str(safeTraverse(stream, [ "width"], default=0))
|
||||
newRow[ "type"] = "video/" + type
|
||||
newRow[ "resolution"] = height + "p"
|
||||
newRow[ "resolution"] = (height if height in ("144", "240", "360", "480", "720", "1080") else "360") + "p" # mpv won't play the video inside of Yattee if it's a non-standard resolution (bug?)
|
||||
newRow[ "fps"] = safeTraverse(stream, ["fps"], default=30)
|
||||
newRow[ "qualityLabel"] = height + "p" + str(int(newRow['fps'])) * (newRow["fps"] > 30) # also a placeholder
|
||||
newRow[ "size"] = width + "x" + height
|
||||
@@ -450,27 +453,9 @@ def videos(data):
|
||||
related_video['authorThumbnails'] = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'avatar', 'avatarViewModel', 'image', 'sources'], default=[])
|
||||
for z in related_video['authorThumbnails']:
|
||||
z['url'] = ythdd_globals.translateLinks(z['url'])
|
||||
related_video['lengthSeconds'] = 0
|
||||
time_lookup_list = [1, 60, 3_600, 86_400]
|
||||
time_list = safeTraverse(y, ['contentImage', 'thumbnailViewModel', 'overlays', 0, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', 0, 'thumbnailBadgeViewModel', 'text'], default="0:0").split(":")
|
||||
if False in map(doesContainNumber, time_list): # works around ['LIVE'] for livestreams or ['Upcoming'] for scheduled videos
|
||||
pass
|
||||
else:
|
||||
for z in range(len(time_list)):
|
||||
related_video['lengthSeconds'] += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
|
||||
related_views_text = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 1, 'metadataParts', 0, 'text', 'content'], default="0").split(" ")[0]
|
||||
related_video['lengthSeconds'] = parseLengthFromTimeBadge(safeTraverse(y, ['contentImage', 'thumbnailViewModel', 'overlays', 0, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', 0, 'thumbnailBadgeViewModel', 'text'], default="0:0"))
|
||||
related_video['viewCountText'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 1, 'metadataParts', 0, 'text', 'content'], default="0").split(" ")[0]
|
||||
related_views = 0
|
||||
magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
|
||||
if related_views_text:
|
||||
if related_views_text.lower() == "no":
|
||||
related_views_text = "0"
|
||||
related_views = int("0" + "".join([z for z in related_views_text if 48 <= ord(z) and ord(z) <= 57]))
|
||||
related_views_text = related_views_text.split(" ")[0]
|
||||
for x in magnitude.keys():
|
||||
if x == related_views_text[-1]:
|
||||
related_views *= magnitude[x]
|
||||
related_video['viewCount'] = related_views
|
||||
related_video['viewCount'] = parseViewsFromViewText(related_video['viewCountText'])
|
||||
related.append(related_video)
|
||||
|
||||
# magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
|
||||
@@ -621,7 +606,132 @@ def videos(data):
|
||||
|
||||
return send(status_code, response)
|
||||
|
||||
def lookup(data, request):
|
||||
def parseLengthFromTimeBadge(time_str: str) -> int:
|
||||
# Returns 0 if unsuccessful
|
||||
length = 0
|
||||
time_lookup_list = [1, 60, 3_600, 86_400]
|
||||
time_list = time_str.split(":")
|
||||
for z in range(len(time_list)):
|
||||
length += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
|
||||
return length
|
||||
|
||||
def parseViewsFromViewText(viewcounttext: str) -> int:
|
||||
views = 0
|
||||
magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
|
||||
if viewcounttext:
|
||||
if viewcounttext.lower() == "no":
|
||||
viewcounttext = "0"
|
||||
views = int("0" + "".join([z for z in viewcounttext if 48 <= ord(z) and ord(z) <= 57]))
|
||||
viewcounttext = viewcounttext.split(" ")[0]
|
||||
for x in magnitude.keys():
|
||||
if x == viewcounttext[-1].upper():
|
||||
views *= magnitude[x]
|
||||
return views
|
||||
|
||||
def search(data, req):
|
||||
search_query = req.args.get('q')
|
||||
print(f"search query: {search_query}")
|
||||
|
||||
# ignore paginated requests as we do nothing with the continuation token
|
||||
page = req.args.get('page')
|
||||
if page is not None and page != '1':
|
||||
return send(404, [])
|
||||
|
||||
if (data[-2].lower() != "search" or data[-1].lower() != "") and data[-1].lower() != "search":
|
||||
print(f"'{data[-2]}', '{data[-1]}'")
|
||||
print("search suggestions are not yet supported")
|
||||
return send(501, {"status": "error", "msg": "search suggestions not supported in this version of ythdd", "data": []})
|
||||
|
||||
results = ythdd_extractor.WEBextractSearchResults(search_query)
|
||||
results_list = []
|
||||
|
||||
for entry in results:
|
||||
|
||||
match safeTraverse(list(entry.keys()), [0], default=""):
|
||||
|
||||
case "videoRenderer": # represents a video
|
||||
|
||||
published_date = safeTraverse(entry, ["videoRenderer", "publishedTimeText", "simpleText"], default="now")
|
||||
published_date = published_date.removeprefix("Streamed ")
|
||||
|
||||
results_list.append(
|
||||
{
|
||||
"type": "video",
|
||||
"title": safeTraverse(entry, ["videoRenderer", "title", "runs", 0, "text"]),
|
||||
"videoId": safeTraverse(entry, ["videoRenderer", "videoId"]),
|
||||
"author": safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "text"]),
|
||||
"authorId": safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"]),
|
||||
"authorUrl": "/channel/" + safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID"),
|
||||
"authorVerified": False, # TODO
|
||||
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(entry, ["videoRenderer", "avatar", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default="unknown")),
|
||||
"videoThumbnails": genThumbs(safeTraverse(entry, ["videoRenderer", "videoId"], default="unknown")),
|
||||
"description": "",
|
||||
"descriptionHtml": "",
|
||||
"viewCount": parseViewsFromViewText(safeTraverse(entry, ["videoRenderer", "viewCountText", "simpleText"], default="No views")),
|
||||
"viewCountText": safeTraverse(entry, ["videoRenderer", "viewCountText", "simpleText"], default="Unknown amount of views"),
|
||||
"published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too
|
||||
"publishedText": published_date,
|
||||
"lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["videoRenderer", "lengthText", "simpleText"], default="0:0")),
|
||||
"liveNow": False,
|
||||
"premium": False,
|
||||
"isUpcoming": False,
|
||||
"isNew": False,
|
||||
"is4k": False,
|
||||
"is8k": False,
|
||||
"isVr180": False,
|
||||
"isVr360": False,
|
||||
"is3d": False,
|
||||
"hasCaptions": False
|
||||
}
|
||||
)
|
||||
|
||||
# modify the premiere timestamp afterwards here?
|
||||
|
||||
case "lockupViewModel": # represents playlists/mixes
|
||||
|
||||
isMix = safeTraverse(entry, ["lockupViewModel", "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="") == "Mix"
|
||||
if isMix:
|
||||
# mixes aren't currently supported
|
||||
continue
|
||||
|
||||
lvm = entry["lockupViewModel"]
|
||||
meta = safeTraverse(lvm, ["metadata"], default=[])
|
||||
lmvm = safeTraverse(meta, ["lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows"], default=[])
|
||||
ucid = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
||||
length = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="0 videos")
|
||||
length = parseViewsFromViewText(length.split(" ")[0])
|
||||
|
||||
results_list.append(
|
||||
{
|
||||
"type": "playlist",
|
||||
"title": safeTraverse(meta, ["lockupMetadataViewModel", "title", "content"], default="ythdd: unknown title"),
|
||||
"playlistId": safeTraverse(lmvm, [2, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "watchEndpoint", "playlistId"], default="UNKNOWNPLAYLISTID"),
|
||||
"playlistThumbnail": safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", 0, "url"], default="no-url?"), # todo: sanitize this
|
||||
"author": safeTraverse(lmvm, [0, "metadataParts", 0, "text", "content"], default="ythdd: unknown author"),
|
||||
"authorId": ucid,
|
||||
"authorUrl": "/channel/" + ucid,
|
||||
"authorVerified": False,
|
||||
"videoCount": length,
|
||||
"videos": [] # provided for historical reasons i guess
|
||||
}
|
||||
)
|
||||
|
||||
case "shelfRenderer": # "people also watched"
|
||||
continue
|
||||
|
||||
case "gridShelfViewModel": # shorts?
|
||||
continue
|
||||
|
||||
case _:
|
||||
print("received a search result of unknown type:")
|
||||
print(entry)
|
||||
print("")
|
||||
# breakpoint()
|
||||
continue
|
||||
|
||||
return send(200, results_list)
|
||||
|
||||
def lookup(data, req):
|
||||
# possibly TODO: rewrite this mess
|
||||
if len(data) > 2:
|
||||
if (data[0], data[1]) == ("api", "v1"):
|
||||
@@ -636,6 +746,8 @@ def lookup(data, request):
|
||||
return videos(data)
|
||||
case 'auth':
|
||||
return auth(data)
|
||||
case 'search':
|
||||
return search(data, req)
|
||||
case _:
|
||||
incrementBadRequests()
|
||||
return notImplemented(data)
|
||||
|
||||
Reference in New Issue
Block a user