feat: basic support for search queries

currently returns videos and playlists - the latter don't yet have
an API endpoint
This commit is contained in:
2025-09-05 06:23:45 +02:00
parent 89f8f2a786
commit a814797363
3 changed files with 188 additions and 24 deletions

View File

@@ -12,4 +12,5 @@ toml>=0.10.2
Flask-APScheduler>=1.13.1
requests>=2.32.3
yt_dlp
brotli>=1.1.0
brotli>=1.1.0
dateparser>=1.2.2

View File

@@ -229,6 +229,7 @@ def IOSextract(uri: str):
def makeWebContext(secondaryContextDict: dict):
# Uses web_context_dict to create a context, returns a dict.
# Essentially, expands the web_context_dict with a secondary one.
current_web_context_dict = web_context_dict
@@ -264,10 +265,37 @@ def getChannelAvatar(response_json: dict):
if avatar['width'] * avatar['height'] > best_avatar['width'] * best_avatar['height']:
best_avatar = avatar
# or use regex substitution and set the size to something like 512x512
# e.g.: =s128 -> =s512
best_avatar['url'] = ythdd_globals.translateLinks(best_avatar['url'])
return best_avatar
def generateChannelAvatarsFromUrl(url: str, proxied: bool = True) -> list:
# Generates channel avatars at default sizes.
avatars = []
if not url.startswith("https://yt3.ggpht.com/"):
return []
url = ythdd_globals.translateLinks(url)
url_size_start = url.rfind("=s") + 2
url_size_end = url. find("-", url_size_start) - 1
default_sizes = [32, 48, 76, 100, 176, 512]
for size in default_sizes:
avatars.append(
{
"url": url[:url_size_start] + str(size) + url[url_size_end:],
"width": size,
"height": size
}
)
return avatars
def isVerified(response_json: dict):
# Returns True if any user badge has been found (verified/artist).
badges = safeTraverse(response_json, [], default=False)
@@ -291,4 +319,27 @@ def browseAbout(ucid: str):
response_json = json.loads(response.text)
return response_json
return response_json
def WEBextractSearchResults(search_query: str) -> list:
# Posts a search request to innertube API
# and processes only the relevant part (the actual results)
if search_query is None:
return []
web_context = makeWebContext({"query": search_query})
response = requests.post('https://www.youtube.com/youtubei/v1/search',
params={"prettyPrint": False},
headers=stage2_headers,
data=json.dumps(web_context)
)
results = []
try:
results = json.loads(response.text)
except:
pass
results = safeTraverse(results, ["contents", "twoColumnSearchResultsRenderer", "primaryContents", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents"], default=[])
return results

View File

@@ -8,6 +8,7 @@ from markupsafe import escape
from time import strftime, gmtime, time
from ythdd_globals import safeTraverse
import json, datetime
import dateparser
import invidious_formats
import ythdd_globals
import ythdd_api_v1
@@ -16,7 +17,9 @@ import ythdd_extractor
# TODO:
# [✓] /api/v1/stats (stats())
# [✓] /streams/dQw4w9WgXcQ (does nothing)
# [✓] /vi/videoIdXXXX/maxresdefault.jpg
# [✓] /vi/videoIdXXXX/maxresdefault.jpg (todo: add a fallback for 404s)
# [✓] /api/v1/search?q=... (videos and playlists)
# [X] /api/v1/playlists/:plid
# [*] /api/v1/auth/subscriptions (stub? db?)
# [*] /api/v1/auth/feed?page=1 (stub? db?)
# [*] /api/v1/auth/playlists (stub? db?)
@@ -450,27 +453,9 @@ def videos(data):
related_video['authorThumbnails'] = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'avatar', 'avatarViewModel', 'image', 'sources'], default=[])
for z in related_video['authorThumbnails']:
z['url'] = ythdd_globals.translateLinks(z['url'])
related_video['lengthSeconds'] = 0
time_lookup_list = [1, 60, 3_600, 86_400]
time_list = safeTraverse(y, ['contentImage', 'thumbnailViewModel', 'overlays', 0, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', 0, 'thumbnailBadgeViewModel', 'text'], default="0:0").split(":")
if False in map(doesContainNumber, time_list): # works around ['LIVE'] for livestreams or ['Upcoming'] for scheduled videos
pass
else:
for z in range(len(time_list)):
related_video['lengthSeconds'] += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
related_views_text = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 1, 'metadataParts', 0, 'text', 'content'], default="0").split(" ")[0]
related_video['lengthSeconds'] = parseLengthFromTimeBadge(safeTraverse(y, ['contentImage', 'thumbnailViewModel', 'overlays', 0, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', 0, 'thumbnailBadgeViewModel', 'text'], default="0:0"))
related_video['viewCountText'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 1, 'metadataParts', 0, 'text', 'content'], default="0").split(" ")[0]
related_views = 0
magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
if related_views_text:
if related_views_text.lower() == "no":
related_views_text = "0"
related_views = int("0" + "".join([z for z in related_views_text if 48 <= ord(z) and ord(z) <= 57]))
related_views_text = related_views_text.split(" ")[0]
for x in magnitude.keys():
if x == related_views_text[-1]:
related_views *= magnitude[x]
related_video['viewCount'] = related_views
related_video['viewCount'] = parseViewsFromViewText(related_video['viewCountText'])
related.append(related_video)
# magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
@@ -621,7 +606,132 @@ def videos(data):
return send(status_code, response)
def lookup(data, request):
def parseLengthFromTimeBadge(time_str: str) -> int:
# Returns 0 if unsuccessful
length = 0
time_lookup_list = [1, 60, 3_600, 86_400]
time_list = time_str.split(":")
for z in range(len(time_list)):
length += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
return length
def parseViewsFromViewText(viewcounttext: str) -> int:
views = 0
magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
if viewcounttext:
if viewcounttext.lower() == "no":
viewcounttext = "0"
views = int("0" + "".join([z for z in viewcounttext if 48 <= ord(z) and ord(z) <= 57]))
viewcounttext = viewcounttext.split(" ")[0]
for x in magnitude.keys():
if x == viewcounttext[-1].upper():
views *= magnitude[x]
return views
def search(data, req):
search_query = req.args.get('q')
print(f"search query: {search_query}")
# ignore paginated requests as we do nothing with the continuation token
page = req.args.get('page')
if page is not None and page != '1':
return send(404, [])
if (data[-2].lower() != "search" or data[-1].lower() != "") and data[-1].lower() != "search":
print(f"'{data[-2]}', '{data[-1]}'")
print("search suggestions are not yet supported")
return send(501, {"status": "error", "msg": "search suggestions not supported in this version of ythdd", "data": []})
results = ythdd_extractor.WEBextractSearchResults(search_query)
results_list = []
for entry in results:
match safeTraverse(list(entry.keys()), [0], default=""):
case "videoRenderer": # represents a video
published_date = safeTraverse(entry, ["videoRenderer", "publishedTimeText", "simpleText"], default="now")
published_date = published_date.removeprefix("Streamed ")
results_list.append(
{
"type": "video",
"title": safeTraverse(entry, ["videoRenderer", "title", "runs", 0, "text"]),
"videoId": safeTraverse(entry, ["videoRenderer", "videoId"]),
"author": safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "text"]),
"authorId": safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"]),
"authorUrl": "/channel/" + safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID"),
"authorVerified": False, # TODO
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(entry, ["videoRenderer", "avatar", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default="unknown")),
"videoThumbnails": genThumbs(safeTraverse(entry, ["videoRenderer", "videoId"], default="unknown")),
"description": "",
"descriptionHtml": "",
"viewCount": parseViewsFromViewText(safeTraverse(entry, ["videoRenderer", "viewCountText", "simpleText"], default="No views")),
"viewCountText": safeTraverse(entry, ["videoRenderer", "viewCountText", "simpleText"], default="Unknown amount of views"),
"published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too
"publishedText": published_date,
"lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["videoRenderer", "lengthText", "simpleText"], default="0:0")),
"liveNow": False,
"premium": False,
"isUpcoming": False,
"isNew": False,
"is4k": False,
"is8k": False,
"isVr180": False,
"isVr360": False,
"is3d": False,
"hasCaptions": False
}
)
# modify the premiere timestamp afterwards here?
case "lockupViewModel": # represents playlists/mixes
isMix = safeTraverse(entry, ["lockupViewModel", "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="") == "Mix"
if isMix:
# mixes aren't currently supported
continue
lvm = entry["lockupViewModel"]
meta = safeTraverse(lvm, ["metadata"], default=[])
lmvm = safeTraverse(meta, ["lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows"], default=[])
ucid = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
length = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="0 videos")
length = parseViewsFromViewText(length.split(" ")[0])
results_list.append(
{
"type": "playlist",
"title": safeTraverse(meta, ["lockupMetadataViewModel", "title", "content"], default="ythdd: unknown title"),
"playlistId": safeTraverse(lmvm, [2, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "watchEndpoint", "playlistId"], default="UNKNOWNPLAYLISTID"),
"playlistThumbnail": safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", 0, "url"], default="no-url?"), # todo: sanitize this
"author": safeTraverse(lmvm, [0, "metadataParts", 0, "text", "content"], default="ythdd: unknown author"),
"authorId": ucid,
"authorUrl": "/channel/" + ucid,
"authorVerified": False,
"videoCount": length,
"videos": [] # provided for historical reasons i guess
}
)
case "shelfRenderer": # "people also watched"
continue
case "gridShelfViewModel": # shorts?
continue
case _:
print("received a search result of unknown type:")
print(entry)
print("")
# breakpoint()
continue
return send(200, results_list)
def lookup(data, req):
# possibly TODO: rewrite this mess
if len(data) > 2:
if (data[0], data[1]) == ("api", "v1"):
@@ -636,6 +746,8 @@ def lookup(data, request):
return videos(data)
case 'auth':
return auth(data)
case 'search':
return search(data, req)
case _:
incrementBadRequests()
return notImplemented(data)