Files
ythdd/ythdd_extractor.py

613 lines
21 KiB
Python

#!/usr/bin/python3
import brotli, yt_dlp, requests, json, time
from http.cookiejar import MozillaCookieJar
from ythdd_globals import safeTraverse
import ythdd_proto
import ythdd_globals
ytdl_opts = {
#"format": "bv*[height<=720]+ba", # to be defined by the user
#"getcomments": True,
#"extractor_args": {"maxcomments": ...},
#"writeinfojson": True,
#"progress_hooks": my_hook,
"outtmpl": {
"default": "%(id)s.%(ext)s",
"chapter": "%(id)s.%(ext)s_%(section_number)03d_%(section_title)s.%(ext)s"
},
"extractor_args": {
"youtube": {
# "formats": ["dashy"]
}
},
"simulate": True,
"js_runtimes": {
"deno": {}
},
'remote_components': ['ejs:github']
}
stage1_headers = {
"Connection": "keep-alive",
"User-Agent": "com.google.ios.youtube/19.45.4 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-us,en;q=0.5",
"Sec-Fetch-Mode": "navigate",
"Content-Type": "application/json",
"X-Youtube-Client-Name": "5",
"X-Youtube-Client-Version": "19.45.4",
"Origin": "https://www.youtube.com",
"Accept-Encoding": "gzip, deflate, br",
"Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI"
}
stage1_body = {
"context":
{
"client":
{
"clientName": "IOS",
"clientVersion": "19.45.4",
"deviceMake": "Apple",
"deviceModel": "iPhone16,2",
"userAgent": "com.google.ios.youtube/19.45.4 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)",
"osName": "iPhone",
"osVersion": "18.1.0.22B83",
"hl": "en",
"timeZone": "UTC",
"utcOffsetMinutes": 0
}
},
#"videoId": uri,
"playbackContext":
{
"contentPlaybackContext":
{
"html5Preference": "HTML5_PREF_WANTS"
}
},
"contentCheckOk": True,
"racyCheckOk": True
}
stage2_headers = {
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-us,en;q=0.5",
"Sec-Fetch-Mode": "navigate",
"Accept-Encoding": "gzip, deflate, br"
}
stage3_headers = {
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-us,en;q=0.5",
"Sec-Fetch-Mode": "navigate",
"Content-Type": "application/json",
"X-Youtube-Client-Name": "1",
"X-Youtube-Client-Version": "2.20251103.01.00",
"Origin": "https://www.youtube.com",
"Accept-Encoding": "gzip, deflate, br",
"Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI"
}
stage3_body = {
"context":
{
"client":
{
"clientName": "WEB",
"clientVersion": "2.20251103.01.00",
"hl": "en",
"timeZone": "UTC",
"utcOffsetMinutes": 0
}
},
#"videoId": uri,
"contentCheckOk": True,
"racyCheckOk": True
}
web_context_dict = {
'context': {
'client': {
'hl': 'en',
'gl': 'US',
'deviceMake': '',
'deviceModel': '',
'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0,gzip(gfe)',
'clientName': 'WEB',
'clientVersion': '2.20251103.01.00',
'osName': 'Windows',
'osVersion': '10.0',
'screenPixelDensity': 2,
'platform': 'DESKTOP',
'screenDensityFloat': 2,
'userInterfaceTheme': 'USER_INTERFACE_THEME_LIGHT',
'browserName': 'Firefox',
'browserVersion': '142.0',
'acceptHeader': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'utcOffsetMinutes': 0,
}
}
}
def extract(url: str, getcomments=False, maxcomments="", manifest_fix=False, use_cookies=None):
# TODO: check user-agent and cookiefile
ytdl_context = ytdl_opts.copy()
if ythdd_globals.config['extractor']['user-agent']:
yt_dlp.utils.std_headers['User-Agent'] = ythdd_globals.config['extractor']['user-agent']
if len(url) == 11:
url = "https://www.youtube.com/watch?v=" + url
if getcomments:
ytdl_context['getcomments'] = True
if maxcomments:
ytdl_context['extractor_args']['youtube']['max_comments'] = [maxcomments, "all", "all", "all"]
if manifest_fix:
# https://github.com/yt-dlp/yt-dlp/issues/11952#issuecomment-2565802294
ytdl_context['extractor_args']['youtube']['player_client'] = ['default', 'web_safari']
elif ythdd_globals.config['extractor']['preferred_extractor']:
ytdl_context['extractor_args']['youtube']['player_client'] = [ythdd_globals.config['extractor']['preferred_extractor']]
else:
ytdl_context['extractor_args']['youtube']['player_client'] = ['android_vr']
if use_cookies is not None:
# can be either "global", "agegated" or None
deno_path = ythdd_globals.config['extractor']['deno_path']
match use_cookies:
case "global":
ytdl_context['cookiefile'] = ythdd_globals.config['extractor']['cookies_path']
ytdl_context['extractor_args']['youtube']['player_client'] = ['tv']
if not deno_path:
print("FATAL ERROR: deno path is required for playback using cookies!")
ytdl_context['js_runtimes']['deno']['path'] = deno_path if deno_path else ""
case "agegated":
ytdl_context['cookiefile'] = ythdd_globals.config['extractor']['age_restricted_cookies_path']
ytdl_context['extractor_args']['youtube']['player_client'] = ['tv']
if not deno_path:
print("FATAL ERROR: deno path is required for playback of age-restricted content!")
ytdl_context['js_runtimes']['deno']['path'] = deno_path if deno_path else ""
case None | _:
pass
with yt_dlp.YoutubeDL(ytdl_context) as ytdl:
result = ytdl.sanitize_info(ytdl.extract_info(url, download=False))
return result
def WEBrelated(url: str):
# WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME
if len(url) == 11:
params = {'v': url}
else:
videoId = url.find("https://www.youtube.com/watch?v=") # len() = 32
if videoId == -1:
raise BaseException
videoId = url[32:44]
params = {'v': videoId}
response = requests.get(url, headers=ythdd_globals.getHeaders(caller='extractor'), params=params)
extracted_string = str(response.content.decode('utf8', 'unicode_escape'))
start = extracted_string.find('{"responseContext":{"serviceTrackingParams":')
start2 = extracted_string.find('{"responseContext":{"serviceTrackingParams":', start + 1)
end = extracted_string.find(';</script>', start2)
extracted_json = json.loads(extracted_string[start2:end])
return extracted_json["contents"]['twoColumnWatchNextResults']["secondaryResults"]
def WEBextractSinglePage(uri: str, use_cookies=None):
# WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME
start_time = time.time()
if len(uri) != 11:
raise ValueError("WEBextractSinglePage expects a single, 11-character long argument")
cookies = None
if use_cookies is not None:
match use_cookies:
case "global":
ythdd_globals.print_debug("wdata: using global cookies")
cookies = MozillaCookieJar(ythdd_globals.config["extractor"]["cookies_path"])
cookies.load()
case "agegated":
ythdd_globals.print_debug("wdata: using agegated cookies")
cookies = MozillaCookieJar(ythdd_globals.config["extractor"]["age_restricted_cookies_path"])
cookies.load()
case None | _:
pass
response = requests.get("https://www.youtube.com/watch?v=" + uri, headers=ythdd_globals.getHeaders(caller='extractor'), cookies=cookies)
extracted_string = str(response.content.decode('utf8', 'unicode_escape'))
start = extracted_string.find('{"responseContext":')
end = extracted_string.find(';var ', start)
start2 = extracted_string.find('{"responseContext":', start + 1)
end2 = extracted_string.find(';</script>', start2)
extracted_json1 = json.loads(extracted_string[start:end])
extracted_json2 = json.loads(extracted_string[start2:end2])
end_time = time.time()
return {'ec1': extracted_json1, 'ec2': extracted_json2, 'took': end_time - start_time}
def paramsFromUrl(url: str) -> dict:
# Returns a dictionary of params from a given URL.
split_list = url.split("&")
params = {}
for num, string in enumerate(split_list):
if num == 0:
string = string[string.find("?") + 1:]
key, value = string.split("=")
params[key] = value
return params
def IOSextract(uri: str):
start = time.time()
if len(uri) != 11:
raise ValueError("IOSextract expects a single, 11-character long uri as an argument")
stage1_body['videoId'] = uri
stage1_h = requests.post("https://www.youtube.com/youtubei/v1/player?prettyPrint=false", headers=stage1_headers, json=stage1_body)
stage1 = json.loads(stage1_h.content.decode('utf-8'))
#stage2_h = requests.get(stage1['streamingData']['hlsManifestUrl'], headers=stage2_headers)
#stage2 = stage2_h.content.decode('utf-8')
stage3_body['videoId'] = uri
stage3_h = requests.post("https://www.youtube.com/youtubei/v1/next?prettyPrint=false", headers=stage3_headers, json=stage3_body)
stage3 = json.loads(stage3_h.content.decode('utf-8'))
end = time.time()
#return {'stage1': stage1, 'stage2': stage2, 'stage3': stage3, 'took': end - start}
return {'stage1': stage1, 'stage3': stage3, 'took': end - start}
def makeWebContext(secondaryContextDict: dict):
# Uses web_context_dict to create a context, returns a dict.
# Essentially, expands the web_context_dict with a secondary one.
current_web_context_dict = web_context_dict.copy()
for key in secondaryContextDict:
current_web_context_dict[key] = secondaryContextDict[key]
return current_web_context_dict
def getChannelAvatar(response_json: dict):
# Returns a dictionary: {url: <proxied url to remote server>, width: ..., height: ...}
# containing the best resolution in terms of pixel count.
# A great majority of the code has been influenced by https://github.com/iv-org/invidious/blob/master/src/invidious/channels/about.cr.
avatars = safeTraverse(response_json, ['metadata', 'channelMetadataRenderer', 'avatar', 'thumbnails'], default=None)
if avatars is None:
# fallback to lower resolution avatars
avatars = safeTraverse(response_json, ['header',
'pageHeaderRenderer',
'content',
'pageHeaderViewModel',
'image',
'decoratedAvatarViewModel',
'avatar',
'avatarViewModel',
'image',
'sources'], default=None)
# if avatars is None: # TODO: if avatars is still None, use a local avatar
best_avatar = avatars[-1] # usually, the best avatar is stored last
for avatar in avatars:
if avatar['width'] * avatar['height'] > best_avatar['width'] * best_avatar['height']:
best_avatar = avatar
# or use regex substitution and set the size to something like 512x512
# e.g.: =s128 -> =s512
best_avatar['url'] = ythdd_globals.translateLinks(best_avatar['url'])
return best_avatar
def generateChannelAvatarsFromUrl(url: str, proxied: bool = True) -> list:
# Generates channel avatars at default sizes.
# avatar urls for channels in search results start with //yt3.ggpht.com/
if url.startswith("//"):
url = "https:" + url
avatars = []
if not url.startswith("https://yt3.ggpht.com/") and not url.startswith("https://yt3.googleusercontent.com/"):
return []
url = ythdd_globals.translateLinks(url)
url_size_start = url.rfind("=s") + 2
url_size_end = url. find("-", url_size_start)
default_sizes = [32, 48, 76, 100, 176, 512]
for size in default_sizes:
avatars.append(
{
"url": url[:url_size_start] + str(size) + url[url_size_end:],
"width": size,
"height": size
}
)
return avatars
def isVerified(response_json: dict) -> bool:
# Returns True if any user badge has been found (verified/artist).
if not isinstance(response_json, dict):
return False
match safeTraverse(list(response_json.keys()), [0], default=""):
case "metadataBadgeRenderer": # channels in search results
verified = safeTraverse(response_json, ["metadataBadgeRenderer", "tooltip"], default="") in ("Verified", "Official Artist Channel") # perhaps look for badge styles?
return verified
return False
def isPremium(response_json: dict) -> bool:
# Returns True if content is paid (member-only).
if not isinstance(response_json, dict):
return False
match safeTraverse(list(response_json.keys()), [0], default=""):
case "metadataBadgeRenderer": # channels in search results
paid = safeTraverse(response_json, ["metadataBadgeRenderer", "style"], default="") in ("BADGE_STYLE_TYPE_MEMBERS_ONLY")
return paid
return False
def browseChannel(ucid: str, params: str = None, ctoken: str = None):
# Returns the response from innertubes browse endpoint for channels (as a dict).
if len(ucid) != 24:
raise ValueError(f"Something is wrong with the UCID {ucid}. Expected a 24-character long channel ID, not {len(ucid)}.")
additional_context = {'browseId': ucid}
if params is not None:
additional_context['params'] = params
if ctoken is not None:
additional_context['continuation'] = ctoken
context = makeWebContext(additional_context)
response = requests.post(
'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
headers = ythdd_globals.getHeaders(),
json = context,
)
response_json = json.loads(response.text)
return response_json
def WEBextractSearchResults(search_query: str, page: int) -> list:
# Posts a search request to innertube API
# and processes only the relevant part (the actual results)
if search_query is None:
return []
additional_context = {"query": search_query}
if page is not None:
params = ythdd_proto.produceSearchParams(page)
additional_context["params"] = params
web_context = makeWebContext(additional_context)
response = requests.post('https://www.youtube.com/youtubei/v1/search',
params={"prettyPrint": False},
headers=stage2_headers,
data=json.dumps(web_context)
)
results = []
try:
results = json.loads(response.text)
except:
pass
results = safeTraverse(results, ["contents", "twoColumnSearchResultsRenderer", "primaryContents", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents"], default=[])
return results
def WEBgetSearchSuggestions(query: str, previous_query: str = '') -> list:
# Takes in a search query and returns relevant suggestions.
# Can optionally take the previous query but that's rather novel and
# not supported across players nor invidious API itself.
suggestions = []
if not isinstance(query, str):
print("WEBgetSearchSuggestions: query is not a string (as it should)")
return {}
if not isinstance(previous_query, str):
previous_query = ''
if ythdd_globals.config["general"]["cache"]:
# look for cached suggestions
for cached_search in ythdd_globals.general_cache["search"]:
if cached_search["q"] == query.lower() and cached_search["pq"] == previous_query.lower():
# found it? skip ahead
suggestions = cached_search["resp"]
break
# request wasn't cached? query the API
if suggestions == []:
params = {
'ds': 'yt',
'hl': 'en', # host language
'gl': 'us', # geolocation
'client': 'youtube',
'gs_ri': 'youtube',
'q': query, # query
'pq': previous_query # previous query
}
response = requests.get(
'https://suggestqueries-clients6.youtube.com/complete/search',
params=params,
headers=stage2_headers
)
# can break anytime but hopefully the tiny speed gain will make up for it
results = response.text[23 + len(query):]
results = results[:results.rfind("{") - 1]
results = json.loads(results)
for result in results:
suggestions.append(result[0])
# cache response
if ythdd_globals.config["general"]["cache"]:
ythdd_globals.general_cache["search"].append(
{
"q": query.lower(),
"pq": previous_query.lower(),
"resp": suggestions
}
)
return {
"query": query,
"suggestions": suggestions
}
def WEBgetVideoComments(ctoken: str) -> tuple:
# ctoken needs to be passed explicitly.
# no guessing or retrieving it from globals.
if ctoken is None:
return [], ""
# build web context containing the relevant ctoken
web_context = makeWebContext({"continuation": ctoken})
response = requests.post('https://www.youtube.com/youtubei/v1/next',
params={"prettyPrint": False},
headers=stage2_headers,
data=json.dumps(web_context)
)
results = []
try:
results = json.loads(response.text)
except:
pass
comments = safeTraverse(results, ["frameworkUpdates", "entityBatchUpdate", "mutations"], default=[])
comment_continuations = []
comment_continuations_re = safeTraverse(results, ["onResponseReceivedEndpoints"], default=[])
for received_endpoint in comment_continuations_re:
# this is horrible...
acia = safeTraverse(received_endpoint, ["appendContinuationItemsAction", "continuationItems"], default=[])
rcic = safeTraverse(received_endpoint, ["reloadContinuationItemsCommand", "continuationItems"], default=[])
for entry in acia:
if "commentThreadRenderer" in entry or "continuationItemRenderer" in entry:
comment_continuations = acia
break
for entry in rcic:
if "commentThreadRenderer" in entry or "continuationItemRenderer" in entry:
comment_continuations = rcic
break
if comment_continuations != []:
break
if comment_continuations == []:
print("error: received an unknown comment structure, unable to parse continuations (replies)")
# breakpoint()
# return [], ""
# extract new continuation
new_continuation = ""
if "continuationItemRenderer" in safeTraverse(comment_continuations, [-1], default=[]):
# first, look for ctoken inside of response for next page of comments
new_continuation = safeTraverse(comment_continuations, [-1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default=None)
# or search elsewhere in case this is a reply thread
if new_continuation is None:
new_continuation = safeTraverse(comment_continuations, [-1, "continuationItemRenderer", "button", "buttonRenderer", "command", "continuationCommand", "token"], default="")
# perform a basic mutation check before parsing
# will ignore replies liked by video uploader ("hearts")
actual_comments = [x for x in comments if "properties" in safeTraverse(x, ["payload", "commentEntityPayload"], default=[], quiet=True)]
actual_comment_continuations = [x for x in comment_continuations if "replies" in safeTraverse(x, ["commentThreadRenderer"], default=[], quiet=True)]
# link reply data (reply count and ctoken) for comments with replies
for reply_renderer in actual_comment_continuations:
mutual_key = safeTraverse(reply_renderer, ["commentThreadRenderer", "commentViewModel", "commentViewModel", "commentKey"], default="unknown-key")
reply_ctoken = safeTraverse(reply_renderer, ["commentThreadRenderer", "replies", "commentRepliesRenderer", "contents", 0, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="")
reply_count = safeTraverse(reply_renderer, ["commentThreadRenderer", "replies", "commentRepliesRenderer", "viewReplies", "buttonRenderer", "text", "runs", 0, "text"], default="0 replies").split(" ")[0]
# suspected a/b test. can be worked arount with on-demand ctoken creation.
# workaround for yt not showing replies when sorting for "top" comments
try:
int(reply_count) # can be just "Replies"
except:
reply_count = "0"
for comment in actual_comments:
found_key = safeTraverse(comment, ["entityKey"], default="unknown-key")
# try to link a relevant ctoken if a comment has response
if found_key == mutual_key:
if ythdd_globals.config["general"]["debug"]: print(f"found reply for {found_key}")
comment["replies"] = {
"replyCount": int(reply_count),
"continuation": reply_ctoken
}
return actual_comments, new_continuation
def WEBextractPlaylist(plid: str = "", ctoken: str = ""):
# if ctoken has been provided, use it
if ctoken:
# playlist id can be omitted if ctoken is provided
additional_context = {'continuation': ctoken}
else:
# try to create ctoken which will allow for accessing the full playlist, including delisted/deleted videos
additional_context = {'continuation': ythdd_proto.producePlaylistContinuation(plid, offset=0)}
context = makeWebContext(additional_context)
response = requests.post(
'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
headers = ythdd_globals.getHeaders(),
json = context
)
resp_json = json.loads(response.text)
# if this is a first-time fetch (no ctoken passed), extract metadata
metadata = None
if not ctoken:
metadata = {
"microformat": safeTraverse(resp_json, ["microformat"]),
"sidebar": safeTraverse(resp_json, ["sidebar"])
}
# extract continuation
new_continuation = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems", -1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"])
# "best-effort" playlist's videos extraction
# "best-effort" because None's (unsuccessful video extraction = None) are passed as they are
videos = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"]) # includes continuation as last element of list, which will be ignored
return metadata, new_continuation, videos