Files
ythdd/ythdd_extractor.py

437 lines
14 KiB
Python

#!/usr/bin/python3
import brotli, yt_dlp, requests, json, time
from ythdd_globals import safeTraverse
import ythdd_globals
ytdl_opts = {
#"format": "bv*[height<=720]+ba", # to be defined by the user
#"getcomments": True,
#"extractor_args": {"maxcomments": ...},
#"writeinfojson": True,
#"progress_hooks": my_hook,
"outtmpl": {
"default": "%(id)s.%(ext)s",
"chapter": "%(id)s.%(ext)s_%(section_number)03d_%(section_title)s.%(ext)s"
},
"extractor_args": {
"youtube": {
# "formats": ["dashy"]
}
},
"simulate": True
}
stage1_headers = {
"Connection": "keep-alive",
"User-Agent": "com.google.ios.youtube/19.45.4 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-us,en;q=0.5",
"Sec-Fetch-Mode": "navigate",
"Content-Type": "application/json",
"X-Youtube-Client-Name": "5",
"X-Youtube-Client-Version": "19.45.4",
"Origin": "https://www.youtube.com",
"Accept-Encoding": "gzip, deflate, br",
"Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI"
}
stage1_body = {
"context":
{
"client":
{
"clientName": "IOS",
"clientVersion": "19.45.4",
"deviceMake": "Apple",
"deviceModel": "iPhone16,2",
"userAgent": "com.google.ios.youtube/19.45.4 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)",
"osName": "iPhone",
"osVersion": "18.1.0.22B83",
"hl": "en",
"timeZone": "UTC",
"utcOffsetMinutes": 0
}
},
#"videoId": uri,
"playbackContext":
{
"contentPlaybackContext":
{
"html5Preference": "HTML5_PREF_WANTS"
}
},
"contentCheckOk": True,
"racyCheckOk": True
}
stage2_headers = {
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-us,en;q=0.5",
"Sec-Fetch-Mode": "navigate",
"Accept-Encoding": "gzip, deflate, br"
}
stage3_headers = {
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-us,en;q=0.5",
"Sec-Fetch-Mode": "navigate",
"Content-Type": "application/json",
"X-Youtube-Client-Name": "1",
"X-Youtube-Client-Version": "2.20250911.00.00",
"Origin": "https://www.youtube.com",
"Accept-Encoding": "gzip, deflate, br",
"Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI"
}
stage3_body = {
"context":
{
"client":
{
"clientName": "WEB",
"clientVersion": "2.20250911.00.00",
"hl": "en",
"timeZone": "UTC",
"utcOffsetMinutes": 0
}
},
#"videoId": uri,
"contentCheckOk": True,
"racyCheckOk": True
}
web_context_dict = {
'context': {
'client': {
'hl': 'en',
'gl': 'US',
'deviceMake': '',
'deviceModel': '',
'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0,gzip(gfe)',
'clientName': 'WEB',
'clientVersion': '2.20250911.00.00',
'osName': 'Windows',
'osVersion': '10.0',
'screenPixelDensity': 2,
'platform': 'DESKTOP',
'screenDensityFloat': 2,
'userInterfaceTheme': 'USER_INTERFACE_THEME_LIGHT',
'browserName': 'Firefox',
'browserVersion': '142.0',
'acceptHeader': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'utcOffsetMinutes': 0,
}
}
}
def extract(url: str, getcomments=False, maxcomments="", manifest_fix=False):
# TODO: check user-agent and cookiefile
if ythdd_globals.config['extractor']['user-agent']:
yt_dlp.utils.std_headers['User-Agent'] = ythdd_globals.config['extractor']['user-agent']
if ythdd_globals.config['extractor']['cookies_path']:
ytdl_opts['cookiefile'] = ythdd_globals.config['extractor']['cookies_path']
if len(url) == 11:
url = "https://www.youtube.com/watch?v=" + url
if getcomments:
ytdl_opts['getcomments'] = True
if maxcomments:
ytdl_opts['extractor_args']['youtube']['max_comments'] = [maxcomments, "all", "all", "all"]
if manifest_fix:
# https://github.com/yt-dlp/yt-dlp/issues/11952#issuecomment-2565802294
ytdl_opts['extractor_args']['youtube']['player_client'] = ['default', 'web_safari']
with yt_dlp.YoutubeDL(ytdl_opts) as ytdl:
result = ytdl.sanitize_info(ytdl.extract_info(url, download=False))
return result
def WEBrelated(url: str):
# WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME
if len(url) == 11:
params = {'v': url}
else:
videoId = url.find("https://www.youtube.com/watch?v=") # len() = 32
if videoId == -1:
raise BaseException
videoId = url[32:44]
params = {'v': videoId}
response = requests.get(url, headers=ythdd_globals.getHeaders(caller='extractor'), params=params)
extracted_string = str(response.content.decode('utf8', 'unicode_escape'))
start = extracted_string.find('{"responseContext":{"serviceTrackingParams":')
start2 = extracted_string.find('{"responseContext":{"serviceTrackingParams":', start + 1)
end = extracted_string.find(';</script>', start2)
extracted_json = json.loads(extracted_string[start2:end])
return extracted_json["contents"]['twoColumnWatchNextResults']["secondaryResults"]
def WEBextractSinglePage(uri: str):
# WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME
start_time = time.time()
if len(uri) != 11:
raise ValueError("WEBextractSinglePage expects a single, 11-character long argument")
response = requests.get("https://www.youtube.com/watch?v=" + uri, headers=ythdd_globals.getHeaders(caller='extractor'))
extracted_string = str(response.content.decode('utf8', 'unicode_escape'))
start = extracted_string.find('{"responseContext":{"serviceTrackingParams":')
end = extracted_string.find(';var ', start)
start2 = extracted_string.find('{"responseContext":{"serviceTrackingParams":', start + 1)
end2 = extracted_string.find(';</script>', start2)
extracted_json1 = json.loads(extracted_string[start:end])
extracted_json2 = json.loads(extracted_string[start2:end2])
end_time = time.time()
return {'ec1': extracted_json1, 'ec2': extracted_json2, 'took': end_time - start_time}
def paramsFromUrl(url: str) -> dict:
# Returns a dictionary of params from a given URL.
split_list = url.split("&")
params = {}
for num, string in enumerate(split_list):
if num == 0:
string = string[string.find("?") + 1:]
key, value = string.split("=")
params[key] = value
return params
def IOSextract(uri: str):
start = time.time()
if len(uri) != 11:
raise ValueError("IOSextract expects a single, 11-character long uri as an argument")
stage1_body['videoId'] = uri
stage1_h = requests.post("https://www.youtube.com/youtubei/v1/player?prettyPrint=false", headers=stage1_headers, json=stage1_body)
stage1 = json.loads(stage1_h.content.decode('utf-8'))
#stage2_h = requests.get(stage1['streamingData']['hlsManifestUrl'], headers=stage2_headers)
#stage2 = stage2_h.content.decode('utf-8')
stage3_body['videoId'] = uri
stage3_h = requests.post("https://www.youtube.com/youtubei/v1/next?prettyPrint=false", headers=stage3_headers, json=stage3_body)
stage3 = json.loads(stage3_h.content.decode('utf-8'))
end = time.time()
#return {'stage1': stage1, 'stage2': stage2, 'stage3': stage3, 'took': end - start}
return {'stage1': stage1, 'stage3': stage3, 'took': end - start}
def makeWebContext(secondaryContextDict: dict):
# Uses web_context_dict to create a context, returns a dict.
# Essentially, expands the web_context_dict with a secondary one.
current_web_context_dict = web_context_dict.copy()
for key in secondaryContextDict:
current_web_context_dict[key] = secondaryContextDict[key]
return current_web_context_dict
def getChannelAvatar(response_json: dict):
# Returns a dictionary: {url: <proxied url to remote server>, width: ..., height: ...}
# containing the best resolution in terms of pixel count.
# A great majority of the code has been influenced by https://github.com/iv-org/invidious/blob/master/src/invidious/channels/about.cr.
avatars = safeTraverse(response_json, ['metadata', 'channelMetadataRenderer', 'avatar', 'thumbnails'], default=None)
if avatars is None:
# fallback to lower resolution avatars
avatars = safeTraverse(response_json, ['header',
'pageHeaderRenderer',
'content',
'pageHeaderViewModel',
'image',
'decoratedAvatarViewModel',
'avatar',
'avatarViewModel',
'image',
'sources'], default=None)
# if avatars is None: # TODO: if avatars is still None, use a local avatar
best_avatar = avatars[-1] # usually, the best avatar is stored last
for avatar in avatars:
if avatar['width'] * avatar['height'] > best_avatar['width'] * best_avatar['height']:
best_avatar = avatar
# or use regex substitution and set the size to something like 512x512
# e.g.: =s128 -> =s512
best_avatar['url'] = ythdd_globals.translateLinks(best_avatar['url'])
return best_avatar
def generateChannelAvatarsFromUrl(url: str, proxied: bool = True) -> list:
# Generates channel avatars at default sizes.
# avatar urls for channels in search results start with //yt3.ggpht.com/
if url.startswith("//yt3.ggpht.com/"):
url = url.replace("//yt3.ggpht.com/", "https://yt3.ggpht.com/")
avatars = []
if not url.startswith("https://yt3.ggpht.com/") and not url.startswith("https://yt3.googleusercontent.com/"):
return []
url = ythdd_globals.translateLinks(url)
url_size_start = url.rfind("=s") + 2
url_size_end = url. find("-", url_size_start)
default_sizes = [32, 48, 76, 100, 176, 512]
for size in default_sizes:
avatars.append(
{
"url": url[:url_size_start] + str(size) + url[url_size_end:],
"width": size,
"height": size
}
)
return avatars
def isVerified(response_json: dict) -> bool:
# Returns True if any user badge has been found (verified/artist).
if not isinstance(response_json, dict):
return False
match safeTraverse(list(response_json.keys()), [0], default=""):
case "metadataBadgeRenderer": # channels in search results
verified = safeTraverse(response_json, ["metadataBadgeRenderer", "tooltip"], default="") in ("Verified") # room for support of artist channels
return verified
return False
def isPremium(response_json: dict) -> bool:
# Returns True if content is paid (member-only).
if not isinstance(response_json, dict):
return False
match safeTraverse(list(response_json.keys()), [0], default=""):
case "metadataBadgeRenderer": # channels in search results
paid = safeTraverse(response_json, ["metadataBadgeRenderer", "style"], default="") in ("BADGE_STYLE_TYPE_MEMBERS_ONLY")
return paid
return False
def browseChannel(ucid: str, params: str = None, ctoken: str = None):
# Returns the response from innertubes browse endpoint for channels (as a dict).
if len(ucid) != 24:
raise ValueError(f"Something is wrong with the UCID {ucid}. Expected a 24-character long channel ID, not {len(ucid)}.")
additional_context = {'browseId': ucid}
if params is not None:
additional_context['params'] = params
if ctoken is not None:
additional_context['continuation'] = ctoken
context = makeWebContext(additional_context)
response = requests.post(
'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
headers = ythdd_globals.getHeaders(),
json = context,
)
response_json = json.loads(response.text)
return response_json
def WEBextractSearchResults(search_query: str) -> list:
# Posts a search request to innertube API
# and processes only the relevant part (the actual results)
if search_query is None:
return []
web_context = makeWebContext({"query": search_query})
response = requests.post('https://www.youtube.com/youtubei/v1/search',
params={"prettyPrint": False},
headers=stage2_headers,
data=json.dumps(web_context)
)
results = []
try:
results = json.loads(response.text)
except:
pass
results = safeTraverse(results, ["contents", "twoColumnSearchResultsRenderer", "primaryContents", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents"], default=[])
return results
def WEBgetSearchSuggestions(query: str, previous_query: str = '') -> list:
# Takes in a search query and returns relevant suggestions.
# Can optionally take the previous query but that's rather novel and
# not supported across players nor invidious API itself.
suggestions = []
if not isinstance(query, str):
print("WEBgetSearchSuggestions: query is not a string (as it should)")
return {}
if not isinstance(previous_query, str):
previous_query = ''
if ythdd_globals.config["general"]["cache"]:
# look for cached suggestions
for cached_search in ythdd_globals.general_cache["search"]:
if cached_search["q"] == query.lower() and cached_search["pq"] == previous_query.lower():
# found it? skip ahead
suggestions = cached_search["resp"]
break
# request wasn't cached? query the API
if suggestions == []:
params = {
'ds': 'yt',
'hl': 'en', # host language
'gl': 'us', # geolocation
'client': 'youtube',
'gs_ri': 'youtube',
'q': query, # query
'pq': previous_query # previous query
}
response = requests.get(
'https://suggestqueries-clients6.youtube.com/complete/search',
params=params,
headers=stage2_headers
)
# can break anytime but hopefully the tiny speed gain will make up for it
results = response.text[23 + len(query):]
results = results[:results.rfind("{") - 1]
results = json.loads(results)
for result in results:
suggestions.append(result[0])
# cache response
if ythdd_globals.config["general"]["cache"]:
ythdd_globals.general_cache["search"].append(
{
"q": query.lower(),
"pq": previous_query.lower(),
"resp": suggestions
}
)
return {
"query": query,
"suggestions": suggestions
}