Files
ythdd/ythdd_inv_tl.py

493 lines
20 KiB
Python

#!/usr/bin/python3
# ythdd Invidious Translation Layer
# -----
# Translates requests sent through Invidious API at /api/invidious/
# to use internal extractors.
from flask import Response, request, redirect
from markupsafe import escape
from time import strftime, gmtime, time
from ythdd_globals import safeTraverse
import json, datetime
import invidious_formats
import ythdd_globals
import ythdd_api_v1
import ythdd_extractor
# TODO:
# [✓] /api/v1/stats (stats())
# [✓] /streams/dQw4w9WgXcQ (does nothing)
# [✓] /vi/videoIdXXXX/maxresdefault.jpg
# [*] /api/v1/auth/subscriptions (stub? db?)
# [*] /api/v1/auth/feed?page=1 (stub? db?)
# [*] /api/v1/auth/playlists (stub? db?)
# [*] /api/v1/videos/videoIdXXXX
def incrementBadRequests():
ythdd_globals.apiFailedRequests += 1
def greeting():
return 200, 'hello from Invidious TL!\nstats endpoint at /api/invidious/stats'
def send(status, response):
return Response(json.dumps(response), mimetype='application/json', status=status)
def notImplemented(data):
return send(501, {'error': f"not recognised/implemented: {'/'.join(data)}"})
def stats():
data_to_send = {
"version": "2.0",
"software":
{
"name": "invidious",
"version": f"invidious TL, ythdd ({ythdd_globals.version})",
"branch": "https://gitea.7o7.cx/sherl/ythdd",
"tl_msg": "/api/invidious/api/v1/"
}
}
return send(200, data_to_send)
def videoIdSanityCheck(videoId: str):
if len(videoId) != 11:
incrementBadRequests()
return send(400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.')
# elif...?
def auth(data):
# can be either subscriptions, feed or playlists
match data[1]:
# NOT YET IMPLEMENTED
# TODO: make it use the internal db
case "subscriptions" | "feed" | "playlists":
return send(200, [])
case _:
incrementBadRequests()
return send(404, [])
def streams():
return send(200, '')
def epochToDate(epoch):
return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(epoch))
def dateToEpoch(date: str):
return datetime.datetime.fromisoformat(date).timestamp()
def trending():
return send(200, [{}])
def popular():
return send(200, [{}])
def getError(idata: dict):
unknown_error = {"status": "Unknown error", "reason": "This is a generic ythdd error."}
error = ""
try:
playabilityStatus = safeTraverse(idata, ['stage1', 'playabilityStatus'], default=unknown_error)
print(playabilityStatus)
error += f"({playabilityStatus['status']}) {playabilityStatus['reason']}"
except:
error += f"Generic error"
try:
errorScreen = safeTraverse(idata, ['stage1', 'playabilityStatus', 'errorScreen', 'playerErrorMessageRenderer', 'subreason', 'runs'], default=[])
error += " - "
for x in errorScreen:
error += f"{x['text']} "
except:
pass
return error
def genThumbs(videoId: str):
result = []
thumbnails = [
#{'height': 720, 'width': 1280, 'quality': "maxres", 'url': "maxres"}, # for the time being omit the buggy maxres quality
{'height': 720, 'width': 1280, 'quality': "maxresdefault", 'url': "maxresdefault"},
{'height': 480, 'width': 640, 'quality': "sddefault", 'url': "sddefault"},
{'height': 360, 'width': 480, 'quality': "high", 'url': "hqdefault"},
{'height': 180, 'width': 320, 'quality': "medium", 'url': "mqdefault"},
{'height': 90, 'width': 120, 'quality': "default", 'url': "default"},
{'height': 90, 'width': 120, 'quality': "start", 'url': "1"},
{'height': 90, 'width': 120, 'quality': "middle", 'url': "2"},
{'height': 90, 'width': 120, 'quality': "end", 'url': "3"},
]
for x in thumbnails:
width = x['width']
height = x['height']
quality = x['quality']
url = ythdd_globals.config['general']['public_facing_url'] + 'vi/' + videoId + '/' + x['url'] + '.jpg'
result.append({'quality': quality, 'url': url, 'width': width, 'height': height})
return result
def rebuildFormats(data):
result = [{} for x in data]
formatStreams = []
best_bitrate_video = 0
best_bitrate_audio = -1
for x in range(len(data)):
try:
result[x]['audioChannels'] = data[x]['audioChannels']
isVideo = 0
except:
isVideo = 1
if not "initRange" in data[x]: # for livestreams?
continue
result[x]['init'] = str(data[x]['initRange']['start']) + "-" + str(data[x]['initRange']['end'])
result[x]['index'] = str(data[x]['indexRange']['start']) + "-" + str(data[x]['indexRange']['end'])
result[x]['bitrate'] = str(data[x]['averageBitrate'])
result[x]['url'] = data[x]['url']
result[x]['itag'] = str(data[x]['itag'])
result[x]['type'] = data[x]['mimeType']
result[x]['clen'] = data[x]['contentLength']
result[x]['lmt'] = data[x]['lastModified']
result[x]['projectionType'] = data[x]['projectionType']
try:
result[x]['colorInfo'] = data[x]['colorInfo']
except:
pass
if "audio" == data[x]['mimeType'][:5]:
isAudio = 1
else:
isAudio = 0
if isVideo:
result[x]['fps'] = str(data[x]['fps'])
else:
result[x]['audioQuality'] = data[x]['audioQuality']
result[x]['audioSampleRate'] = data[x]['audioSampleRate']
if data[x]['itag'] in invidious_formats.FORMATS.keys():
result[x]['container'] = invidious_formats.FORMATS[data[x]['itag']]['ext']
try:
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['vcodec']
except:
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['acodec']
if isVideo:
try:
result[x]['resolution'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p"
result[x]['qualityLabel'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p" + str(result[x]['fps']) * (data[x]['fps'] > 30) # NOT IMPLEMENTED, that's just a placeholder
result[x]['size'] = str(invidious_formats.FORMATS[data[x]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[x]['itag']]['height'])
except:
pass
# we assume here that a stream with the highest bitrate must be a video stream- that may not be the case
if data[x]['averageBitrate'] > data[best_bitrate_video]['averageBitrate'] and isVideo:
best_bitrate_video = x
if data[x]['averageBitrate'] > data[best_bitrate_audio]['averageBitrate'] and isAudio:
best_bitrate_audio = x
# makes FreeTube work, unfortunately it's a video-only stream
formatStreams = [
{
"url": data[best_bitrate_video]['url'],
"itag": str(data[best_bitrate_video]['itag']),
"type": data[best_bitrate_video]['mimeType'],
"quality": data[best_bitrate_video]['quality'],
"bitrate": str(data[best_bitrate_video]['averageBitrate']),
"fps": data[best_bitrate_video]['fps'],
"size": "", # workaround for clipious, which requires ANYTHING to be passed, or else it will throw and error and won't load the video
"resolution": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
"qualityLabel": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
"container": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['ext'],
"encoding": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['vcodec']
},
# {
# "audioChannels": data[best_bitrate_audio]['audioChannels'],
# "init": result[best_bitrate_audio]['init'],
# "index": result[best_bitrate_audio]['index'],
# "bitrate": str(data[best_bitrate_audio]['averageBitrate']),
# "url": data[best_bitrate_audio]['url'],
# "itag": str(data[best_bitrate_audio]['itag']),
# "type": data[best_bitrate_audio]['mimeType'],
# "clen": result[best_bitrate_audio]['clen'],
# "lmt": result[best_bitrate_audio]['lmt'],
# "projectionType": result[best_bitrate_audio]['projectionType'],
# "audioQuality": result[best_bitrate_audio]['audioQuality'],
# "audioSampleRate": result[best_bitrate_audio]['audioSampleRate'],
# "qualityLabel": "audio"
# }
]
# not all itags have width and/or height
try:
formatStreams[0]["size"] = str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['height'])
except:
pass
return result, formatStreams
def videos(data):
# an attempt on a faithful rewrite of
# https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr
response = {}
#print(f"got data: {data}")
#print("requesting idata from IOSextract")
# idata = ythdd_extractor.IOSextract(data[3])
# hls_url = safeTraverse(idata, ['stage1', 'streamingData', 'hlsManifestUrl'], default="")
# adaptive_formats = safeTraverse(idata, ['stage1', 'streamingData', 'adaptiveFormats'], default=[])
# if not hls_url or not adaptive_formats:
# print(f"serious error: couldn't get hls_url or adaptive_formats!\n"
# f"dumping idata:\n"
# f"{idata}")
# return send(500, {'error': getError(idata)})
time_start = time()
ydata = ythdd_extractor.extract(data[3])
wdata = ythdd_extractor.WEBextractSinglePage(data[3])
#return send(200, {'ydata': ydata, 'wdata': wdata})
#return send(200, {'idata': idata, 'wdata': wdata})
# main_results = idata['stage3']['contents']['twoColumnWatchNextResults']
# primary_results = safeTraverse(main_results, ['results', 'results', 'contents'])
# if primary_results:
# video_primary_renderer = safeTraverse(primary_results, [0, 'videoPrimaryInfoRenderer'])
# video_secondary_renderer = safeTraverse(primary_results, [1, 'videoSecondaryInfoRenderer'])
# else:
# print("error: primary_results not found in invidious TL videos()")
video_details = safeTraverse(wdata, ['ec1', 'videoDetails'])
microformat = safeTraverse(wdata, ['ec1', 'microformat', 'playerMicroformatRenderer'], default={})
video_id = safeTraverse(video_details, ['videoId'], default=f"[{data[3]}] (errors occurred, check logs)")
title = safeTraverse(video_details, ['title'], default=video_id)
views = int(safeTraverse(video_details, ['viewCount'], default=0))
length = int(safeTraverse(video_details, ['lengthSeconds'], default=1))
published = dateToEpoch(safeTraverse(microformat, ['publishDate'], default="1970-01-02T00:00:00Z")) # ISO format to Unix timestamp
published_date = epochToDate(published)
premiere_timestamp = safeTraverse(microformat, ['liveBroadcastDetails', 'startTimestamp'], default=0) # let's ignore the nitty gritty for the time being
premiere_timestamp = premiere_timestamp if premiere_timestamp else safeTraverse(microformat, ['playabilityStatus', 'liveStreamability', 'liveStreamabilityRenderer', 'offlineSlate', 'liveStreamOfflineSlateRenderer', 'scheduledStartTime'], default=0)
live_now = safeTraverse(microformat, ['liveBroadcastDetails', 'isLiveNow'], default=False)
post_live_dvr = safeTraverse(video_details, ['isPostLiveDvr'], default=False)
allowed_regions = safeTraverse(microformat, ['availableCountries'], default=[])
allow_ratings = safeTraverse(video_details, ['allowRatings'], default=True)
family_friendly = safeTraverse(microformat, ['isFamilySafe'], default=True)
is_listed = safeTraverse(video_details, ['isCrawlable'], default=True)
is_upcoming = safeTraverse(video_details, ['isUpcoming'], default=False)
keywords = safeTraverse(video_details, ['keywords'], default=[])
related_raw = safeTraverse(wdata, ['ec2', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results'], default=[]) # can possibly change in the future
related = []
for x in related_raw:
y = safeTraverse(x, ['compactVideoRenderer'])
if type(y) != dict:
continue
related_video = {}
related_video['videoId'] = safeTraverse(y, ['videoId'])
related_video['title'] = safeTraverse(y, ['title', 'simpleText'])
related_video['videoThumbnails'] = genThumbs(related_video['videoId']) #safeTraverse(y, ['thumbnail', 'thumbnails'])
related_video['author'] = safeTraverse(y, ['longBylineText', 'runs', 0, 'text'])
related_video['authorId'] = safeTraverse(y, ['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], default="UNKNOWNCHANNELID")
related_video['authorUrl'] = '/channel/' + related_video['authorId']
related_video['authorVerified'] = False
if "ownerBadges" in y:
related_video['authorVerified'] = True # hopefully this won't break things, as invidious API doesn't distinguish music and normal verified badges
related_video['authorThumbnails'] = safeTraverse(y, ['channelThumbnail', 'thumbnails'], default=[])
for z in related_video['authorThumbnails']:
z['url'] = ythdd_globals.translateLinks(z['url'])
related_video['lengthSeconds'] = 0
time_lookup_list = [1, 60, 3_600, 86_400]
time_list = safeTraverse(y, ['lengthText', 'simpleText'], default="0:0").split(":")
for z in range(len(time_list)):
related_video['lengthSeconds'] += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
related_views_text = safeTraverse(y, ['viewCountText', 'simpleText'], default="0").split(" ")[0]
related_video['viewCountText'] = safeTraverse(y, ['shortViewCountText', 'simpleText'], default="0").split(" ")[0]
related_views = 0
if related_views_text:
related_views = int("".join([z for z in related_views_text if 48 <= ord(z) and ord(z) <= 57]))
related_views_text = related_views_text.split(" ")[0]
related_video['viewCount'] = related_views
related.append(related_video)
# magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
# toplevel_buttons = safeTraverse(video_primary_renderer, ['videoActions', 'menuRenderer', 'topLevelButtons'], default={}) # hacky solution
# likes_text = safeTraverse(toplevel_buttons, [0, 'segmentedLikeDislikeButtonViewModel', 'likeButtonViewModel', 'likeButtonViewModel', 'toggleButtonViewModel', 'toggleButtonViewModel', 'defaultButtonViewModel', 'buttonViewModel', 'title'], default="") # hacky solution
# likes = 0
# if likes_text:
# likes = int("".join([x for x in likes_text if 48 <= ord(x) and ord(x) <= 57])) # ASCII for 0-9, no regex needed
# likes_text = likes_text.split(" ")[0]
# for x in magnitude.keys():
# if x in likes_text:
# likes *= magnitude[x]
likes = safeTraverse(ydata, ['like_count'], default=0)
description = safeTraverse(microformat, ['description', 'simpleText'], default="\n(ythdd: failed to retrieve description, perhaps it's empty?)")
short_description = safeTraverse(wdata, ['ec1', 'videoDetails', 'shortDescription'], default="(ythdd: failed to retrieve short description, perhaps it's empty?)")
description_html = "<p>" + description + "</p>" # sorry, not happening right now, TODO: https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr#L329
# metadata = safeTraverse(video_secondary_renderer, ['metadataRowContainer', 'metadataRowContainerRenderer', 'rows'], default={})
genre = safeTraverse(microformat, ['category'])
# TODO: genre blah blah blah...
author = safeTraverse(video_details, ['author'], default="Unknown Author")
ucid = safeTraverse(video_details, ['channelId'], default="UNKNOWNCHANNELID")
# author_info = safeTraverse(video_secondary_renderer, ['owner', 'videoOwnerRenderer'], default={})
# author_thumbnail = safeTraverse(author_info, ['thumbnail', 'thumbnails']) # lowest quality thumbnail
# subs_text = safeTraverse(author_info, ['subscriberCountText', 'simpleText'], default="0")
# subs = 0
# if subs_text:
# subs = int("".join([x for x in subs_text if 48 <= ord(x) and ord(x) <= 57]))
# subs_text = subs_text.split(" ")[0]
# for x in magnitude.keys():
# if x in subs_text:
# subs *= magnitude[x]
subs = ydata['channel_follower_count']
channel_about_info = ythdd_extractor.browseAbout(ucid)
author_thumbnail = ythdd_extractor.getChannelAvatar(channel_about_info)
# for x in author_thumbnail:
# # rewrite to use views.py
# x['url'] = ythdd_globals.translateLinks(x['url'])
# so far it seems to be impossible to tell if a channel is verified or not,
# that is - without making another request
author_verified = ythdd_extractor.isVerified(channel_about_info)
format_streams = []
# adaptive_formats, format_streams = rebuildFormats(adaptive_formats)
if live_now:
video_type = "livestream"
elif premiere_timestamp:
video_type = "scheduled"
published = dateToEpoch(premiere_timestamp) if premiere_timestamp else int(time())
else:
video_type = "video"
premium = False
if "YouTube Red" in keywords:
premium = True
# TODO: detect paywalled patron-only videos
time_end = time()
#'''
response = {
"type": video_type,
"title": title,
"videoId": video_id,
"videoThumbnails": genThumbs(video_id),
"storyboards": [], # not implemented
"description": description, # due to change (include ythdd metadata)
"descriptionHtml": description_html, # basically the same as normal description for the time being
"published": published,
"publishedText": published_date,
"keywords": keywords,
"viewCount": views,
"viewCountText": str(views), # not implemented
"likeCount": likes,
"dislikeCount": 0,
"paid": False, # not implemented
"premium": premium,
"isFamilyFriendly": family_friendly,
"allowedRegions": allowed_regions,
"genre": genre,
"genreUrl": "/genreUrl/not/implemented/", # not implemented
"author": author,
"authorId": ucid,
"authorUrl": "/channel/" + ucid,
"authorVerified": author_verified,
"authorThumbnails": author_thumbnail,
"subCountText": str(subs),
"lengthSeconds": length,
"allowRatings": allow_ratings,
"rating": 0,
"isListed": is_listed,
"liveNow": live_now,
"isPostLiveDvr": post_live_dvr,
"isUpcoming": is_upcoming,
"dashUrl": ythdd_globals.config['general']['public_facing_url'] + "/dash/not/implemented/", # not implemented
"premiereTimestamp": premiere_timestamp,
#"hlsUrl": hls_url, # broken after a change in iOS player
#"adaptiveFormats": adaptive_formats, # same as hlsUrl
"formatStreams": format_streams, # very bare bones, empty actually xD
"captions": [], # not implemented
# "captions": [
# {
# "label": String,
# "language_code": String,
# "url": String
# }
# ],
# "musicTracks": [
# {
# "song": String,
# "artist": String,
# "album": String,
# "license": String
# }
# ],
"recommendedVideos": related,
"took": time_end - time_start
}
#'''
# for debugging:
#return send(200, ythdd_extractor.WEBextractSinglePage(data[3]))
#return send(200, ythdd_extractor.IOSextract(data[3]))
#return send(200, {'idata': idata, 'wdata': wdata})
# if youtube returns not the videoId we aksed
# then it means that the instance is ratelimited
status_code = 200 if data[3] == response['videoId'] else 403
return send(status_code, response)
def lookup(data, request):
# possibly TODO: rewrite this mess
if len(data) > 2:
if (data[0], data[1]) == ("api", "v1"):
match data[2]:
case 'stats' | '': # /api/invidious/api/v1/stats and /api/invidious/api/v1/
return stats()
case 'trending':
return trending()
case 'popular':
return popular()
case 'videos':
return videos(data)
case 'auth':
return auth(data)
case _:
incrementBadRequests()
return notImplemented(data)
elif data[0] in ('ggpht', 'vi'):
# for some reason the Materialous client
# and FreeTube keep making requests to these
if data[1] in ('ggpht', 'vi'):
return redirect('/' + "/".join(data[1:]))
return redirect('/' + "/".join(data[0:]))
else:
incrementBadRequests()
return notImplemented(data)
elif len(data) == 2:
if (data[0], data[1]) == ("api", "v1"): # /api/invidious/api/v1
return stats()
elif data[0] == "streams":
return streams()
elif data[0] in ('ggpht', 'vi'):
return redirect('/' + "/".join(data[0:]))
else:
incrementBadRequests()
return notImplemented(data)
elif len(data) == 1:
return stats() # /api/invidious/something