#!/usr/bin/python3 import brotli, yt_dlp, requests, json, time from ythdd_globals import safeTraverse import ythdd_globals ytdl_opts = { #"format": "bv*[height<=720]+ba", # to be defined by the user #"getcomments": True, #"extractor_args": {"maxcomments": ...}, #"writeinfojson": True, #"progress_hooks": my_hook, "outtmpl": { "default": "%(id)s.%(ext)s", "chapter": "%(id)s.%(ext)s_%(section_number)03d_%(section_title)s.%(ext)s" }, "extractor_args": { "youtube": { # "formats": ["dashy"] } }, "simulate": True } stage1_headers = { "Connection": "keep-alive", "User-Agent": "com.google.ios.youtube/19.45.4 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-us,en;q=0.5", "Sec-Fetch-Mode": "navigate", "Content-Type": "application/json", "X-Youtube-Client-Name": "5", "X-Youtube-Client-Version": "19.45.4", "Origin": "https://www.youtube.com", "Accept-Encoding": "gzip, deflate, br", "Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI" } stage1_body = { "context": { "client": { "clientName": "IOS", "clientVersion": "19.45.4", "deviceMake": "Apple", "deviceModel": "iPhone16,2", "userAgent": "com.google.ios.youtube/19.45.4 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)", "osName": "iPhone", "osVersion": "18.1.0.22B83", "hl": "en", "timeZone": "UTC", "utcOffsetMinutes": 0 } }, #"videoId": uri, "playbackContext": { "contentPlaybackContext": { "html5Preference": "HTML5_PREF_WANTS" } }, "contentCheckOk": True, "racyCheckOk": True } stage2_headers = { "Connection": "keep-alive", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-us,en;q=0.5", "Sec-Fetch-Mode": "navigate", "Accept-Encoding": "gzip, deflate, br" } stage3_headers = { "Connection": "keep-alive", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-us,en;q=0.5", "Sec-Fetch-Mode": "navigate", "Content-Type": "application/json", "X-Youtube-Client-Name": "1", "X-Youtube-Client-Version": "2.20250911.00.00", "Origin": "https://www.youtube.com", "Accept-Encoding": "gzip, deflate, br", "Cookie": "PREF=hl=en&tz=UTC; SOCS=CAI" } stage3_body = { "context": { "client": { "clientName": "WEB", "clientVersion": "2.20250911.00.00", "hl": "en", "timeZone": "UTC", "utcOffsetMinutes": 0 } }, #"videoId": uri, "contentCheckOk": True, "racyCheckOk": True } web_context_dict = { 'context': { 'client': { 'hl': 'en', 'gl': 'US', 'deviceMake': '', 'deviceModel': '', 'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0,gzip(gfe)', 'clientName': 'WEB', 'clientVersion': '2.20250911.00.00', 'osName': 'Windows', 'osVersion': '10.0', 'screenPixelDensity': 2, 'platform': 'DESKTOP', 'screenDensityFloat': 2, 'userInterfaceTheme': 'USER_INTERFACE_THEME_LIGHT', 'browserName': 'Firefox', 'browserVersion': '142.0', 'acceptHeader': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'utcOffsetMinutes': 0, } } } def extract(url: str, getcomments=False, maxcomments="", manifest_fix=False): # TODO: check user-agent and cookiefile if ythdd_globals.config['extractor']['user-agent']: yt_dlp.utils.std_headers['User-Agent'] = ythdd_globals.config['extractor']['user-agent'] if ythdd_globals.config['extractor']['cookies_path']: ytdl_opts['cookiefile'] = ythdd_globals.config['extractor']['cookies_path'] if len(url) == 11: url = "https://www.youtube.com/watch?v=" + url if getcomments: ytdl_opts['getcomments'] = True if maxcomments: ytdl_opts['extractor_args']['youtube']['max_comments'] = [maxcomments, "all", "all", "all"] if manifest_fix: # https://github.com/yt-dlp/yt-dlp/issues/11952#issuecomment-2565802294 ytdl_opts['extractor_args']['youtube']['player_client'] = ['default', 'web_safari'] with yt_dlp.YoutubeDL(ytdl_opts) as ytdl: result = ytdl.sanitize_info(ytdl.extract_info(url, download=False)) return result def WEBrelated(url: str): # WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME if len(url) == 11: params = {'v': url} else: videoId = url.find("https://www.youtube.com/watch?v=") # len() = 32 if videoId == -1: raise BaseException videoId = url[32:44] params = {'v': videoId} response = requests.get(url, headers=ythdd_globals.getHeaders(caller='extractor'), params=params) extracted_string = str(response.content.decode('utf8', 'unicode_escape')) start = extracted_string.find('{"responseContext":{"serviceTrackingParams":') start2 = extracted_string.find('{"responseContext":{"serviceTrackingParams":', start + 1) end = extracted_string.find(';', start2) extracted_json = json.loads(extracted_string[start2:end]) return extracted_json["contents"]['twoColumnWatchNextResults']["secondaryResults"] def WEBextractSinglePage(uri: str): # WARNING! HIGHLY EXPERIMENTAL, DUE TO BREAK ANYTIME start_time = time.time() if len(uri) != 11: raise ValueError("WEBextractSinglePage expects a single, 11-character long argument") response = requests.get("https://www.youtube.com/watch?v=" + uri, headers=ythdd_globals.getHeaders(caller='extractor')) extracted_string = str(response.content.decode('utf8', 'unicode_escape')) start = extracted_string.find('{"responseContext":{"serviceTrackingParams":') end = extracted_string.find(';var ', start) start2 = extracted_string.find('{"responseContext":{"serviceTrackingParams":', start + 1) end2 = extracted_string.find(';', start2) extracted_json1 = json.loads(extracted_string[start:end]) extracted_json2 = json.loads(extracted_string[start2:end2]) end_time = time.time() return {'ec1': extracted_json1, 'ec2': extracted_json2, 'took': end_time - start_time} def paramsFromUrl(url: str) -> dict: # Returns a dictionary of params from a given URL. split_list = url.split("&") params = {} for num, string in enumerate(split_list): if num == 0: string = string[string.find("?") + 1:] key, value = string.split("=") params[key] = value return params def IOSextract(uri: str): start = time.time() if len(uri) != 11: raise ValueError("IOSextract expects a single, 11-character long uri as an argument") stage1_body['videoId'] = uri stage1_h = requests.post("https://www.youtube.com/youtubei/v1/player?prettyPrint=false", headers=stage1_headers, json=stage1_body) stage1 = json.loads(stage1_h.content.decode('utf-8')) #stage2_h = requests.get(stage1['streamingData']['hlsManifestUrl'], headers=stage2_headers) #stage2 = stage2_h.content.decode('utf-8') stage3_body['videoId'] = uri stage3_h = requests.post("https://www.youtube.com/youtubei/v1/next?prettyPrint=false", headers=stage3_headers, json=stage3_body) stage3 = json.loads(stage3_h.content.decode('utf-8')) end = time.time() #return {'stage1': stage1, 'stage2': stage2, 'stage3': stage3, 'took': end - start} return {'stage1': stage1, 'stage3': stage3, 'took': end - start} def makeWebContext(secondaryContextDict: dict): # Uses web_context_dict to create a context, returns a dict. # Essentially, expands the web_context_dict with a secondary one. current_web_context_dict = web_context_dict.copy() for key in secondaryContextDict: current_web_context_dict[key] = secondaryContextDict[key] return current_web_context_dict def getChannelAvatar(response_json: dict): # Returns a dictionary: {url: , width: ..., height: ...} # containing the best resolution in terms of pixel count. # A great majority of the code has been influenced by https://github.com/iv-org/invidious/blob/master/src/invidious/channels/about.cr. avatars = safeTraverse(response_json, ['metadata', 'channelMetadataRenderer', 'avatar', 'thumbnails'], default=None) if avatars is None: # fallback to lower resolution avatars avatars = safeTraverse(response_json, ['header', 'pageHeaderRenderer', 'content', 'pageHeaderViewModel', 'image', 'decoratedAvatarViewModel', 'avatar', 'avatarViewModel', 'image', 'sources'], default=None) # if avatars is None: # TODO: if avatars is still None, use a local avatar best_avatar = avatars[-1] # usually, the best avatar is stored last for avatar in avatars: if avatar['width'] * avatar['height'] > best_avatar['width'] * best_avatar['height']: best_avatar = avatar # or use regex substitution and set the size to something like 512x512 # e.g.: =s128 -> =s512 best_avatar['url'] = ythdd_globals.translateLinks(best_avatar['url']) return best_avatar def generateChannelAvatarsFromUrl(url: str, proxied: bool = True) -> list: # Generates channel avatars at default sizes. # avatar urls for channels in search results start with //yt3.ggpht.com/ if url.startswith("//yt3.ggpht.com/"): url = url.replace("//yt3.ggpht.com/", "https://yt3.ggpht.com/") avatars = [] if not url.startswith("https://yt3.ggpht.com/") and not url.startswith("https://yt3.googleusercontent.com/"): return [] url = ythdd_globals.translateLinks(url) url_size_start = url.rfind("=s") + 2 url_size_end = url. find("-", url_size_start) default_sizes = [32, 48, 76, 100, 176, 512] for size in default_sizes: avatars.append( { "url": url[:url_size_start] + str(size) + url[url_size_end:], "width": size, "height": size } ) return avatars def isVerified(response_json: dict) -> bool: # Returns True if any user badge has been found (verified/artist). if not isinstance(response_json, dict): return False match safeTraverse(list(response_json.keys()), [0], default=""): case "metadataBadgeRenderer": # channels in search results verified = safeTraverse(response_json, ["metadataBadgeRenderer", "tooltip"], default="") in ("Verified", "Official Artist Channel") # perhaps look for badge styles? return verified return False def isPremium(response_json: dict) -> bool: # Returns True if content is paid (member-only). if not isinstance(response_json, dict): return False match safeTraverse(list(response_json.keys()), [0], default=""): case "metadataBadgeRenderer": # channels in search results paid = safeTraverse(response_json, ["metadataBadgeRenderer", "style"], default="") in ("BADGE_STYLE_TYPE_MEMBERS_ONLY") return paid return False def browseChannel(ucid: str, params: str = None, ctoken: str = None): # Returns the response from innertubes browse endpoint for channels (as a dict). if len(ucid) != 24: raise ValueError(f"Something is wrong with the UCID {ucid}. Expected a 24-character long channel ID, not {len(ucid)}.") additional_context = {'browseId': ucid} if params is not None: additional_context['params'] = params if ctoken is not None: additional_context['continuation'] = ctoken context = makeWebContext(additional_context) response = requests.post( 'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false', headers = ythdd_globals.getHeaders(), json = context, ) response_json = json.loads(response.text) return response_json def WEBextractSearchResults(search_query: str) -> list: # Posts a search request to innertube API # and processes only the relevant part (the actual results) if search_query is None: return [] web_context = makeWebContext({"query": search_query}) response = requests.post('https://www.youtube.com/youtubei/v1/search', params={"prettyPrint": False}, headers=stage2_headers, data=json.dumps(web_context) ) results = [] try: results = json.loads(response.text) except: pass results = safeTraverse(results, ["contents", "twoColumnSearchResultsRenderer", "primaryContents", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents"], default=[]) return results def WEBgetSearchSuggestions(query: str, previous_query: str = '') -> list: # Takes in a search query and returns relevant suggestions. # Can optionally take the previous query but that's rather novel and # not supported across players nor invidious API itself. suggestions = [] if not isinstance(query, str): print("WEBgetSearchSuggestions: query is not a string (as it should)") return {} if not isinstance(previous_query, str): previous_query = '' if ythdd_globals.config["general"]["cache"]: # look for cached suggestions for cached_search in ythdd_globals.general_cache["search"]: if cached_search["q"] == query.lower() and cached_search["pq"] == previous_query.lower(): # found it? skip ahead suggestions = cached_search["resp"] break # request wasn't cached? query the API if suggestions == []: params = { 'ds': 'yt', 'hl': 'en', # host language 'gl': 'us', # geolocation 'client': 'youtube', 'gs_ri': 'youtube', 'q': query, # query 'pq': previous_query # previous query } response = requests.get( 'https://suggestqueries-clients6.youtube.com/complete/search', params=params, headers=stage2_headers ) # can break anytime but hopefully the tiny speed gain will make up for it results = response.text[23 + len(query):] results = results[:results.rfind("{") - 1] results = json.loads(results) for result in results: suggestions.append(result[0]) # cache response if ythdd_globals.config["general"]["cache"]: ythdd_globals.general_cache["search"].append( { "q": query.lower(), "pq": previous_query.lower(), "resp": suggestions } ) return { "query": query, "suggestions": suggestions } def WEBgetVideoComments(ctoken: str) -> tuple: # ctoken needs to be passed explicitly. # no guessing or retrieving it from globals. if ctoken is None: return [], "" # build web context containing the relevant ctoken web_context = makeWebContext({"continuation": ctoken}) response = requests.post('https://www.youtube.com/youtubei/v1/next', params={"prettyPrint": False}, headers=stage2_headers, data=json.dumps(web_context) ) results = [] try: results = json.loads(response.text) except: pass comments = safeTraverse(results, ["frameworkUpdates", "entityBatchUpdate", "mutations"], default=[]) comment_continuations = [] comment_continuations_re = safeTraverse(results, ["onResponseReceivedEndpoints"], default=[]) for received_endpoint in comment_continuations_re: # this is horrible... acia = safeTraverse(received_endpoint, ["appendContinuationItemsAction", "continuationItems"], default=[]) rcic = safeTraverse(received_endpoint, ["reloadContinuationItemsCommand", "continuationItems"], default=[]) for entry in acia: if "commentThreadRenderer" in entry or "continuationItemRenderer" in entry: comment_continuations = acia break for entry in rcic: if "commentThreadRenderer" in entry or "continuationItemRenderer" in entry: comment_continuations = rcic break if comment_continuations != []: break if comment_continuations == []: print("error: received an unknown comment structure, unable to parse continuations (replies)") # breakpoint() # return [], "" # extract new continuation new_continuation = "" if "continuationItemRenderer" in safeTraverse(comment_continuations, [-1], default=[]): # first, look for ctoken inside of response for next page of comments new_continuation = safeTraverse(comment_continuations, [-1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default=None) # or search elsewhere in case this is a reply thread if new_continuation is None: new_continuation = safeTraverse(comment_continuations, [-1, "continuationItemRenderer", "button", "buttonRenderer", "command", "continuationCommand", "token"], default="") # perform a basic mutation check before parsing # will ignore replies liked by video uploader ("hearts") actual_comments = [x for x in comments if "properties" in safeTraverse(x, ["payload", "commentEntityPayload"], default=[], quiet=True)] actual_comment_continuations = [x for x in comment_continuations if "replies" in safeTraverse(x, ["commentThreadRenderer"], default=[], quiet=True)] # link reply data (reply count and ctoken) for comments with replies for reply_renderer in actual_comment_continuations: mutual_key = safeTraverse(reply_renderer, ["commentThreadRenderer", "commentViewModel", "commentViewModel", "commentKey"], default="unknown-key") reply_ctoken = safeTraverse(reply_renderer, ["commentThreadRenderer", "replies", "commentRepliesRenderer", "contents", 0, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"], default="") reply_count = safeTraverse(reply_renderer, ["commentThreadRenderer", "replies", "commentRepliesRenderer", "viewReplies", "buttonRenderer", "text", "runs", 0, "text"], default="0 replies").split(" ")[0] for comment in actual_comments: found_key = safeTraverse(comment, ["entityKey"], default="unknown-key") # try to link a relevant ctoken if a comment has response if found_key == mutual_key: if ythdd_globals.config["general"]["debug"]: print(f"found reply for {found_key}") comment["replies"] = { "replyCount": int(reply_count), "continuation": reply_ctoken } return actual_comments, new_continuation