#!/usr/bin/python3
# ythdd Invidious Translation Layer
# -----
# Translates requests sent through Invidious API at /api/invidious/
# to use internal extractors.
from flask import Response, request, redirect
from markupsafe import escape
from time import strftime, gmtime, time
from ythdd_globals import safeTraverse
from hashlib import md5
import json, datetime
import dateparser
import html
import invidious_formats
import ythdd_globals
import ythdd_api_v1
import ythdd_extractor
import ythdd_struct_parser
# TODOs:
# ----------
# DONE:
# [✓] /api/v1/stats (stats())
# [✓] /streams/dQw4w9WgXcQ (does nothing)
# [✓] /vi/:videoIdXXXX/maxresdefault.jpg
# [✓] /api/v1/search?q=... (videos and playlists)
# [✓] /api/v1/search/suggestions?q=...&pq=...
# [✓] /api/v1/channel/:ucid
# [✓] /api/v1/channel/:ucid/videos, shorts, playlists, streams
# [✓] /api/v1/comments/:videoid?continuation=...
# [✓] /api/v1/videos/:videoIdXXXX
# [✓] /api/v1/playlists/:plid
# ----------
# PLANNED:
# [X] /api/v1/channel/{videos, shorts, playlists, streams, latest?}/:ucid (rewrite)
# [X] /api/v1/:videoIdXXXX/maxres.jpg redirects to best quality thumbnail
# [X] /api/v1/storyboards/:videoIdXXXX
# [X] /api/v1/videos/:videoIdXXXX does not depend on yt-dlp and offloads stream retrieval elsewhere (making initial response fast)
# [X] /api/v1/manifest/:videoIdXXXX (above is prerequisite)
# [X] rewrite the awful lookup logic
# ----------
# IDEAS:
# [*] /api/v1/popular returns last requested videos by the IP (serving as multi-device history?)
# [*] /api/v1/trending returns recently archived videos
# ----------
# NOT PLANNED/MAYBE IN THE FUTURE:
# [ ] /api/v1/auth/subscriptions (stub? db?)
# [ ] /api/v1/auth/feed?page=1 (stub? db?)
# [ ] /api/v1/auth/playlists (stub? db?)
DEFAULT_AVATAR = "https://yt3.ggpht.com/a/default-user=s176-c-k-c0x00ffffff-no-rj"
DEFAULT_VIDEO = "https://i.ytimg.com/img/no_thumbnail.jpg" # todo: replace this with a custom, local asset
def incrementBadRequests():
ythdd_globals.apiFailedRequests += 1
def greeting():
return 200, 'hello from Invidious TL!\nstats endpoint at /api/invidious/stats'
def send(status, response):
return Response(json.dumps(response), mimetype='application/json', status=status)
def notImplemented(data):
return send(501, {'error': f"not recognised/implemented in Invidious TL: {'/'.join(data)}"})
def stats():
data_to_send = {
"version": "2.0",
"software":
{
"name": "invidious",
"version": f"invidious TL, ythdd ({ythdd_globals.version})",
"branch": "https://gitea.7o7.cx/sherl/ythdd",
"tl_msg": "/api/invidious/api/v1/"
}
}
return send(200, data_to_send)
def videoIdSanityCheck(videoId: str):
if len(videoId) != 11:
incrementBadRequests()
return send(400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.')
# elif...?
def auth(data):
# can be either subscriptions, feed or playlists
match data[1]:
# NOT YET IMPLEMENTED
# TODO: make it use the internal db
case "subscriptions" | "feed" | "playlists":
return send(200, [])
case _:
incrementBadRequests()
return send(404, [])
def streams():
return send(200, '')
def epochToDate(epoch):
return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(epoch))
def dateToEpoch(date: str):
return datetime.datetime.fromisoformat(date).timestamp()
def trending():
return send(200, [{}])
def popular():
return send(200, [{}])
def getError(wdata: dict):
unknown_error = {"status": "Unknown error", "reason": "This is a generic ythdd error."}
error = ""
try:
playabilityStatus = safeTraverse(wdata, ['ec1', 'playabilityStatus'], default=unknown_error)
# check for error
if safeTraverse(playabilityStatus, ["status"], default="error") == "OK":
return None # no error? return None
error += f"({playabilityStatus['status']}) {playabilityStatus['reason']}"
except:
error += f"Generic error"
try:
errorScreen = safeTraverse(wdata, ['ec1', 'playabilityStatus', 'errorScreen', 'playerErrorMessageRenderer', 'subreason', 'runs'], default=[])
error += " - "
for x in errorScreen:
error += f"{x['text']} "
except:
pass
return error
def rebuildFormats(data):
result = [{} for x in data]
formatStreams = []
best_bitrate_video = 0
best_bitrate_audio = -1
for x in range(len(data)):
try:
result[x]['audioChannels'] = data[x]['audioChannels']
isVideo = 0
except:
isVideo = 1
if not "initRange" in data[x]: # for livestreams?
continue
result[x]['init'] = str(data[x]['initRange']['start']) + "-" + str(data[x]['initRange']['end'])
result[x]['index'] = str(data[x]['indexRange']['start']) + "-" + str(data[x]['indexRange']['end'])
result[x]['bitrate'] = str(data[x]['averageBitrate'])
result[x]['url'] = data[x]['url']
result[x]['itag'] = str(data[x]['itag'])
result[x]['type'] = data[x]['mimeType']
result[x]['clen'] = data[x]['contentLength']
result[x]['lmt'] = data[x]['lastModified']
result[x]['projectionType'] = data[x]['projectionType']
try:
result[x]['colorInfo'] = data[x]['colorInfo']
except:
pass
if "audio" == data[x]['mimeType'][:5]:
isAudio = 1
else:
isAudio = 0
if isVideo:
result[x]['fps'] = str(data[x]['fps'])
else:
result[x]['audioQuality'] = data[x]['audioQuality']
result[x]['audioSampleRate'] = data[x]['audioSampleRate']
if data[x]['itag'] in invidious_formats.FORMATS.keys():
result[x]['container'] = invidious_formats.FORMATS[data[x]['itag']]['ext']
try:
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['vcodec']
except:
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['acodec']
if isVideo:
try:
result[x]['resolution'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p"
result[x]['qualityLabel'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p" + str(result[x]['fps']) * (data[x]['fps'] > 30) # NOT IMPLEMENTED, that's just a placeholder
result[x]['size'] = str(invidious_formats.FORMATS[data[x]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[x]['itag']]['height'])
except:
pass
# we assume here that a stream with the highest bitrate must be a video stream- that may not be the case
if data[x]['averageBitrate'] > data[best_bitrate_video]['averageBitrate'] and isVideo:
best_bitrate_video = x
if data[x]['averageBitrate'] > data[best_bitrate_audio]['averageBitrate'] and isAudio:
best_bitrate_audio = x
# makes FreeTube work, unfortunately it's a video-only stream
formatStreams = [
{
"url": data[best_bitrate_video]['url'],
"itag": str(data[best_bitrate_video]['itag']),
"type": data[best_bitrate_video]['mimeType'],
"quality": data[best_bitrate_video]['quality'],
"bitrate": str(data[best_bitrate_video]['averageBitrate']),
"fps": data[best_bitrate_video]['fps'],
"size": "", # workaround for clipious, which requires ANYTHING to be passed, or else it will throw and error and won't load the video
"resolution": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
"qualityLabel": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
"container": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['ext'],
"encoding": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['vcodec']
},
# {
# "audioChannels": data[best_bitrate_audio]['audioChannels'],
# "init": result[best_bitrate_audio]['init'],
# "index": result[best_bitrate_audio]['index'],
# "bitrate": str(data[best_bitrate_audio]['averageBitrate']),
# "url": data[best_bitrate_audio]['url'],
# "itag": str(data[best_bitrate_audio]['itag']),
# "type": data[best_bitrate_audio]['mimeType'],
# "clen": result[best_bitrate_audio]['clen'],
# "lmt": result[best_bitrate_audio]['lmt'],
# "projectionType": result[best_bitrate_audio]['projectionType'],
# "audioQuality": result[best_bitrate_audio]['audioQuality'],
# "audioSampleRate": result[best_bitrate_audio]['audioSampleRate'],
# "qualityLabel": "audio"
# }
]
# not all itags have width and/or height
try:
formatStreams[0]["size"] = str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[best_bitrate]['itag']]['height'])
except:
pass
return result, formatStreams
def rebuildFormatsFromYtdlpApi(ydata: dict):
# Rebuild invidious-compatible formats from yt-dlp's output (ydata)
adaptive_formats = []
format_streams = []
for stream in safeTraverse(ydata, ["formats"], default=[]):
if safeTraverse(stream, ["protocol"], default="storyboard") not in ("http_dash_segments", "https"):
continue
newRow = {}
# Add from ...'s ... to ... as ...
newRow["bitrate"] = str(int(safeTraverse(stream, ["tbr"], default=0) * 1000))
newRow["url"] = safeTraverse(stream, ["url"])
newRow["itag"] = safeTraverse(stream, ["format_id"])
params = ythdd_extractor.paramsFromUrl(newRow["url"])
vcodec = safeTraverse(stream, ["vcodec"], default="none")
acodec = safeTraverse(stream, ["acodec"], default="none")
if vcodec == "none" and acodec == "none":
continue
if safeTraverse(stream, ["acodec"]) != "none":
# audio-only track
type = safeTraverse(stream, ["audio_ext"], default=None)
fnote = safeTraverse(stream, ["format_note"], default="low")
if type is None:
type = "mp4"
abr = safeTraverse(stream, ["abr"], default="0")
if abr is None:
abr = "0"
newRow[ "type"] = "audio/" + type
newRow[ "audioQuality"] = fnote
newRow["audioSampleRate"] = int(safeTraverse(stream, ["asr"], default="44100"))
newRow[ "audioChannels"] = int(safeTraverse(stream, ["audio_channels"]))
newRow[ "qualityLabel"] = str(int(abr)) + "k (audio)"
newRow[ "resolution"] = f"{fnote} quality"
newRow[ "size"] = "0x0"
if safeTraverse(stream, ["vcodec"]) != "none":
# either video-only or video+audio
type = safeTraverse(stream, ["video_ext"], default=None)
if type is None:
type = "mp4"
height = str(safeTraverse(stream, ["height"], default=0))
width = str(safeTraverse(stream, [ "width"], default=0))
newRow[ "type"] = "video/" + type
newRow[ "resolution"] = (height if height in ("144", "240", "360", "480", "720", "1080") else "360") + "p" # mpv won't play the video inside of Yattee if it's a non-standard resolution (bug?)
newRow[ "fps"] = safeTraverse(stream, ["fps"], default=30)
newRow[ "qualityLabel"] = height + "p" + str(int(newRow['fps'])) * (newRow["fps"] > 30) + " (video)" # also a placeholder
newRow[ "size"] = width + "x" + height
newRow[ "clen"] = safeTraverse(params, ["clen"], default=safeTraverse(stream, ["filesize"], default="0"))
newRow[ "lmt"] = safeTraverse(params, ["lmt"], default="0")
if newRow["clen"] is None:
# for clipious sake which expects a string
newRow["clen"] = "0"
newRow[ "projectionType"] = "RECTANGULAR" # clipious requires this to be mentioned explicitly and cannot be nulled
newRow[ "container"] = safeTraverse(stream, ["ext"], default="unknown_container")
newRow[ "encoding"] = safeTraverse(invidious_formats.FORMATS, [int("0" + newRow["itag"].split("-")[0]), "ext"], default="unknown_encoding") # not sure this split is necessary
newRow[ "quality"] = newRow["qualityLabel"]
newRow[ "init"] = "0-1" # dummy values
newRow[ "index"] = "2-3" # dummy values
if vcodec != "none" and acodec != "none":
# 360p stream
newRow["qualityLabel"] = height + "p" + str(int(newRow['fps'])) * (newRow["fps"] > 30)
format_streams.append(newRow)
if vcodec != "none" or acodec != "none":
adaptive_formats.append(newRow)
# {
# "url": data[best_bitrate_video]['url'],
# "itag": str(data[best_bitrate_video]['itag']),
# "type": data[best_bitrate_video]['mimeType'],
# "quality": data[best_bitrate_video]['quality'],
# "bitrate": str(data[best_bitrate_video]['averageBitrate']),
# "fps": data[best_bitrate_video]['fps'],
# "size": "", # workaround for clipious, which requires ANYTHING to be passed, or else it will throw and error and won't load the video
# "resolution": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
# "qualityLabel": str(invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['height']) + "p",
# "container": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['ext'],
# "encoding": invidious_formats.FORMATS[data[best_bitrate_video]['itag']]['vcodec']
# }
# {
# "audioChannels": data[best_bitrate_audio]['audioChannels'],
# "init": result[best_bitrate_audio]['init'],
# "index": result[best_bitrate_audio]['index'],
# "bitrate": str(data[best_bitrate_audio]['averageBitrate']),
# "url": data[best_bitrate_audio]['url'],
# "itag": str(data[best_bitrate_audio]['itag']),
# "type": data[best_bitrate_audio]['mimeType'],
# "clen": result[best_bitrate_audio]['clen'],
# "lmt": result[best_bitrate_audio]['lmt'],
# "projectionType": result[best_bitrate_audio]['projectionType'],
# "audioQuality": result[best_bitrate_audio]['audioQuality'],
# "audioSampleRate": result[best_bitrate_audio]['audioSampleRate'],
# "qualityLabel": "audio"
# }
return adaptive_formats, format_streams
def videos(data):
# an attempt on a faithful rewrite of
# https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr
response = {}
# hls_url = safeTraverse(idata, ['stage1', 'streamingData', 'hlsManifestUrl'], default="")
# adaptive_formats = safeTraverse(idata, ['stage1', 'streamingData', 'adaptiveFormats'], default=[])
# if not hls_url or not adaptive_formats:
# print(f"serious error: couldn't get hls_url or adaptive_formats!\n"
# f"dumping idata:\n"
# f"{idata}")
# return send(500, {'error': getError(idata)})
time_start = time()
if len(data) < 4 or len(data[3]) != 11:
return send(400, {"error": "No valid video id."})
if ythdd_globals.config['general']['cache'] and data[3] in ythdd_globals.video_cache:
if ythdd_globals.video_cache[data[3]]['cacheTime'] + 3 * 60 * 60 > time():
response = ythdd_globals.video_cache[data[3]]
response['fromCache'] = True
return send(200, response)
else:
del ythdd_globals.video_cache[data[3]]
wdata = ythdd_extractor.WEBextractSinglePage(data[3])
error = getError(wdata)
if error is not None:
return send(500, {"status": "error", "error": error})
ydata = ythdd_extractor.extract(data[3], manifest_fix=True)
#return send(200, {'ydata': ydata, 'wdata': wdata})
#return send(200, {'idata': idata, 'wdata': wdata})
main_results = wdata['ec2']['contents']['twoColumnWatchNextResults']
primary_results = safeTraverse(main_results, ['results', 'results', 'contents'])
# video_primary_renderer = safeTraverse(primary_results, [..., 'videoPrimaryInfoRenderer'])
video_secondary_renderer = safeTraverse(primary_results, [..., 'videoSecondaryInfoRenderer'])
video_details = safeTraverse(wdata, ['ec1', 'videoDetails'])
microformat = safeTraverse(wdata, ['ec1', 'microformat', 'playerMicroformatRenderer'], default={})
video_id = safeTraverse(video_details, ['videoId'], default=f"[{data[3]}] (errors occurred, check logs)")
title = safeTraverse(video_details, ['title'], default=video_id)
views = int(safeTraverse(video_details, ['viewCount'], default=0))
length = int(safeTraverse(video_details, ['lengthSeconds'], default=1))
published = dateToEpoch(safeTraverse(microformat, ['publishDate'], default="2000-01-01T00:00:00Z")) # ISO format to Unix timestamp
published_date = epochToDate(published)
premiere_timestamp = safeTraverse(microformat, ['liveBroadcastDetails', 'startTimestamp'], default=None) # let's ignore the nitty gritty for the time being
premiere_timestamp = premiere_timestamp if premiere_timestamp else safeTraverse(microformat, ['playabilityStatus', 'liveStreamability', 'liveStreamabilityRenderer', 'offlineSlate', 'liveStreamOfflineSlateRenderer', 'scheduledStartTime'], default=None)
live_now = safeTraverse(microformat, ['liveBroadcastDetails', 'isLiveNow'], default=False)
post_live_dvr = safeTraverse(video_details, ['isPostLiveDvr'], default=False)
allowed_regions = safeTraverse(microformat, ['availableCountries'], default=[])
allow_ratings = safeTraverse(video_details, ['allowRatings'], default=True)
family_friendly = safeTraverse(microformat, ['isFamilySafe'], default=True)
is_listed = safeTraverse(video_details, ['isCrawlable'], default=True)
is_upcoming = safeTraverse(video_details, ['isUpcoming'], default=False)
keywords = safeTraverse(video_details, ['keywords'], default=[])
related_raw = safeTraverse(wdata, ['ec2', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results'], default=[]) # can possibly change in the future
related = []
for entry in related_raw[:-1]:
related_entry = {}
match safeTraverse(list(entry.keys()), [0], default=""):
case "compactVideoRenderer":
# legacy renderer, a/b tested and later phased out in summer 2025
continue
case "lockupViewModel":
y = safeTraverse(entry, ['lockupViewModel'])
if not isinstance(y, dict):
continue
is_mix_or_playlist = safeTraverse(entry, ["lockupViewModel", "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "icon", "sources", 0, "clientResource", "imageName"], default="") in ("MIX", "PLAYLISTS")
if is_mix_or_playlist:
# neither mixes nor playlists are currently supported by the invidious api
continue
lmvm = safeTraverse(y, ['metadata', 'lockupMetadataViewModel'], default=[])
related_entry['videoId'] = safeTraverse(y, ['contentId'])
related_entry['title'] = safeTraverse(lmvm, ['title', 'content'])
related_entry['videoThumbnails'] = ythdd_struct_parser.genThumbs(related_entry['videoId']) #safeTraverse(y, ['thumbnail', 'thumbnails'])
related_entry['author'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 0, 'metadataParts', 0, 'text', 'content'])
related_entry['authorId'] = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'rendererContext', 'commandContext', 'onTap', 'innertubeCommand', 'browseEndpoint', 'browseId'], default="UNKNOWNCHANNELID")
related_entry['authorUrl'] = '/channel/' + related_entry['authorId']
related_entry['authorVerified'] = False if safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 0, 'metadataParts', 0, 'text', 'attachmentRuns']) is None else True # seens to do the job
author_avatar_url = safeTraverse(lmvm, ['image', 'decoratedAvatarViewModel', 'avatar', 'avatarViewModel', 'image', 'sources', 0, 'url'], default=DEFAULT_AVATAR)
related_entry['authorThumbnails'] = ythdd_extractor.generateChannelAvatarsFromUrl(author_avatar_url)
related_entry['lengthSeconds'] = ythdd_struct_parser.parseLengthFromTimeBadge(safeTraverse(y, ['contentImage', 'thumbnailViewModel', 'overlays', 0, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', 0, 'thumbnailBadgeViewModel', 'text'], default="0:0"))
related_entry['viewCountText'] = safeTraverse(lmvm, ['metadata', 'contentMetadataViewModel', 'metadataRows', 1, 'metadataParts', 0, 'text', 'content'], default="0").split(" ")[0]
related_entry['viewCount'] = ythdd_struct_parser.parseViewsFromViewText(related_entry['viewCountText'])
case _:
# unsupported model: print info into stdout
print("received an entry of unknown type during parsing of related videos:")
print(entry)
print("")
continue
related.append(related_entry)
likes = safeTraverse(ydata, ['like_count'], default=0)
description = safeTraverse(microformat, ['description', 'simpleText'], default="\n(ythdd: failed to retrieve description, perhaps it's empty?)")
short_description = safeTraverse(wdata, ['ec1', 'videoDetails', 'shortDescription'], default="(ythdd: failed to retrieve short description, perhaps it's empty?)")
description_html = html.escape(description).replace("\r\n", "
").replace("\n", "
") # still TODO: https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr#L329
genre = safeTraverse(microformat, ['category'])
# TODO: genre blah blah blah...
author = safeTraverse(video_details, ['author'], default="Unknown Author")
ucid = safeTraverse(video_details, ['channelId'], default="UNKNOWNCHANNELID")
subs = ydata['channel_follower_count']
author_thumbnail = safeTraverse(video_secondary_renderer, ['owner', 'videoOwnerRenderer', 'thumbnail', 'thumbnails', 0, 'url'])
author_verified = ythdd_extractor.isVerified(safeTraverse(video_secondary_renderer, ['owner', 'videoOwnerRenderer', 'badges', 0], default=[]))
if author_thumbnail is None:
# there might be multiple authors (on a collaborative video)
# if so, then try to extract first channel's (uploader's) avatar
livm = safeTraverse(video_secondary_renderer, ["owner", "videoOwnerRenderer", "attributedTitle", "commandRuns", 0, "onTap", "innertubeCommand", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "customContent", "listViewModel", "listItems"], default=[])
author_thumbnail = safeTraverse(livm, [0, "listItemViewModel", "leadingAccessory", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
author_verified = author_verified or safeTraverse(livm, [0, "listItemViewModel", "title", "attachmentRuns", 0, "element", "type", "imageType", "image", "sources", 0, "clientResource", "imageName"]) in ("AUDIO_BADGE", "CHECK_CIRCLE_FILLED")
author_thumbnail = ythdd_extractor.generateChannelAvatarsFromUrl(author_thumbnail)
format_streams = []
# adaptive_formats, format_streams = rebuildFormats(adaptive_formats)
if not live_now:
adaptive_formats, format_streams = rebuildFormatsFromYtdlpApi(ydata)
hls_url = None
else:
adaptive_formats, format_streams = [{"url": f"http://a/?expire={int(time_start + 5.9 * 60 * 60)}", "itag": "18", "type": "", "clen": "0", "lmt": "", "projectionType": "RECTANGULAR"}], [] # freetube/clipious shenanigans, see: https://github.com/FreeTubeApp/FreeTube/pull/5997 and https://github.com/lamarios/clipious/blob/b9e7885/lib/videos/models/adaptive_format.g.dart
hls_url = safeTraverse(ydata, ["url"], default="ythdd: unable to retrieve stream url")
if live_now:
video_type = "livestream"
premiere_timestamp = published # ??? that works i guess
elif premiere_timestamp:
video_type = "scheduled"
published = dateToEpoch(premiere_timestamp) if premiere_timestamp else int(time())
else:
video_type = "video"
premium = False
if "YouTube Red" in keywords:
premium = True
# TODO: detect paywalled patron-only videos
# because we fetched the video's wdata, we might as
# well save it inside of general cache so that
# requests for the video's comments don't have to
# spawn an additional request for initial ctoken
ensure_comment_continuation(video_id, wdata)
time_end = time()
response = {
"type": video_type,
"title": title,
"videoId": video_id,
"videoThumbnails": ythdd_struct_parser.genThumbs(video_id),
"storyboards": [], # not implemented
"description": description, # due to change (include ythdd metadata)
"descriptionHtml": description_html,
"published": published,
"publishedText": published_date,
"keywords": keywords,
"viewCount": views,
"viewCountText": str(views), # not implemented
"likeCount": likes,
"dislikeCount": 0,
"paid": False, # not implemented
"premium": premium,
"isFamilyFriendly": family_friendly,
"allowedRegions": allowed_regions,
"genre": genre,
"genreUrl": "/genreUrl/not/implemented/", # not implemented
"author": author,
"authorId": ucid,
"authorUrl": "/channel/" + ucid,
"authorVerified": author_verified,
"authorThumbnails": author_thumbnail,
"subCountText": str(subs),
"lengthSeconds": length,
"allowRatings": allow_ratings,
"rating": 0,
"isListed": is_listed,
"liveNow": live_now,
"isPostLiveDvr": post_live_dvr,
"isUpcoming": is_upcoming,
"dashUrl": ythdd_globals.config['general']['public_facing_url'] + "api/invidious/api/v1/manifest/" + video_id, # not implemented
"premiereTimestamp": premiere_timestamp,
"hlsUrl": hls_url, # broken after a change in iOS player, only usable for livestreams
"adaptiveFormats": adaptive_formats, # same as hlsUrl
"formatStreams": format_streams,
"captions": [], # not implemented
# "captions": [
# {
# "label": String,
# "language_code": String,
# "url": String
# }
# ],
# "musicTracks": [
# {
# "song": String,
# "artist": String,
# "album": String,
# "license": String
# }
# ],
"recommendedVideos": related,
"took": time_end - time_start
}
if ythdd_globals.config['general']['debug']:
response["ydata"] = ydata
response["wdata"] = wdata
if ythdd_globals.config['general']['cache']:
ythdd_globals.video_cache[data[3]] = response
ythdd_globals.video_cache[data[3]]['cacheTime'] = time()
# for debugging:
#return send(200, ythdd_extractor.WEBextractSinglePage(data[3]))
#return send(200, ythdd_extractor.IOSextract(data[3]))
#return send(200, {'idata': idata, 'wdata': wdata})
# if youtube returns not the videoId we aksed
# then it means that the instance is ratelimited
status_code = 200 if data[3] == response['videoId'] else 403
return send(status_code, response)
def search(data, req):
search_query = req.args.get('q')
# ignore paginated requests as we do nothing with the continuation token
page = req.args.get('page')
if page is not None and page != '1':
return send(404, [])
if (data[-2].lower() != "search" or data[-1].lower() != "") and data[-1].lower() != "search":
previous_query = req.args.get('pq')
suggestions = ythdd_extractor.WEBgetSearchSuggestions(search_query, previous_query)
return send(200, suggestions)
results = ythdd_extractor.WEBextractSearchResults(search_query)
results_list = []
for entry in results:
parsed_entry = ythdd_struct_parser.parseRenderers(entry)
if parsed_entry is not None:
results_list.append(parsed_entry)
return send(200, results_list)
def get_channel_tab(requested_tab, ucid, req, only_json: bool = False):
# check for page/cont
ctoken = req.args.get('continuation')
# perhaps continuation tokens should be checked here (whether they are inside of general_cache)
# this way, malicious requests containing bogus ctokens can't be sent to potentially ban/ratelimit the instance (?)
# if ctoken is not None and ctoken not in ythdd_globals.general_cache...
# unique req fingerprint allows for this exact query to be cached in memory.
# md5 sum serves as a "unique" deterministic value which can be checked for cache hit/miss
unique_request_fingerprint = md5(f"{ucid}_{requested_tab}_{ctoken}".encode('utf-8')).hexdigest()
# if we haven't discovered parameters required for browsing a specific tab,
# then load them now
if ucid not in ythdd_globals.general_cache["continuations"]["channels"]:
channels(["", "", "", ucid], req, True)
# check if request has been cached within the last hour
if ythdd_globals.config['general']['cache'] and unique_request_fingerprint in ythdd_globals.general_cache["channels"]:
if ythdd_globals.general_cache["channels"][unique_request_fingerprint]['cacheTime'] + 1 * 60 * 60 > time():
response = ythdd_globals.general_cache["channels"][unique_request_fingerprint]
if only_json:
return response
else:
return send(200, response)
else:
del ythdd_globals.general_cache["channels"][unique_request_fingerprint]
# load relevant data from global (general) cache
param = safeTraverse(ythdd_globals.general_cache["continuations"]["channels"][ucid], ["tabs", requested_tab, "param"], default=None)
name = safeTraverse(ythdd_globals.general_cache["continuations"]["channels"][ucid], ["name"], default="")
avatar = safeTraverse(ythdd_globals.general_cache["continuations"]["channels"][ucid], ["avatar"], default=DEFAULT_AVATAR)
verified = safeTraverse(ythdd_globals.general_cache["continuations"]["channels"][ucid], ["verified"], default=False)
# if provided, ctoken will be used for browsing as well
wdata = ythdd_extractor.browseChannel(ucid, params=param, ctoken=ctoken)
# sanity check (whether we got what we requested)
received_tab = safeTraverse(wdata, ["responseContext", "serviceTrackingParams", 0, "params", 0, "value"])
if received_tab != f"channel.{requested_tab}":
# if that's not the case, either something changed in the innertube API,
# or content that was asked for isn't available
print(f"INFO: couldn't verify server returned channel data we asked for. "
f"Requested channel.{requested_tab}, got {received_tab}. Most likely we sent a request to Innertube which got rejected.")
# load requested tab
result = {}
if ctoken is None:
tabs = safeTraverse(wdata, ["contents", "twoColumnBrowseResultsRenderer", "tabs"], default=[])
for tab in tabs:
tab_name = safeTraverse(tab, ["tabRenderer", "title"], default="").lower()
# rewrite livestream tab for backwards compatibility with invidious (and clients like freetube)
if tab_name == "live":
tab_name = "streams"
if tab_name and tab_name == requested_tab:
result = safeTraverse(tab, ["tabRenderer", "content"], default=[])
break
items = []
inner_contents = []
new_continuation = ""
response = {}
match requested_tab:
case "videos" | "shorts" | "streams":
# videos/shorts/livestreams have actually the same response schema,
# only the renderers differ - but they are taken care of in ythdd_struct_parser.parseRenderers()
if ctoken is None:
inner_contents = safeTraverse(result, ["richGridRenderer", "contents"], default=[[]])
else:
inner_contents = safeTraverse(wdata, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"], default=[[]])
for entry in inner_contents:
# videos from videos tab have no owner info (?) or it's in another place. if it is somewhere, this expression can be made simpler by traversing something else in struct parser.
item = safeTraverse(entry, ["richItemRenderer", "content"])
if item is not None:
items.append(ythdd_struct_parser.parseRenderers(item, {"author_name": name, "author_ucid": ucid, "avatar": avatar}))
new_continuation = safeTraverse(inner_contents[-1], ["continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
response = {
"videos": items
}
if new_continuation:
response["continuation"] = new_continuation
# cache response
if ythdd_globals.config['general']['cache']:
ythdd_globals.general_cache["channels"][unique_request_fingerprint] = response
ythdd_globals.general_cache["channels"][unique_request_fingerprint]['cacheTime'] = time()
# todo: save continuation(?)
# or... is there a usecase for saving it?
case "playlists":
if ctoken is None:
inner_contents = safeTraverse(result, ["sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "gridRenderer", "items"], default=[[]])
else:
inner_contents = safeTraverse(wdata, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"], default=[[]])
for entry in inner_contents:
item = ythdd_struct_parser.parseRenderers(entry, {"author_name": name, "author_ucid": ucid, "avatar": avatar})
items.append(item)
new_continuation = safeTraverse(inner_contents[-1], ["continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
response = {
"playlists": items
}
if new_continuation:
response["continuation"] = new_continuation
# cache response
if ythdd_globals.config['general']['cache']:
ythdd_globals.general_cache["channels"][unique_request_fingerprint] = response
ythdd_globals.general_cache["channels"][unique_request_fingerprint]['cacheTime'] = time()
case _:
# for all other renderers, which aren't currently supported
response = {
# "wdata": wdata
}
if ythdd_globals.config["general"]["debug"]:
response["wdata"] = wdata
if only_json:
return response
return send(200, response)
def get_comments(data, req, only_json: bool = False):
# get comment continuation
ctoken = req.args.get('continuation')
# perform some basic video id validation
if len(data) < 4 or len(data) >= 4 and len(data[3]) != 11:
return send(400, {"error": "Bad request: invalid videoId."})
video_id = data[3]
# if ctoken isn't provided, get it from the general cache
if ctoken is None or ctoken == '':
# but first ensure it's there
ensure_comment_continuation(video_id)
ctoken = ythdd_globals.general_cache["continuations"]["comments"][video_id][0]
# get joined video comment models
wdata, new_continuation = ythdd_extractor.WEBgetVideoComments(ctoken)
comments = []
for comment in wdata:
# parse the comment
parsed_comment = ythdd_struct_parser.customCommentRendererParser(comment)
if parsed_comment is not None:
comments.append(parsed_comment)
response = {
"videoId": video_id,
"comments": comments,
"continuation": new_continuation
}
if only_json:
return response
return send(200, response)
def ensure_comment_continuation(video_id: str, wdata = None):
# save continutation token for comments in global comment cache
if not video_id in ythdd_globals.general_cache["continuations"]["comments"]:
ythdd_globals.general_cache["continuations"]["comments"][video_id] = []
if wdata is None:
# perhaps saving related videos to cache might be not a bad idea?
wdata = ythdd_extractor.WEBextractSinglePage(video_id)
# search for "top comments" continuation token
comment_continuation = safeTraverse(wdata, ["ec2", "engagementPanels", 0, "engagementPanelSectionListRenderer", "header", "engagementPanelTitleHeaderRenderer", "menu", "sortFilterSubMenuRenderer", "subMenuItems", 0, "serviceEndpoint", "continuationCommand", "token"], default=None)
if comment_continuation is not None:
ythdd_globals.general_cache["continuations"]["comments"][video_id].append(comment_continuation)
else:
print(f"error: couldn't extract comment continuation token from video page ({video_id}). this video likely has comments disabled.")
ythdd_globals.general_cache["continuations"]["comments"][video_id].append("")
def channels(data, req, only_json: bool = False):
# prevent potential out of bound read
if len(data) < 4:
return send(400, {"error": "No channel specified."})
# silly sanity check
if len(data[3]) != 24 or not data[3].startswith("UC"):
return send(404, {"error": "This channel does not exist."})
if len(data) > 4:
match data[4]:
case "videos" | "shorts" | "playlists" | "podcasts" | "streams":
return get_channel_tab( data[4], data[3], req)
case "live":
return get_channel_tab("streams", data[3], req)
case "latest":
return get_channel_tab( "videos", data[3], req)
case _:
return send(400, {"error": f"Bad request, unrecognized/unsupported tab \"{data[4]}\"."})
wdata = ythdd_extractor.browseChannel(data[3])
channel_meta = safeTraverse(wdata, ["metadata", "channelMetadataRenderer"])
banners = safeTraverse(wdata, ["header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "banner", "imageBannerViewModel", "image", "sources"], default=[])
avatar = safeTraverse(wdata, ["header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "image", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
subscribers = ythdd_struct_parser.parseViewsFromViewText(safeTraverse(wdata, ["header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "metadata", "contentMetadataViewModel", "metadataRows", 1, "metadataParts", 0, "text", "content"], default="0"))
verified = False # to be replaced later with ythdd_extractor.isVerified(...)
author_name = safeTraverse(channel_meta, ["title"], default="Unknown Channel")
author_ucid = safeTraverse(channel_meta, ["externalId"], default=data[3]) # prevent recursion with fallback to provided ucid
ythdd_globals.general_cache["continuations"]["channels"][author_ucid] = {
"avatar": avatar,
"name": author_name,
"tabs": {},
"verified": verified
}
tabs = safeTraverse(wdata, ["contents", "twoColumnBrowseResultsRenderer", "tabs"], default=[])
tab_names = []
for tab in tabs:
# collect tab names
tab_name = safeTraverse(tab, ["tabRenderer", "title"], default="").lower()
if tab_name:
# same as in get_channel_tab
if tab_name == "live":
tab_name = "streams"
tab_names.append(tab_name)
# and their params (used to retrieve data about them)
ythdd_globals.general_cache["continuations"]["channels"][author_ucid]["tabs"][tab_name] = dict()
ythdd_globals.general_cache["continuations"]["channels"][author_ucid]["tabs"][tab_name]["param"] = safeTraverse(tab, ["tabRenderer", "endpoint", "browseEndpoint", "params"], default=None)
latest_videos = get_channel_tab("videos", data[3], req, only_json=True)
for banner in banners:
banner["url"] = ythdd_globals.translateLinks(banner["url"])
avatars = ythdd_extractor.generateChannelAvatarsFromUrl(avatar)
response = {
"author": author_name,
"authorId": author_ucid,
"authorUrl": "https://www.youtube.com/channel/" + author_ucid,
"authorBanners": banners,
"authorThumbnails": avatars,
"subCount": subscribers,
"totalViews": 0,
"joined": 0,
"autoGenerated": False, # todo: actually check this
"ageGated": False,
"isFamilyFriendly": safeTraverse(channel_meta, ["isFamilySafe"], default=False),
"description": safeTraverse(channel_meta, ["description"], default="ythdd: no channel description"),
"descriptionHtml": html.escape(safeTraverse(channel_meta, ["description"], default="ythdd: no channel description (html)")).replace("\r\n", "
").replace("\n", "
"),
"allowedRegions": safeTraverse(channel_meta, ["availableCountryCodes"], default=[]),
"tabs": tab_names,
"tags": [safeTraverse(channel_meta, ["keywords"], default="")],
"authorVerified": verified,
"latestVideos": latest_videos["videos"], # using struct parser
"relatedChannels": []
}
if ythdd_globals.config["general"]["debug"]:
response["wdata"] = wdata
# todo: cache response
if only_json:
return response
return send(200, response)
def playlists(data, req, only_json: bool = False):
# read playlist id and sanity check
if len(data) < 4:
return send(400, {"error": "No playlist specified."})
# todo: make clipious stop spamming requests for paginated response
page = req.args.get('page')
if page is not None and page != '1':
return send(404, {"error": "Paginated queries are not supported."})
plid = data[3]
# no info about what length can playlists be
# settle for a basic sanity check instead
# if len(plid) not in (18, 34, 36):
if not plid.startswith("PL"):
response = {"error": "Only standard playlists are currently supported (no mixes, video-based playlists, etc.)"}
if only_json:
return response
return send(400, response)
# check if request has been cached within the last hour
if ythdd_globals.config['general']['cache'] and plid in ythdd_globals.general_cache['playlists']:
if ythdd_globals.general_cache['playlists'][plid]['cacheTime'] + 1 * 60 * 60 > time():
response = ythdd_globals.general_cache['playlists'][plid]
if only_json:
return response
else:
return send(200, response)
else:
del ythdd_globals.general_cache['playlists'][plid]
# browse the playlist iteratively, first fetch is without any continuation
all_unparsed_videos = []
meta, new_continuation, videos = ythdd_extractor.WEBextractPlaylist(plid=plid)
if isinstance(videos, list):
all_unparsed_videos = videos.copy()
while new_continuation != None:
# fetch subsequent playlist videos
_, new_continuation, videos = ythdd_extractor.WEBextractPlaylist(ctoken=new_continuation)
if videos is not None:
all_unparsed_videos.extend(videos)
# process videos
parsed_videos = []
for video in all_unparsed_videos:
parsed_video = ythdd_struct_parser.parseRenderers(video)
if parsed_video is not None:
parsed_videos.append(parsed_video)
# process metadata
primary_sidebar = safeTraverse(meta, ["sidebar", "playlistSidebarRenderer", "items", 0, "playlistSidebarPrimaryInfoRenderer"], default={})
secondary_sidebar = safeTraverse(meta, ["sidebar", "playlistSidebarRenderer", "items", 1, "playlistSidebarSecondaryInfoRenderer"], default={})
# apparently fields can be stored inside of simpleText one time, only to be stored inside of runs another time
title = ythdd_struct_parser.extractTextFromSimpleOrRuns(safeTraverse(primary_sidebar, ["title"]), default="Unknown playlist title")
playlist_thumb = ythdd_globals.translateLinks(safeTraverse(primary_sidebar, ["thumbnailRenderer", "playlistVideoThumbnailRenderer", "thumbnail", "thumbnails", -1, "url"], default=DEFAULT_VIDEO))
author = safeTraverse(secondary_sidebar, ["videoOwner", "videoOwnerRenderer", "title", "runs", 0, "text"], default="Unknown channel")
author_ucid = safeTraverse(secondary_sidebar, ["videoOwner", "videoOwnerRenderer", "title", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
author_avatars = ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(secondary_sidebar, ["videoOwner", "videoOwnerRenderer", "thumbnail", "thumbnails", 0, "url"], default=DEFAULT_AVATAR))
description = safeTraverse(meta, ["microformat", "microformatDataRenderer", "description"], default="(ythdd: failed to retrieve description, perhaps it's empty?)")
if author_ucid == "UNKNOWNCHANNELID" and author == "Unknown channel":
# most likely a collaborative playlist
author = safeTraverse(secondary_sidebar, ["videoOwner", "videoOwnerRenderer", "title", "simpleText"], default="by Unknown user and others").removeprefix("by ")
author_orig = author[:author.rfind(" and ")]
description += f"\n(ythdd: This is a collaborative playlist by \"{author_orig}\" {author.removeprefix(author_orig + ' ')}. You can't view the authors' channels.)"
author = author_orig
description_html = html.escape(description).replace("\r\n", "
").replace("\n", "
")
video_count = ythdd_struct_parser.parseViewsFromViewText(ythdd_struct_parser.extractTextFromSimpleOrRuns(safeTraverse(primary_sidebar, ["stats", 0]), default="No videos"))
view_count = ythdd_struct_parser.parseViewsFromViewText(ythdd_struct_parser.extractTextFromSimpleOrRuns(safeTraverse(primary_sidebar, ["stats", 1]), default="No views"))
updated = ythdd_struct_parser.extractTextFromSimpleOrRuns(safeTraverse(primary_sidebar, ["stats", 2]), default="2000-01-01").removeprefix("Last updated on ").removeprefix("Updated ")
updated = int(dateparser.parse(updated).timestamp())
is_unlisted = safeTraverse(primary_sidebar, ["badges", 0, "metadataBadgeRenderer", "icon", "iconType"], default="PRIVACY_LISTED") == "PRIVACY_UNLISTED" # this needs further research https://gitea.invidious.io/iv-org/invidious/src/commit/325e013e0d9e5670fa0df7635ff30a0ee029e05e/src/invidious/playlists.cr#L133
response = {
"type": "playlist",
"title": title,
"playlistId": plid,
"playlistThumbnail": playlist_thumb,
"author": author,
"authorId": author_ucid,
"authorUrl": "/channel/" + author_ucid,
"subtitle": None, # todo?
"authorThumbnails": author_avatars,
"description": description,
"descriptionHtml": description_html,
"videoCount": video_count,
"viewCount": view_count,
"updated": updated,
"isListed": not is_unlisted,
"videos": parsed_videos
}
# todo: cache videos and metadata separately, so that paginated queries can be supported as well
if ythdd_globals.config['general']['cache']:
ythdd_globals.general_cache['playlists'][plid] = response
ythdd_globals.general_cache['playlists'][plid]['cacheTime'] = time()
if only_json:
return response
return send(200, response)
def lookup(data, req):
# possibly TODO: rewrite this mess
if len(data) > 2:
if (data[0], data[1]) == ("api", "v1"):
match data[2]:
case 'stats' | '': # /api/invidious/api/v1/stats and /api/invidious/api/v1/
return stats()
case 'trending':
return trending()
case 'popular':
return popular()
case 'videos':
return videos(data)
case 'auth':
return auth(data)
case 'search':
return search(data, req)
case 'channels':
return channels(data, req)
case 'comments':
return get_comments(data, req)
case 'playlists':
return playlists(data, req)
case _:
incrementBadRequests()
return notImplemented(data)
elif data[0] in ('ggpht', 'vi'):
# for some reason the Materialous client
# and FreeTube keep making requests to these
if data[1] in ('ggpht', 'vi'):
return redirect('/' + "/".join(data[1:]))
return redirect('/' + "/".join(data[0:]))
else:
incrementBadRequests()
return notImplemented(data)
elif len(data) == 2:
if (data[0], data[1]) == ("api", "v1"): # /api/invidious/api/v1
return stats()
elif data[0] == "streams":
return streams()
elif data[0] in ('ggpht', 'vi'):
return redirect('/' + "/".join(data[0:]))
else:
incrementBadRequests()
return notImplemented(data)
elif len(data) == 1:
return stats() # /api/invidious/something