437 lines
14 KiB
Python
437 lines
14 KiB
Python
#!/usr/bin/python3
|
|
import brotli, yt_dlp, requests, json, time
|
|
from ythdd_globals import safeTraverse
|
|
import ythdd_globals
|
|
|
|
ytdl_opts = {
|
|
#"format": "bv*[height<=720]+ba", # to be defined by the user
|
|
#"getcomments": True,
|
|
#"extractor_args": {"maxcomments": ...},
|
|
#"writeinfojson": True,
|
|
#"progress_hooks": my_hook,
|
|
"outtmpl": {
|
|
"default": "%(id)s.%(ext)s",
|
|
"chapter": "%(id)s.%(ext)s_%(section_number)03d_%(section_title)s.%(ext)s"
|
|
},
|
|
"extractor_args": {
|
|
"youtube": {
|
|
# "formats": ["dashy"]
|
|
}
|
|
},
|
|
"simulate": True
|
|
}
|
|
|
|
stage1_headers = {
|
|
"Connection": "keep-alive",
|
|
"User-Agent": "com.google.ios.youtube/19.45.4 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "en-us,en;q=0.5",
|
|
"Sec-Fetch-Mode": "navigate",
|
|
"Content-Type": "application/json",
|
|
"X-Youtube-Client-Name": "5",
|
|
"X-Youtube-Client-Version": "19.45.4",
|
|
"Origin": "https://www.youtube.com",
|
|
"Accept-Encoding": "gzip, deflate, br",
|
|
"Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI"
|
|
}
|
|
|
|
stage1_body = {
|
|
"context":
|
|
{
|
|
"client":
|
|
{
|
|
"clientName": "IOS",
|
|
"clientVersion": "19.45.4",
|
|
"deviceMake": "Apple",
|
|
"deviceModel": "iPhone16,2",
|
|
"userAgent": "com.google.ios.youtube/19.45.4 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)",
|
|
"osName": "iPhone",
|
|
"osVersion": "18.1.0.22B83",
|
|
"hl": "en",
|
|
"timeZone": "UTC",
|
|
"utcOffsetMinutes": 0
|
|
}
|
|
},
|
|
#"videoId": uri,
|
|
"playbackContext":
|
|
{
|
|
"contentPlaybackContext":
|
|
{
|
|
"html5Preference": "HTML5_PREF_WANTS"
|
|
}
|
|
},
|
|
"contentCheckOk": True,
|
|
"racyCheckOk": True
|
|
}
|
|
|
|
stage2_headers = {
|
|
"Connection": "keep-alive",
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "en-us,en;q=0.5",
|
|
"Sec-Fetch-Mode": "navigate",
|
|
"Accept-Encoding": "gzip, deflate, br"
|
|
}
|
|
|
|
stage3_headers = {
|
|
"Connection": "keep-alive",
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "en-us,en;q=0.5",
|
|
"Sec-Fetch-Mode": "navigate",
|
|
"Content-Type": "application/json",
|
|
"X-Youtube-Client-Name": "1",
|
|
"X-Youtube-Client-Version": "2.20250829.01.00",
|
|
"Origin": "https://www.youtube.com",
|
|
"Accept-Encoding": "gzip, deflate, br",
|
|
"Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI"
|
|
}
|
|
|
|
stage3_body = {
|
|
"context":
|
|
{
|
|
"client":
|
|
{
|
|
"clientName": "WEB",
|
|
"clientVersion": "2.20250829.01.00",
|
|
"hl": "en",
|
|
"timeZone": "UTC",
|
|
"utcOffsetMinutes": 0
|
|
}
|
|
},
|
|
#"videoId": uri,
|
|
"contentCheckOk": True,
|
|
"racyCheckOk": True
|
|
}
|
|
|
|
web_context_dict = {
|
|
'context': {
|
|
'client': {
|
|
'hl': 'en',
|
|
'gl': 'US',
|
|
'deviceMake': '',
|
|
'deviceModel': '',
|
|
'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0,gzip(gfe)',
|
|
'clientName': 'WEB',
|
|
'clientVersion': '2.20250829.01.00',
|
|
'osName': 'Windows',
|
|
'osVersion': '10.0',
|
|
'screenPixelDensity': 2,
|
|
'platform': 'DESKTOP',
|
|
'screenDensityFloat': 2,
|
|
'userInterfaceTheme': 'USER_INTERFACE_THEME_LIGHT',
|
|
'browserName': 'Firefox',
|
|
'browserVersion': '142.0',
|
|
'acceptHeader': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
|
'utcOffsetMinutes': 0,
|
|
}
|
|
}
|
|
}
|
|
|
|
def extract(url: str, getcomments=False, maxcomments="", manifest_fix=False):
|
|
# TODO: check user-agent and cookiefile
|
|
|
|
if ythdd_globals.config['extractor']['user-agent']:
|
|
yt_dlp.utils.std_headers['User-Agent'] = ythdd_globals.config['extractor']['user-agent']
|
|
|
|
if ythdd_globals.config['extractor']['cookies_path']:
|
|
ytdl_opts['cookiefile'] = ythdd_globals.config['extractor']['cookies_path']
|
|
|
|
if len(url) == 11:
|
|
url = "https://www.youtube.com/watch?v=" + url
|
|
if getcomments:
|
|
ytdl_opts['getcomments'] = True
|
|
if maxcomments:
|
|
ytdl_opts['extractor_args']['youtube']['max_comments'] = [maxcomments, "all", "all", "all"]
|
|
if manifest_fix:
|
|
# https://github.com/yt-dlp/yt-dlp/issues/11952#issuecomment-2565802294
|
|
ytdl_opts['extractor_args']['youtube']['player_client'] = ['default', 'web_safari']
|
|
with yt_dlp.YoutubeDL(ytdl_opts) as ytdl:
|
|
result = ytdl.sanitize_info(ytdl.extract_info(url, download=False))
|
|
return result
|
|
|
|
def WEBrelated(url: str):
|
|
# WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME
|
|
if len(url) == 11:
|
|
params = {'v': url}
|
|
else:
|
|
videoId = url.find("https://www.youtube.com/watch?v=") # len() = 32
|
|
if videoId == -1:
|
|
raise BaseException
|
|
videoId = url[32:44]
|
|
params = {'v': videoId}
|
|
|
|
response = requests.get(url, headers=ythdd_globals.getHeaders(caller='extractor'), params=params)
|
|
extracted_string = str(response.content.decode('utf8', 'unicode_escape'))
|
|
start = extracted_string.find('{"responseContext":{"serviceTrackingParams":')
|
|
start2 = extracted_string.find('{"responseContext":{"serviceTrackingParams":', start + 1)
|
|
end = extracted_string.find(';</script>', start2)
|
|
extracted_json = json.loads(extracted_string[start2:end])
|
|
|
|
return extracted_json["contents"]['twoColumnWatchNextResults']["secondaryResults"]
|
|
|
|
def WEBextractSinglePage(uri: str):
|
|
# WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME
|
|
|
|
start_time = time.time()
|
|
|
|
if len(uri) != 11:
|
|
raise ValueError("WEBextractSinglePage expects a single, 11-character long argument")
|
|
|
|
response = requests.get("https://www.youtube.com/watch?v=" + uri, headers=ythdd_globals.getHeaders(caller='extractor'))
|
|
extracted_string = str(response.content.decode('utf8', 'unicode_escape'))
|
|
start = extracted_string.find('{"responseContext":{"serviceTrackingParams":')
|
|
end = extracted_string.find(';var ', start)
|
|
start2 = extracted_string.find('{"responseContext":{"serviceTrackingParams":', start + 1)
|
|
end2 = extracted_string.find(';</script>', start2)
|
|
extracted_json1 = json.loads(extracted_string[start:end])
|
|
extracted_json2 = json.loads(extracted_string[start2:end2])
|
|
|
|
end_time = time.time()
|
|
|
|
return {'ec1': extracted_json1, 'ec2': extracted_json2, 'took': end_time - start_time}
|
|
|
|
def paramsFromUrl(url: str) -> dict:
|
|
# Returns a dictionary of params from a given URL.
|
|
split_list = url.split("&")
|
|
params = {}
|
|
|
|
for num, string in enumerate(split_list):
|
|
if num == 0:
|
|
string = string[string.find("?") + 1:]
|
|
key, value = string.split("=")
|
|
params[key] = value
|
|
|
|
return params
|
|
|
|
def IOSextract(uri: str):
|
|
|
|
start = time.time()
|
|
|
|
if len(uri) != 11:
|
|
raise ValueError("IOSextract expects a single, 11-character long uri as an argument")
|
|
|
|
stage1_body['videoId'] = uri
|
|
stage1_h = requests.post("https://www.youtube.com/youtubei/v1/player?prettyPrint=false", headers=stage1_headers, json=stage1_body)
|
|
stage1 = json.loads(stage1_h.content.decode('utf-8'))
|
|
|
|
#stage2_h = requests.get(stage1['streamingData']['hlsManifestUrl'], headers=stage2_headers)
|
|
#stage2 = stage2_h.content.decode('utf-8')
|
|
|
|
stage3_body['videoId'] = uri
|
|
stage3_h = requests.post("https://www.youtube.com/youtubei/v1/next?prettyPrint=false", headers=stage3_headers, json=stage3_body)
|
|
stage3 = json.loads(stage3_h.content.decode('utf-8'))
|
|
|
|
end = time.time()
|
|
|
|
#return {'stage1': stage1, 'stage2': stage2, 'stage3': stage3, 'took': end - start}
|
|
return {'stage1': stage1, 'stage3': stage3, 'took': end - start}
|
|
|
|
def makeWebContext(secondaryContextDict: dict):
|
|
# Uses web_context_dict to create a context, returns a dict.
|
|
# Essentially, expands the web_context_dict with a secondary one.
|
|
|
|
current_web_context_dict = web_context_dict.copy()
|
|
|
|
for key in secondaryContextDict:
|
|
current_web_context_dict[key] = secondaryContextDict[key]
|
|
|
|
return current_web_context_dict
|
|
|
|
def getChannelAvatar(response_json: dict):
|
|
# Returns a dictionary: {url: <proxied url to remote server>, width: ..., height: ...}
|
|
# containing the best resolution in terms of pixel count.
|
|
# A great majority of the code has been influenced by https://github.com/iv-org/invidious/blob/master/src/invidious/channels/about.cr.
|
|
|
|
avatars = safeTraverse(response_json, ['metadata', 'channelMetadataRenderer', 'avatar', 'thumbnails'], default=None)
|
|
|
|
if avatars is None:
|
|
# fallback to lower resolution avatars
|
|
avatars = safeTraverse(response_json, ['header',
|
|
'pageHeaderRenderer',
|
|
'content',
|
|
'pageHeaderViewModel',
|
|
'image',
|
|
'decoratedAvatarViewModel',
|
|
'avatar',
|
|
'avatarViewModel',
|
|
'image',
|
|
'sources'], default=None)
|
|
|
|
# if avatars is None: # TODO: if avatars is still None, use a local avatar
|
|
|
|
best_avatar = avatars[-1] # usually, the best avatar is stored last
|
|
for avatar in avatars:
|
|
if avatar['width'] * avatar['height'] > best_avatar['width'] * best_avatar['height']:
|
|
best_avatar = avatar
|
|
|
|
# or use regex substitution and set the size to something like 512x512
|
|
# e.g.: =s128 -> =s512
|
|
|
|
best_avatar['url'] = ythdd_globals.translateLinks(best_avatar['url'])
|
|
|
|
return best_avatar
|
|
|
|
def generateChannelAvatarsFromUrl(url: str, proxied: bool = True) -> list:
|
|
# Generates channel avatars at default sizes.
|
|
|
|
# avatar urls for channels in search results start with //yt3.ggpht.com/
|
|
if url.startswith("//yt3.ggpht.com/"):
|
|
url = url.replace("//yt3.ggpht.com/", "https://yt3.ggpht.com/")
|
|
|
|
avatars = []
|
|
if not url.startswith("https://yt3.ggpht.com/") and not url.startswith("https://yt3.googleusercontent.com/"):
|
|
return []
|
|
|
|
url = ythdd_globals.translateLinks(url)
|
|
url_size_start = url.rfind("=s") + 2
|
|
url_size_end = url. find("-", url_size_start)
|
|
|
|
default_sizes = [32, 48, 76, 100, 176, 512]
|
|
|
|
for size in default_sizes:
|
|
avatars.append(
|
|
{
|
|
"url": url[:url_size_start] + str(size) + url[url_size_end:],
|
|
"width": size,
|
|
"height": size
|
|
}
|
|
)
|
|
|
|
return avatars
|
|
|
|
def isVerified(response_json: dict) -> bool:
|
|
# Returns True if any user badge has been found (verified/artist).
|
|
|
|
if not isinstance(response_json, dict):
|
|
return False
|
|
|
|
match safeTraverse(list(response_json.keys()), [0], default=""):
|
|
case "metadataBadgeRenderer": # channels in search results
|
|
verified = safeTraverse(response_json, ["metadataBadgeRenderer", "tooltip"], default="") in ("Verified") # room for support of artist channels
|
|
return verified
|
|
|
|
return False
|
|
|
|
def isPremium(response_json: dict) -> bool:
|
|
# Returns True if content is paid (member-only).
|
|
|
|
if not isinstance(response_json, dict):
|
|
return False
|
|
|
|
match safeTraverse(list(response_json.keys()), [0], default=""):
|
|
case "metadataBadgeRenderer": # channels in search results
|
|
paid = safeTraverse(response_json, ["metadataBadgeRenderer", "style"], default="") in ("BADGE_STYLE_TYPE_MEMBERS_ONLY")
|
|
return paid
|
|
|
|
return False
|
|
|
|
def browseChannel(ucid: str, params: str = None, ctoken: str = None):
|
|
# Returns the response from innertubes browse endpoint for channels (as a dict).
|
|
|
|
if len(ucid) != 24:
|
|
raise ValueError(f"Something is wrong with the UCID {ucid}. Expected a 24-character long channel ID, not {len(ucid)}.")
|
|
|
|
additional_context = {'browseId': ucid}
|
|
if params is not None:
|
|
additional_context['params'] = params
|
|
if ctoken is not None:
|
|
additional_context['continuation'] = ctoken
|
|
|
|
context = makeWebContext(additional_context)
|
|
|
|
response = requests.post(
|
|
'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
|
|
headers = ythdd_globals.getHeaders(),
|
|
json = context,
|
|
)
|
|
|
|
response_json = json.loads(response.text)
|
|
|
|
return response_json
|
|
|
|
def WEBextractSearchResults(search_query: str) -> list:
|
|
# Posts a search request to innertube API
|
|
# and processes only the relevant part (the actual results)
|
|
|
|
if search_query is None:
|
|
return []
|
|
|
|
web_context = makeWebContext({"query": search_query})
|
|
response = requests.post('https://www.youtube.com/youtubei/v1/search',
|
|
params={"prettyPrint": False},
|
|
headers=stage2_headers,
|
|
data=json.dumps(web_context)
|
|
)
|
|
|
|
results = []
|
|
try:
|
|
results = json.loads(response.text)
|
|
except:
|
|
pass
|
|
results = safeTraverse(results, ["contents", "twoColumnSearchResultsRenderer", "primaryContents", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents"], default=[])
|
|
|
|
return results
|
|
|
|
def WEBgetSearchSuggestions(query: str, previous_query: str = '') -> list:
|
|
# Takes in a search query and returns relevant suggestions.
|
|
# Can optionally take the previous query but that's rather novel and
|
|
# not supported across players nor invidious API itself.
|
|
|
|
suggestions = []
|
|
|
|
if not isinstance(query, str):
|
|
print("WEBgetSearchSuggestions: query is not a string (as it should)")
|
|
return {}
|
|
if not isinstance(previous_query, str):
|
|
previous_query = ''
|
|
|
|
if ythdd_globals.config["general"]["cache"]:
|
|
# look for cached suggestions
|
|
for cached_search in ythdd_globals.general_cache["search"]:
|
|
if cached_search["q"] == query.lower() and cached_search["pq"] == previous_query.lower():
|
|
# found it? skip ahead
|
|
suggestions = cached_search["resp"]
|
|
break
|
|
|
|
# request wasn't cached? query the API
|
|
if suggestions == []:
|
|
|
|
params = {
|
|
'ds': 'yt',
|
|
'hl': 'en', # host language
|
|
'gl': 'us', # geolocation
|
|
'client': 'youtube',
|
|
'gs_ri': 'youtube',
|
|
'q': query, # query
|
|
'pq': previous_query # previous query
|
|
}
|
|
|
|
response = requests.get(
|
|
'https://suggestqueries-clients6.youtube.com/complete/search',
|
|
params=params,
|
|
headers=stage2_headers
|
|
)
|
|
|
|
# can break anytime but hopefully the tiny speed gain will make up for it
|
|
results = response.text[23 + len(query):]
|
|
results = results[:results.rfind("{") - 1]
|
|
results = json.loads(results)
|
|
|
|
for result in results:
|
|
suggestions.append(result[0])
|
|
|
|
# cache response
|
|
if ythdd_globals.config["general"]["cache"]:
|
|
ythdd_globals.general_cache["search"].append(
|
|
{
|
|
"q": query.lower(),
|
|
"pq": previous_query.lower(),
|
|
"resp": suggestions
|
|
}
|
|
)
|
|
|
|
return {
|
|
"query": query,
|
|
"suggestions": suggestions
|
|
} |