new boilerplate extractor for related videos
This commit is contained in:
3
ythdd.py
3
ythdd.py
@@ -4,10 +4,7 @@ from flask_sqlalchemy import SQLAlchemy
|
|||||||
from markupsafe import escape
|
from markupsafe import escape
|
||||||
import requests, json, toml, time
|
import requests, json, toml, time
|
||||||
import views, downloader, ythdd_api, ythdd_globals, ythdd_db
|
import views, downloader, ythdd_api, ythdd_globals, ythdd_db
|
||||||
#from ythdd_db import db
|
|
||||||
|
|
||||||
#config = toml.load("config.toml")
|
|
||||||
#global app
|
|
||||||
ythdd_globals.starttime = int(time.time())
|
ythdd_globals.starttime = int(time.time())
|
||||||
ythdd_globals.apiRequests = 0
|
ythdd_globals.apiRequests = 0
|
||||||
ythdd_globals.apiFailedRequests = 0
|
ythdd_globals.apiFailedRequests = 0
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ def incrementBadRequests():
|
|||||||
ythdd_globals.apiFailedRequests += 1
|
ythdd_globals.apiFailedRequests += 1
|
||||||
|
|
||||||
def notImplemented(data):
|
def notImplemented(data):
|
||||||
|
# TODO: change list to string -> data, not data[0]
|
||||||
return 501, f"not recognised/implemented: {data[0]}", []
|
return 501, f"not recognised/implemented: {data[0]}", []
|
||||||
|
|
||||||
def stub_hello():
|
def stub_hello():
|
||||||
@@ -29,13 +30,75 @@ def stats():
|
|||||||
return 200, "OK", data_to_send
|
return 200, "OK", data_to_send
|
||||||
|
|
||||||
def hot(data):
|
def hot(data):
|
||||||
if len(data) <= 3:
|
#print(data)
|
||||||
|
if len(data) <= 2:
|
||||||
incrementBadRequests()
|
incrementBadRequests()
|
||||||
return 400, f'error: bad request. supply required arguments.', []
|
return 400, f'error: bad request. supply required arguments.', []
|
||||||
comment_count = ""
|
match data[1]:
|
||||||
if data[1] not in ("video", "channel", "handle", "playlist"):
|
case "video" | "channel" | "handle" | "playlist":
|
||||||
|
url_lookup = {'video': 'https://www.youtube.com/watch?v=', 'channel': 'https://www.youtube.com/channel/', 'handle': 'https://www.youtube.com/@', 'playlist': 'https://www.youtube.com/playlist?list='}
|
||||||
|
comment_count = ""
|
||||||
|
if len(data) <= 3:
|
||||||
|
incrementBadRequests()
|
||||||
|
return 400, f'error: bad request. supply required arguments.', []
|
||||||
|
if data[2] not in ("c", "nc", "lc"):
|
||||||
|
incrementBadRequests()
|
||||||
|
return notImplemented(data)
|
||||||
|
if data[2] == "lc" and len(data) <= 4:
|
||||||
|
incrementBadRequests()
|
||||||
|
return 400, f'error: bad request. limited comments (lc) requires an extra argument specifying amount of comments.', []
|
||||||
|
elif data[2] == "lc":
|
||||||
|
try:
|
||||||
|
comment_count = str(int(data[3]))
|
||||||
|
except:
|
||||||
|
incrementBadRequests()
|
||||||
|
return 400, f'error: bad request. {data[3]} is not a number.', []
|
||||||
|
videoId = data[4]
|
||||||
|
else:
|
||||||
|
videoId = data[3]
|
||||||
|
|
||||||
|
if len(videoId) != 11: # videoId sanity check
|
||||||
|
incrementBadRequests()
|
||||||
|
return 400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.', []
|
||||||
|
|
||||||
|
getcomments = True
|
||||||
|
if data[2] == "nc":
|
||||||
|
getcomments = False
|
||||||
|
|
||||||
|
try:
|
||||||
|
started = int(time.time())
|
||||||
|
extracted_dict = ythdd_extractor.extract(url_lookup[data[1]] + videoId, getcomments=getcomments, maxcomments=comment_count)
|
||||||
|
extracted_dict["took"] = int(time.time()) - started
|
||||||
|
return 200, "OK", extracted_dict
|
||||||
|
except Exception as e:
|
||||||
|
incrementBadRequests()
|
||||||
|
return 400, f'error: failed to get "{videoId}" ({data[2]}). {e}', []
|
||||||
|
case "related":
|
||||||
|
videoId = data[2]
|
||||||
|
if len(videoId) != 11: # videoId sanity check
|
||||||
|
incrementBadRequests()
|
||||||
|
return 400, f'error: bad request. wrong videoId: {videoId} is {len(videoId)} characters long, but should be 11.', []
|
||||||
|
|
||||||
|
started = int(time.time())
|
||||||
|
try:
|
||||||
|
extracted_related = ythdd_extractor.related('https://www.youtube.com/watch?v=' + videoId)
|
||||||
|
extracted_related['took'] = int(time.time()) - started
|
||||||
|
return 200, "OK", extracted_related
|
||||||
|
except Exception as e:
|
||||||
|
incrementBadRequests()
|
||||||
|
return 400, f'error: unknown error while parsing {videoId}: {e}', []
|
||||||
|
|
||||||
|
case _:
|
||||||
|
incrementBadRequests()
|
||||||
|
return notImplemented(data)
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
if data[1] not in ("video", "channel", "handle", "playlist", "related"):
|
||||||
|
incrementBadRequests()
|
||||||
return notImplemented(data)
|
return notImplemented(data)
|
||||||
if data[2] not in ("c", "nc", "lc"): # comments, no comments, limited comments
|
if data[2] not in ("c", "nc", "lc"): # comments, no comments, limited comments
|
||||||
|
incrementBadRequests()
|
||||||
return notImplemented(data)
|
return notImplemented(data)
|
||||||
if data[2] == "lc":
|
if data[2] == "lc":
|
||||||
if len(data) <= 4:
|
if len(data) <= 4:
|
||||||
@@ -62,6 +125,7 @@ def hot(data):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
incrementBadRequests()
|
incrementBadRequests()
|
||||||
return 400, f'error: failed to get "{videoId}" ({data[2]}). {e}', []
|
return 400, f'error: failed to get "{videoId}" ({data[2]}). {e}', []
|
||||||
|
'''
|
||||||
|
|
||||||
def lookup(data):
|
def lookup(data):
|
||||||
match data[0]:
|
match data[0]:
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
import yt_dlp, toml
|
import yt_dlp, requests, json
|
||||||
|
import ythdd_globals
|
||||||
|
|
||||||
ytdl_opts = {
|
ytdl_opts = {
|
||||||
#"format": "bv*[height<=720]+ba", # to be defined by the user
|
#"format": "bv*[height<=720]+ba", # to be defined by the user
|
||||||
@@ -14,11 +15,53 @@ ytdl_opts = {
|
|||||||
"simulate": True
|
"simulate": True
|
||||||
}
|
}
|
||||||
|
|
||||||
def extract(url, getcomments=False, maxcomments=""):
|
def extract(url: str, getcomments=False, maxcomments=""):
|
||||||
|
if len(url) == 11:
|
||||||
|
url = "https://www.youtube.com/watch?v=" + url
|
||||||
if getcomments:
|
if getcomments:
|
||||||
ytdl_opts['getcomments'] = True
|
ytdl_opts['getcomments'] = True
|
||||||
if maxcomments:
|
if maxcomments:
|
||||||
ytdl_opts['extractor_args'] = {'youtube': {'max_comments': [maxcomments, "all", "all", "all"]}}
|
ytdl_opts['extractor_args'] = {'youtube': {'max_comments': [maxcomments, "all", "all", "all"]}}
|
||||||
with yt_dlp.YoutubeDL(ytdl_opts) as ytdl:
|
with yt_dlp.YoutubeDL(ytdl_opts) as ytdl:
|
||||||
result = ytdl.extract_info(url, download=False)
|
result = ytdl.extract_info(url, download=False)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def related(url: str):
|
||||||
|
# WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME
|
||||||
|
if len(url) == 11:
|
||||||
|
params = {'v': url}
|
||||||
|
else:
|
||||||
|
videoId = url.find("https://www.youtube.com/watch?v=") # len() = 32
|
||||||
|
if videoId == -1:
|
||||||
|
raise BaseException
|
||||||
|
videoId = url[32:44]
|
||||||
|
params = {'v': videoId}
|
||||||
|
|
||||||
|
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:130.0) Gecko/20100101 Firefox/130.0'
|
||||||
|
if ythdd_globals.config['extractor']['user-agent']:
|
||||||
|
user_agent = ythdd_globals.config['extractor']['user-agent']
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
'User-Agent': user_agent,
|
||||||
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8',
|
||||||
|
'Accept-Language': 'en-US,en;q=0.5',
|
||||||
|
'DNT': '1',
|
||||||
|
'Sec-GPC': '1',
|
||||||
|
'Connection': 'keep-alive',
|
||||||
|
'Upgrade-Insecure-Requests': '1',
|
||||||
|
'Sec-Fetch-Dest': 'document',
|
||||||
|
'Sec-Fetch-Mode': 'navigate',
|
||||||
|
'Sec-Fetch-Site': 'none',
|
||||||
|
'Sec-Fetch-User': '?1',
|
||||||
|
'Priority': 'u=0, i',
|
||||||
|
'Pragma': 'no-cache',
|
||||||
|
'Cache-Control': 'no-cache',
|
||||||
|
}
|
||||||
|
response = requests.get(url, headers=headers, params=params)
|
||||||
|
extracted_string = str(response.content.decode('utf8', 'unicode_escape'))
|
||||||
|
start = extracted_string.find('{"responseContext":{"serviceTrackingParams":')
|
||||||
|
start2 = extracted_string.find('{"responseContext":{"serviceTrackingParams":', start + 1)
|
||||||
|
end = extracted_string.find(';</script>', start2)
|
||||||
|
extracted_json = json.loads(extracted_string[start2:end])
|
||||||
|
|
||||||
|
return extracted_json["contents"]['twoColumnWatchNextResults']["secondaryResults"]
|
||||||
Reference in New Issue
Block a user