for some reason Yattee's mpv backend won't play videos which are using an "unusual" aspect ratio (at least ones which are not 360p)
663 lines
29 KiB
Python
663 lines
29 KiB
Python
#!/usr/bin/python3
|
|
# ythdd Invidious Translation Layer
|
|
# -----
|
|
# Translates requests sent through Invidious API at /api/invidious/
|
|
# to use internal extractors.
|
|
from flask import Response, request, redirect
|
|
from markupsafe import escape
|
|
from time import strftime, gmtime, time
|
|
from ythdd_globals import safeTraverse
|
|
import json, datetime
|
|
import invidious_formats
|
|
import ythdd_globals
|
|
import ythdd_api_v1
|
|
import ythdd_extractor
|
|
|
|
# TODO:
|
|
# [✓] /api/v1/stats (stats())
|
|
# [✓] /streams/dQw4w9WgXcQ (does nothing)
|
|
# [✓] /vi/videoIdXXXX/maxresdefault.jpg
|
|
# [*] /api/v1/auth/subscriptions (stub? db?)
|
|
# [*] /api/v1/auth/feed?page=1 (stub? db?)
|
|
# [*] /api/v1/auth/playlists (stub? db?)
|
|
# [*] /api/v1/videos/videoIdXXXX
|
|
|
|
def incrementBadRequests():
|
|
ythdd_globals.apiFailedRequests += 1
|
|
|
|
def greeting():
|
|
return 200, 'hello from Invidious TL!\nstats endpoint at /api/invidious/stats'
|
|
|
|
def send(status, response):
|
|
return Response(json.dumps(response), mimetype='application/json', status=status)
|
|
|
|
def notImplemented(data):
|
|
return send(501, {'error': f"not recognised/implemented in Invidious TL: {'/'.join(data)}"})
|
|
|
|
def stats():
|
|
data_to_send = {
|
|
"version": "2.0",
|
|
"software":
|
|
{
|
|
"name": "invidious",
|
|
"version": f"invidious TL, ythdd ({ythdd_globals.version})",
|
|
"branch": "https://gitea.7o7.cx/sherl/ythdd",
|
|
"tl_msg": "/api/invidious/api/v1/"
|
|
}
|
|
}
|
|
return send(200, data_to_send)
|
|
|
|
def videoIdSanityCheck(videoId: str):
|
|
if len(videoId) != 11:
|
|
incrementBadRequests()
|
|
return send(400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.')
|
|
# elif...?
|
|
|
|
def auth(data):
|
|
# can be either subscriptions, feed or playlists
|
|
match data[1]:
|
|
# NOT YET IMPLEMENTED
|
|
# TODO: make it use the internal db
|
|
case "subscriptions" | "feed" | "playlists":
|
|
return send(200, [])
|
|
case _:
|
|
incrementBadRequests()
|
|
return send(404, [])
|
|
|
|
def streams():
|
|
return send(200, '')
|
|
|
|
def epochToDate(epoch):
|
|
return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(epoch))
|
|
|
|
def dateToEpoch(date: str):
|
|
return datetime.datetime.fromisoformat(date).timestamp()
|
|
|
|
def doesContainNumber(string: str, numeric_system: int = 10) -> bool:
|
|
try:
|
|
number = int(string, numeric_system)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
raise BaseException("doesContainNumber(): Unknown error while determining if a string contains a number")
|
|
|
|
def trending():
|
|
return send(200, [{}])
|
|
|
|
def popular():
|
|
return send(200, [{}])
|
|
|
|
def getError(idata: dict):
|
|
unknown_error = {"status": "Unknown error", "reason": "This is a generic ythdd error."}
|
|
error = ""
|
|
|
|
try:
|
|
playabilityStatus = safeTraverse(idata, ['stage1', 'playabilityStatus'], default=unknown_error)
|
|
print(playabilityStatus)
|
|
error += f"({playabilityStatus['status']}) {playabilityStatus['reason']}"
|
|
except:
|
|
error += f"Generic error"
|
|
|
|
try:
|
|
errorScreen = safeTraverse(idata, ['stage1', 'playabilityStatus', 'errorScreen', 'playerErrorMessageRenderer', 'subreason', 'runs'], default=[])
|
|
error += " - "
|
|
for x in errorScreen:
|
|
error += f"{x['text']} "
|
|
except:
|
|
pass
|
|
|
|
return error
|
|
|
|
def genThumbs(videoId: str):
|
|
|
|
result = []
|
|
thumbnails = [
|
|
#{'height': 720, 'width': 1280, 'quality': "maxres", 'url': "maxres"}, # for the time being omit the buggy maxres quality
|
|
{'height': 720, 'width': 1280, 'quality': "maxresdefault", 'url': "maxresdefault"},
|
|
{'height': 480, 'width': 640, 'quality': "sddefault", 'url': "sddefault"},
|
|
{'height': 360, 'width': 480, 'quality': "high", 'url': "hqdefault"},
|
|
{'height': 180, 'width': 320, 'quality': "medium", 'url': "mqdefault"},
|
|
{'height': 90, 'width': 120, 'quality': "default", 'url': "default"},
|
|
{'height': 90, 'width': 120, 'quality': "start", 'url': "1"},
|
|
{'height': 90, 'width': 120, 'quality': "middle", 'url': "2"},
|
|
{'height': 90, 'width': 120, 'quality': "end", 'url': "3"},
|
|
]
|
|
|
|
for x in thumbnails:
|
|
width = x['width']
|
|
height = x['height']
|
|
quality = x['quality']
|
|
url = ythdd_globals.config['general']['public_facing_url'] + 'vi/' + videoId + '/' + x['url'] + '.jpg'
|
|
result.append({'quality': quality, 'url': url, 'width': width, 'height': height})
|
|
|
|
return result
|
|
|
|
def rebuildFormats(data):
|
|
result = [{} for x in data]
|
|
formatStreams = []
|
|
best_bitrate_video = 0
|
|
best_bitrate_audio = -1
|
|
|
|
for x in range(len(data)):
|
|
|
|
try:
|
|
result[x]['audioChannels'] = data[x]['audioChannels']
|
|
isVideo = 0
|
|
except:
|
|
isVideo = 1
|
|
|
|
if not "initRange" in data[x]: # for livestreams?
|
|
continue
|
|
|
|
result[x]['init'] = str(data[x]['initRange']['start']) + "-" + str(data[x]['initRange']['end'])
|
|
result[x]['index'] = str(data[x]['indexRange']['start']) + "-" + str(data[x]['indexRange']['end'])
|
|
result[x]['bitrate'] = str(data[x]['averageBitrate'])
|
|
result[x]['url'] = data[x]['url']
|
|
result[x]['itag'] = str(data[x]['itag'])
|
|
result[x]['type'] = data[x]['mimeType']
|
|
result[x]['clen'] = data[x]['contentLength']
|
|
result[x]['lmt'] = data[x]['lastModified']
|
|
result[x]['projectionType'] = data[x]['projectionType']
|
|
try:
|
|
result[x]['colorInfo'] = data[x]['colorInfo']
|
|
except:
|
|
pass
|
|
|
|
if "audio" == data[x]['mimeType'][:5]:
|
|
isAudio = 1
|
|
else:
|
|
isAudio = 0
|
|
|
|
if isVideo:
|
|
result[x]['fps'] = str(data[x]['fps'])
|
|
else:
|
|
result[x]['audioQuality'] = data[x]['audioQuality']
|
|
result[x]['audioSampleRate'] = data[x]['audioSampleRate']
|
|
|
|
if data[x]['itag'] in invidious_formats.FORMATS.keys():
|
|
result[x]['container'] = invidious_formats.FORMATS[data[x]['itag']]['ext']
|
|
try:
|
|
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['vcodec']
|
|
except:
|
|
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['acodec']
|
|
|
|
if isVideo:
|
|
try:
|
|
result[x]['resolution'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p"
|
|
result[x]['qualityLabel'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p" + str(result[x]['fps']) * (data[x]['fps'] > 30) # NOT IMPLEMENTED, that's just a placeholder
|
|
result[x]['size'] = str(invidious_formats.FORMATS[data[x]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[x]['itag']]['height'])
|
|
except:
|
|
pass
|
|
|
|
# we assume here that a stream with the highest bitrate must be a video stream- that may not be the case
|
|
if data[x]['averageBitrate'] > data[best_bitrate_video]['averageBitrate'] and isVideo:
|
|
best_bitrate_video = x
|
|
if data[x]['averageBitrate'] > data[best_bitrate_audio]['averageBitrate'] and isAudio:
|
|
best_bitrate_audio = x
|
|
|
|
# makes FreeTube work, unfortunately it's a video-only stream
|
|
formatStreams = [
|
|
{
|
|
"url": data[best_bitrate_video]['url'],
|
|
"itag": str(data[best_bitrate_video]['itag']),
|
|
"type": data[best_bitrate_video]['mimeType'],
|
|
"quality": data[best_bitrate_video]['quality'],
|
|
"bitrate": str(data[best_bitrate_video]['averageBitrate']),
|
|
"fps": data[best_bitrate_video]['fps'],
|
|
"size": "", # workaround for clipious, which requires ANYTHING to be passed, or else it will throw and error and won't load the video
|
|
"resolution": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
"qualityLabel": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
"container": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['ext'],
|
|
"encoding": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['vcodec']
|
|
},
|
|
# {
|
|
# "audioChannels": data[best_bitrate_audio]['audioChannels'],
|
|
# "init": result[best_bitrate_audio]['init'],
|
|
# "index": result[best_bitrate_audio]['index'],
|
|
# "bitrate": str(data[best_bitrate_audio]['averageBitrate']),
|
|
# "url": data[best_bitrate_audio]['url'],
|
|
# "itag": str(data[best_bitrate_audio]['itag']),
|
|
# "type": data[best_bitrate_audio]['mimeType'],
|
|
# "clen": result[best_bitrate_audio]['clen'],
|
|
# "lmt": result[best_bitrate_audio]['lmt'],
|
|
# "projectionType": result[best_bitrate_audio]['projectionType'],
|
|
# "audioQuality": result[best_bitrate_audio]['audioQuality'],
|
|
# "audioSampleRate": result[best_bitrate_audio]['audioSampleRate'],
|
|
# "qualityLabel": "audio"
|
|
# }
|
|
]
|
|
|
|
# not all itags have width and/or height
|
|
try:
|
|
formatStreams[0]["size"] = str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['height'])
|
|
except:
|
|
pass
|
|
|
|
return result, formatStreams
|
|
|
|
def rebuildFormatsFromYtdlpApi(ydata: dict):
|
|
# Rebuild invidious-compatible formats from yt-dlp's output (ydata)
|
|
|
|
adaptive_formats = []
|
|
format_streams = []
|
|
|
|
for stream in safeTraverse(ydata, ["formats"], default=[]):
|
|
|
|
if safeTraverse(stream, ["protocol"], default="storyboard") not in ("http_dash_segments", "https"):
|
|
continue
|
|
|
|
newRow = {}
|
|
# Add from ...'s ... to ... as ...
|
|
newRow["bitrate"] = str(int(safeTraverse(stream, ["tbr"], default=0) * 1000))
|
|
newRow["url"] = safeTraverse(stream, ["url"])
|
|
newRow["itag"] = safeTraverse(stream, ["format_id"])
|
|
params = ythdd_extractor.paramsFromUrl(newRow["url"])
|
|
|
|
vcodec = safeTraverse(stream, ["vcodec"], default="none")
|
|
acodec = safeTraverse(stream, ["acodec"], default="none")
|
|
if vcodec == "none" and acodec == "none":
|
|
continue
|
|
|
|
if safeTraverse(stream, ["acodec"]) != "none":
|
|
# audio-only track
|
|
type = safeTraverse(stream, ["audio_ext"], default=None)
|
|
fnote = safeTraverse(stream, ["format_note"], default="low")
|
|
if type is None:
|
|
type = "mp4"
|
|
newRow[ "type"] = "audio/" + type
|
|
newRow[ "audioQuality"] = fnote
|
|
newRow["audioSampleRate"] = int(safeTraverse(stream, ["asr"], default="44100"))
|
|
newRow[ "audioChannels"] = int(safeTraverse(stream, ["audio_channels"]))
|
|
newRow[ "qualityLabel"] = str(safeTraverse(stream, ["abr"], default="?"))
|
|
newRow[ "resolution"] = f"{fnote} quality"
|
|
newRow[ "size"] = "0x0"
|
|
if safeTraverse(stream, ["vcodec"]) != "none":
|
|
# either video-only or video+audio
|
|
type = safeTraverse(stream, ["video_ext"], default=None)
|
|
if type is None:
|
|
type = "mp4"
|
|
height = str(safeTraverse(stream, ["height"], default=0))
|
|
width = str(safeTraverse(stream, [ "width"], default=0))
|
|
newRow[ "type"] = "video/" + type
|
|
newRow[ "resolution"] = (height if height in ("144", "240", "360", "480", "720", "1080") else "360") + "p" # mpv won't play the video inside of Yattee if it's a non-standard resolution (bug?)
|
|
newRow[ "fps"] = safeTraverse(stream, ["fps"], default=30)
|
|
newRow[ "qualityLabel"] = height + "p" + str(int(newRow['fps'])) * (newRow["fps"] > 30) # also a placeholder
|
|
newRow[ "size"] = width + "x" + height
|
|
|
|
newRow[ "clen"] = safeTraverse(params, ["clen"], default=safeTraverse(stream, ["filesize"], default="0"))
|
|
newRow[ "lmt"] = safeTraverse(params, ["lmt"], default="0")
|
|
|
|
if newRow["clen"] is None:
|
|
# for clipious sake which expects a string
|
|
newRow["clen"] = "0"
|
|
|
|
newRow[ "projectionType"] = "RECTANGULAR" # clipious requires this to be mentioned explicitly and cannot be nulled
|
|
newRow[ "container"] = safeTraverse(stream, ["ext"], default="unknown_container")
|
|
newRow[ "encoding"] = safeTraverse(invidious_formats.FORMATS, [int("0" + newRow["itag"].split("-")[0]), "ext"], default="unknown_encoding") # not sure this split is necessary
|
|
newRow[ "quality"] = newRow["qualityLabel"]
|
|
newRow[ "init"] = "0-1" # dummy values
|
|
newRow[ "index"] = "2-3" # dummy values
|
|
|
|
if vcodec != "none" and acodec != "none":
|
|
# 360p stream
|
|
format_streams.append(newRow.copy())
|
|
if vcodec != "none" or acodec != "none":
|
|
adaptive_formats.append(newRow.copy())
|
|
|
|
|
|
# {
|
|
# "url": data[best_bitrate_video]['url'],
|
|
# "itag": str(data[best_bitrate_video]['itag']),
|
|
# "type": data[best_bitrate_video]['mimeType'],
|
|
# "quality": data[best_bitrate_video]['quality'],
|
|
# "bitrate": str(data[best_bitrate_video]['averageBitrate']),
|
|
# "fps": data[best_bitrate_video]['fps'],
|
|
# "size": "", # workaround for clipious, which requires ANYTHING to be passed, or else it will throw and error and won't load the video
|
|
# "resolution": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
# "qualityLabel": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
# "container": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['ext'],
|
|
# "encoding": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['vcodec']
|
|
# }
|
|
|
|
# {
|
|
# "audioChannels": data[best_bitrate_audio]['audioChannels'],
|
|
# "init": result[best_bitrate_audio]['init'],
|
|
# "index": result[best_bitrate_audio]['index'],
|
|
# "bitrate": str(data[best_bitrate_audio]['averageBitrate']),
|
|
# "url": data[best_bitrate_audio]['url'],
|
|
# "itag": str(data[best_bitrate_audio]['itag']),
|
|
# "type": data[best_bitrate_audio]['mimeType'],
|
|
# "clen": result[best_bitrate_audio]['clen'],
|
|
# "lmt": result[best_bitrate_audio]['lmt'],
|
|
# "projectionType": result[best_bitrate_audio]['projectionType'],
|
|
# "audioQuality": result[best_bitrate_audio]['audioQuality'],
|
|
# "audioSampleRate": result[best_bitrate_audio]['audioSampleRate'],
|
|
# "qualityLabel": "audio"
|
|
# }
|
|
|
|
return adaptive_formats, format_streams
|
|
|
|
def videos(data):
|
|
# an attempt on a faithful rewrite of
|
|
# https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr
|
|
response = {}
|
|
|
|
#print(f"got data: {data}")
|
|
|
|
#print("requesting idata from IOSextract")
|
|
# idata = ythdd_extractor.IOSextract(data[3])
|
|
|
|
# hls_url = safeTraverse(idata, ['stage1', 'streamingData', 'hlsManifestUrl'], default="")
|
|
# adaptive_formats = safeTraverse(idata, ['stage1', 'streamingData', 'adaptiveFormats'], default=[])
|
|
# if not hls_url or not adaptive_formats:
|
|
# print(f"serious error: couldn't get hls_url or adaptive_formats!\n"
|
|
# f"dumping idata:\n"
|
|
# f"{idata}")
|
|
# return send(500, {'error': getError(idata)})
|
|
|
|
time_start = time()
|
|
|
|
if ythdd_globals.config['general']['cache'] and data[3] in ythdd_globals.video_cache:
|
|
if ythdd_globals.video_cache[data[3]]['cacheTime'] + 3 * 60 * 60 > time():
|
|
response = ythdd_globals.video_cache[data[3]]
|
|
response['fromCache'] = True
|
|
return send(200, response)
|
|
else:
|
|
del ythdd_globals.video_cache[data[3]]
|
|
|
|
ydata = ythdd_extractor.extract(data[3], manifest_fix=True)
|
|
wdata = ythdd_extractor.WEBextractSinglePage(data[3])
|
|
|
|
#return send(200, {'ydata': ydata, 'wdata': wdata})
|
|
#return send(200, {'idata': idata, 'wdata': wdata})
|
|
|
|
# main_results = idata['stage3']['contents']['twoColumnWatchNextResults']
|
|
# primary_results = safeTraverse(main_results, ['results', 'results', 'contents'])
|
|
# if primary_results:
|
|
# video_primary_renderer = safeTraverse(primary_results, [0, 'videoPrimaryInfoRenderer'])
|
|
# video_secondary_renderer = safeTraverse(primary_results, [1, 'videoSecondaryInfoRenderer'])
|
|
# else:
|
|
# print("error: primary_results not found in invidious TL videos()")
|
|
|
|
video_details = safeTraverse(wdata, ['ec1', 'videoDetails'])
|
|
microformat = safeTraverse(wdata, ['ec1', 'microformat', 'playerMicroformatRenderer'], default={})
|
|
|
|
video_id = safeTraverse(video_details, ['videoId'], default=f"[{data[3]}] (errors occurred, check logs)")
|
|
title = safeTraverse(video_details, ['title'], default=video_id)
|
|
views = int(safeTraverse(video_details, ['viewCount'], default=0))
|
|
length = int(safeTraverse(video_details, ['lengthSeconds'], default=1))
|
|
published = dateToEpoch(safeTraverse(microformat, ['publishDate'], default="1970-01-02T00:00:00Z")) # ISO format to Unix timestamp
|
|
published_date = epochToDate(published)
|
|
premiere_timestamp = safeTraverse(microformat, ['liveBroadcastDetails', 'startTimestamp'], default=None) # let's ignore the nitty gritty for the time being
|
|
premiere_timestamp = premiere_timestamp if premiere_timestamp else safeTraverse(microformat, ['playabilityStatus', 'liveStreamability', 'liveStreamabilityRenderer', 'offlineSlate', 'liveStreamOfflineSlateRenderer', 'scheduledStartTime'], default=None)
|
|
live_now = safeTraverse(microformat, ['liveBroadcastDetails', 'isLiveNow'], default=False)
|
|
post_live_dvr = safeTraverse(video_details, ['isPostLiveDvr'], default=False)
|
|
allowed_regions = safeTraverse(microformat, ['availableCountries'], default=[])
|
|
allow_ratings = safeTraverse(video_details, ['allowRatings'], default=True)
|
|
family_friendly = safeTraverse(microformat, ['isFamilySafe'], default=True)
|
|
is_listed = safeTraverse(video_details, ['isCrawlable'], default=True)
|
|
is_upcoming = safeTraverse(video_details, ['isUpcoming'], default=False)
|
|
keywords = safeTraverse(video_details, ['keywords'], default=[])
|
|
|
|
related_raw = safeTraverse(wdata, ['ec2', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results'], default=[]) # can possibly change in the future
|
|
related = []
|
|
for x in related_raw[:-1]:
|
|
if safeTraverse(x, ['compactVideoRenderer'], default=[]):
|
|
y = safeTraverse(x, ['compactVideoRenderer'])
|
|
if type(y) != dict:
|
|
continue
|
|
related_video = {}
|
|
related_video['videoId'] = safeTraverse(y, ['videoId'])
|
|
related_video['title'] = safeTraverse(y, ['title', 'simpleText'])
|
|
related_video['videoThumbnails'] = genThumbs(related_video['videoId']) #safeTraverse(y, ['thumbnail', 'thumbnails'])
|
|
related_video['author'] = safeTraverse(y, ['longBylineText', 'runs', 0, 'text'])
|
|
related_video['authorId'] = safeTraverse(y, ['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], default="UNKNOWNCHANNELID")
|
|
related_video['authorUrl'] = '/channel/' + related_video['authorId']
|
|
related_video['authorVerified'] = False
|
|
if "ownerBadges" in y:
|
|
related_video['authorVerified'] = True # hopefully this won't break things, as invidious API doesn't distinguish music and normal verified badges
|
|
related_video['authorThumbnails'] = safeTraverse(y, ['channelThumbnail', 'thumbnails'], default=[])
|
|
for z in related_video['authorThumbnails']:
|
|
z['url'] = ythdd_globals.translateLinks(z['url'])
|
|
related_video['lengthSeconds'] = 0
|
|
time_lookup_list = [1, 60, 3_600, 86_400]
|
|
time_list = safeTraverse(y, ['lengthText', 'simpleText'], default="0:0").split(":")
|
|
for z in range(len(time_list)):
|
|
related_video['lengthSeconds'] += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
|
|
related_views_text = safeTraverse(y, ['viewCountText', 'simpleText'], default="0").split(" ")[0]
|
|
related_video['viewCountText'] = safeTraverse(y, ['shortViewCountText', 'simpleText'], default="0").split(" ")[0]
|
|
related_views = 0
|
|
if related_views_text:
|
|
if related_views_text.lower() == "no":
|
|
related_views_text = "0"
|
|
related_views = int("0" + "".join([z for z in related_views_text if 48 <= ord(z) and ord(z) <= 57]))
|
|
related_views_text = related_views_text.split(" ")[0]
|
|
related_video['viewCount'] = related_views
|
|
related.append(related_video)
|
|
else:
|
|
y = safeTraverse(x, ['lockupViewModel'])
|
|
if type(y) != dict:
|
|
continue
|
|
lmvm = safeTraverse(y, ['metadata', 'lockupMetadataViewModel'], default=[])
|
|
related_video = {}
|
|
related_video['videoId'] = safeTraverse(y, ['contentId'])
|
|
related_video['title'] = safeTraverse(lmvm, ['title', 'content'])
|
|
related_video['videoThumbnails'] = genThumbs(related_video['videoId']) #safeTraverse(y, ['thumbnail', 'thumbnails'])
|
|
related_video['author'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 0, 'metadataParts', 0, 'text', 'content'])
|
|
related_video['authorId'] = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'rendererContext', 'commandContext', 'onTap', 'innertubeCommand', 'browseEndpoint', 'browseId'], default="UNKNOWNCHANNELID")
|
|
related_video['authorUrl'] = '/channel/' + related_video['authorId']
|
|
related_video['authorVerified'] = False if safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 0, 'metadataParts', 0, 'text', 'attachmentRuns']) is None else True # seens to do the job
|
|
related_video['authorThumbnails'] = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'avatar', 'avatarViewModel', 'image', 'sources'], default=[])
|
|
for z in related_video['authorThumbnails']:
|
|
z['url'] = ythdd_globals.translateLinks(z['url'])
|
|
related_video['lengthSeconds'] = 0
|
|
time_lookup_list = [1, 60, 3_600, 86_400]
|
|
time_list = safeTraverse(y, ['contentImage', 'thumbnailViewModel', 'overlays', 0, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', 0, 'thumbnailBadgeViewModel', 'text'], default="0:0").split(":")
|
|
if False in map(doesContainNumber, time_list): # works around ['LIVE'] for livestreams or ['Upcoming'] for scheduled videos
|
|
pass
|
|
else:
|
|
for z in range(len(time_list)):
|
|
related_video['lengthSeconds'] += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
|
|
related_views_text = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 1, 'metadataParts', 0, 'text', 'content'], default="0").split(" ")[0]
|
|
related_video['viewCountText'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 1, 'metadataParts', 0, 'text', 'content'], default="0").split(" ")[0]
|
|
related_views = 0
|
|
magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
|
|
if related_views_text:
|
|
if related_views_text.lower() == "no":
|
|
related_views_text = "0"
|
|
related_views = int("0" + "".join([z for z in related_views_text if 48 <= ord(z) and ord(z) <= 57]))
|
|
related_views_text = related_views_text.split(" ")[0]
|
|
for x in magnitude.keys():
|
|
if x == related_views_text[-1]:
|
|
related_views *= magnitude[x]
|
|
related_video['viewCount'] = related_views
|
|
related.append(related_video)
|
|
|
|
# magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
|
|
# toplevel_buttons = safeTraverse(video_primary_renderer, ['videoActions', 'menuRenderer', 'topLevelButtons'], default={}) # hacky solution
|
|
# likes_text = safeTraverse(toplevel_buttons, [0, 'segmentedLikeDislikeButtonViewModel', 'likeButtonViewModel', 'likeButtonViewModel', 'toggleButtonViewModel', 'toggleButtonViewModel', 'defaultButtonViewModel', 'buttonViewModel', 'title'], default="") # hacky solution
|
|
# likes = 0
|
|
# if likes_text:
|
|
# likes = int("".join([x for x in likes_text if 48 <= ord(x) and ord(x) <= 57])) # ASCII for 0-9, no regex needed
|
|
# likes_text = likes_text.split(" ")[0]
|
|
# for x in magnitude.keys():
|
|
# if x in likes_text:
|
|
# likes *= magnitude[x]
|
|
likes = safeTraverse(ydata, ['like_count'], default=0)
|
|
description = safeTraverse(microformat, ['description', 'simpleText'], default="\n(ythdd: failed to retrieve description, perhaps it's empty?)")
|
|
short_description = safeTraverse(wdata, ['ec1', 'videoDetails', 'shortDescription'], default="(ythdd: failed to retrieve short description, perhaps it's empty?)")
|
|
description_html = "<p>" + description + "</p>" # sorry, not happening right now, TODO: https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr#L329
|
|
|
|
# metadata = safeTraverse(video_secondary_renderer, ['metadataRowContainer', 'metadataRowContainerRenderer', 'rows'], default={})
|
|
genre = safeTraverse(microformat, ['category'])
|
|
# TODO: genre blah blah blah...
|
|
author = safeTraverse(video_details, ['author'], default="Unknown Author")
|
|
ucid = safeTraverse(video_details, ['channelId'], default="UNKNOWNCHANNELID")
|
|
# author_info = safeTraverse(video_secondary_renderer, ['owner', 'videoOwnerRenderer'], default={})
|
|
# author_thumbnail = safeTraverse(author_info, ['thumbnail', 'thumbnails']) # lowest quality thumbnail
|
|
# subs_text = safeTraverse(author_info, ['subscriberCountText', 'simpleText'], default="0")
|
|
# subs = 0
|
|
# if subs_text:
|
|
# subs = int("".join([x for x in subs_text if 48 <= ord(x) and ord(x) <= 57]))
|
|
# subs_text = subs_text.split(" ")[0]
|
|
# for x in magnitude.keys():
|
|
# if x in subs_text:
|
|
# subs *= magnitude[x]
|
|
subs = ydata['channel_follower_count']
|
|
channel_about_info = ythdd_extractor.browseAbout(ucid)
|
|
author_thumbnail = [ # must be a list
|
|
ythdd_extractor.getChannelAvatar(channel_about_info)
|
|
] * 3 # yes really
|
|
# for x in author_thumbnail:
|
|
# # rewrite to use views.py
|
|
# x['url'] = ythdd_globals.translateLinks(x['url'])
|
|
|
|
# so far it seems to be impossible to tell if a channel is verified or not,
|
|
# that is - without making another request
|
|
author_verified = ythdd_extractor.isVerified(channel_about_info)
|
|
|
|
format_streams = []
|
|
# adaptive_formats, format_streams = rebuildFormats(adaptive_formats)
|
|
adaptive_formats, format_streams = rebuildFormatsFromYtdlpApi(ydata)
|
|
|
|
if live_now:
|
|
video_type = "livestream"
|
|
elif premiere_timestamp:
|
|
video_type = "scheduled"
|
|
published = dateToEpoch(premiere_timestamp) if premiere_timestamp else int(time())
|
|
else:
|
|
video_type = "video"
|
|
|
|
premium = False
|
|
if "YouTube Red" in keywords:
|
|
premium = True
|
|
# TODO: detect paywalled patron-only videos
|
|
|
|
time_end = time()
|
|
|
|
#'''
|
|
response = {
|
|
"type": video_type,
|
|
"title": title,
|
|
"videoId": video_id,
|
|
"videoThumbnails": genThumbs(video_id),
|
|
"storyboards": [], # not implemented
|
|
|
|
"description": description, # due to change (include ythdd metadata)
|
|
"descriptionHtml": description_html, # basically the same as normal description for the time being
|
|
"published": published,
|
|
"publishedText": published_date,
|
|
|
|
"keywords": keywords,
|
|
"viewCount": views,
|
|
"viewCountText": str(views), # not implemented
|
|
"likeCount": likes,
|
|
"dislikeCount": 0,
|
|
|
|
"paid": False, # not implemented
|
|
"premium": premium,
|
|
"isFamilyFriendly": family_friendly,
|
|
"allowedRegions": allowed_regions,
|
|
"genre": genre,
|
|
"genreUrl": "/genreUrl/not/implemented/", # not implemented
|
|
|
|
"author": author,
|
|
"authorId": ucid,
|
|
"authorUrl": "/channel/" + ucid,
|
|
"authorVerified": author_verified,
|
|
"authorThumbnails": author_thumbnail,
|
|
|
|
"subCountText": str(subs),
|
|
"lengthSeconds": length,
|
|
"allowRatings": allow_ratings,
|
|
"rating": 0,
|
|
"isListed": is_listed,
|
|
"liveNow": live_now,
|
|
"isPostLiveDvr": post_live_dvr,
|
|
"isUpcoming": is_upcoming,
|
|
"dashUrl": ythdd_globals.config['general']['public_facing_url'] + "/dash/not/implemented/", # not implemented
|
|
"premiereTimestamp": premiere_timestamp,
|
|
|
|
#"hlsUrl": hls_url, # broken after a change in iOS player
|
|
"adaptiveFormats": adaptive_formats, # same as hlsUrl
|
|
"formatStreams": format_streams, # very bare bones, empty actually xD
|
|
"captions": [], # not implemented
|
|
# "captions": [
|
|
# {
|
|
# "label": String,
|
|
# "language_code": String,
|
|
# "url": String
|
|
# }
|
|
# ],
|
|
# "musicTracks": [
|
|
# {
|
|
# "song": String,
|
|
# "artist": String,
|
|
# "album": String,
|
|
# "license": String
|
|
# }
|
|
# ],
|
|
"recommendedVideos": related,
|
|
"took": time_end - time_start
|
|
}
|
|
#'''
|
|
|
|
if ythdd_globals.config['general']['debug']:
|
|
response["ydata"] = ydata
|
|
response["wdata"] = wdata
|
|
|
|
if ythdd_globals.config['general']['cache']:
|
|
ythdd_globals.video_cache[data[3]] = response
|
|
ythdd_globals.video_cache[data[3]]['cacheTime'] = time()
|
|
|
|
# for debugging:
|
|
#return send(200, ythdd_extractor.WEBextractSinglePage(data[3]))
|
|
#return send(200, ythdd_extractor.IOSextract(data[3]))
|
|
#return send(200, {'idata': idata, 'wdata': wdata})
|
|
|
|
# if youtube returns not the videoId we aksed
|
|
# then it means that the instance is ratelimited
|
|
status_code = 200 if data[3] == response['videoId'] else 403
|
|
|
|
return send(status_code, response)
|
|
|
|
def lookup(data, request):
|
|
# possibly TODO: rewrite this mess
|
|
if len(data) > 2:
|
|
if (data[0], data[1]) == ("api", "v1"):
|
|
match data[2]:
|
|
case 'stats' | '': # /api/invidious/api/v1/stats and /api/invidious/api/v1/
|
|
return stats()
|
|
case 'trending':
|
|
return trending()
|
|
case 'popular':
|
|
return popular()
|
|
case 'videos':
|
|
return videos(data)
|
|
case 'auth':
|
|
return auth(data)
|
|
case _:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
elif data[0] in ('ggpht', 'vi'):
|
|
# for some reason the Materialous client
|
|
# and FreeTube keep making requests to these
|
|
if data[1] in ('ggpht', 'vi'):
|
|
return redirect('/' + "/".join(data[1:]))
|
|
return redirect('/' + "/".join(data[0:]))
|
|
else:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
elif len(data) == 2:
|
|
if (data[0], data[1]) == ("api", "v1"): # /api/invidious/api/v1
|
|
return stats()
|
|
elif data[0] == "streams":
|
|
return streams()
|
|
elif data[0] in ('ggpht', 'vi'):
|
|
return redirect('/' + "/".join(data[0:]))
|
|
else:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
|
|
elif len(data) == 1:
|
|
return stats() # /api/invidious/something |