168 lines
6.2 KiB
Python
168 lines
6.2 KiB
Python
#!/usr/bin/python3
|
|
# API is expected to return:
|
|
# - HTTP status code,
|
|
# - human-readable status message,
|
|
# - json with appropriate data
|
|
import flask, json, time
|
|
import ythdd_globals, ythdd_extractor
|
|
#from flask_sqlalchemy import SQLAlchemy
|
|
#import ythdd_api_v1_stats, ythdd_api_v1_user, ythdd_api_v1_info, ythdd_api_v1_query, ythdd_api_v1_meta, ythdd_api_v1_admin
|
|
|
|
def requireAuthentication(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
token = kwargs["r"].args.get('token')
|
|
if token == lewy_globals.config['api']['api_key']:
|
|
try:
|
|
status, received, data = func(*args, **kwargs)
|
|
return status, received, data
|
|
except:
|
|
raise AssertionError(f"Function \"{func.__name__}\" does not return status, code, and data as it should!")
|
|
else:
|
|
return 401, "error", {'error_msg': "Unauthorized"}
|
|
return wrapper
|
|
|
|
def incrementBadRequests():
|
|
ythdd_globals.apiFailedRequests += 1
|
|
|
|
def notImplemented(data):
|
|
# TODO: change list to string -> data, not data[0]
|
|
return 501, f"not recognised/implemented: {data[0]}", []
|
|
|
|
def stub_hello():
|
|
return 200, 'hello from v1!', []
|
|
|
|
def stats():
|
|
data_to_send = {
|
|
# TODO: include yt-dlp version
|
|
"start_time": ythdd_globals.starttime,
|
|
"uptime": ythdd_globals.getUptime(),
|
|
"real_uptime": ythdd_globals.realUptime,
|
|
"total_api_requests": ythdd_globals.apiRequests,
|
|
"failed_api_requests": ythdd_globals.apiFailedRequests,
|
|
"outside_api_requests": ythdd_globals.outsideApiHits,
|
|
"local_api_requests": ythdd_globals.apiRequests - ythdd_globals.outsideApiHits
|
|
}
|
|
return 200, "ok", data_to_send
|
|
|
|
def videoIdSanityCheck(videoId: str):
|
|
if len(videId) != 11:
|
|
incrementBadRequests()
|
|
return 400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.', []
|
|
# elif... check
|
|
|
|
def hot(data):
|
|
#print(data)
|
|
# if we are given not enough data to work with, return bad request.
|
|
# example:
|
|
# hot related videoId
|
|
# 0. 1. 2. [No. of argument]
|
|
if len(data) <= 2:
|
|
incrementBadRequests()
|
|
return 400, f'error: bad request. supply required arguments.', []
|
|
|
|
# check our first argument
|
|
match data[1]:
|
|
|
|
# if it can be handled by yt_dlp, use yt_dlp
|
|
case "video" | "channel" | "handle" | "playlist":
|
|
url_lookup = {'video': 'https://www.youtube.com/watch?v=', 'channel': 'https://www.youtube.com/channel/', 'handle': 'https://www.youtube.com/@', 'playlist': 'https://www.youtube.com/playlist?list='}
|
|
comment_count = ""
|
|
# require 3 arguments (if using "c" or "nc")
|
|
if len(data) <= 3:
|
|
incrementBadRequests()
|
|
return 400, f'error: bad request. supply required arguments.', []
|
|
# comment settings should be either Comments, No Comments, or Limited Comments
|
|
if data[2] not in ("c", "nc", "lc"):
|
|
incrementBadRequests()
|
|
return notImplemented(data)
|
|
# require 4 arguments (when using "lc" we need to know the number of comments to retrieve)
|
|
if data[2] == "lc" and len(data) <= 4:
|
|
incrementBadRequests()
|
|
return 400, f'error: bad request. limited comments (lc) requires an extra argument specifying amount of comments.', []
|
|
# check if the additional "lc" argument is a number
|
|
elif data[2] == "lc":
|
|
try:
|
|
comment_count = str(int(data[3]))
|
|
except:
|
|
incrementBadRequests()
|
|
return 400, f'error: bad request. {data[3]} is not a number.', []
|
|
videoId = data[4]
|
|
# if the user didn't choose Limited Comments, then the videoId must be the fourth (zero included) argument
|
|
else:
|
|
videoId = data[3]
|
|
|
|
if len(videoId) != 11: # videoId sanity check
|
|
incrementBadRequests()
|
|
return 400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.', []
|
|
|
|
# assume we want to get the comments
|
|
getcomments = True
|
|
# unless we're given "nc"
|
|
if data[2] == "nc":
|
|
getcomments = False
|
|
|
|
# try to get the data
|
|
try:
|
|
started = time.time()
|
|
extracted_dict = ythdd_extractor.extract(url_lookup[data[1]] + videoId, getcomments=getcomments, maxcomments=comment_count, manifest_fix=True)
|
|
extracted_dict["took"] = time.time() - started
|
|
return 200, "OK", extracted_dict
|
|
except Exception as e:
|
|
# shrink yt_dlp's unnecessarily long error message
|
|
# example: \u001b[0;31mERROR:\u001b[0m [youtube] videoIdABCD: Sign in to confirm your age. This video may be inappropriate for some users.
|
|
# TODO: perhaps implement proper error handling? so that we don't need this hacky solution
|
|
incrementBadRequests()
|
|
error_msg = str(e).replace("\u001b[0;31mERROR:\u001b[0m [youtube] " + videoId + ": ", "") # yt_dlp error message with color codes
|
|
return 400, f'error: failed to get "{videoId}" ({data[2]})', {'error_msg': error_msg}
|
|
|
|
# if some functionality is not supported or buried deep within yt_dlp, as is the case with "related videos" feed, use our own (naive) approach
|
|
case "related":
|
|
# no logic needed for additional arguments here
|
|
videoId = data[2]
|
|
if len(videoId) != 11: # videoId sanity check
|
|
incrementBadRequests()
|
|
return 400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.', []
|
|
|
|
started = time.time()
|
|
try:
|
|
# try to actually get the data
|
|
extracted_related = ythdd_extractor.WEBrelated('https://www.youtube.com/watch?v=' + videoId)
|
|
extracted_related['took'] = time.time() - started
|
|
return 200, "OK", extracted_related
|
|
except KeyError:
|
|
# instead of throwing error at age restricted videos for not having "related videos" feed,
|
|
# return an empty feed
|
|
return 200, "OK", {'secondaryResults': {'results': []}, 'took': time.time() - started}
|
|
except Exception as e:
|
|
# general exception handler
|
|
incrementBadRequests()
|
|
return 400, f'error: unknown error while parsing {videoId}: {e}', []
|
|
|
|
case _:
|
|
incrementBadRequests()
|
|
return notImplemented([data[1]]) # workaround before notImplemented is reworked
|
|
|
|
def lookup(data, request):
|
|
match data[0]:
|
|
case 'stats':
|
|
return stats()
|
|
case 'hot': # retrieve live, uncached data
|
|
#print(data)
|
|
return hot(data)
|
|
case 'user':
|
|
return stub_hello()
|
|
#do_user()
|
|
case 'info':
|
|
return stub_hello()
|
|
#do_info()
|
|
case 'query':
|
|
return stub_hello()
|
|
case 'meta':
|
|
return stub_hello()
|
|
case 'admin':
|
|
# REQUIRE CREDENTIALS!
|
|
return stub_hello()
|
|
case _:
|
|
incrementBadRequests()
|
|
return notImplemented(data) |