this applies mainly for meta channels like UC4R8DWoMoI7CAwX8_LjQHig, which ythdd can't parse and thus doesn't support
874 lines
37 KiB
Python
874 lines
37 KiB
Python
#!/usr/bin/python3
|
|
# ythdd Invidious Translation Layer
|
|
# -----
|
|
# Translates requests sent through Invidious API at /api/invidious/
|
|
# to use internal extractors.
|
|
from flask import Response, request, redirect
|
|
from markupsafe import escape
|
|
from time import strftime, gmtime, time
|
|
from ythdd_globals import safeTraverse
|
|
from hashlib import md5
|
|
import json, datetime
|
|
import dateparser
|
|
import html
|
|
import invidious_formats
|
|
import ythdd_globals
|
|
import ythdd_api_v1
|
|
import ythdd_extractor
|
|
import ythdd_struct_parser
|
|
|
|
# TODO:
|
|
# [✓] /api/v1/stats (stats())
|
|
# [✓] /streams/dQw4w9WgXcQ (does nothing)
|
|
# [✓] /vi/videoIdXXXX/maxresdefault.jpg (todo: add a fallback for 404s)
|
|
# [✓] /api/v1/search?q=... (videos and playlists)
|
|
# [✓] /api/v1/search/suggestions?q=...&pq=...
|
|
# [✓] /api/v1/channels/:ucid
|
|
# [✓] /api/v1/channels/:ucid/videos, shorts, playlists
|
|
# [✓] /api/v1/comments/:videoid?continuation=...
|
|
# [✓] /api/v1/videos/videoIdXXXX
|
|
# [X] /api/v1/channels/:ucid/streams
|
|
# [X] /api/v1/playlists/:plid
|
|
# [X] /api/v1/storyboards/:videoIdXXXX
|
|
# [*] /api/v1/auth/subscriptions (stub? db?)
|
|
# [*] /api/v1/auth/feed?page=1 (stub? db?)
|
|
# [*] /api/v1/auth/playlists (stub? db?)
|
|
|
|
DEFAULT_AVATAR = "https://yt3.ggpht.com/a/default-user=s176-c-k-c0x00ffffff-no-rj"
|
|
|
|
def incrementBadRequests():
|
|
ythdd_globals.apiFailedRequests += 1
|
|
|
|
def greeting():
|
|
return 200, 'hello from Invidious TL!\nstats endpoint at /api/invidious/stats'
|
|
|
|
def send(status, response):
|
|
return Response(json.dumps(response), mimetype='application/json', status=status)
|
|
|
|
def notImplemented(data):
|
|
return send(501, {'error': f"not recognised/implemented in Invidious TL: {'/'.join(data)}"})
|
|
|
|
def stats():
|
|
data_to_send = {
|
|
"version": "2.0",
|
|
"software":
|
|
{
|
|
"name": "invidious",
|
|
"version": f"invidious TL, ythdd ({ythdd_globals.version})",
|
|
"branch": "https://gitea.7o7.cx/sherl/ythdd",
|
|
"tl_msg": "/api/invidious/api/v1/"
|
|
}
|
|
}
|
|
return send(200, data_to_send)
|
|
|
|
def videoIdSanityCheck(videoId: str):
|
|
if len(videoId) != 11:
|
|
incrementBadRequests()
|
|
return send(400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.')
|
|
# elif...?
|
|
|
|
def auth(data):
|
|
# can be either subscriptions, feed or playlists
|
|
match data[1]:
|
|
# NOT YET IMPLEMENTED
|
|
# TODO: make it use the internal db
|
|
case "subscriptions" | "feed" | "playlists":
|
|
return send(200, [])
|
|
case _:
|
|
incrementBadRequests()
|
|
return send(404, [])
|
|
|
|
def streams():
|
|
return send(200, '')
|
|
|
|
def epochToDate(epoch):
|
|
return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(epoch))
|
|
|
|
def dateToEpoch(date: str):
|
|
return datetime.datetime.fromisoformat(date).timestamp()
|
|
|
|
def trending():
|
|
return send(200, [{}])
|
|
|
|
def popular():
|
|
return send(200, [{}])
|
|
|
|
def getError(idata: dict):
|
|
unknown_error = {"status": "Unknown error", "reason": "This is a generic ythdd error."}
|
|
error = ""
|
|
|
|
try:
|
|
playabilityStatus = safeTraverse(idata, ['stage1', 'playabilityStatus'], default=unknown_error)
|
|
print(playabilityStatus)
|
|
error += f"({playabilityStatus['status']}) {playabilityStatus['reason']}"
|
|
except:
|
|
error += f"Generic error"
|
|
|
|
try:
|
|
errorScreen = safeTraverse(idata, ['stage1', 'playabilityStatus', 'errorScreen', 'playerErrorMessageRenderer', 'subreason', 'runs'], default=[])
|
|
error += " - "
|
|
for x in errorScreen:
|
|
error += f"{x['text']} "
|
|
except:
|
|
pass
|
|
|
|
return error
|
|
|
|
def rebuildFormats(data):
|
|
result = [{} for x in data]
|
|
formatStreams = []
|
|
best_bitrate_video = 0
|
|
best_bitrate_audio = -1
|
|
|
|
for x in range(len(data)):
|
|
|
|
try:
|
|
result[x]['audioChannels'] = data[x]['audioChannels']
|
|
isVideo = 0
|
|
except:
|
|
isVideo = 1
|
|
|
|
if not "initRange" in data[x]: # for livestreams?
|
|
continue
|
|
|
|
result[x]['init'] = str(data[x]['initRange']['start']) + "-" + str(data[x]['initRange']['end'])
|
|
result[x]['index'] = str(data[x]['indexRange']['start']) + "-" + str(data[x]['indexRange']['end'])
|
|
result[x]['bitrate'] = str(data[x]['averageBitrate'])
|
|
result[x]['url'] = data[x]['url']
|
|
result[x]['itag'] = str(data[x]['itag'])
|
|
result[x]['type'] = data[x]['mimeType']
|
|
result[x]['clen'] = data[x]['contentLength']
|
|
result[x]['lmt'] = data[x]['lastModified']
|
|
result[x]['projectionType'] = data[x]['projectionType']
|
|
try:
|
|
result[x]['colorInfo'] = data[x]['colorInfo']
|
|
except:
|
|
pass
|
|
|
|
if "audio" == data[x]['mimeType'][:5]:
|
|
isAudio = 1
|
|
else:
|
|
isAudio = 0
|
|
|
|
if isVideo:
|
|
result[x]['fps'] = str(data[x]['fps'])
|
|
else:
|
|
result[x]['audioQuality'] = data[x]['audioQuality']
|
|
result[x]['audioSampleRate'] = data[x]['audioSampleRate']
|
|
|
|
if data[x]['itag'] in invidious_formats.FORMATS.keys():
|
|
result[x]['container'] = invidious_formats.FORMATS[data[x]['itag']]['ext']
|
|
try:
|
|
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['vcodec']
|
|
except:
|
|
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['acodec']
|
|
|
|
if isVideo:
|
|
try:
|
|
result[x]['resolution'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p"
|
|
result[x]['qualityLabel'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p" + str(result[x]['fps']) * (data[x]['fps'] > 30) # NOT IMPLEMENTED, that's just a placeholder
|
|
result[x]['size'] = str(invidious_formats.FORMATS[data[x]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[x]['itag']]['height'])
|
|
except:
|
|
pass
|
|
|
|
# we assume here that a stream with the highest bitrate must be a video stream- that may not be the case
|
|
if data[x]['averageBitrate'] > data[best_bitrate_video]['averageBitrate'] and isVideo:
|
|
best_bitrate_video = x
|
|
if data[x]['averageBitrate'] > data[best_bitrate_audio]['averageBitrate'] and isAudio:
|
|
best_bitrate_audio = x
|
|
|
|
# makes FreeTube work, unfortunately it's a video-only stream
|
|
formatStreams = [
|
|
{
|
|
"url": data[best_bitrate_video]['url'],
|
|
"itag": str(data[best_bitrate_video]['itag']),
|
|
"type": data[best_bitrate_video]['mimeType'],
|
|
"quality": data[best_bitrate_video]['quality'],
|
|
"bitrate": str(data[best_bitrate_video]['averageBitrate']),
|
|
"fps": data[best_bitrate_video]['fps'],
|
|
"size": "", # workaround for clipious, which requires ANYTHING to be passed, or else it will throw and error and won't load the video
|
|
"resolution": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
"qualityLabel": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
"container": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['ext'],
|
|
"encoding": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['vcodec']
|
|
},
|
|
# {
|
|
# "audioChannels": data[best_bitrate_audio]['audioChannels'],
|
|
# "init": result[best_bitrate_audio]['init'],
|
|
# "index": result[best_bitrate_audio]['index'],
|
|
# "bitrate": str(data[best_bitrate_audio]['averageBitrate']),
|
|
# "url": data[best_bitrate_audio]['url'],
|
|
# "itag": str(data[best_bitrate_audio]['itag']),
|
|
# "type": data[best_bitrate_audio]['mimeType'],
|
|
# "clen": result[best_bitrate_audio]['clen'],
|
|
# "lmt": result[best_bitrate_audio]['lmt'],
|
|
# "projectionType": result[best_bitrate_audio]['projectionType'],
|
|
# "audioQuality": result[best_bitrate_audio]['audioQuality'],
|
|
# "audioSampleRate": result[best_bitrate_audio]['audioSampleRate'],
|
|
# "qualityLabel": "audio"
|
|
# }
|
|
]
|
|
|
|
# not all itags have width and/or height
|
|
try:
|
|
formatStreams[0]["size"] = str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['height'])
|
|
except:
|
|
pass
|
|
|
|
return result, formatStreams
|
|
|
|
def rebuildFormatsFromYtdlpApi(ydata: dict):
|
|
# Rebuild invidious-compatible formats from yt-dlp's output (ydata)
|
|
|
|
adaptive_formats = []
|
|
format_streams = []
|
|
|
|
for stream in safeTraverse(ydata, ["formats"], default=[]):
|
|
|
|
if safeTraverse(stream, ["protocol"], default="storyboard") not in ("http_dash_segments", "https"):
|
|
continue
|
|
|
|
newRow = {}
|
|
# Add from ...'s ... to ... as ...
|
|
newRow["bitrate"] = str(int(safeTraverse(stream, ["tbr"], default=0) * 1000))
|
|
newRow["url"] = safeTraverse(stream, ["url"])
|
|
newRow["itag"] = safeTraverse(stream, ["format_id"])
|
|
params = ythdd_extractor.paramsFromUrl(newRow["url"])
|
|
|
|
vcodec = safeTraverse(stream, ["vcodec"], default="none")
|
|
acodec = safeTraverse(stream, ["acodec"], default="none")
|
|
if vcodec == "none" and acodec == "none":
|
|
continue
|
|
|
|
if safeTraverse(stream, ["acodec"]) != "none":
|
|
# audio-only track
|
|
type = safeTraverse(stream, ["audio_ext"], default=None)
|
|
fnote = safeTraverse(stream, ["format_note"], default="low")
|
|
if type is None:
|
|
type = "mp4"
|
|
abr = safeTraverse(stream, ["abr"], default="0")
|
|
if abr is None:
|
|
abr = "0"
|
|
newRow[ "type"] = "audio/" + type
|
|
newRow[ "audioQuality"] = fnote
|
|
newRow["audioSampleRate"] = int(safeTraverse(stream, ["asr"], default="44100"))
|
|
newRow[ "audioChannels"] = int(safeTraverse(stream, ["audio_channels"]))
|
|
newRow[ "qualityLabel"] = str(int(abr)) + "k (audio)"
|
|
newRow[ "resolution"] = f"{fnote} quality"
|
|
newRow[ "size"] = "0x0"
|
|
if safeTraverse(stream, ["vcodec"]) != "none":
|
|
# either video-only or video+audio
|
|
type = safeTraverse(stream, ["video_ext"], default=None)
|
|
if type is None:
|
|
type = "mp4"
|
|
height = str(safeTraverse(stream, ["height"], default=0))
|
|
width = str(safeTraverse(stream, [ "width"], default=0))
|
|
newRow[ "type"] = "video/" + type
|
|
newRow[ "resolution"] = (height if height in ("144", "240", "360", "480", "720", "1080") else "360") + "p" # mpv won't play the video inside of Yattee if it's a non-standard resolution (bug?)
|
|
newRow[ "fps"] = safeTraverse(stream, ["fps"], default=30)
|
|
newRow[ "qualityLabel"] = height + "p" + str(int(newRow['fps'])) * (newRow["fps"] > 30) + " (video)" # also a placeholder
|
|
newRow[ "size"] = width + "x" + height
|
|
|
|
newRow[ "clen"] = safeTraverse(params, ["clen"], default=safeTraverse(stream, ["filesize"], default="0"))
|
|
newRow[ "lmt"] = safeTraverse(params, ["lmt"], default="0")
|
|
|
|
if newRow["clen"] is None:
|
|
# for clipious sake which expects a string
|
|
newRow["clen"] = "0"
|
|
|
|
newRow[ "projectionType"] = "RECTANGULAR" # clipious requires this to be mentioned explicitly and cannot be nulled
|
|
newRow[ "container"] = safeTraverse(stream, ["ext"], default="unknown_container")
|
|
newRow[ "encoding"] = safeTraverse(invidious_formats.FORMATS, [int("0" + newRow["itag"].split("-")[0]), "ext"], default="unknown_encoding") # not sure this split is necessary
|
|
newRow[ "quality"] = newRow["qualityLabel"]
|
|
newRow[ "init"] = "0-1" # dummy values
|
|
newRow[ "index"] = "2-3" # dummy values
|
|
|
|
if vcodec != "none" and acodec != "none":
|
|
# 360p stream
|
|
newRow["qualityLabel"] = height + "p" + str(int(newRow['fps'])) * (newRow["fps"] > 30)
|
|
format_streams.append(newRow)
|
|
if vcodec != "none" or acodec != "none":
|
|
adaptive_formats.append(newRow)
|
|
|
|
|
|
# {
|
|
# "url": data[best_bitrate_video]['url'],
|
|
# "itag": str(data[best_bitrate_video]['itag']),
|
|
# "type": data[best_bitrate_video]['mimeType'],
|
|
# "quality": data[best_bitrate_video]['quality'],
|
|
# "bitrate": str(data[best_bitrate_video]['averageBitrate']),
|
|
# "fps": data[best_bitrate_video]['fps'],
|
|
# "size": "", # workaround for clipious, which requires ANYTHING to be passed, or else it will throw and error and won't load the video
|
|
# "resolution": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
# "qualityLabel": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
|
|
# "container": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['ext'],
|
|
# "encoding": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['vcodec']
|
|
# }
|
|
|
|
# {
|
|
# "audioChannels": data[best_bitrate_audio]['audioChannels'],
|
|
# "init": result[best_bitrate_audio]['init'],
|
|
# "index": result[best_bitrate_audio]['index'],
|
|
# "bitrate": str(data[best_bitrate_audio]['averageBitrate']),
|
|
# "url": data[best_bitrate_audio]['url'],
|
|
# "itag": str(data[best_bitrate_audio]['itag']),
|
|
# "type": data[best_bitrate_audio]['mimeType'],
|
|
# "clen": result[best_bitrate_audio]['clen'],
|
|
# "lmt": result[best_bitrate_audio]['lmt'],
|
|
# "projectionType": result[best_bitrate_audio]['projectionType'],
|
|
# "audioQuality": result[best_bitrate_audio]['audioQuality'],
|
|
# "audioSampleRate": result[best_bitrate_audio]['audioSampleRate'],
|
|
# "qualityLabel": "audio"
|
|
# }
|
|
|
|
return adaptive_formats, format_streams
|
|
|
|
def videos(data):
|
|
# an attempt on a faithful rewrite of
|
|
# https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr
|
|
response = {}
|
|
|
|
# hls_url = safeTraverse(idata, ['stage1', 'streamingData', 'hlsManifestUrl'], default="")
|
|
# adaptive_formats = safeTraverse(idata, ['stage1', 'streamingData', 'adaptiveFormats'], default=[])
|
|
# if not hls_url or not adaptive_formats:
|
|
# print(f"serious error: couldn't get hls_url or adaptive_formats!\n"
|
|
# f"dumping idata:\n"
|
|
# f"{idata}")
|
|
# return send(500, {'error': getError(idata)})
|
|
|
|
time_start = time()
|
|
|
|
if ythdd_globals.config['general']['cache'] and data[3] in ythdd_globals.video_cache:
|
|
if ythdd_globals.video_cache[data[3]]['cacheTime'] + 3 * 60 * 60 > time():
|
|
response = ythdd_globals.video_cache[data[3]]
|
|
response['fromCache'] = True
|
|
return send(200, response)
|
|
else:
|
|
del ythdd_globals.video_cache[data[3]]
|
|
|
|
ydata = ythdd_extractor.extract(data[3], manifest_fix=True)
|
|
wdata = ythdd_extractor.WEBextractSinglePage(data[3])
|
|
|
|
#return send(200, {'ydata': ydata, 'wdata': wdata})
|
|
#return send(200, {'idata': idata, 'wdata': wdata})
|
|
|
|
main_results = wdata['ec2']['contents']['twoColumnWatchNextResults']
|
|
primary_results = safeTraverse(main_results, ['results', 'results', 'contents'])
|
|
# video_primary_renderer = safeTraverse(primary_results, [0, 'videoPrimaryInfoRenderer'])
|
|
video_secondary_renderer = safeTraverse(primary_results, [1, 'videoSecondaryInfoRenderer'])
|
|
|
|
video_details = safeTraverse(wdata, ['ec1', 'videoDetails'])
|
|
microformat = safeTraverse(wdata, ['ec1', 'microformat', 'playerMicroformatRenderer'], default={})
|
|
|
|
video_id = safeTraverse(video_details, ['videoId'], default=f"[{data[3]}] (errors occurred, check logs)")
|
|
title = safeTraverse(video_details, ['title'], default=video_id)
|
|
views = int(safeTraverse(video_details, ['viewCount'], default=0))
|
|
length = int(safeTraverse(video_details, ['lengthSeconds'], default=1))
|
|
published = dateToEpoch(safeTraverse(microformat, ['publishDate'], default="1970-01-02T00:00:00Z")) # ISO format to Unix timestamp
|
|
published_date = epochToDate(published)
|
|
premiere_timestamp = safeTraverse(microformat, ['liveBroadcastDetails', 'startTimestamp'], default=None) # let's ignore the nitty gritty for the time being
|
|
premiere_timestamp = premiere_timestamp if premiere_timestamp else safeTraverse(microformat, ['playabilityStatus', 'liveStreamability', 'liveStreamabilityRenderer', 'offlineSlate', 'liveStreamOfflineSlateRenderer', 'scheduledStartTime'], default=None)
|
|
live_now = safeTraverse(microformat, ['liveBroadcastDetails', 'isLiveNow'], default=False)
|
|
post_live_dvr = safeTraverse(video_details, ['isPostLiveDvr'], default=False)
|
|
allowed_regions = safeTraverse(microformat, ['availableCountries'], default=[])
|
|
allow_ratings = safeTraverse(video_details, ['allowRatings'], default=True)
|
|
family_friendly = safeTraverse(microformat, ['isFamilySafe'], default=True)
|
|
is_listed = safeTraverse(video_details, ['isCrawlable'], default=True)
|
|
is_upcoming = safeTraverse(video_details, ['isUpcoming'], default=False)
|
|
keywords = safeTraverse(video_details, ['keywords'], default=[])
|
|
|
|
related_raw = safeTraverse(wdata, ['ec2', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results'], default=[]) # can possibly change in the future
|
|
related = []
|
|
for entry in related_raw[:-1]:
|
|
|
|
related_entry = {}
|
|
match safeTraverse(list(entry.keys()), [0], default=""):
|
|
|
|
case "compactVideoRenderer":
|
|
# legacy renderer, a/b tested and later phased out in summer 2025
|
|
continue
|
|
|
|
case "lockupViewModel":
|
|
y = safeTraverse(entry, ['lockupViewModel'])
|
|
if not isinstance(y, dict):
|
|
continue
|
|
isMix = playlist_type = safeTraverse(entry, ["lockupViewModel", "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "icon", "sources", 0, "clientResource", "imageName"], default="") == "MIX"
|
|
if isMix:
|
|
# mixes aren't currently supported
|
|
continue
|
|
lmvm = safeTraverse(y, ['metadata', 'lockupMetadataViewModel'], default=[])
|
|
related_entry['videoId'] = safeTraverse(y, ['contentId'])
|
|
related_entry['title'] = safeTraverse(lmvm, ['title', 'content'])
|
|
related_entry['videoThumbnails'] = ythdd_struct_parser.genThumbs(related_entry['videoId']) #safeTraverse(y, ['thumbnail', 'thumbnails'])
|
|
related_entry['author'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 0, 'metadataParts', 0, 'text', 'content'])
|
|
related_entry['authorId'] = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'rendererContext', 'commandContext', 'onTap', 'innertubeCommand', 'browseEndpoint', 'browseId'], default="UNKNOWNCHANNELID")
|
|
related_entry['authorUrl'] = '/channel/' + related_entry['authorId']
|
|
related_entry['authorVerified'] = False if safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 0, 'metadataParts', 0, 'text', 'attachmentRuns']) is None else True # seens to do the job
|
|
author_avatar_url = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'avatar', 'avatarViewModel', 'image', 'sources', 0, 'url'], default=DEFAULT_AVATAR)
|
|
related_entry['authorThumbnails'] = ythdd_extractor.generateChannelAvatarsFromUrl(author_avatar_url)
|
|
related_entry['lengthSeconds'] = ythdd_struct_parser.parseLengthFromTimeBadge(safeTraverse(y, ['contentImage', 'thumbnailViewModel', 'overlays', 0, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', 0, 'thumbnailBadgeViewModel', 'text'], default="0:0"))
|
|
related_entry['viewCountText'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 1, 'metadataParts', 0, 'text', 'content'], default="0").split(" ")[0]
|
|
related_entry['viewCount'] = ythdd_struct_parser.parseViewsFromViewText(related_entry['viewCountText'])
|
|
|
|
case _:
|
|
# unsupported model: print info into stdout
|
|
print("received an entry of unknown type during parsing of related videos:")
|
|
print(entry)
|
|
print("")
|
|
continue
|
|
|
|
related.append(related_entry)
|
|
|
|
likes = safeTraverse(ydata, ['like_count'], default=0)
|
|
description = safeTraverse(microformat, ['description', 'simpleText'], default="\n(ythdd: failed to retrieve description, perhaps it's empty?)")
|
|
short_description = safeTraverse(wdata, ['ec1', 'videoDetails', 'shortDescription'], default="(ythdd: failed to retrieve short description, perhaps it's empty?)")
|
|
description_html = html.escape(description).replace("\r\n", "<br>").replace("\n", "<br>") # still TODO: https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr#L329
|
|
|
|
genre = safeTraverse(microformat, ['category'])
|
|
# TODO: genre blah blah blah...
|
|
author = safeTraverse(video_details, ['author'], default="Unknown Author")
|
|
ucid = safeTraverse(video_details, ['channelId'], default="UNKNOWNCHANNELID")
|
|
subs = ydata['channel_follower_count']
|
|
author_thumbnail = ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(video_secondary_renderer, ['owner', 'videoOwnerRenderer', 'thumbnail', 'thumbnails', 0, 'url'], default=DEFAULT_AVATAR))
|
|
|
|
# so far it seems to be impossible to tell if a channel is verified or not,
|
|
# that is - without making another request
|
|
author_verified = ythdd_extractor.isVerified(safeTraverse(video_secondary_renderer, ['owner', 'videoOwnerRenderer', 'badges', 0], default=[]))
|
|
|
|
format_streams = []
|
|
# adaptive_formats, format_streams = rebuildFormats(adaptive_formats)
|
|
adaptive_formats, format_streams = rebuildFormatsFromYtdlpApi(ydata)
|
|
|
|
if live_now:
|
|
video_type = "livestream"
|
|
elif premiere_timestamp:
|
|
video_type = "scheduled"
|
|
published = dateToEpoch(premiere_timestamp) if premiere_timestamp else int(time())
|
|
else:
|
|
video_type = "video"
|
|
|
|
premium = False
|
|
if "YouTube Red" in keywords:
|
|
premium = True
|
|
# TODO: detect paywalled patron-only videos
|
|
|
|
# because we fetched the video's wdata, we might as
|
|
# well save it inside of general cache so that
|
|
# requests for the video's comments don't have to
|
|
# spawn an additional request for initial ctoken
|
|
ensure_comment_continuation(video_id, wdata)
|
|
|
|
time_end = time()
|
|
|
|
response = {
|
|
"type": video_type,
|
|
"title": title,
|
|
"videoId": video_id,
|
|
"videoThumbnails": ythdd_struct_parser.genThumbs(video_id),
|
|
"storyboards": [], # not implemented
|
|
"description": description, # due to change (include ythdd metadata)
|
|
"descriptionHtml": description_html,
|
|
"published": published,
|
|
"publishedText": published_date,
|
|
"keywords": keywords,
|
|
"viewCount": views,
|
|
"viewCountText": str(views), # not implemented
|
|
"likeCount": likes,
|
|
"dislikeCount": 0,
|
|
"paid": False, # not implemented
|
|
"premium": premium,
|
|
"isFamilyFriendly": family_friendly,
|
|
"allowedRegions": allowed_regions,
|
|
"genre": genre,
|
|
"genreUrl": "/genreUrl/not/implemented/", # not implemented
|
|
"author": author,
|
|
"authorId": ucid,
|
|
"authorUrl": "/channel/" + ucid,
|
|
"authorVerified": author_verified,
|
|
"authorThumbnails": author_thumbnail,
|
|
"subCountText": str(subs),
|
|
"lengthSeconds": length,
|
|
"allowRatings": allow_ratings,
|
|
"rating": 0,
|
|
"isListed": is_listed,
|
|
"liveNow": live_now,
|
|
"isPostLiveDvr": post_live_dvr,
|
|
"isUpcoming": is_upcoming,
|
|
"dashUrl": ythdd_globals.config['general']['public_facing_url'] + "api/invidious/api/v1/manifest/" + video_id, # not implemented
|
|
"premiereTimestamp": premiere_timestamp,
|
|
#"hlsUrl": hls_url, # broken after a change in iOS player, only usable for livestreams
|
|
"adaptiveFormats": adaptive_formats, # same as hlsUrl
|
|
"formatStreams": format_streams,
|
|
"captions": [], # not implemented
|
|
# "captions": [
|
|
# {
|
|
# "label": String,
|
|
# "language_code": String,
|
|
# "url": String
|
|
# }
|
|
# ],
|
|
# "musicTracks": [
|
|
# {
|
|
# "song": String,
|
|
# "artist": String,
|
|
# "album": String,
|
|
# "license": String
|
|
# }
|
|
# ],
|
|
"recommendedVideos": related,
|
|
"took": time_end - time_start
|
|
}
|
|
|
|
if ythdd_globals.config['general']['debug']:
|
|
response["ydata"] = ydata
|
|
response["wdata"] = wdata
|
|
|
|
if ythdd_globals.config['general']['cache']:
|
|
ythdd_globals.video_cache[data[3]] = response
|
|
ythdd_globals.video_cache[data[3]]['cacheTime'] = time()
|
|
|
|
# for debugging:
|
|
#return send(200, ythdd_extractor.WEBextractSinglePage(data[3]))
|
|
#return send(200, ythdd_extractor.IOSextract(data[3]))
|
|
#return send(200, {'idata': idata, 'wdata': wdata})
|
|
|
|
# if youtube returns not the videoId we aksed
|
|
# then it means that the instance is ratelimited
|
|
status_code = 200 if data[3] == response['videoId'] else 403
|
|
|
|
return send(status_code, response)
|
|
|
|
def search(data, req):
|
|
search_query = req.args.get('q')
|
|
|
|
# ignore paginated requests as we do nothing with the continuation token
|
|
page = req.args.get('page')
|
|
if page is not None and page != '1':
|
|
return send(404, [])
|
|
|
|
if (data[-2].lower() != "search" or data[-1].lower() != "") and data[-1].lower() != "search":
|
|
previous_query = req.args.get('pq')
|
|
suggestions = ythdd_extractor.WEBgetSearchSuggestions(search_query, previous_query)
|
|
return send(200, suggestions)
|
|
|
|
results = ythdd_extractor.WEBextractSearchResults(search_query)
|
|
results_list = []
|
|
|
|
for entry in results:
|
|
parsed_entry = ythdd_struct_parser.parseRenderers(entry)
|
|
if parsed_entry is not None:
|
|
results_list.append(parsed_entry)
|
|
|
|
return send(200, results_list)
|
|
|
|
def get_channel_tab(requested_tab, ucid, req, only_json: bool = False):
|
|
|
|
# check for page/cont
|
|
ctoken = req.args.get('continuation')
|
|
|
|
# perhaps continuation tokens should be checked here (whether they are inside of general_cache)
|
|
# this way, malicious requests containing bogus ctokens can't be sent to potentially ban/ratelimit the instance (?)
|
|
# if ctoken is not None and ctoken not in ythdd_globals.general_cache...
|
|
|
|
# unique req fingerprint allows for this exact query to be cached in memory.
|
|
# md5 sum serves as a "unique" deterministic value which can be checked for cache hit/miss
|
|
unique_request_fingerprint = md5(f"{ucid}_{requested_tab}_{ctoken}".encode('utf-8')).hexdigest()
|
|
|
|
# if we haven't discovered parameters required for browsing a specific tab,
|
|
# then load them now
|
|
if ucid not in ythdd_globals.general_cache["continuations"]["channels"]:
|
|
channels(["", "", "", ucid], req, True)
|
|
|
|
# check if request has been cached within the last hour
|
|
if ythdd_globals.config['general']['cache'] and unique_request_fingerprint in ythdd_globals.general_cache["channels"]:
|
|
if ythdd_globals.general_cache["channels"][unique_request_fingerprint]['cacheTime'] + 1 * 60 * 60 > time():
|
|
response = ythdd_globals.general_cache["channels"][unique_request_fingerprint]
|
|
if only_json:
|
|
return response
|
|
else:
|
|
return send(200, response)
|
|
else:
|
|
del ythdd_globals.general_cache["channels"][unique_request_fingerprint]
|
|
|
|
# load relevant data from global (general) cache
|
|
param = safeTraverse(ythdd_globals.general_cache["continuations"]["channels"][ucid], ["tabs", requested_tab, "param"], default=None)
|
|
name = safeTraverse(ythdd_globals.general_cache["continuations"]["channels"][ucid], ["name"], default="")
|
|
avatar = safeTraverse(ythdd_globals.general_cache["continuations"]["channels"][ucid], ["avatar"], default=DEFAULT_AVATAR)
|
|
verified = safeTraverse(ythdd_globals.general_cache["continuations"]["channels"][ucid], ["verified"], default=False)
|
|
|
|
# if provided, ctoken will be used for browsing as well
|
|
wdata = ythdd_extractor.browseChannel(ucid, params=param, ctoken=ctoken)
|
|
|
|
# sanity check (whether we got what we requested)
|
|
received_tab = safeTraverse(wdata, ["responseContext", "serviceTrackingParams", 0, "params", 0, "value"])
|
|
if received_tab != f"channel.{requested_tab}":
|
|
# if that's not the case, either something changed in the innertube API,
|
|
# or content that was asked for isn't available
|
|
print(f"INFO: couldn't verify server returned channel data we asked for. "
|
|
f"Requested channel.{requested_tab}, got {received_tab}. Most likely we sent a request to Innertube which got rejected.")
|
|
|
|
# load requested tab
|
|
result = {}
|
|
if ctoken is None:
|
|
tabs = safeTraverse(wdata, ["contents", "twoColumnBrowseResultsRenderer", "tabs"], default=[])
|
|
for tab in tabs:
|
|
tab_name = safeTraverse(tab, ["tabRenderer", "title"], default="").lower()
|
|
if tab_name and tab_name == requested_tab:
|
|
result = safeTraverse(tab, ["tabRenderer", "content"], default=[])
|
|
break
|
|
|
|
items = []
|
|
inner_contents = []
|
|
new_continuation = ""
|
|
response = {}
|
|
match requested_tab:
|
|
case "videos" | "shorts":
|
|
# videos/shorts have actually the same response schema,
|
|
# only the renderers differ - but they are taken care of in ythdd_struct_parser.parseRenderers()
|
|
|
|
if ctoken is None:
|
|
inner_contents = safeTraverse(result, ["richGridRenderer", "contents"], default=[[]])
|
|
else:
|
|
inner_contents = safeTraverse(wdata, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"], default=[[]])
|
|
|
|
for entry in inner_contents:
|
|
# videos from videos tab have no owner info (?) or it's in another place. if it is somewhere, this expression can be made simpler by traversing something else in struct parser.
|
|
item = safeTraverse(entry, ["richItemRenderer", "content"])
|
|
if item is not None:
|
|
items.append(ythdd_struct_parser.parseRenderers(item, {"author_name": name, "author_ucid": ucid, "avatar": avatar}))
|
|
|
|
new_continuation = safeTraverse(inner_contents[-1], ["continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
|
|
|
|
response = {
|
|
"videos": items,
|
|
"continuation": new_continuation
|
|
}
|
|
|
|
# cache response
|
|
if ythdd_globals.config['general']['cache']:
|
|
ythdd_globals.general_cache["channels"][unique_request_fingerprint] = response
|
|
ythdd_globals.general_cache["channels"][unique_request_fingerprint]['cacheTime'] = time()
|
|
|
|
# todo: save continuation(?)
|
|
# or... is there a usecase for saving it?
|
|
|
|
case "playlists":
|
|
|
|
if ctoken is None:
|
|
inner_contents = safeTraverse(result, ["sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "gridRenderer", "items"], default=[[]])
|
|
else:
|
|
inner_contents = safeTraverse(wdata, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"], default=[[]])
|
|
|
|
for entry in inner_contents:
|
|
item = ythdd_struct_parser.parseRenderers(entry, {"author_name": name, "author_ucid": ucid, "avatar": avatar})
|
|
items.append(item)
|
|
|
|
new_continuation = safeTraverse(inner_contents[-1], ["continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
|
|
|
|
response = {
|
|
"playlists": items,
|
|
"continuation": new_continuation
|
|
}
|
|
|
|
# cache response
|
|
if ythdd_globals.config['general']['cache']:
|
|
ythdd_globals.general_cache["channels"][unique_request_fingerprint] = response
|
|
ythdd_globals.general_cache["channels"][unique_request_fingerprint]['cacheTime'] = time()
|
|
|
|
case _:
|
|
# for all other renderers, which aren't currently supported
|
|
response = {
|
|
# "wdata": wdata
|
|
}
|
|
|
|
if ythdd_globals.config["general"]["debug"]:
|
|
response["wdata"] = wdata
|
|
|
|
if only_json:
|
|
return response
|
|
|
|
return send(200, response)
|
|
|
|
def get_comments(data, req, only_json: bool = False):
|
|
|
|
# get comment continuation
|
|
ctoken = req.args.get('continuation')
|
|
|
|
# perform some basic video id validation
|
|
if len(data) < 4 or len(data) >= 4 and len(data[3]) != 11:
|
|
return send(400, {"error": "Bad request: invalid videoId."})
|
|
|
|
video_id = data[3]
|
|
|
|
# if ctoken isn't provided, get it from the general cache
|
|
if ctoken is None or ctoken == '':
|
|
# but first ensure it's there
|
|
ensure_comment_continuation(video_id)
|
|
ctoken = ythdd_globals.general_cache["continuations"]["comments"][video_id][0]
|
|
|
|
# get joined video comment models
|
|
wdata, new_continuation = ythdd_extractor.WEBgetVideoComments(ctoken)
|
|
|
|
comments = []
|
|
for comment in wdata:
|
|
# parse the comment
|
|
parsed_comment = ythdd_struct_parser.customCommentRendererParser(comment)
|
|
if parsed_comment is not None:
|
|
comments.append(parsed_comment)
|
|
|
|
response = {
|
|
"videoId": video_id,
|
|
"comments": comments,
|
|
"continuation": new_continuation
|
|
}
|
|
|
|
if only_json:
|
|
return response
|
|
|
|
return send(200, response)
|
|
|
|
|
|
def ensure_comment_continuation(video_id: str, wdata = None):
|
|
|
|
# save continutation token for comments in global comment cache
|
|
if not video_id in ythdd_globals.general_cache["continuations"]["comments"]:
|
|
ythdd_globals.general_cache["continuations"]["comments"][video_id] = []
|
|
|
|
if wdata is None:
|
|
# perhaps saving related videos to cache might be not a bad idea?
|
|
wdata = ythdd_extractor.WEBextractSinglePage(video_id)
|
|
|
|
# search for "top comments" continuation token
|
|
comment_continuation = safeTraverse(wdata, ["ec2", "engagementPanels", 0, "engagementPanelSectionListRenderer", "header", "engagementPanelTitleHeaderRenderer", "menu", "sortFilterSubMenuRenderer", "subMenuItems", 0, "serviceEndpoint", "continuationCommand", "token"], default=None)
|
|
if comment_continuation is not None:
|
|
ythdd_globals.general_cache["continuations"]["comments"][video_id].append(comment_continuation)
|
|
else:
|
|
print(f"error: couldn't extract comment continuation token from video page ({video_id}). this video likely has comments disabled.")
|
|
ythdd_globals.general_cache["continuations"]["comments"][video_id].append("")
|
|
|
|
def channels(data, req, only_json: bool = False):
|
|
|
|
# prevent potential out of bound read
|
|
if len(data) < 4:
|
|
return send(400, {"error": "No channel specified."})
|
|
|
|
# silly sanity check
|
|
if len(data[3]) != 24 or not data[3].startswith("UC"):
|
|
return send(404, {"error": "This channel does not exist."})
|
|
|
|
if len(data) > 4:
|
|
match data[4]:
|
|
case "videos" | "shorts" | "playlists" | "podcasts":
|
|
return get_channel_tab(data[4], data[3], req)
|
|
case _:
|
|
return send(400, {"error": f"Bad request, unrecognized/unsupported tab \"{data[4]}\"."})
|
|
|
|
wdata = ythdd_extractor.browseChannel(data[3])
|
|
channel_meta = safeTraverse(wdata, ["metadata", "channelMetadataRenderer"])
|
|
banners = safeTraverse(wdata, ["header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "banner", "imageBannerViewModel", "image", "sources"], default=[])
|
|
avatar = safeTraverse(wdata, ["header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "image", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
|
|
subscribers = ythdd_struct_parser.parseViewsFromViewText(safeTraverse(wdata, ["header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "metadata", "contentMetadataViewModel", "metadataRows", 1, "metadataParts", 0, "text", "content"], default="0"))
|
|
verified = False # to be replaced later with ythdd_extractor.isVerified(...)
|
|
|
|
author_name = safeTraverse(channel_meta, ["title"], default="Unknown Channel")
|
|
author_ucid = safeTraverse(channel_meta, ["externalId"], default=data[3]) # prevent recursion with fallback to provided ucid
|
|
|
|
ythdd_globals.general_cache["continuations"]["channels"][author_ucid] = {
|
|
"avatar": avatar,
|
|
"name": author_name,
|
|
"tabs": {},
|
|
"verified": verified
|
|
}
|
|
tabs = safeTraverse(wdata, ["contents", "twoColumnBrowseResultsRenderer", "tabs"], default=[])
|
|
tab_names = []
|
|
for tab in tabs:
|
|
# collect tab names
|
|
tab_name = safeTraverse(tab, ["tabRenderer", "title"], default="").lower()
|
|
if tab_name:
|
|
tab_names.append(tab_name)
|
|
# and their params (used to retrieve data about them)
|
|
ythdd_globals.general_cache["continuations"]["channels"][author_ucid]["tabs"][tab_name] = dict()
|
|
ythdd_globals.general_cache["continuations"]["channels"][author_ucid]["tabs"][tab_name]["param"] = safeTraverse(tab, ["tabRenderer", "endpoint", "browseEndpoint", "params"], default=None)
|
|
|
|
latest_videos = get_channel_tab("videos", data[3], req, only_json=True)
|
|
|
|
for banner in banners:
|
|
banner["url"] = ythdd_globals.translateLinks(banner["url"])
|
|
avatars = ythdd_extractor.generateChannelAvatarsFromUrl(avatar)
|
|
|
|
response = {
|
|
"author": author_name,
|
|
"authorId": author_ucid,
|
|
"authorUrl": "https://www.youtube.com/channel/" + author_ucid,
|
|
"authorBanners": banners,
|
|
"authorThumbnails": avatars,
|
|
"subCount": subscribers,
|
|
"totalViews": 0,
|
|
"joined": 0,
|
|
"autoGenerated": False, # todo: actually check this
|
|
"ageGated": False,
|
|
"isFamilyFriendly": safeTraverse(channel_meta, ["isFamilySafe"], default=False),
|
|
"description": safeTraverse(channel_meta, ["description"], default="ythdd: no channel description"),
|
|
"descriptionHtml": html.escape(safeTraverse(channel_meta, ["description"], default="ythdd: no channel description (html)")).replace("\r\n", "<br>").replace("\n", "<br>"),
|
|
"allowedRegions": safeTraverse(channel_meta, ["availableCountryCodes"], default=[]),
|
|
"tabs": tab_names,
|
|
"tags": [safeTraverse(channel_meta, ["keywords"], default="")],
|
|
"authorVerified": verified,
|
|
"latestVideos": latest_videos["videos"], # using struct parser
|
|
"relatedChannels": []
|
|
}
|
|
|
|
if ythdd_globals.config["general"]["debug"]:
|
|
response["wdata"] = wdata
|
|
|
|
if only_json:
|
|
return response
|
|
|
|
return send(200, response)
|
|
|
|
def lookup(data, req):
|
|
# possibly TODO: rewrite this mess
|
|
if len(data) > 2:
|
|
if (data[0], data[1]) == ("api", "v1"):
|
|
match data[2]:
|
|
case 'stats' | '': # /api/invidious/api/v1/stats and /api/invidious/api/v1/
|
|
return stats()
|
|
case 'trending':
|
|
return trending()
|
|
case 'popular':
|
|
return popular()
|
|
case 'videos':
|
|
return videos(data)
|
|
case 'auth':
|
|
return auth(data)
|
|
case 'search':
|
|
return search(data, req)
|
|
case 'channels':
|
|
return channels(data, req)
|
|
case 'comments':
|
|
return get_comments(data, req)
|
|
case _:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
elif data[0] in ('ggpht', 'vi'):
|
|
# for some reason the Materialous client
|
|
# and FreeTube keep making requests to these
|
|
if data[1] in ('ggpht', 'vi'):
|
|
return redirect('/' + "/".join(data[1:]))
|
|
return redirect('/' + "/".join(data[0:]))
|
|
else:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
elif len(data) == 2:
|
|
if (data[0], data[1]) == ("api", "v1"): # /api/invidious/api/v1
|
|
return stats()
|
|
elif data[0] == "streams":
|
|
return streams()
|
|
elif data[0] in ('ggpht', 'vi'):
|
|
return redirect('/' + "/".join(data[0:]))
|
|
else:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
|
|
elif len(data) == 1:
|
|
return stats() # /api/invidious/something |