from ythdd_globals import safeTraverse from html import escape import json import dateparser import ythdd_globals import ythdd_extractor DEFAULT_AVATAR = "https://yt3.ggpht.com/a/default-user=s176-c-k-c0x00ffffff-no-rj" def genThumbs(videoId: str): result = [] thumbnails = [ {'height': 720, 'width': 1280, 'quality': "maxres", 'url': "maxres"}, # will always attempt to return the best quality available {'height': 720, 'width': 1280, 'quality': "maxresdefault", 'url': "maxresdefault"}, {'height': 480, 'width': 640, 'quality': "sddefault", 'url': "sddefault"}, {'height': 360, 'width': 480, 'quality': "high", 'url': "hqdefault"}, {'height': 180, 'width': 320, 'quality': "medium", 'url': "mqdefault"}, {'height': 90, 'width': 120, 'quality': "default", 'url': "default"}, {'height': 90, 'width': 120, 'quality': "start", 'url': "1"}, {'height': 90, 'width': 120, 'quality': "middle", 'url': "2"}, {'height': 90, 'width': 120, 'quality': "end", 'url': "3"}, ] for x in thumbnails: width = x['width'] height = x['height'] quality = x['quality'] url = ythdd_globals.config['general']['public_facing_url'] + 'vi/' + videoId + '/' + x['url'] + '.jpg' result.append({'quality': quality, 'url': url, 'width': width, 'height': height}) return result def doesContainNumber(string: str, numeric_system: int = 10) -> bool: try: number = int(string, numeric_system) return True except ValueError: return False raise BaseException("doesContainNumber(): Unknown error while determining if a string contains a number") def parseLengthFromTimeBadge(time_str: str) -> int: # Returns 0 if unsuccessful length = 0 time_lookup_list = [1, 60, 3_600, 86_400] time_list = time_str.split(":") if False in map(doesContainNumber, time_list): # works around ['LIVE'] for livestreams or ['Upcoming'] for scheduled videos pass else: for z in range(len(time_list)): length += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z]) return length def parseViewsFromViewText(viewcounttext: str) -> int: # Returns 0 if unsuccessful views = 0 magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000} if viewcounttext: if viewcounttext.lower() == "no": viewcounttext = "0" views = float("0" + "".join([z for z in viewcounttext if 48 <= ord(z) and ord(z) <= 57 or ord(z) == 46])) viewcounttext = viewcounttext.split(" ")[0] for x in magnitude.keys(): if x == viewcounttext[-1].upper(): views *= magnitude[x] return int(views) def parseRenderers(entry: dict, context: dict = {}) -> dict: if not isinstance(entry, dict): raise ValueError("parsed entry is not of type dict") match safeTraverse(list(entry.keys()), [0], default=""): case "videoRenderer": # represents a video published_date = safeTraverse(entry, ["videoRenderer", "publishedTimeText", "simpleText"], default="now") published_date = published_date.removeprefix("Streamed ") description, description_html = parseDescriptionSnippet(safeTraverse(entry, ["videoRenderer", "descriptionSnippet", "runs"], default=[])) collaborative = False if "author_name" in context: author_name = context["author_name"] else: author_name = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "text"], default="Unknown author") if "author_ucid" in context: author_ucid = context["author_ucid"] else: author_ucid = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID") if author_ucid == "UNKNOWNCHANNELID": # this is a first indicator that a video is a collaborative (has multiple authors) # if that's the case, let's take the first author's ucid as the ucid collaborative = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "header", "dialogHeaderViewModel", "headline", "content"]) == "Collaborators" if "verified" in context: verified = context["verified"] else: verified = ythdd_extractor.isVerified(safeTraverse(entry, ["videoRenderer", "ownerBadges", 0])) if "avatar" in context: avatar_url = context["avatar"] else: avatar_url = safeTraverse(entry, ["videoRenderer", "avatar", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR) views_or_viewers_model = safeTraverse(entry, ["videoRenderer", "viewCountText"], default={}) if "simpleText" in views_or_viewers_model: # means this is a video with X views view_count = parseViewsFromViewText(entry["videoRenderer"]["viewCountText"]["simpleText"]) view_count_text = entry["videoRenderer"]["viewCountText"]["simpleText"] elif "runs" in views_or_viewers_model: # means this is a livestream with X concurrent viewers view_count = parseViewsFromViewText(entry["videoRenderer"]["viewCountText"]["runs"][0]["text"] + " watching") view_count_text = entry["videoRenderer"]["viewCountText"]["runs"][0]["text"] + " watching" else: # unknown model, assume no views view_count = 0 view_count_text = "Unknown amount of views" if collaborative: livm = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "customContent", "listViewModel", "listItems"], default=[]) if "author_name" not in context: # override the default "name1 and others" or "name1 and name2" text # with full author info all_authors = [] for collaborative_author in livm: collaborative_author_name = safeTraverse(collaborative_author, ["listItemViewModel", "title", "content"]) if collaborative_author_name is not None: all_authors.append(collaborative_author_name) if all_authors != []: # check if custom extraction succeeded author_name = ", ".join(all_authors) if author_ucid == "UNKNOWNCHANNELID": # retrieve main author's ucid author_ucid = safeTraverse(livm, [0, "listItemViewModel", "title", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID") if safeTraverse(entry, ["videoRenderer", "ownerBadges", 0]) is None: # check if the main author is verified verified = False if safeTraverse(livm, [0, "listItemViewModel", "title", "attachmentRuns", 0, "element", "type", "imageType", "image", "sources", 0, "clientResource", "imageName"]) in ("AUDIO_BADGE", "CHECK_CIRCLE_FILLED"): verified = True if avatar_url == DEFAULT_AVATAR: # retrieve the main channel's avatar avatar_url = safeTraverse(livm, [0, "listItemViewModel", "leadingAccessory", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR) return { "type": "video", "title": safeTraverse(entry, ["videoRenderer", "title", "runs", 0, "text"]), "videoId": safeTraverse(entry, ["videoRenderer", "videoId"]), "author": author_name, "authorId": author_ucid, "authorUrl": "/channel/" + author_ucid, "authorVerified": verified, # TODO "authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url), "videoThumbnails": genThumbs(safeTraverse(entry, ["videoRenderer", "videoId"], default="unknown")), "description": description, "descriptionHtml": description_html, "viewCount": view_count, "viewCountText": view_count_text, "published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too "publishedText": published_date, "lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["videoRenderer", "lengthText", "simpleText"], default="0:0")), "liveNow": False, "premium": ythdd_extractor.isPremium(safeTraverse(entry, ["videoRenderer", "badges", 0])), # will fail if it's not the only badge "isUpcoming": False, "isNew": False, "is4k": False, "is8k": False, "isVr180": False, "isVr360": False, "is3d": False, "hasCaptions": False } # modify the premiere timestamp afterwards here? case "lockupViewModel": # represents playlists/mixes playlist_type = safeTraverse(entry, ["lockupViewModel", "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "icon", "sources", 0, "clientResource", "imageName"], default="PLAYLISTS") if playlist_type == "MIX": # mixes aren't currently supported return lvm = entry["lockupViewModel"] meta = safeTraverse(lvm, ["metadata"], default=[]) lmvm = safeTraverse(meta, ["lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows"], default=[]) thumbnail = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", -1, "url"], default="no-url?") thumbnail = ythdd_globals.translateLinks(thumbnail[:thumbnail.rfind("?")]) verified = safeTraverse(context, ["verified"], default=False) playlist_id = safeTraverse(lvm, ["contentId"], default="UNKNOWNPLAYLISTID") length = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="0 videos") length = parseViewsFromViewText(length.split(" ")[0]) # Turns out for some responses we do some data, while not on others. # Data from context should be prioritized, thus even if something is found with safeTraverse, # the parser will ignore it in favour of the context. ucid = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID") author = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "content"], default="ythdd: unknown author") ucid = safeTraverse(context, ["author_ucid"], default=ucid) author = safeTraverse(context, ["author_name"], default=author) return { "type": "playlist", "title": safeTraverse(meta, ["lockupMetadataViewModel", "title", "content"], default="ythdd: unknown title"), "playlistId": playlist_id, "playlistThumbnail": thumbnail, "author": author, "authorId": ucid, "authorUrl": "/channel/" + ucid, "authorVerified": verified, "videoCount": length, "videos": [] # provided for historical reasons i guess } case "shelfRenderer": # "people also watched" return case "gridShelfViewModel": # shorts? return case "shortsLockupViewModel": # shorts on channel pages video_id = safeTraverse(entry, ["shortsLockupViewModel", "onTap", "innertubeCommand", "reelWatchEndpoint", "videoId"], default="UnknownVideoId") title = safeTraverse(entry, ["shortsLockupViewModel", "overlayMetadata", "primaryText", "content"], default="ythdd: couldn't find title") views_text = safeTraverse(entry, ["shortsLockupViewModel", "overlayMetadata", "secondaryText", "content"], default="No views") published_date = "No data about published time" # the view model doesn't provide data about the date a short is published if video_id == "UnknownVideoId": # failsafe video_id = safeTraverse(entry, ["shortsLockupViewModel", "entityId"], default="-UnknownVideoId") video_id = video_id[video_id.rfind("-") + 1:] if "author_name" in context: author_name = context["author_name"] else: author_name = "Unknown author" if "author_ucid" in context: author_ucid = context["author_ucid"] else: author_ucid = "UNKNOWNCHANNELID" if "verified" in context: verified = context["verified"] else: verified = False if "avatar" in context: avatar_url = context["avatar"] else: avatar_url = "unknown" return { "type": "video", "title": title, "videoId": video_id, "author": author_name, "authorId": author_ucid, "authorUrl": "/channel/" + author_ucid, "authorVerified": False, "videoThumbnails": genThumbs(video_id), "description": "", "descriptionHtml": "", "viewCount": parseViewsFromViewText(views_text), "viewCountText": views_text, "published": int(0), "publishedText": published_date, "lengthSeconds": int(60), # invidious locks this to 60s no matter what the actual duration is "liveNow": False, "premium": False, "isUpcoming": False, "premiereTimestamp": 0, "isNew": False, "is4k": False, "is8k": False, "isVr180": False, "isVr360": False, "is3d": False, "hasCaptions": False } case "gridVideoRenderer": # videos on channel pages # doesn't work on Yattee # thumbnails = safeTraverse(entry, ["gridVideoRenderer", "thumbnail", "thumbnails"], default=[]) # for thumbnail in thumbnails: # thumbnail["url"] = ythdd_globals.translateLinks(thumbnail["url"]) video_id = safeTraverse(entry, ["gridVideoRenderer", "videoId"], default="UnknownVideoId") thumbnails = genThumbs(video_id) published_date = safeTraverse(entry, ["gridVideoRenderer", "publishedTimeText", "simpleText"], default="now") published_date = published_date.removeprefix("Streamed ") return { "type": "video", "title": safeTraverse(entry, ["gridVideoRenderer", "title", "simpleText"], default="unknown video title"), "videoId": video_id, "author": context["author_name"], "authorId": context["author_ucid"], "authorUrl": "/channel/" + context["author_ucid"], "authorVerified": False, # TODO: handle badge related tasks here using context "videoThumbnails": thumbnails, "description": "", # won't work without using an RSS feed (?) "descriptionHtml": "", # -||- "viewCount": parseViewsFromViewText(safeTraverse(entry, ["gridVideoRenderer", "viewCountText", "simpleText"], default="0 views")), "viewCountText": safeTraverse(entry, ["gridVideoRenderer", "shortViewCountText", "simpleText"], default="0 views"), "published": int(dateparser.parse(published_date).timestamp()), "publishedText": published_date, "lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["gridVideoRenderer", "thumbnailOverlays", 0, "thumbnailOverlayTimeStatusRenderer", "text", "simpleText"], default="0:0")), "liveNow": True if published_date == "now" else False, "premium": False, "isUpcoming": False, "isNew": False, "is4k": False, "is8k": False, "isVr180": False, "isVr360": False, "is3d": False, "hasCaptions": False } case "channelRenderer": # channels in search results avatars = ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(entry, ["channelRenderer", "thumbnail", "thumbnails", 0, "url"], default=DEFAULT_AVATAR)) description, description_html = parseDescriptionSnippet(safeTraverse(entry, ["channelRenderer", "descriptionSnippet", "runs"], default=[])) isVerified = ythdd_extractor.isVerified(safeTraverse(entry, ["channelRenderer", "ownerBadges", 0], default=[])) return { "type": "channel", "author": safeTraverse(entry, ["channelRenderer", "title", "simpleText"], default="Unknown channel"), "authorId": safeTraverse(entry, ["channelRenderer", "channelId"], default="UNKNOWNCHANNELID"), "authorUrl": "/channel/" + safeTraverse(entry, ["channelRenderer", "channelId"], default="UNKNOWNCHANNELID"), "authorVerified": isVerified, "authorThumbnails": avatars, "autoGenerated": False, "subCount": parseViewsFromViewText(safeTraverse(entry, ["channelRenderer", "videoCountText", "simpleText"], default="0 subscribers")), "videoCount": 0, "channelHandle": safeTraverse(entry, ["channelRenderer", "navigationEndpoint", "browseEndpoint", "canonicalBaseUrl"], default="/@ythdd_unknown_handle")[1:], "description": description, "descriptionHtml": description_html } case "playlistVideoRenderer": # used by all content inside of playlists which have at least one non-shorts video/livestream video_id = safeTraverse(entry, ["playlistVideoRenderer", "videoId"], default="UnknownVideoId") title = safeTraverse(entry, ["playlistVideoRenderer", "title", "runs", 0, "text"], default="Unknown video title") author_ucid = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"]) author_name = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "text"], default="Unknown author") video_index = int(safeTraverse(entry, ["playlistVideoRenderer", "index", "simpleText"], default="1")) - 1 length = parseLengthFromTimeBadge(safeTraverse(entry, ["playlistVideoRenderer", "lengthText", "simpleText"], default="0:0")) published_date = safeTraverse(entry, ["playlistVideoRenderer", "videoInfo", "runs", -1, "text"], default="2000-01-01") published_date = published_date.removeprefix("Streamed ").removeprefix(" watching") # handle livestreams if not published_date: published_date = "now" if author_ucid is None: # likely a collaborative video, let's try # to fetch the uploader's ucid with that in mind livm = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "customContent", "listViewModel", "listItems"], default=[]) # name extraction logic the same as in videoRenderer all_authors = [] for collaborative_author in livm: collaborative_author_name = safeTraverse(collaborative_author, ["listItemViewModel", "title", "content"]) if collaborative_author_name is not None: all_authors.append(collaborative_author_name) if all_authors != []: author_name = ", ".join(all_authors) author_ucid = safeTraverse(livm, [0, "listItemViewModel", "title", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID") # surprisingly, innertube responds with the avatar of the user that added the video to the playlist # we can extract that information, e.g. for yattee to display avatar_url = safeTraverse(entry, ["playlistVideoRenderer", "thumbnailOverlays", ..., "thumbnailOverlayAvatarStackViewModel", "avatarStack", "avatarStackViewModel", "avatars", 0, "avatarViewModel", "image", "sources", 0, "url"]) avatars = None if avatar_url is None else ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url) return { "type": "video", "title": title, "videoId": video_id, "author": author_name, "authorId": author_ucid, "authorUrl": "/channel/" + author_ucid, "authorThumbnails": avatars, "videoThumbnails": genThumbs(video_id), "index": video_index, "lengthSeconds": length, "liveNow": False, # todo: check this? # these do not need to be returned, but some clients try to read it # so we return an approximation here: "published": int(dateparser.parse(published_date).timestamp()), "publishedText": published_date } case _: print("received an entry of unknown type:") print(entry) print("") # breakpoint() return def customCommentRendererParser(comment: dict, context: dict = {}) -> dict: cep = safeTraverse(comment, ["payload", "commentEntityPayload"], default={}) content = safeTraverse(cep, ["properties", "content", "content"], default="") content_html = escape(content).replace("\r\n", "
").replace("\n", "
") author = safeTraverse(cep, ["author"], default={}) verified = safeTraverse(author, ["isVerified"], default=False) or safeTraverse(author, ["isArtist"], default=False) ucid = safeTraverse(author, ["channelId"], default="UNKNOWNCHANNELID") published_date = safeTraverse(cep, ["properties", "publishedTime"], default="now") edited = False if published_date.endswith(" (edited)"): edited = True published_date_unix = int(dateparser.parse(published_date.removesuffix(" (edited)")).timestamp()) else: published_date_unix = int(dateparser.parse(published_date).timestamp()) inv_comment = { "authorId": ucid, "authorUrl": "/channel/" + ucid, "author": safeTraverse(author, ["displayName"], default="@ythdd-unknown-user"), "verified": verified, "authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(author, ["avatarThumbnailUrl"], default=DEFAULT_AVATAR)), # proxy them! "authorIsChannelOwner": safeTraverse(author, ["isCreator"], default=False), # ??? "isSponsor": False, # not sure how to retrieve this "likeCount": parseViewsFromViewText("0" + safeTraverse(cep, ["toolbar", "likeCountNotliked"], default="0") + " likes"), "isPinned": False, "commentId": safeTraverse(cep, ["properties", "commentId"], default="UNKNOWNCOMMENTID"), "content": content, "contentHtml": content_html, "isEdited": edited, "published": published_date_unix, "publishedText": published_date if published_date != "now" else "unknown amount of time ago" } if "replies" in comment: inv_comment["replies"] = comment["replies"] return inv_comment def parseDescriptionSnippet(snippet: list): text = "" text_html = "" for entry in snippet: text += entry["text"] if "bold" in entry: # is checking entry["bold"] == True necessary? text_html += "" + entry["text"] + "" else: text_html += entry["text"] text_html = escape(text_html).replace("\r\n", "
").replace("\n", "
") return text, text_html def runsToText(runs: list, default: str = "") -> str: # "default" will be returned when text extraction fails. extracted_text = "" for field in runs: extracted_text += safeTraverse(field, ["text"], default="") if extracted_text: return extracted_text return default def extractTextFromSimpleOrRuns(obj: dict, default: str = "") -> str: # Extracts the text both from "runs" and "simpleText" # with failsafe to default. text = default if not isinstance(obj, str): return default if "runs" in obj: text = runsToText(obj["runs"]) elif "simpleText" in obj: text = obj["simpleText"] else: print(f"error(extractTextFromSimpleOrRuns): text extraction failed for {obj}") return text