#!/usr/bin/python3 # ythdd Invidious Translation Layer # ----- # Translates requests sent through Invidious API at /api/invidious/ # to use internal extractors. from flask import Response, request, redirect from markupsafe import escape from time import strftime, gmtime, time import json, datetime import invidious_formats import ythdd_globals import ythdd_api_v1 import ythdd_extractor # TODO: # [✓] /api/v1/stats (stats()) # [✓] /streams/dQw4w9WgXcQ (does nothing) # [✓] /vi/videoIdXXXX/maxresdefault.jpg # [*] /api/v1/auth/subscriptions (stub? db?) # [*] /api/v1/auth/feed?page=1 (stub? db?) # [*] /api/v1/auth/playlists (stub? db?) # [*] /api/v1/videos/videoIdXXXX def incrementBadRequests(): ythdd_globals.apiFailedRequests += 1 def greeting(): return 200, 'hello from Invidious TL!\nstats endpoint at /api/invidious/stats' def send(status, response): return Response(json.dumps(response), mimetype='application/json', status=status) def notImplemented(data): return send(501, {'error': f"not recognised/implemented: {'/'.join(data)}"}) def stats(): data_to_send = { "version": "2.0", "software": { "name": "invidious", "version": f"invidious TL, ythdd ({ythdd_globals.version})", "branch": "https://gitea.7o7.cx/sherl/ythdd", "tl_msg": "/api/invidious/api/v1/" } } return send(200, data_to_send) def videoIdSanityCheck(videoId: str): if len(videoId) != 11: incrementBadRequests() return send(400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.') # elif...? def auth(data): # can be either subscriptions, feed or playlists match data[1]: # NOT YET IMPLEMENTED # TODO: make it use the internal db case "subscriptions" | "feed" | "playlists": return send(200, []) case _: incrementBadRequests() return send(404, []) def streams(): return send(200, '') def epochToDate(epoch): return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(epoch)) def trending(): return send(200, [{}]) def popular(): return send(200, [{}]) def safeTraverse(obj: dict, path: list, default=None): result = obj try: for x in path: result = result[x] except KeyError: result = default finally: return result def genThumbs(videoId: str): result = [] thumbnails = [ #{'height': 720, 'width': 1280, 'quality': "maxres", 'url': "maxres"}, # for the time being omit the buggy maxres quality {'height': 720, 'width': 1280, 'quality': "maxresdefault", 'url': "maxresdefault"}, {'height': 480, 'width': 640, 'quality': "sddefault", 'url': "sddefault"}, {'height': 360, 'width': 480, 'quality': "high", 'url': "hqdefault"}, {'height': 180, 'width': 320, 'quality': "medium", 'url': "mqdefault"}, {'height': 90, 'width': 120, 'quality': "default", 'url': "default"}, {'height': 90, 'width': 120, 'quality': "start", 'url': "1"}, {'height': 90, 'width': 120, 'quality': "middle", 'url': "2"}, {'height': 90, 'width': 120, 'quality': "end", 'url': "3"}, ] for x in thumbnails: width = x['width'] height = x['height'] quality = x['quality'] url = ythdd_globals.config['general']['public_facing_url'] + 'vi/' + videoId + '/' + x['url'] + '.jpg' #url = '/vi/' + videoId + '/' + x['url'] + '.jpg' result.append({'quality': quality, 'url': url, 'width': width, 'height': height}) return result def rebuildFormats(data): result = [{} for x in data] formatStreams = [] for x in range(len(data)): try: result[x]['audioChannels'] = data[x]['audioChannels'] isVideo = 0 except: isVideo = 1 result[x]['init'] = str(data[x]['initRange']['start']) + "-" + str(data[x]['initRange']['end']) result[x]['index'] = str(data[x]['indexRange']['start']) + "-" + str(data[x]['indexRange']['end']) result[x]['bitrate'] = str(data[x]['averageBitrate']) result[x]['url'] = data[x]['url'] result[x]['itag'] = str(data[x]['itag']) result[x]['type'] = data[x]['mimeType'] result[x]['clen'] = data[x]['contentLength'] result[x]['lmt'] = data[x]['lastModified'] result[x]['projectionType'] = data[x]['projectionType'] try: result[x]['colorInfo'] = data[x]['colorInfo'] except: pass if isVideo: result[x]['fps'] = str(data[x]['fps']) else: result[x]['audioQuality'] = data[x]['audioQuality'] result[x]['audioSampleRate'] = data[x]['audioSampleRate'] if data[x]['itag'] in invidious_formats.FORMATS.keys(): result[x]['container'] = invidious_formats.FORMATS[data[x]['itag']]['ext'] try: result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['vcodec'] except: result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['acodec'] if isVideo: try: result[x]['resolution'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p" result[x]['qualityLabel'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p" + str(result[x]['fps']) * (data[x]['fps'] > 30) # NOT IMPLEMENTED, that's just a placeholder result[x]['size'] = str(invidious_formats.FORMATS[data[x]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[x]['itag']]['height']) except: pass #if data[x]['itag'] <= 80: # won't be triggered for iOS player as it has no progressive streams # formatStreams.append(result[x]) return result, formatStreams def videos(data): # an attempt on a faithful rewrite of # https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr response = {} #print(f"got data: {data}") #print("requesting idata from IOSextract") idata = ythdd_extractor.IOSextract(data[3]) wdata = ythdd_extractor.WEBextractSinglePage(data[3]) main_results = idata['stage3']['contents']['twoColumnWatchNextResults'] primary_results = safeTraverse(main_results, ['results', 'results', 'contents']) if primary_results: video_primary_renderer = safeTraverse(primary_results, [0, 'videoPrimaryInfoRenderer']) video_secondary_renderer = safeTraverse(primary_results, [1, 'videoSecondaryInfoRenderer']) else: print("error: primary_results not found in invidious TL videos()") video_details = safeTraverse(wdata, ['ec1', 'videoDetails']) microformat = safeTraverse(wdata, ['ec1', 'microformat', 'playerMicroformatRenderer'], default={}) video_id = safeTraverse(video_details, ['videoId'], default=f"[{data[3]}] (errors occurred, check logs)") title = safeTraverse(video_details, ['title'], default=video_id) views = int(safeTraverse(video_details, ['viewCount'], default=0)) length = int(safeTraverse(video_details, ['lengthSeconds'], default=1)) published = datetime.datetime.fromisoformat(safeTraverse(microformat, ['publishDate'], default="1970-01-02T00:00:00Z")).timestamp() # ISO format to Unix timestamp published_date = epochToDate(published) premiere_timestamp = safeTraverse(microformat, ['liveBroadcastDetails', 'startTimestamp'], default=0) # let's ignore the nitty gritty for the time being premiere_timestamp = premiere_timestamp if premiere_timestamp else safeTraverse(microformat, ['playabilityStatus', 'liveStreamability', 'liveStreamabilityRenderer', 'offlineSlate', 'liveStreamOfflineSlateRenderer', 'scheduledStartTime'], default=0) live_now = safeTraverse(microformat, ['liveBroadcastDetails', 'isLiveNow'], default=False) post_live_dvr = safeTraverse(video_details, ['isPostLiveDvr'], default=False) allowed_regions = safeTraverse(microformat, ['availableCountries'], default=[]) allow_ratings = safeTraverse(video_details, ['allowRatings'], default=True) family_friendly = safeTraverse(microformat, ['isFamilySafe'], default=True) is_listed = safeTraverse(video_details, ['isCrawlable'], default=True) is_upcoming = safeTraverse(video_details, ['isUpcoming'], default=False) keywords = safeTraverse(video_details, ['keywords'], default=[]) related_raw = safeTraverse(wdata, ['ec2', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results'], default=[]) # can possibly change in the future related = [] for x in related_raw: y = safeTraverse(x, ['compactVideoRenderer']) if type(y) != dict: continue related_video = {} related_video['videoId'] = safeTraverse(y, ['videoId']) related_video['title'] = safeTraverse(y, ['title', 'simpleText']) related_video['videoThumbnails'] = genThumbs(related_video['videoId']) #safeTraverse(y, ['thumbnail', 'thumbnails']) related_video['author'] = safeTraverse(y, ['longBylineText', 'runs', 0, 'text']) related_video['authorId'] = safeTraverse(y, ['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], default="UNKNOWNCHANNELID") related_video['authorUrl'] = '/channel/' + related_video['authorId'] related_video['authorVerified'] = False if "ownerBadges" in y: related_video['authorVerified'] = True # hopefully this won't break things, as invidious API doesn't distinguish music and normal verified badges related_video['authorThumbnails'] = safeTraverse(y, ['channelThumbnail', 'thumbnails'], default=[]) for z in related_video['authorThumbnails']: z['url'] = ythdd_globals.translateLinks(z['url']) related_video['lengthSeconds'] = 0 time_lookup_list = [1, 60, 3_600, 86_400] time_list = safeTraverse(y, ['lengthText', 'simpleText'], default="0:0").split(":") for z in range(len(time_list)): related_video['lengthSeconds'] += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z]) related_views_text = safeTraverse(y, ['viewCountText', 'simpleText'], default="0").split(" ")[0] related_video['viewCountText'] = safeTraverse(y, ['shortViewCountText', 'simpleText'], default="0").split(" ")[0] related_views = 0 if related_views_text: related_views = int("".join([z for z in related_views_text if 48 <= ord(z) and ord(z) <= 57])) related_views_text = related_views_text.split(" ")[0] related_video['viewCount'] = related_views related.append(related_video) magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000} toplevel_buttons = safeTraverse(video_primary_renderer, ['videoActions', 'menuRenderer', 'topLevelButtons'], default={}) # hacky solution likes_text = safeTraverse(toplevel_buttons, [0, 'segmentedLikeDislikeButtonViewModel', 'likeButtonViewModel', 'likeButtonViewModel', 'toggleButtonViewModel', 'toggleButtonViewModel', 'defaultButtonViewModel', 'buttonViewModel', 'title'], default="") # hacky solution likes = 0 if likes_text: likes = int("".join([x for x in likes_text if 48 <= ord(x) and ord(x) <= 57])) # ASCII for 0-9, no regex needed likes_text = likes_text.split(" ")[0] for x in magnitude.keys(): if x in likes_text: likes *= magnitude[x] description = safeTraverse(microformat, ['description', 'simpleText'], default="\n(ythdd: error ocurred, failed to retrieve description)") short_description = safeTraverse(wdata, ['ec1', 'videoDetails', 'shortDescription'], default="(ythdd: error occurred, failed to retrieve short description)") description_html = "
" + description + "
" # sorry, not happening right now, TODO: https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr#L329 metadata = safeTraverse(video_secondary_renderer, ['metadataRowContainer', 'metadataRowContainerRenderer', 'rows'], default={}) genre = safeTraverse(microformat, ['category']) # TODO: genre blah blah blah... author = safeTraverse(video_details, ['author'], default="Unknown Author") ucid = safeTraverse(video_details, ['channelId'], default="UNKNOWNCHANNELID") author_info = safeTraverse(video_secondary_renderer, ['owner', 'videoOwnerRenderer'], default={}) author_thumbnail = safeTraverse(author_info, ['thumbnail', 'thumbnails']) # lowest quality thumbnail subs_text = safeTraverse(author_info, ['subscriberCountText', 'simpleText'], default="0") subs = 0 if subs_text: subs = int("".join([x for x in subs_text if 48 <= ord(x) and ord(x) <= 57])) subs_text = subs_text.split(" ")[0] for x in magnitude.keys(): if x in subs_text: subs *= magnitude[x] for x in author_thumbnail: # rewrite to use views.py x['url'] = ythdd_globals.translateLinks(x['url']) # so far it seems to be impossible to tell if a channel is verified or not, # that is - without making another request author_verified = False hls_url = safeTraverse(idata, ['stage1', 'streamingData', 'hlsManifestUrl'], default="") adaptive_formats = safeTraverse(idata, ['stage1', 'streamingData', 'adaptiveFormats'], default=[]) adaptive_formats, format_streams = rebuildFormats(adaptive_formats) if live_now: video_type = "livestream" elif premiere_timestamp: video_type = "scheduled" published = premiere_timestamp if premiere_timestamp else int(time.time()) else: video_type = "video" premium = False if "YouTube Red" in keywords: premium = True # TODO: detect paywalled patron-only videos if not format_streams: format_streams = [] # providing format streams breaks Clipious client #format_streams.append(adaptive_formats[0]) #format_streams.append(adaptive_formats[1]) #''' response = { "type": video_type, "title": title, "videoId": video_id, "videoThumbnails": genThumbs(video_id), "storyboards": [], # not implemented "description": description, # due to change (include ythdd metadata) "descriptionHtml": description_html, # basically the same as normal description for the time being "published": published, "publishedText": published_date, "keywords": keywords, "viewCount": views, "viewCountText": str(views), # not implemented "likeCount": likes, "dislikeCount": 0, "paid": False, # not implemented "premium": premium, "isFamilyFriendly": family_friendly, "allowedRegions": allowed_regions, "genre": genre, "genreUrl": "/genreUrl/not/implemented/", # not implemented "author": author, "authorId": ucid, "authorUrl": "/channel/" + ucid, "authorVerified": author_verified, "authorThumbnails": author_thumbnail, "subCountText": subs_text, "lengthSeconds": length, "allowRatings": allow_ratings, "rating": 0, "isListed": is_listed, "liveNow": live_now, "isPostLiveDvr": post_live_dvr, "isUpcoming": is_upcoming, "dashUrl": "/dash/not/implemented/", # not implemented "premiereTimestamp": premiere_timestamp, "hlsUrl": hls_url, "adaptiveFormats": adaptive_formats, "formatStreams": format_streams, # very bare bones, empty actually xD "captions": [], # not implemented # "captions": [ # { # "label": String, # "language_code": String, # "url": String # } # ], # "musicTracks": [ # { # "song": String, # "artist": String, # "album": String, # "license": String # } # ], "recommendedVideos": related # "recommendedVideos": [ # { # "videoId": String, # "title": String, # "videoThumbnails": [ # { # "quality": String, # "url": String, # "width": Int32, # "height": Int32 # } # ], # "author": String, # "authorUrl": String, # "authorId": String?, # "authorVerified": Boolean, # "authorThumbnails": [ # { # "url": string, # "width": Int32, # "height": Int32 # } # ], # "lengthSeconds": Int32, # "viewCount": # "viewCountText": String # } # ] } #''' # for debugging: #return send(200, ythdd_extractor.WEBextractSinglePage(data[3])) #return send(200, ythdd_extractor.IOSextract(data[3])) #return send(200, {'idata': idata, 'wdata': wdata}) # if youtube returns not the videoId we aksed # then it means that the instance is ratelimited status_code = 200 if data[3] == response['videoId'] else 403 return send(status_code, response) def lookup(data): # possibly TODO: rewrite this mess if len(data) > 2: if (data[0], data[1]) == ("api", "v1"): match data[2]: case 'stats' | '': # /api/invidious/api/v1/stats and /api/invidious/api/v1/ return stats() case 'trending': return trending() case 'popular': return popular() case 'videos': return videos(data) case 'auth': return auth(data) case _: incrementBadRequests() return notImplemented(data) elif data[0] == 'ggpht': # for some reason the Materialous client # keeps making requests to these if data[1] == 'ggpht': return redirect('/' + "/".join(data[1:])) return redirect('/' + "/".join(data[0:])) else: incrementBadRequests() return notImplemented(data) elif len(data) == 2: if (data[0], data[1]) == ("api", "v1"): # /api/invidious/api/v1 return stats() elif data[0] == "streams": return streams() elif data[0] == 'ggpht': return redirect('/' + "/".join(data[0:])) else: incrementBadRequests() return notImplemented(data) elif len(data) == 1: return stats() # /api/invidious/something