From a8147973635967f05d86a66e4223f2b9d8b6454d Mon Sep 17 00:00:00 2001 From: sherl Date: Fri, 5 Sep 2025 06:23:45 +0200 Subject: [PATCH] feat: basic support for search queries currently returns videos and playlists - the latter don't yet have an API endpoint --- requirements.txt | 3 +- ythdd_extractor.py | 53 ++++++++++++++- ythdd_inv_tl.py | 156 ++++++++++++++++++++++++++++++++++++++------- 3 files changed, 188 insertions(+), 24 deletions(-) diff --git a/requirements.txt b/requirements.txt index 191ff4f..e3b8f2a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,5 @@ toml>=0.10.2 Flask-APScheduler>=1.13.1 requests>=2.32.3 yt_dlp -brotli>=1.1.0 \ No newline at end of file +brotli>=1.1.0 +dateparser>=1.2.2 \ No newline at end of file diff --git a/ythdd_extractor.py b/ythdd_extractor.py index 1d2c43f..48eecf1 100644 --- a/ythdd_extractor.py +++ b/ythdd_extractor.py @@ -229,6 +229,7 @@ def IOSextract(uri: str): def makeWebContext(secondaryContextDict: dict): # Uses web_context_dict to create a context, returns a dict. + # Essentially, expands the web_context_dict with a secondary one. current_web_context_dict = web_context_dict @@ -264,10 +265,37 @@ def getChannelAvatar(response_json: dict): if avatar['width'] * avatar['height'] > best_avatar['width'] * best_avatar['height']: best_avatar = avatar + # or use regex substitution and set the size to something like 512x512 + # e.g.: =s128 -> =s512 + best_avatar['url'] = ythdd_globals.translateLinks(best_avatar['url']) return best_avatar +def generateChannelAvatarsFromUrl(url: str, proxied: bool = True) -> list: + # Generates channel avatars at default sizes. + + avatars = [] + if not url.startswith("https://yt3.ggpht.com/"): + return [] + + url = ythdd_globals.translateLinks(url) + url_size_start = url.rfind("=s") + 2 + url_size_end = url. find("-", url_size_start) - 1 + + default_sizes = [32, 48, 76, 100, 176, 512] + + for size in default_sizes: + avatars.append( + { + "url": url[:url_size_start] + str(size) + url[url_size_end:], + "width": size, + "height": size + } + ) + + return avatars + def isVerified(response_json: dict): # Returns True if any user badge has been found (verified/artist). badges = safeTraverse(response_json, [], default=False) @@ -291,4 +319,27 @@ def browseAbout(ucid: str): response_json = json.loads(response.text) - return response_json \ No newline at end of file + return response_json + +def WEBextractSearchResults(search_query: str) -> list: + # Posts a search request to innertube API + # and processes only the relevant part (the actual results) + + if search_query is None: + return [] + + web_context = makeWebContext({"query": search_query}) + response = requests.post('https://www.youtube.com/youtubei/v1/search', + params={"prettyPrint": False}, + headers=stage2_headers, + data=json.dumps(web_context) + ) + + results = [] + try: + results = json.loads(response.text) + except: + pass + results = safeTraverse(results, ["contents", "twoColumnSearchResultsRenderer", "primaryContents", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents"], default=[]) + + return results diff --git a/ythdd_inv_tl.py b/ythdd_inv_tl.py index 9036f82..41e70ca 100644 --- a/ythdd_inv_tl.py +++ b/ythdd_inv_tl.py @@ -8,6 +8,7 @@ from markupsafe import escape from time import strftime, gmtime, time from ythdd_globals import safeTraverse import json, datetime +import dateparser import invidious_formats import ythdd_globals import ythdd_api_v1 @@ -16,7 +17,9 @@ import ythdd_extractor # TODO: # [✓] /api/v1/stats (stats()) # [✓] /streams/dQw4w9WgXcQ (does nothing) -# [✓] /vi/videoIdXXXX/maxresdefault.jpg +# [✓] /vi/videoIdXXXX/maxresdefault.jpg (todo: add a fallback for 404s) +# [✓] /api/v1/search?q=... (videos and playlists) +# [X] /api/v1/playlists/:plid # [*] /api/v1/auth/subscriptions (stub? db?) # [*] /api/v1/auth/feed?page=1 (stub? db?) # [*] /api/v1/auth/playlists (stub? db?) @@ -450,27 +453,9 @@ def videos(data): related_video['authorThumbnails'] = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'avatar', 'avatarViewModel', 'image', 'sources'], default=[]) for z in related_video['authorThumbnails']: z['url'] = ythdd_globals.translateLinks(z['url']) - related_video['lengthSeconds'] = 0 - time_lookup_list = [1, 60, 3_600, 86_400] - time_list = safeTraverse(y, ['contentImage', 'thumbnailViewModel', 'overlays', 0, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', 0, 'thumbnailBadgeViewModel', 'text'], default="0:0").split(":") - if False in map(doesContainNumber, time_list): # works around ['LIVE'] for livestreams or ['Upcoming'] for scheduled videos - pass - else: - for z in range(len(time_list)): - related_video['lengthSeconds'] += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z]) - related_views_text = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 1, 'metadataParts', 0, 'text', 'content'], default="0").split(" ")[0] + related_video['lengthSeconds'] = parseLengthFromTimeBadge(safeTraverse(y, ['contentImage', 'thumbnailViewModel', 'overlays', 0, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', 0, 'thumbnailBadgeViewModel', 'text'], default="0:0")) related_video['viewCountText'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 1, 'metadataParts', 0, 'text', 'content'], default="0").split(" ")[0] - related_views = 0 - magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000} - if related_views_text: - if related_views_text.lower() == "no": - related_views_text = "0" - related_views = int("0" + "".join([z for z in related_views_text if 48 <= ord(z) and ord(z) <= 57])) - related_views_text = related_views_text.split(" ")[0] - for x in magnitude.keys(): - if x == related_views_text[-1]: - related_views *= magnitude[x] - related_video['viewCount'] = related_views + related_video['viewCount'] = parseViewsFromViewText(related_video['viewCountText']) related.append(related_video) # magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000} @@ -621,7 +606,132 @@ def videos(data): return send(status_code, response) -def lookup(data, request): +def parseLengthFromTimeBadge(time_str: str) -> int: + # Returns 0 if unsuccessful + length = 0 + time_lookup_list = [1, 60, 3_600, 86_400] + time_list = time_str.split(":") + for z in range(len(time_list)): + length += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z]) + return length + +def parseViewsFromViewText(viewcounttext: str) -> int: + views = 0 + magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000} + if viewcounttext: + if viewcounttext.lower() == "no": + viewcounttext = "0" + views = int("0" + "".join([z for z in viewcounttext if 48 <= ord(z) and ord(z) <= 57])) + viewcounttext = viewcounttext.split(" ")[0] + for x in magnitude.keys(): + if x == viewcounttext[-1].upper(): + views *= magnitude[x] + return views + +def search(data, req): + search_query = req.args.get('q') + print(f"search query: {search_query}") + + # ignore paginated requests as we do nothing with the continuation token + page = req.args.get('page') + if page is not None and page != '1': + return send(404, []) + + if (data[-2].lower() != "search" or data[-1].lower() != "") and data[-1].lower() != "search": + print(f"'{data[-2]}', '{data[-1]}'") + print("search suggestions are not yet supported") + return send(501, {"status": "error", "msg": "search suggestions not supported in this version of ythdd", "data": []}) + + results = ythdd_extractor.WEBextractSearchResults(search_query) + results_list = [] + + for entry in results: + + match safeTraverse(list(entry.keys()), [0], default=""): + + case "videoRenderer": # represents a video + + published_date = safeTraverse(entry, ["videoRenderer", "publishedTimeText", "simpleText"], default="now") + published_date = published_date.removeprefix("Streamed ") + + results_list.append( + { + "type": "video", + "title": safeTraverse(entry, ["videoRenderer", "title", "runs", 0, "text"]), + "videoId": safeTraverse(entry, ["videoRenderer", "videoId"]), + "author": safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "text"]), + "authorId": safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"]), + "authorUrl": "/channel/" + safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID"), + "authorVerified": False, # TODO + "authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(entry, ["videoRenderer", "avatar", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default="unknown")), + "videoThumbnails": genThumbs(safeTraverse(entry, ["videoRenderer", "videoId"], default="unknown")), + "description": "", + "descriptionHtml": "", + "viewCount": parseViewsFromViewText(safeTraverse(entry, ["videoRenderer", "viewCountText", "simpleText"], default="No views")), + "viewCountText": safeTraverse(entry, ["videoRenderer", "viewCountText", "simpleText"], default="Unknown amount of views"), + "published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too + "publishedText": published_date, + "lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["videoRenderer", "lengthText", "simpleText"], default="0:0")), + "liveNow": False, + "premium": False, + "isUpcoming": False, + "isNew": False, + "is4k": False, + "is8k": False, + "isVr180": False, + "isVr360": False, + "is3d": False, + "hasCaptions": False + } + ) + + # modify the premiere timestamp afterwards here? + + case "lockupViewModel": # represents playlists/mixes + + isMix = safeTraverse(entry, ["lockupViewModel", "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="") == "Mix" + if isMix: + # mixes aren't currently supported + continue + + lvm = entry["lockupViewModel"] + meta = safeTraverse(lvm, ["metadata"], default=[]) + lmvm = safeTraverse(meta, ["lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows"], default=[]) + ucid = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID") + length = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="0 videos") + length = parseViewsFromViewText(length.split(" ")[0]) + + results_list.append( + { + "type": "playlist", + "title": safeTraverse(meta, ["lockupMetadataViewModel", "title", "content"], default="ythdd: unknown title"), + "playlistId": safeTraverse(lmvm, [2, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "watchEndpoint", "playlistId"], default="UNKNOWNPLAYLISTID"), + "playlistThumbnail": safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", 0, "url"], default="no-url?"), # todo: sanitize this + "author": safeTraverse(lmvm, [0, "metadataParts", 0, "text", "content"], default="ythdd: unknown author"), + "authorId": ucid, + "authorUrl": "/channel/" + ucid, + "authorVerified": False, + "videoCount": length, + "videos": [] # provided for historical reasons i guess + } + ) + + case "shelfRenderer": # "people also watched" + continue + + case "gridShelfViewModel": # shorts? + continue + + case _: + print("received a search result of unknown type:") + print(entry) + print("") + # breakpoint() + continue + + return send(200, results_list) + +def lookup(data, req): # possibly TODO: rewrite this mess if len(data) > 2: if (data[0], data[1]) == ("api", "v1"): @@ -636,6 +746,8 @@ def lookup(data, request): return videos(data) case 'auth': return auth(data) + case 'search': + return search(data, req) case _: incrementBadRequests() return notImplemented(data)