778 lines
34 KiB
Python
778 lines
34 KiB
Python
#!/usr/bin/python3
|
|
# ythdd Invidious Translation Layer
|
|
# -----
|
|
# Translates requests sent through Invidious API at /api/invidious/
|
|
# to use internal extractors.
|
|
from flask import Response, request, redirect
|
|
from markupsafe import escape
|
|
from time import strftime, gmtime, time
|
|
from ythdd_globals import safeTraverse
|
|
import json, datetime
|
|
import dateparser
|
|
import invidious_formats
|
|
import ythdd_globals
|
|
import ythdd_api_v1
|
|
import ythdd_extractor
|
|
|
|
# TODO:
|
|
# [✓] /api/v1/stats (stats())
|
|
# [✓] /streams/dQw4w9WgXcQ (does nothing)
|
|
# [✓] /vi/videoIdXXXX/maxresdefault.jpg (todo: add a fallback for 404s)
|
|
# [✓] /api/v1/search?q=... (videos and playlists)
|
|
# [X] /api/v1/playlists/:plid
|
|
# [*] /api/v1/auth/subscriptions (stub? db?)
|
|
# [*] /api/v1/auth/feed?page=1 (stub? db?)
|
|
# [*] /api/v1/auth/playlists (stub? db?)
|
|
# [*] /api/v1/videos/videoIdXXXX
|
|
|
|
def incrementBadRequests():
|
|
ythdd_globals.apiFailedRequests += 1
|
|
|
|
def greeting():
|
|
return 200, 'hello from Invidious TL!\nstats endpoint at /api/invidious/stats'
|
|
|
|
def send(status, response):
|
|
return Response(json.dumps(response), mimetype='application/json', status=status)
|
|
|
|
def notImplemented(data):
|
|
return send(501, {'error': f"not recognised/implemented in Invidious TL: {'/'.join(data)}"})
|
|
|
|
def stats():
|
|
data_to_send = {
|
|
"version": "2.0",
|
|
"software":
|
|
{
|
|
"name": "invidious",
|
|
"version": f"invidious TL, ythdd ({ythdd_globals.version})",
|
|
"branch": "https://gitea.7o7.cx/sherl/ythdd",
|
|
"tl_msg": "/api/invidious/api/v1/"
|
|
}
|
|
}
|
|
return send(200, data_to_send)
|
|
|
|
def videoIdSanityCheck(videoId: str):
|
|
if len(videoId) != 11:
|
|
incrementBadRequests()
|
|
return send(400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.')
|
|
# elif...?
|
|
|
|
def auth(data):
|
|
# can be either subscriptions, feed or playlists
|
|
match data[1]:
|
|
# NOT YET IMPLEMENTED
|
|
# TODO: make it use the internal db
|
|
case "subscriptions" | "feed" | "playlists":
|
|
return send(200, [])
|
|
case _:
|
|
incrementBadRequests()
|
|
return send(404, [])
|
|
|
|
def streams():
|
|
return send(200, '')
|
|
|
|
def epochToDate(epoch):
|
|
return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(epoch))
|
|
|
|
def dateToEpoch(date: str):
|
|
return datetime.datetime.fromisoformat(date).timestamp()
|
|
|
|
def doesContainNumber(string: str, numeric_system: int = 10) -> bool:
|
|
try:
|
|
number = int(string, numeric_system)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
raise BaseException("doesContainNumber(): Unknown error while determining if a string contains a number")
|
|
|
|
def trending():
|
|
return send(200, [{}])
|
|
|
|
def popular():
|
|
return send(200, [{}])
|
|
|
|
def getError(idata: dict):
|
|
unknown_error = {"status": "Unknown error", "reason": "This is a generic ythdd error."}
|
|
error = ""
|
|
|
|
try:
|
|
playabilityStatus = safeTraverse(idata, ['stage1', 'playabilityStatus'], default=unknown_error)
|
|
print(playabilityStatus)
|
|
error += f"({playabilityStatus['status']}) {playabilityStatus['reason']}"
|
|
except:
|
|
error += f"Generic error"
|
|
|
|
try:
|
|
errorScreen = safeTraverse(idata, ['stage1', 'playabilityStatus', 'errorScreen', 'playerErrorMessageRenderer', 'subreason', 'runs'], default=[])
|
|
error += " - "
|
|
for x in errorScreen:
|
|
error += f"{x['text']} "
|
|
except:
|
|
pass
|
|
|
|
return error
|
|
|
|
def genThumbs(videoId: str):
|
|
|
|
result = []
|
|
thumbnails = [
|
|
#{'height': 720, 'width': 1280, 'quality': "maxres", 'url': "maxres"}, # for the time being omit the buggy maxres quality
|
|
{'height': 720, 'width': 1280, 'quality': "maxresdefault", 'url': "maxresdefault"},
|
|
{'height': 480, 'width': 640, 'quality': "sddefault", 'url': "sddefault"},
|
|
{'height': 360, 'width': 480, 'quality': "high", 'url': "hqdefault"},
|
|
{'height': 180, 'width': 320, 'quality': "medium", 'url': "mqdefault"},
|
|
{'height': 90, 'width': 120, 'quality': "default", 'url': "default"},
|
|
{'height': 90, 'width': 120, 'quality': "start", 'url': "1"},
|
|
{'height': 90, 'width': 120, 'quality': "middle", 'url': "2"},
|
|
{'height': 90, 'width': 120, 'quality': "end", 'url': "3"},
|
|
]
|
|
|
|
for x in thumbnails:
|
|
width = x['width']
|
|
height = x['height']
|
|
quality = x['quality']
|
|
url = ythdd_globals.config['general']['public_facing_url'] + 'vi/' + videoId + '/' + x['url'] + '.jpg'
|
|
result.append({'quality': quality, 'url': url, 'width': width, 'height': height})
|
|
|
|
return result
|
|
|
|
def rebuildFormats(data):
|
|
result = [{} for x in data]
|
|
formatStreams = []
|
|
best_bitrate_video = 0
|
|
best_bitrate_audio = -1
|
|
|
|
for x in range(len(data)):
|
|
|
|
try:
|
|
result[x]['audioChannels'] = data[x]['audioChannels']
|
|
isVideo = 0
|
|
except:
|
|
isVideo = 1
|
|
|
|
if not "initRange" in data[x]: # for livestreams?
|
|
continue
|
|
|
|
result[x]['init'] = str(data[x]['initRange']['start']) + "-" + str(data[x]['initRange']['end'])
|
|
result[x]['index'] = str(data[x]['indexRange']['start']) + "-" + str(data[x]['indexRange']['end'])
|
|
result[x]['bitrate'] = str(data[x]['averageBitrate'])
|
|
result[x]['url'] = data[x]['url']
|
|
result[x]['itag'] = str(data[x]['itag'])
|
|
result[x]['type'] = data[x]['mimeType']
|
|
result[x]['clen'] = data[x]['contentLength']
|
|
result[x]['lmt'] = data[x]['lastModified']
|
|
result[x]['projectionType'] = data[x]['projectionType']
|
|
try:
|
|
result[x]['colorInfo'] = data[x]['colorInfo']
|
|
except:
|
|
pass
|
|
|
|
if "audio" == data[x]['mimeType'][:5]:
|
|
isAudio = 1
|
|
else:
|
|
isAudio = 0
|
|
|
|
if isVideo:
|
|
result[x]['fps'] = str(data[x]['fps'])
|
|
else:
|
|
result[x]['audioQuality'] = data[x]['audioQuality']
|
|
result[x]['audioSampleRate'] = data[x]['audioSampleRate']
|
|
|
|
if data[x]['itag'] in invidious_formats.FORMATS.keys():
|
|
result[x]['container'] = invidious_formats.FORMATS[data[x]['itag']]['ext']
|
|
try:
|
|
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['vcodec']
|
|
except:
|
|
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['acodec']
|
|
|
|
if isVideo:
|
|
try:
|
|
result[x]['resolution'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p"
|
|
result[x]['qualityLabel'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p" + str(result[x]['fps']) * (data[x]['fps'] > 30) # NOT IMPLEMENTED, that's just a placeholder
|
|
result[x]['size'] = str(invidious_formats.FORMATS[data[x]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[x]['itag']]['height'])
|
|
except:
|
|
pass
|
|
|
|
# we assume here that a stream with the highest bitrate must be a video stream- that may not be the case
|
|
if data[x]['averageBitrate'] > data[best_bitrate_video]['averageBitrate'] and isVideo:
|
|
best_bitrate_video = x
|
|
if data[x]['averageBitrate'] > data[best_bitrate_audio]['averageBitrate'] and isAudio:
|
|
best_bitrate_audio = x
|
|
|
|
# makes FreeTube work, unfortunately it's a video-only stream
|
|
formatStreams = [
|
|
{
|
|
"url": data[best_bitrate_video]['url'],
|
|
"itag": str(data[best_bitrate_video]['itag']),
|
|
"type": data[best_bitrate_video]['mimeType'],
|
|
"quality": data[best_bitrate_video]['quality'],
|
|
"bitrate": str(data[best_bitrate_video]['averageBitrate']),
|
|
"fps": data[best_bitrate_video]['fps'],
|
|
"size": "", # workaround for clipious, which requires ANYTHING to be passed, or else it will throw and error and won't load the video
|
|
"resolution": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
"qualityLabel": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
"container": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['ext'],
|
|
"encoding": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['vcodec']
|
|
},
|
|
# {
|
|
# "audioChannels": data[best_bitrate_audio]['audioChannels'],
|
|
# "init": result[best_bitrate_audio]['init'],
|
|
# "index": result[best_bitrate_audio]['index'],
|
|
# "bitrate": str(data[best_bitrate_audio]['averageBitrate']),
|
|
# "url": data[best_bitrate_audio]['url'],
|
|
# "itag": str(data[best_bitrate_audio]['itag']),
|
|
# "type": data[best_bitrate_audio]['mimeType'],
|
|
# "clen": result[best_bitrate_audio]['clen'],
|
|
# "lmt": result[best_bitrate_audio]['lmt'],
|
|
# "projectionType": result[best_bitrate_audio]['projectionType'],
|
|
# "audioQuality": result[best_bitrate_audio]['audioQuality'],
|
|
# "audioSampleRate": result[best_bitrate_audio]['audioSampleRate'],
|
|
# "qualityLabel": "audio"
|
|
# }
|
|
]
|
|
|
|
# not all itags have width and/or height
|
|
try:
|
|
formatStreams[0]["size"] = str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['height'])
|
|
except:
|
|
pass
|
|
|
|
return result, formatStreams
|
|
|
|
def rebuildFormatsFromYtdlpApi(ydata: dict):
|
|
# Rebuild invidious-compatible formats from yt-dlp's output (ydata)
|
|
|
|
adaptive_formats = []
|
|
format_streams = []
|
|
|
|
for stream in safeTraverse(ydata, ["formats"], default=[]):
|
|
|
|
if safeTraverse(stream, ["protocol"], default="storyboard") not in ("http_dash_segments", "https"):
|
|
continue
|
|
|
|
newRow = {}
|
|
# Add from ...'s ... to ... as ...
|
|
newRow["bitrate"] = str(int(safeTraverse(stream, ["tbr"], default=0) * 1000))
|
|
newRow["url"] = safeTraverse(stream, ["url"])
|
|
newRow["itag"] = safeTraverse(stream, ["format_id"])
|
|
params = ythdd_extractor.paramsFromUrl(newRow["url"])
|
|
|
|
vcodec = safeTraverse(stream, ["vcodec"], default="none")
|
|
acodec = safeTraverse(stream, ["acodec"], default="none")
|
|
if vcodec == "none" and acodec == "none":
|
|
continue
|
|
|
|
if safeTraverse(stream, ["acodec"]) != "none":
|
|
# audio-only track
|
|
type = safeTraverse(stream, ["audio_ext"], default=None)
|
|
fnote = safeTraverse(stream, ["format_note"], default="low")
|
|
if type is None:
|
|
type = "mp4"
|
|
newRow[ "type"] = "audio/" + type
|
|
newRow[ "audioQuality"] = fnote
|
|
newRow["audioSampleRate"] = int(safeTraverse(stream, ["asr"], default="44100"))
|
|
newRow[ "audioChannels"] = int(safeTraverse(stream, ["audio_channels"]))
|
|
newRow[ "qualityLabel"] = str(safeTraverse(stream, ["abr"], default="?"))
|
|
newRow[ "resolution"] = f"{fnote} quality"
|
|
newRow[ "size"] = "0x0"
|
|
if safeTraverse(stream, ["vcodec"]) != "none":
|
|
# either video-only or video+audio
|
|
type = safeTraverse(stream, ["video_ext"], default=None)
|
|
if type is None:
|
|
type = "mp4"
|
|
height = str(safeTraverse(stream, ["height"], default=0))
|
|
width = str(safeTraverse(stream, [ "width"], default=0))
|
|
newRow[ "type"] = "video/" + type
|
|
newRow[ "resolution"] = (height if height in ("144", "240", "360", "480", "720", "1080") else "360") + "p" # mpv won't play the video inside of Yattee if it's a non-standard resolution (bug?)
|
|
newRow[ "fps"] = safeTraverse(stream, ["fps"], default=30)
|
|
newRow[ "qualityLabel"] = height + "p" + str(int(newRow['fps'])) * (newRow["fps"] > 30) # also a placeholder
|
|
newRow[ "size"] = width + "x" + height
|
|
|
|
newRow[ "clen"] = safeTraverse(params, ["clen"], default=safeTraverse(stream, ["filesize"], default="0"))
|
|
newRow[ "lmt"] = safeTraverse(params, ["lmt"], default="0")
|
|
|
|
if newRow["clen"] is None:
|
|
# for clipious sake which expects a string
|
|
newRow["clen"] = "0"
|
|
|
|
newRow[ "projectionType"] = "RECTANGULAR" # clipious requires this to be mentioned explicitly and cannot be nulled
|
|
newRow[ "container"] = safeTraverse(stream, ["ext"], default="unknown_container")
|
|
newRow[ "encoding"] = safeTraverse(invidious_formats.FORMATS, [int("0" + newRow["itag"].split("-")[0]), "ext"], default="unknown_encoding") # not sure this split is necessary
|
|
newRow[ "quality"] = newRow["qualityLabel"]
|
|
newRow[ "init"] = "0-1" # dummy values
|
|
newRow[ "index"] = "2-3" # dummy values
|
|
|
|
if vcodec != "none" and acodec != "none":
|
|
# 360p stream
|
|
format_streams.append(newRow.copy())
|
|
if vcodec != "none" or acodec != "none":
|
|
adaptive_formats.append(newRow.copy())
|
|
|
|
|
|
# {
|
|
# "url": data[best_bitrate_video]['url'],
|
|
# "itag": str(data[best_bitrate_video]['itag']),
|
|
# "type": data[best_bitrate_video]['mimeType'],
|
|
# "quality": data[best_bitrate_video]['quality'],
|
|
# "bitrate": str(data[best_bitrate_video]['averageBitrate']),
|
|
# "fps": data[best_bitrate_video]['fps'],
|
|
# "size": "", # workaround for clipious, which requires ANYTHING to be passed, or else it will throw and error and won't load the video
|
|
# "resolution": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
# "qualityLabel": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
# "container": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['ext'],
|
|
# "encoding": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['vcodec']
|
|
# }
|
|
|
|
# {
|
|
# "audioChannels": data[best_bitrate_audio]['audioChannels'],
|
|
# "init": result[best_bitrate_audio]['init'],
|
|
# "index": result[best_bitrate_audio]['index'],
|
|
# "bitrate": str(data[best_bitrate_audio]['averageBitrate']),
|
|
# "url": data[best_bitrate_audio]['url'],
|
|
# "itag": str(data[best_bitrate_audio]['itag']),
|
|
# "type": data[best_bitrate_audio]['mimeType'],
|
|
# "clen": result[best_bitrate_audio]['clen'],
|
|
# "lmt": result[best_bitrate_audio]['lmt'],
|
|
# "projectionType": result[best_bitrate_audio]['projectionType'],
|
|
# "audioQuality": result[best_bitrate_audio]['audioQuality'],
|
|
# "audioSampleRate": result[best_bitrate_audio]['audioSampleRate'],
|
|
# "qualityLabel": "audio"
|
|
# }
|
|
|
|
return adaptive_formats, format_streams
|
|
|
|
def videos(data):
|
|
# an attempt on a faithful rewrite of
|
|
# https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr
|
|
response = {}
|
|
|
|
#print(f"got data: {data}")
|
|
|
|
#print("requesting idata from IOSextract")
|
|
# idata = ythdd_extractor.IOSextract(data[3])
|
|
|
|
# hls_url = safeTraverse(idata, ['stage1', 'streamingData', 'hlsManifestUrl'], default="")
|
|
# adaptive_formats = safeTraverse(idata, ['stage1', 'streamingData', 'adaptiveFormats'], default=[])
|
|
# if not hls_url or not adaptive_formats:
|
|
# print(f"serious error: couldn't get hls_url or adaptive_formats!\n"
|
|
# f"dumping idata:\n"
|
|
# f"{idata}")
|
|
# return send(500, {'error': getError(idata)})
|
|
|
|
time_start = time()
|
|
|
|
if ythdd_globals.config['general']['cache'] and data[3] in ythdd_globals.video_cache:
|
|
if ythdd_globals.video_cache[data[3]]['cacheTime'] + 3 * 60 * 60 > time():
|
|
response = ythdd_globals.video_cache[data[3]]
|
|
response['fromCache'] = True
|
|
return send(200, response)
|
|
else:
|
|
del ythdd_globals.video_cache[data[3]]
|
|
|
|
ydata = ythdd_extractor.extract(data[3], manifest_fix=True)
|
|
wdata = ythdd_extractor.WEBextractSinglePage(data[3])
|
|
|
|
#return send(200, {'ydata': ydata, 'wdata': wdata})
|
|
#return send(200, {'idata': idata, 'wdata': wdata})
|
|
|
|
# main_results = idata['stage3']['contents']['twoColumnWatchNextResults']
|
|
# primary_results = safeTraverse(main_results, ['results', 'results', 'contents'])
|
|
# if primary_results:
|
|
# video_primary_renderer = safeTraverse(primary_results, [0, 'videoPrimaryInfoRenderer'])
|
|
# video_secondary_renderer = safeTraverse(primary_results, [1, 'videoSecondaryInfoRenderer'])
|
|
# else:
|
|
# print("error: primary_results not found in invidious TL videos()")
|
|
|
|
video_details = safeTraverse(wdata, ['ec1', 'videoDetails'])
|
|
microformat = safeTraverse(wdata, ['ec1', 'microformat', 'playerMicroformatRenderer'], default={})
|
|
|
|
video_id = safeTraverse(video_details, ['videoId'], default=f"[{data[3]}] (errors occurred, check logs)")
|
|
title = safeTraverse(video_details, ['title'], default=video_id)
|
|
views = int(safeTraverse(video_details, ['viewCount'], default=0))
|
|
length = int(safeTraverse(video_details, ['lengthSeconds'], default=1))
|
|
published = dateToEpoch(safeTraverse(microformat, ['publishDate'], default="1970-01-02T00:00:00Z")) # ISO format to Unix timestamp
|
|
published_date = epochToDate(published)
|
|
premiere_timestamp = safeTraverse(microformat, ['liveBroadcastDetails', 'startTimestamp'], default=None) # let's ignore the nitty gritty for the time being
|
|
premiere_timestamp = premiere_timestamp if premiere_timestamp else safeTraverse(microformat, ['playabilityStatus', 'liveStreamability', 'liveStreamabilityRenderer', 'offlineSlate', 'liveStreamOfflineSlateRenderer', 'scheduledStartTime'], default=None)
|
|
live_now = safeTraverse(microformat, ['liveBroadcastDetails', 'isLiveNow'], default=False)
|
|
post_live_dvr = safeTraverse(video_details, ['isPostLiveDvr'], default=False)
|
|
allowed_regions = safeTraverse(microformat, ['availableCountries'], default=[])
|
|
allow_ratings = safeTraverse(video_details, ['allowRatings'], default=True)
|
|
family_friendly = safeTraverse(microformat, ['isFamilySafe'], default=True)
|
|
is_listed = safeTraverse(video_details, ['isCrawlable'], default=True)
|
|
is_upcoming = safeTraverse(video_details, ['isUpcoming'], default=False)
|
|
keywords = safeTraverse(video_details, ['keywords'], default=[])
|
|
|
|
related_raw = safeTraverse(wdata, ['ec2', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results'], default=[]) # can possibly change in the future
|
|
related = []
|
|
for x in related_raw[:-1]:
|
|
if safeTraverse(x, ['compactVideoRenderer'], default=[]):
|
|
y = safeTraverse(x, ['compactVideoRenderer'])
|
|
if type(y) != dict:
|
|
continue
|
|
related_video = {}
|
|
related_video['videoId'] = safeTraverse(y, ['videoId'])
|
|
related_video['title'] = safeTraverse(y, ['title', 'simpleText'])
|
|
related_video['videoThumbnails'] = genThumbs(related_video['videoId']) #safeTraverse(y, ['thumbnail', 'thumbnails'])
|
|
related_video['author'] = safeTraverse(y, ['longBylineText', 'runs', 0, 'text'])
|
|
related_video['authorId'] = safeTraverse(y, ['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], default="UNKNOWNCHANNELID")
|
|
related_video['authorUrl'] = '/channel/' + related_video['authorId']
|
|
related_video['authorVerified'] = False
|
|
if "ownerBadges" in y:
|
|
related_video['authorVerified'] = True # hopefully this won't break things, as invidious API doesn't distinguish music and normal verified badges
|
|
related_video['authorThumbnails'] = safeTraverse(y, ['channelThumbnail', 'thumbnails'], default=[])
|
|
for z in related_video['authorThumbnails']:
|
|
z['url'] = ythdd_globals.translateLinks(z['url'])
|
|
related_video['lengthSeconds'] = 0
|
|
time_lookup_list = [1, 60, 3_600, 86_400]
|
|
time_list = safeTraverse(y, ['lengthText', 'simpleText'], default="0:0").split(":")
|
|
for z in range(len(time_list)):
|
|
related_video['lengthSeconds'] += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
|
|
related_views_text = safeTraverse(y, ['viewCountText', 'simpleText'], default="0").split(" ")[0]
|
|
related_video['viewCountText'] = safeTraverse(y, ['shortViewCountText', 'simpleText'], default="0").split(" ")[0]
|
|
related_views = 0
|
|
if related_views_text:
|
|
if related_views_text.lower() == "no":
|
|
related_views_text = "0"
|
|
related_views = int("0" + "".join([z for z in related_views_text if 48 <= ord(z) and ord(z) <= 57]))
|
|
related_views_text = related_views_text.split(" ")[0]
|
|
related_video['viewCount'] = related_views
|
|
related.append(related_video)
|
|
else:
|
|
y = safeTraverse(x, ['lockupViewModel'])
|
|
if type(y) != dict:
|
|
continue
|
|
lmvm = safeTraverse(y, ['metadata', 'lockupMetadataViewModel'], default=[])
|
|
related_video = {}
|
|
related_video['videoId'] = safeTraverse(y, ['contentId'])
|
|
related_video['title'] = safeTraverse(lmvm, ['title', 'content'])
|
|
related_video['videoThumbnails'] = genThumbs(related_video['videoId']) #safeTraverse(y, ['thumbnail', 'thumbnails'])
|
|
related_video['author'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 0, 'metadataParts', 0, 'text', 'content'])
|
|
related_video['authorId'] = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'rendererContext', 'commandContext', 'onTap', 'innertubeCommand', 'browseEndpoint', 'browseId'], default="UNKNOWNCHANNELID")
|
|
related_video['authorUrl'] = '/channel/' + related_video['authorId']
|
|
related_video['authorVerified'] = False if safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 0, 'metadataParts', 0, 'text', 'attachmentRuns']) is None else True # seens to do the job
|
|
related_video['authorThumbnails'] = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'avatar', 'avatarViewModel', 'image', 'sources'], default=[])
|
|
for z in related_video['authorThumbnails']:
|
|
z['url'] = ythdd_globals.translateLinks(z['url'])
|
|
related_video['lengthSeconds'] = parseLengthFromTimeBadge(safeTraverse(y, ['contentImage', 'thumbnailViewModel', 'overlays', 0, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', 0, 'thumbnailBadgeViewModel', 'text'], default="0:0"))
|
|
related_video['viewCountText'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 1, 'metadataParts', 0, 'text', 'content'], default="0").split(" ")[0]
|
|
related_video['viewCount'] = parseViewsFromViewText(related_video['viewCountText'])
|
|
related.append(related_video)
|
|
|
|
# magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
|
|
# toplevel_buttons = safeTraverse(video_primary_renderer, ['videoActions', 'menuRenderer', 'topLevelButtons'], default={}) # hacky solution
|
|
# likes_text = safeTraverse(toplevel_buttons, [0, 'segmentedLikeDislikeButtonViewModel', 'likeButtonViewModel', 'likeButtonViewModel', 'toggleButtonViewModel', 'toggleButtonViewModel', 'defaultButtonViewModel', 'buttonViewModel', 'title'], default="") # hacky solution
|
|
# likes = 0
|
|
# if likes_text:
|
|
# likes = int("".join([x for x in likes_text if 48 <= ord(x) and ord(x) <= 57])) # ASCII for 0-9, no regex needed
|
|
# likes_text = likes_text.split(" ")[0]
|
|
# for x in magnitude.keys():
|
|
# if x in likes_text:
|
|
# likes *= magnitude[x]
|
|
likes = safeTraverse(ydata, ['like_count'], default=0)
|
|
description = safeTraverse(microformat, ['description', 'simpleText'], default="\n(ythdd: failed to retrieve description, perhaps it's empty?)")
|
|
short_description = safeTraverse(wdata, ['ec1', 'videoDetails', 'shortDescription'], default="(ythdd: failed to retrieve short description, perhaps it's empty?)")
|
|
description_html = "<p>" + description + "</p>" # sorry, not happening right now, TODO: https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr#L329
|
|
|
|
# metadata = safeTraverse(video_secondary_renderer, ['metadataRowContainer', 'metadataRowContainerRenderer', 'rows'], default={})
|
|
genre = safeTraverse(microformat, ['category'])
|
|
# TODO: genre blah blah blah...
|
|
author = safeTraverse(video_details, ['author'], default="Unknown Author")
|
|
ucid = safeTraverse(video_details, ['channelId'], default="UNKNOWNCHANNELID")
|
|
# author_info = safeTraverse(video_secondary_renderer, ['owner', 'videoOwnerRenderer'], default={})
|
|
# author_thumbnail = safeTraverse(author_info, ['thumbnail', 'thumbnails']) # lowest quality thumbnail
|
|
# subs_text = safeTraverse(author_info, ['subscriberCountText', 'simpleText'], default="0")
|
|
# subs = 0
|
|
# if subs_text:
|
|
# subs = int("".join([x for x in subs_text if 48 <= ord(x) and ord(x) <= 57]))
|
|
# subs_text = subs_text.split(" ")[0]
|
|
# for x in magnitude.keys():
|
|
# if x in subs_text:
|
|
# subs *= magnitude[x]
|
|
subs = ydata['channel_follower_count']
|
|
channel_about_info = ythdd_extractor.browseAbout(ucid)
|
|
author_thumbnail = [ # must be a list
|
|
ythdd_extractor.getChannelAvatar(channel_about_info)
|
|
] * 3 # yes really
|
|
# for x in author_thumbnail:
|
|
# # rewrite to use views.py
|
|
# x['url'] = ythdd_globals.translateLinks(x['url'])
|
|
|
|
# so far it seems to be impossible to tell if a channel is verified or not,
|
|
# that is - without making another request
|
|
author_verified = ythdd_extractor.isVerified(channel_about_info)
|
|
|
|
format_streams = []
|
|
# adaptive_formats, format_streams = rebuildFormats(adaptive_formats)
|
|
adaptive_formats, format_streams = rebuildFormatsFromYtdlpApi(ydata)
|
|
|
|
if live_now:
|
|
video_type = "livestream"
|
|
elif premiere_timestamp:
|
|
video_type = "scheduled"
|
|
published = dateToEpoch(premiere_timestamp) if premiere_timestamp else int(time())
|
|
else:
|
|
video_type = "video"
|
|
|
|
premium = False
|
|
if "YouTube Red" in keywords:
|
|
premium = True
|
|
# TODO: detect paywalled patron-only videos
|
|
|
|
time_end = time()
|
|
|
|
#'''
|
|
response = {
|
|
"type": video_type,
|
|
"title": title,
|
|
"videoId": video_id,
|
|
"videoThumbnails": genThumbs(video_id),
|
|
"storyboards": [], # not implemented
|
|
|
|
"description": description, # due to change (include ythdd metadata)
|
|
"descriptionHtml": description_html, # basically the same as normal description for the time being
|
|
"published": published,
|
|
"publishedText": published_date,
|
|
|
|
"keywords": keywords,
|
|
"viewCount": views,
|
|
"viewCountText": str(views), # not implemented
|
|
"likeCount": likes,
|
|
"dislikeCount": 0,
|
|
|
|
"paid": False, # not implemented
|
|
"premium": premium,
|
|
"isFamilyFriendly": family_friendly,
|
|
"allowedRegions": allowed_regions,
|
|
"genre": genre,
|
|
"genreUrl": "/genreUrl/not/implemented/", # not implemented
|
|
|
|
"author": author,
|
|
"authorId": ucid,
|
|
"authorUrl": "/channel/" + ucid,
|
|
"authorVerified": author_verified,
|
|
"authorThumbnails": author_thumbnail,
|
|
|
|
"subCountText": str(subs),
|
|
"lengthSeconds": length,
|
|
"allowRatings": allow_ratings,
|
|
"rating": 0,
|
|
"isListed": is_listed,
|
|
"liveNow": live_now,
|
|
"isPostLiveDvr": post_live_dvr,
|
|
"isUpcoming": is_upcoming,
|
|
"dashUrl": ythdd_globals.config['general']['public_facing_url'] + "/dash/not/implemented/", # not implemented
|
|
"premiereTimestamp": premiere_timestamp,
|
|
|
|
#"hlsUrl": hls_url, # broken after a change in iOS player
|
|
"adaptiveFormats": adaptive_formats, # same as hlsUrl
|
|
"formatStreams": format_streams, # very bare bones, empty actually xD
|
|
"captions": [], # not implemented
|
|
# "captions": [
|
|
# {
|
|
# "label": String,
|
|
# "language_code": String,
|
|
# "url": String
|
|
# }
|
|
# ],
|
|
# "musicTracks": [
|
|
# {
|
|
# "song": String,
|
|
# "artist": String,
|
|
# "album": String,
|
|
# "license": String
|
|
# }
|
|
# ],
|
|
"recommendedVideos": related,
|
|
"took": time_end - time_start
|
|
}
|
|
#'''
|
|
|
|
if ythdd_globals.config['general']['debug']:
|
|
response["ydata"] = ydata
|
|
response["wdata"] = wdata
|
|
|
|
if ythdd_globals.config['general']['cache']:
|
|
ythdd_globals.video_cache[data[3]] = response
|
|
ythdd_globals.video_cache[data[3]]['cacheTime'] = time()
|
|
|
|
# for debugging:
|
|
#return send(200, ythdd_extractor.WEBextractSinglePage(data[3]))
|
|
#return send(200, ythdd_extractor.IOSextract(data[3]))
|
|
#return send(200, {'idata': idata, 'wdata': wdata})
|
|
|
|
# if youtube returns not the videoId we aksed
|
|
# then it means that the instance is ratelimited
|
|
status_code = 200 if data[3] == response['videoId'] else 403
|
|
|
|
return send(status_code, response)
|
|
|
|
def parseLengthFromTimeBadge(time_str: str) -> int:
|
|
# Returns 0 if unsuccessful
|
|
length = 0
|
|
time_lookup_list = [1, 60, 3_600, 86_400]
|
|
time_list = time_str.split(":")
|
|
if False in map(doesContainNumber, time_list): # works around ['LIVE'] for livestreams or ['Upcoming'] for scheduled videos
|
|
pass
|
|
else:
|
|
for z in range(len(time_list)):
|
|
length += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
|
|
return length
|
|
|
|
def parseViewsFromViewText(viewcounttext: str) -> int:
|
|
views = 0
|
|
magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
|
|
if viewcounttext:
|
|
if viewcounttext.lower() == "no":
|
|
viewcounttext = "0"
|
|
views = int("0" + "".join([z for z in viewcounttext if 48 <= ord(z) and ord(z) <= 57]))
|
|
viewcounttext = viewcounttext.split(" ")[0]
|
|
for x in magnitude.keys():
|
|
if x == viewcounttext[-1].upper():
|
|
views *= magnitude[x]
|
|
return views
|
|
|
|
def search(data, req):
|
|
search_query = req.args.get('q')
|
|
print(f"search query: {search_query}")
|
|
|
|
# ignore paginated requests as we do nothing with the continuation token
|
|
page = req.args.get('page')
|
|
if page is not None and page != '1':
|
|
return send(404, [])
|
|
|
|
if (data[-2].lower() != "search" or data[-1].lower() != "") and data[-1].lower() != "search":
|
|
print(f"'{data[-2]}', '{data[-1]}'")
|
|
print("search suggestions are not yet supported")
|
|
return send(501, {"status": "error", "msg": "search suggestions not supported in this version of ythdd", "data": []})
|
|
|
|
results = ythdd_extractor.WEBextractSearchResults(search_query)
|
|
results_list = []
|
|
|
|
for entry in results:
|
|
|
|
match safeTraverse(list(entry.keys()), [0], default=""):
|
|
|
|
case "videoRenderer": # represents a video
|
|
|
|
published_date = safeTraverse(entry, ["videoRenderer", "publishedTimeText", "simpleText"], default="now")
|
|
published_date = published_date.removeprefix("Streamed ")
|
|
|
|
results_list.append(
|
|
{
|
|
"type": "video",
|
|
"title": safeTraverse(entry, ["videoRenderer", "title", "runs", 0, "text"]),
|
|
"videoId": safeTraverse(entry, ["videoRenderer", "videoId"]),
|
|
"author": safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "text"]),
|
|
"authorId": safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"]),
|
|
"authorUrl": "/channel/" + safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID"),
|
|
"authorVerified": False, # TODO
|
|
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(entry, ["videoRenderer", "avatar", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default="unknown")),
|
|
"videoThumbnails": genThumbs(safeTraverse(entry, ["videoRenderer", "videoId"], default="unknown")),
|
|
"description": "",
|
|
"descriptionHtml": "",
|
|
"viewCount": parseViewsFromViewText(safeTraverse(entry, ["videoRenderer", "viewCountText", "simpleText"], default="No views")),
|
|
"viewCountText": safeTraverse(entry, ["videoRenderer", "viewCountText", "simpleText"], default="Unknown amount of views"),
|
|
"published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too
|
|
"publishedText": published_date,
|
|
"lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["videoRenderer", "lengthText", "simpleText"], default="0:0")),
|
|
"liveNow": False,
|
|
"premium": False,
|
|
"isUpcoming": False,
|
|
"isNew": False,
|
|
"is4k": False,
|
|
"is8k": False,
|
|
"isVr180": False,
|
|
"isVr360": False,
|
|
"is3d": False,
|
|
"hasCaptions": False
|
|
}
|
|
)
|
|
|
|
# modify the premiere timestamp afterwards here?
|
|
|
|
case "lockupViewModel": # represents playlists/mixes
|
|
|
|
isMix = safeTraverse(entry, ["lockupViewModel", "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="") == "Mix"
|
|
if isMix:
|
|
# mixes aren't currently supported
|
|
continue
|
|
|
|
lvm = entry["lockupViewModel"]
|
|
meta = safeTraverse(lvm, ["metadata"], default=[])
|
|
lmvm = safeTraverse(meta, ["lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows"], default=[])
|
|
ucid = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
|
length = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="0 videos")
|
|
length = parseViewsFromViewText(length.split(" ")[0])
|
|
|
|
results_list.append(
|
|
{
|
|
"type": "playlist",
|
|
"title": safeTraverse(meta, ["lockupMetadataViewModel", "title", "content"], default="ythdd: unknown title"),
|
|
"playlistId": safeTraverse(lmvm, [2, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "watchEndpoint", "playlistId"], default="UNKNOWNPLAYLISTID"),
|
|
"playlistThumbnail": safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", 0, "url"], default="no-url?"), # todo: sanitize this
|
|
"author": safeTraverse(lmvm, [0, "metadataParts", 0, "text", "content"], default="ythdd: unknown author"),
|
|
"authorId": ucid,
|
|
"authorUrl": "/channel/" + ucid,
|
|
"authorVerified": False,
|
|
"videoCount": length,
|
|
"videos": [] # provided for historical reasons i guess
|
|
}
|
|
)
|
|
|
|
case "shelfRenderer": # "people also watched"
|
|
continue
|
|
|
|
case "gridShelfViewModel": # shorts?
|
|
continue
|
|
|
|
case _:
|
|
print("received a search result of unknown type:")
|
|
print(entry)
|
|
print("")
|
|
# breakpoint()
|
|
continue
|
|
|
|
return send(200, results_list)
|
|
|
|
def lookup(data, req):
|
|
# possibly TODO: rewrite this mess
|
|
if len(data) > 2:
|
|
if (data[0], data[1]) == ("api", "v1"):
|
|
match data[2]:
|
|
case 'stats' | '': # /api/invidious/api/v1/stats and /api/invidious/api/v1/
|
|
return stats()
|
|
case 'trending':
|
|
return trending()
|
|
case 'popular':
|
|
return popular()
|
|
case 'videos':
|
|
return videos(data)
|
|
case 'auth':
|
|
return auth(data)
|
|
case 'search':
|
|
return search(data, req)
|
|
case _:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
elif data[0] in ('ggpht', 'vi'):
|
|
# for some reason the Materialous client
|
|
# and FreeTube keep making requests to these
|
|
if data[1] in ('ggpht', 'vi'):
|
|
return redirect('/' + "/".join(data[1:]))
|
|
return redirect('/' + "/".join(data[0:]))
|
|
else:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
elif len(data) == 2:
|
|
if (data[0], data[1]) == ("api", "v1"): # /api/invidious/api/v1
|
|
return stats()
|
|
elif data[0] == "streams":
|
|
return streams()
|
|
elif data[0] in ('ggpht', 'vi'):
|
|
return redirect('/' + "/".join(data[0:]))
|
|
else:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
|
|
elif len(data) == 1:
|
|
return stats() # /api/invidious/something |