402 lines
15 KiB
Python
402 lines
15 KiB
Python
#!/usr/bin/python3
|
|
# ythdd Invidious Translation Layer
|
|
# -----
|
|
# Translates requests sent through Invidious API at /api/invidious/
|
|
# to use internal extractors.
|
|
from flask import Response, request, redirect
|
|
from markupsafe import escape
|
|
from time import strftime, localtime, time
|
|
import json, datetime
|
|
import invidious_formats
|
|
import ythdd_globals
|
|
import ythdd_api_v1
|
|
import ythdd_extractor
|
|
|
|
# TODO:
|
|
# [✓] /api/v1/stats (stats())
|
|
# [✓] /streams/dQw4w9WgXcQ (does nothing)
|
|
# [✓] /vi/videoIdXXXX/maxresdefault.jpg
|
|
# [*] /api/v1/auth/subscriptions (stub? db?)
|
|
# [*] /api/v1/auth/feed?page=1 (stub? db?)
|
|
# [*] /api/v1/auth/playlists (stub? db?)
|
|
# [*] /api/v1/videos/videoIdXXXX
|
|
|
|
def incrementBadRequests():
|
|
ythdd_globals.apiFailedRequests += 1
|
|
|
|
def greeting():
|
|
return 200, 'hello from Invidious TL!\nstats endpoint at /api/invidious/stats'
|
|
|
|
def send(status, response):
|
|
return Response(json.dumps(response), mimetype='application/json', status=status)
|
|
|
|
def notImplemented(data):
|
|
return send(501, {'error': f"not recognised/implemented: {'/'.join(data)}"})
|
|
|
|
def stats():
|
|
data_to_send = {
|
|
"version": "2.0",
|
|
"software":
|
|
{
|
|
"name": "invidious",
|
|
"version": f"invidious TL, ythdd ({ythdd_globals.version})",
|
|
"branch": "https://gitea.7o7.cx/sherl/ythdd",
|
|
"tl_msg": "/api/invidious/api/v1/"
|
|
}
|
|
}
|
|
return send(200, data_to_send)
|
|
|
|
def videoIdSanityCheck(videoId: str):
|
|
if len(videId) != 11:
|
|
incrementBadRequests()
|
|
return send(400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.', [])
|
|
# elif...?
|
|
|
|
def auth(data):
|
|
# can be either subscriptions, feed or playlists
|
|
match data[1]:
|
|
# NOT YET IMPLEMENTED
|
|
# TODO: make it use the internal db
|
|
case "subscriptions" | "feed" | "playlists":
|
|
return send(200, [])
|
|
case _:
|
|
incrementBadRequests()
|
|
return send(404, [])
|
|
|
|
def streams():
|
|
return send(200, '')
|
|
|
|
def epochToDate(epoch):
|
|
# TODO: replace with UTC time
|
|
return strftime('%Y-%m-%d %H:%M:%S', localtime(epoch))
|
|
|
|
def trending():
|
|
return send(200, [{}])
|
|
|
|
def popular():
|
|
return send(200, [{}])
|
|
|
|
def safeTraverse(obj: dict, path: list, default=None):
|
|
result = obj
|
|
try:
|
|
for x in path:
|
|
result = result[x]
|
|
except KeyError:
|
|
result = default
|
|
finally:
|
|
return result
|
|
|
|
def genThumbs(videoId: str):
|
|
|
|
result = []
|
|
thumbnails = [
|
|
#{'height': 720, 'width': 1280, 'quality': "maxres", 'url': "maxres"}, # for the time being omit the buggy maxres quality
|
|
{'height': 720, 'width': 1280, 'quality': "maxresdefault", 'url': "maxresdefault"},
|
|
{'height': 480, 'width': 640, 'quality': "sddefault", 'url': "sddefault"},
|
|
{'height': 360, 'width': 480, 'quality': "high", 'url': "hqdefault"},
|
|
{'height': 180, 'width': 320, 'quality': "medium", 'url': "mqdefault"},
|
|
{'height': 90, 'width': 120, 'quality': "default", 'url': "default"},
|
|
{'height': 90, 'width': 120, 'quality': "start", 'url': "1"},
|
|
{'height': 90, 'width': 120, 'quality': "middle", 'url': "2"},
|
|
{'height': 90, 'width': 120, 'quality': "end", 'url': "3"},
|
|
]
|
|
|
|
for x in thumbnails:
|
|
width = x['width']
|
|
height = x['height']
|
|
quality = x['quality']
|
|
url = ythdd_globals.config['general']['public_facing_url'] + 'vi/' + videoId + '/' + x['url'] + '.jpg'
|
|
result.append({'quality': quality, 'url': url, 'width': width, 'height': height})
|
|
|
|
return result
|
|
|
|
def rebuildFormats(data):
|
|
result = [{} for x in data]
|
|
formatStreams = []
|
|
|
|
for x in range(len(data)):
|
|
|
|
try:
|
|
result[x]['audioChannels'] = data[x]['audioChannels']
|
|
isVideo = 0
|
|
except:
|
|
isVideo = 1
|
|
|
|
result[x]['init'] = str(data[x]['initRange']['start']) + "-" + str(data[x]['initRange']['end'])
|
|
result[x]['index'] = str(data[x]['indexRange']['start']) + "-" + str(data[x]['indexRange']['end'])
|
|
result[x]['bitrate'] = str(data[x]['averageBitrate'])
|
|
result[x]['url'] = data[x]['url']
|
|
result[x]['itag'] = str(data[x]['itag'])
|
|
result[x]['type'] = data[x]['mimeType']
|
|
result[x]['clen'] = data[x]['contentLength']
|
|
result[x]['lmt'] = data[x]['lastModified']
|
|
result[x]['projectionType'] = data[x]['projectionType']
|
|
try:
|
|
result[x]['colorInfo'] = data[x]['colorInfo']
|
|
except:
|
|
pass
|
|
|
|
if isVideo:
|
|
result[x]['fps'] = str(data[x]['fps'])
|
|
else:
|
|
result[x]['audioQuality'] = data[x]['audioQuality']
|
|
result[x]['audioSampleRate'] = data[x]['audioSampleRate']
|
|
|
|
if data[x]['itag'] in invidious_formats.FORMATS.keys():
|
|
result[x]['container'] = invidious_formats.FORMATS[data[x]['itag']]['ext']
|
|
try:
|
|
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['vcodec']
|
|
except:
|
|
result[x]['encoding'] = invidious_formats.FORMATS[data[x]['itag']]['acodec']
|
|
|
|
if isVideo:
|
|
try:
|
|
result[x]['resolution'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p"
|
|
result[x]['qualityLabel'] = str(invidious_formats.FORMATS[data[x]['itag']]['height']) + "p" + str(result[x]['fps']) * (data[x]['fps'] > 30) # NOT IMPLEMENTED, that's just a placeholder
|
|
result[x]['size'] = str(invidious_formats.FORMATS[data[x]['itag']]['width']) + "x" + str(invidious_formats.FORMATS[data[x]['itag']]['height'])
|
|
except:
|
|
pass
|
|
|
|
if data[x]['itag'] <= 80: # temporary solution, I promise!
|
|
formatStreams.append(result[x])
|
|
|
|
return result, formatStreams
|
|
|
|
def videos(data):
|
|
# an attempt on a faithful rewrite of
|
|
# https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr
|
|
response = {}
|
|
|
|
#print(f"got data: {data}")
|
|
|
|
#print("requesting idata from IOSextract")
|
|
idata = ythdd_extractor.IOSextract(data[3])
|
|
wdata = ythdd_extractor.WEBextractSinglePage(data[3])
|
|
|
|
main_results = idata['stage3']['contents']['twoColumnWatchNextResults']
|
|
primary_results = safeTraverse(main_results, ['results', 'results', 'contents'])
|
|
if primary_results:
|
|
video_primary_renderer = safeTraverse(primary_results, [0, 'videoPrimaryInfoRenderer'])
|
|
video_secondary_renderer = safeTraverse(primary_results, [1, 'videoSecondaryInfoRenderer'])
|
|
else:
|
|
print("error: primary_results not found in invidious TL videos()")
|
|
|
|
video_details = safeTraverse(wdata, ['ec1', 'videoDetails'])
|
|
microformat = safeTraverse(wdata, ['ec1', 'microformat', 'playerMicroformatRenderer'], default={})
|
|
|
|
video_id = safeTraverse(video_details, ['videoId'], default=f"[{data[3]}] (errors occurred, check logs)")
|
|
title = safeTraverse(video_details, ['title'], default=video_id)
|
|
views = int(safeTraverse(video_details, ['viewCount'], default=0))
|
|
length = int(safeTraverse(video_details, ['lengthSeconds'], default=1))
|
|
published = datetime.datetime.fromisoformat(safeTraverse(microformat, ['publishDate'], default="2004-10-29T10:30:00-07:00")).timestamp() # ISO format to Unix timestamp
|
|
published_date = epochToDate(published)
|
|
premiere_timestamp = safeTraverse(microformat, ['liveBroadcastDetails', 'startTimestamp'], default=0) # let's ignore the nitty gritty for the time being
|
|
premiere_timestamp = premiere_timestamp if premiere_timestamp else safeTraverse(microformat, ['playabilityStatus', 'liveStreamability', 'liveStreamabilityRenderer', 'offlineSlate', 'liveStreamOfflineSlateRenderer', 'scheduledStartTime'], default=0)
|
|
live_now = safeTraverse(microformat, ['liveBroadcastDetails', 'isLiveNow'], default=False)
|
|
post_live_dvr = safeTraverse(video_details, ['isPostLiveDvr'], default=False)
|
|
allowed_regions = safeTraverse(microformat, ['availableCountries'], default=[])
|
|
allow_ratings = safeTraverse(video_details, ['allowRatings'], default=True)
|
|
family_friendly = safeTraverse(microformat, ['isFamilySafe'], default=True)
|
|
is_listed = safeTraverse(video_details, ['isCrawlable'], default=True)
|
|
is_upcoming = safeTraverse(video_details, ['isUpcoming'], default=False)
|
|
keywords = safeTraverse(video_details, ['keywords'], default=[])
|
|
|
|
# TODO: https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr#L258
|
|
related = safeTraverse(wdata, ['ec2', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results'], default=[]) # can possibly change in the future
|
|
|
|
magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
|
|
toplevel_buttons = safeTraverse(video_primary_renderer, ['videoActions', 'menuRenderer', 'topLevelButtons'], default={}) # hacky solution
|
|
likes_text = safeTraverse(toplevel_buttons, [0, 'segmentedLikeDislikeButtonViewModel', 'likeButtonViewModel', 'likeButtonViewModel', 'toggleButtonViewModel', 'toggleButtonViewModel', 'defaultButtonViewModel', 'buttonViewModel', 'title'], default="") # hacky solution
|
|
likes = 0
|
|
if likes_text:
|
|
likes = int("".join([x for x in likes_text if 48 <= ord(x) and ord(x) <= 57])) # ASCII for 0-9, no regex needed
|
|
likes_text = likes_text.split(" ")[0]
|
|
for x in magnitude.keys():
|
|
if x in likes_text:
|
|
likes *= magnitude[x]
|
|
description = safeTraverse(microformat, ['description', 'simpleText'], default="\n(ythdd: error ocurred, failed to retrieve description)")
|
|
short_description = safeTraverse(wdata, ['ec1', 'videoDetails', 'shortDescription'], default="(ythdd: error occurred, failed to retrieve short description)")
|
|
description_html = "<p>" + description + "</p>" # sorry, not happening right now, TODO: https://github.com/iv-org/invidious/blob/master/src/invidious/videos/parser.cr#L329
|
|
|
|
metadata = safeTraverse(video_secondary_renderer, ['metadataRowContainer', 'metadataRowContainerRenderer', 'rows'], default={})
|
|
genre = safeTraverse(microformat, ['category'])
|
|
# TODO: genre blah blah blah...
|
|
author = safeTraverse(video_details, ['author'], default="Unknown Author")
|
|
ucid = safeTraverse(video_details, ['channelId'], default="UNKNOWNCHANNELID")
|
|
author_info = safeTraverse(video_secondary_renderer, ['owner', 'videoOwnerRenderer'], default={})
|
|
author_thumbnail = safeTraverse(author_info, ['thumbnail', 'thumbnails']) # lowest quality thumbnail
|
|
subs_text = safeTraverse(author_info, ['subscriberCountText', 'simpleText'], default="0")
|
|
subs = 0
|
|
if subs_text:
|
|
subs = int("".join([x for x in subs_text if 48 <= ord(x) and ord(x) <= 57]))
|
|
subs_text = subs_text.split(" ")[0]
|
|
for x in magnitude.keys():
|
|
if x in subs_text:
|
|
subs *= magnitude[x]
|
|
for x in author_thumbnail:
|
|
# rewrite to use views.py
|
|
x['url'] = ythdd_globals.translateLinks(x['url'])
|
|
# TODO: author_verified = ...
|
|
|
|
hls_url = safeTraverse(idata, ['stage1', 'streamingData', 'hlsManifestUrl'], default="")
|
|
adaptive_formats = safeTraverse(idata, ['stage1', 'streamingData', 'adaptiveFormats'], default=[])
|
|
adaptive_formats, format_streams = rebuildFormats(adaptive_formats)
|
|
|
|
if live_now:
|
|
video_type = "livestream"
|
|
elif premiere_timestamp:
|
|
video_type = "scheduled"
|
|
published = premiere_timestamp if premiere_timestamp else int(time.time())
|
|
else:
|
|
video_type = "video"
|
|
|
|
if not format_streams:
|
|
format_streams = []
|
|
# providing format streams breaks Clipious client
|
|
#format_streams.append(adaptive_formats[0])
|
|
#format_streams.append(adaptive_formats[1])
|
|
|
|
#'''
|
|
response = {
|
|
"type": video_type,
|
|
"title": title,
|
|
"videoId": video_id,
|
|
"videoThumbnails": genThumbs(video_id),
|
|
"storyboards": [], # not implemented
|
|
|
|
"description": description, # due to change (include ythdd metadata)
|
|
"descriptionHtml": description_html, # basically the same as normal description for the time being
|
|
"published": published,
|
|
"publishedText": published_date,
|
|
|
|
"keywords": keywords,
|
|
"viewCount": views,
|
|
"viewCountText": str(views), # not implemented
|
|
"likeCount": likes,
|
|
"dislikeCount": 0,
|
|
|
|
"paid": False, # not implemented
|
|
"premium": False, # not implemented
|
|
"isFamilyFriendly": family_friendly,
|
|
"allowedRegions": allowed_regions,
|
|
"genre": genre,
|
|
"genreUrl": "/genreUrl/not/implemented/", # not implemented
|
|
|
|
"author": author,
|
|
"authorId": ucid,
|
|
"authorUrl": "/channel/" + ucid,
|
|
"authorVerified": False, # not implemented
|
|
"authorThumbnails": author_thumbnail,
|
|
|
|
"subCountText": subs_text,
|
|
"lengthSeconds": length,
|
|
"allowRatings": allow_ratings,
|
|
"rating": 0,
|
|
"isListed": is_listed,
|
|
"liveNow": live_now,
|
|
"isPostLiveDvr": post_live_dvr,
|
|
"isUpcoming": is_upcoming,
|
|
"dashUrl": "/dash/not/implemented/", # not implemented
|
|
"premiereTimestamp": premiere_timestamp,
|
|
|
|
"hlsUrl": hls_url,
|
|
"adaptiveFormats": adaptive_formats,
|
|
"formatStreams": format_streams, # very bare bones, empty actually xD
|
|
"captions": [], # not implemented
|
|
# "captions": [
|
|
# {
|
|
# "label": String,
|
|
# "language_code": String,
|
|
# "url": String
|
|
# }
|
|
# ],
|
|
# "musicTracks": [
|
|
# {
|
|
# "song": String,
|
|
# "artist": String,
|
|
# "album": String,
|
|
# "license": String
|
|
# }
|
|
# ],
|
|
'recommendedVideos': [] # not yet implemented
|
|
# "recommendedVideos": [
|
|
# {
|
|
# "videoId": String,
|
|
# "title": String,
|
|
# "videoThumbnails": [
|
|
# {
|
|
# "quality": String,
|
|
# "url": String,
|
|
# "width": Int32,
|
|
# "height": Int32
|
|
# }
|
|
# ],
|
|
# "author": String,
|
|
# "authorUrl": String,
|
|
# "authorId": String?,
|
|
# "authorVerified": Boolean,
|
|
# "authorThumbnails": [
|
|
# {
|
|
# "url": string,
|
|
# "width": Int32,
|
|
# "height": Int32
|
|
# }
|
|
# ],
|
|
# "lengthSeconds": Int32,
|
|
# "viewCount":
|
|
# "viewCountText": String
|
|
# }
|
|
# ]
|
|
}
|
|
#'''
|
|
|
|
# for debugging:
|
|
#return send(200, ythdd_extractor.WEBextractSinglePage(data[3]))
|
|
#return send(200, ythdd_extractor.IOSextract(data[3]))
|
|
|
|
# if youtube returns not the videoId we aksed
|
|
# then it means that the instance is ratelimited
|
|
status_code = 200 if data[3] == response['videoId'] else 403
|
|
|
|
return send(status_code, response)
|
|
|
|
def lookup(data):
|
|
# possibly TODO: rewrite this mess
|
|
if len(data) > 2:
|
|
if (data[0], data[1]) == ("api", "v1"):
|
|
match data[2]:
|
|
case 'stats' | '': # /api/invidious/api/v1/stats and /api/invidious/api/v1/
|
|
return stats()
|
|
case 'trending':
|
|
return trending()
|
|
case 'popular':
|
|
return popular()
|
|
case 'videos':
|
|
return videos(data)
|
|
case 'auth':
|
|
return auth(data)
|
|
case _:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
elif data[0] == 'ggpht':
|
|
# for some reason the Materialous client
|
|
# keeps making requests to these
|
|
if data[1] == 'ggpht':
|
|
return redirect('/' + "/".join(data[1:]))
|
|
return redirect('/' + "/".join(data[0:]))
|
|
else:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
elif len(data) == 2:
|
|
if (data[0], data[1]) == ("api", "v1"): # /api/invidious/api/v1
|
|
return stats()
|
|
elif data[0] == "streams":
|
|
return streams()
|
|
elif data[0] == 'ggpht':
|
|
return redirect('/' + "/".join(data[0:]))
|
|
else:
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
|
|
elif len(data) == 1:
|
|
return stats() # /api/invidious/something |