Files
ythdd/ythdd_struct_parser.py
sherl 34e00e2492 fix: handle collaboratively authored videos in playlists and videos
endpoint
currently, the videos endpoint returns the video uploader name,
and not "author1, author2, author3" as is the case for videoRenderer
and playlistVideoRenderer - this might change in the future in order for
the endpoints to return the same data
2025-09-28 05:02:51 +02:00

475 lines
21 KiB
Python

from ythdd_globals import safeTraverse
from html import escape
import json
import dateparser
import ythdd_globals
import ythdd_extractor
DEFAULT_AVATAR = "https://yt3.ggpht.com/a/default-user=s176-c-k-c0x00ffffff-no-rj"
def genThumbs(videoId: str):
result = []
thumbnails = [
#{'height': 720, 'width': 1280, 'quality': "maxres", 'url': "maxres"}, # for the time being omit the buggy maxres quality
{'height': 720, 'width': 1280, 'quality': "maxresdefault", 'url': "maxresdefault"},
{'height': 480, 'width': 640, 'quality': "sddefault", 'url': "sddefault"},
{'height': 360, 'width': 480, 'quality': "high", 'url': "hqdefault"},
{'height': 180, 'width': 320, 'quality': "medium", 'url': "mqdefault"},
{'height': 90, 'width': 120, 'quality': "default", 'url': "default"},
{'height': 90, 'width': 120, 'quality': "start", 'url': "1"},
{'height': 90, 'width': 120, 'quality': "middle", 'url': "2"},
{'height': 90, 'width': 120, 'quality': "end", 'url': "3"},
]
for x in thumbnails:
width = x['width']
height = x['height']
quality = x['quality']
url = ythdd_globals.config['general']['public_facing_url'] + 'vi/' + videoId + '/' + x['url'] + '.jpg'
result.append({'quality': quality, 'url': url, 'width': width, 'height': height})
return result
def doesContainNumber(string: str, numeric_system: int = 10) -> bool:
try:
number = int(string, numeric_system)
return True
except ValueError:
return False
raise BaseException("doesContainNumber(): Unknown error while determining if a string contains a number")
def parseLengthFromTimeBadge(time_str: str) -> int:
# Returns 0 if unsuccessful
length = 0
time_lookup_list = [1, 60, 3_600, 86_400]
time_list = time_str.split(":")
if False in map(doesContainNumber, time_list): # works around ['LIVE'] for livestreams or ['Upcoming'] for scheduled videos
pass
else:
for z in range(len(time_list)):
length += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
return length
def parseViewsFromViewText(viewcounttext: str) -> int:
# Returns 0 if unsuccessful
views = 0
magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
if viewcounttext:
if viewcounttext.lower() == "no":
viewcounttext = "0"
views = float("0" + "".join([z for z in viewcounttext if 48 <= ord(z) and ord(z) <= 57 or ord(z) == 46]))
viewcounttext = viewcounttext.split(" ")[0]
for x in magnitude.keys():
if x == viewcounttext[-1].upper():
views *= magnitude[x]
return int(views)
def parseRenderers(entry: dict, context: dict = {}) -> dict:
if not isinstance(entry, dict):
raise ValueError("parsed entry is not of type dict")
match safeTraverse(list(entry.keys()), [0], default=""):
case "videoRenderer": # represents a video
published_date = safeTraverse(entry, ["videoRenderer", "publishedTimeText", "simpleText"], default="now")
published_date = published_date.removeprefix("Streamed ")
description, description_html = parseDescriptionSnippet(safeTraverse(entry, ["videoRenderer", "descriptionSnippet", "runs"], default=[]))
collaborative = False
if "author_name" in context:
author_name = context["author_name"]
else:
author_name = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "text"], default="Unknown author")
if "author_ucid" in context:
author_ucid = context["author_ucid"]
else:
author_ucid = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
if author_ucid == "UNKNOWNCHANNELID":
# this is a first indicator that a video is a collaborative (has multiple authors)
# if that's the case, let's take the first author's ucid as the ucid
collaborative = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "header", "dialogHeaderViewModel", "headline", "content"]) == "Collaborators"
if "verified" in context:
verified = context["verified"]
else:
verified = ythdd_extractor.isVerified(safeTraverse(entry, ["videoRenderer", "ownerBadges", 0]))
if "avatar" in context:
avatar_url = context["avatar"]
else:
avatar_url = safeTraverse(entry, ["videoRenderer", "avatar", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
views_or_viewers_model = safeTraverse(entry, ["videoRenderer", "viewCountText"])
if "simpleText" in views_or_viewers_model:
# means this is a video with X views
view_count = parseViewsFromViewText(entry["videoRenderer"]["viewCountText"]["simpleText"])
view_count_text = entry["videoRenderer"]["viewCountText"]["simpleText"]
elif "runs" in views_or_viewers_model:
# means this is a livestream with X concurrent viewers
view_count = parseViewsFromViewText(entry["videoRenderer"]["viewCountText"]["runs"][0]["text"] + " watching")
view_count_text = entry["videoRenderer"]["viewCountText"]["runs"][0]["text"] + " watching"
else:
# unknown model, assume no views
view_count = 0
view_count_text = "Unknown amount of views"
if collaborative:
livm = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "customContent", "listViewModel", "listItems"], default=[])
if "author_name" not in context:
# override the default "name1 and others" or "name1 and name2" text
# with full author info
all_authors = []
for collaborative_author in livm:
collaborative_author_name = safeTraverse(collaborative_author, ["listItemViewModel", "title", "content"])
if collaborative_author_name is not None:
all_authors.append(collaborative_author_name)
if all_authors != []: # check if custom extraction succeeded
author_name = ", ".join(all_authors)
if author_ucid == "UNKNOWNCHANNELID":
# retrieve main author's ucid
author_ucid = safeTraverse(livm, [0, "listItemViewModel", "title", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
if safeTraverse(entry, ["videoRenderer", "ownerBadges", 0]) is None:
# check if the main author is verified
verified = False
if safeTraverse(livm, [0, "listItemViewModel", "title", "attachmentRuns", 0, "element", "type", "imageType", "image", "sources", 0, "clientResource", "imageName"]) in ("AUDIO_BADGE", "CHECK_CIRCLE_FILLED"):
verified = True
if avatar_url == DEFAULT_AVATAR:
# retrieve the main channel's avatar
avatar_url = safeTraverse(livm, [0, "listItemViewModel", "leadingAccessory", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
return {
"type": "video",
"title": safeTraverse(entry, ["videoRenderer", "title", "runs", 0, "text"]),
"videoId": safeTraverse(entry, ["videoRenderer", "videoId"]),
"author": author_name,
"authorId": author_ucid,
"authorUrl": "/channel/" + author_ucid,
"authorVerified": verified, # TODO
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url),
"videoThumbnails": genThumbs(safeTraverse(entry, ["videoRenderer", "videoId"], default="unknown")),
"description": description,
"descriptionHtml": description_html,
"viewCount": view_count,
"viewCountText": view_count_text,
"published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too
"publishedText": published_date,
"lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["videoRenderer", "lengthText", "simpleText"], default="0:0")),
"liveNow": False,
"premium": ythdd_extractor.isPremium(safeTraverse(entry, ["videoRenderer", "badges", 0])), # will fail if it's not the only badge
"isUpcoming": False,
"isNew": False,
"is4k": False,
"is8k": False,
"isVr180": False,
"isVr360": False,
"is3d": False,
"hasCaptions": False
}
# modify the premiere timestamp afterwards here?
case "lockupViewModel": # represents playlists/mixes
playlist_type = safeTraverse(entry, ["lockupViewModel", "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "icon", "sources", 0, "clientResource", "imageName"], default="PLAYLISTS")
if playlist_type == "MIX":
# mixes aren't currently supported
return
lvm = entry["lockupViewModel"]
meta = safeTraverse(lvm, ["metadata"], default=[])
lmvm = safeTraverse(meta, ["lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows"], default=[])
thumbnail = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", -1, "url"], default="no-url?")
thumbnail = ythdd_globals.translateLinks(thumbnail[:thumbnail.rfind("?")])
verified = safeTraverse(context, ["verified"], default=False)
playlist_id = safeTraverse(lvm, ["contentId"], default="UNKNOWNPLAYLISTID")
length = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="0 videos")
length = parseViewsFromViewText(length.split(" ")[0])
# Turns out for some responses we do some data, while not on others.
# Data from context should be prioritized, thus even if something is found with safeTraverse,
# the parser will ignore it in favour of the context.
ucid = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
author = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "content"], default="ythdd: unknown author")
ucid = safeTraverse(context, ["author_ucid"], default=ucid)
author = safeTraverse(context, ["author_name"], default=author)
return {
"type": "playlist",
"title": safeTraverse(meta, ["lockupMetadataViewModel", "title", "content"], default="ythdd: unknown title"),
"playlistId": playlist_id,
"playlistThumbnail": thumbnail,
"author": author,
"authorId": ucid,
"authorUrl": "/channel/" + ucid,
"authorVerified": verified,
"videoCount": length,
"videos": [] # provided for historical reasons i guess
}
case "shelfRenderer": # "people also watched"
return
case "gridShelfViewModel": # shorts?
return
case "shortsLockupViewModel": # shorts on channel pages
video_id = safeTraverse(entry, ["shortsLockupViewModel", "onTap", "innertubeCommand", "reelWatchEndpoint", "videoId"], default="UnknownVideoId")
title = safeTraverse(entry, ["shortsLockupViewModel", "overlayMetadata", "primaryText", "content"], default="ythdd: couldn't find title")
views_text = safeTraverse(entry, ["shortsLockupViewModel", "overlayMetadata", "secondaryText", "content"], default="No views")
published_date = "No data about published time" # the view model doesn't provide data about the date a short is published
if video_id == "UnknownVideoId": # failsafe
video_id = safeTraverse(entry, ["shortsLockupViewModel", "entityId"], default="-UnknownVideoId")
video_id = video_id[video_id.rfind("-") + 1:]
if "author_name" in context:
author_name = context["author_name"]
else:
author_name = "Unknown author"
if "author_ucid" in context:
author_ucid = context["author_ucid"]
else:
author_ucid = "UNKNOWNCHANNELID"
if "verified" in context:
verified = context["verified"]
else:
verified = False
if "avatar" in context:
avatar_url = context["avatar"]
else:
avatar_url = "unknown"
return {
"type": "video",
"title": title,
"videoId": video_id,
"author": author_name,
"authorId": author_ucid,
"authorUrl": "/channel/" + author_ucid,
"authorVerified": False,
"videoThumbnails": genThumbs(video_id),
"description": "",
"descriptionHtml": "",
"viewCount": parseViewsFromViewText(views_text),
"viewCountText": views_text,
"published": int(0),
"publishedText": published_date,
"lengthSeconds": int(60), # invidious locks this to 60s no matter what the actual duration is
"liveNow": False,
"premium": False,
"isUpcoming": False,
"premiereTimestamp": 0,
"isNew": False,
"is4k": False,
"is8k": False,
"isVr180": False,
"isVr360": False,
"is3d": False,
"hasCaptions": False
}
case "gridVideoRenderer": # videos on channel pages
# doesn't work on Yattee
# thumbnails = safeTraverse(entry, ["gridVideoRenderer", "thumbnail", "thumbnails"], default=[])
# for thumbnail in thumbnails:
# thumbnail["url"] = ythdd_globals.translateLinks(thumbnail["url"])
video_id = safeTraverse(entry, ["gridVideoRenderer", "videoId"], default="UnknownVideoId")
thumbnails = genThumbs(video_id)
published_date = safeTraverse(entry, ["gridVideoRenderer", "publishedTimeText", "simpleText"], default="now")
published_date = published_date.removeprefix("Streamed ")
return {
"type": "video",
"title": safeTraverse(entry, ["gridVideoRenderer", "title", "simpleText"], default="unknown video title"),
"videoId": video_id,
"author": context["author_name"],
"authorId": context["author_ucid"],
"authorUrl": "/channel/" + context["author_ucid"],
"authorVerified": False, # TODO: handle badge related tasks here using context
"videoThumbnails": thumbnails,
"description": "", # won't work without using an RSS feed (?)
"descriptionHtml": "", # -||-
"viewCount": parseViewsFromViewText(safeTraverse(entry, ["gridVideoRenderer", "viewCountText", "simpleText"], default="0 views")),
"viewCountText": safeTraverse(entry, ["gridVideoRenderer", "shortViewCountText", "simpleText"], default="0 views"),
"published": int(dateparser.parse(published_date).timestamp()),
"publishedText": published_date,
"lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["gridVideoRenderer", "thumbnailOverlays", 0, "thumbnailOverlayTimeStatusRenderer", "text", "simpleText"], default="0:0")),
"liveNow": True if published_date == "now" else False,
"premium": False,
"isUpcoming": False,
"isNew": False,
"is4k": False,
"is8k": False,
"isVr180": False,
"isVr360": False,
"is3d": False,
"hasCaptions": False
}
case "channelRenderer": # channels in search results
avatars = ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(entry, ["channelRenderer", "thumbnail", "thumbnails", 0, "url"], default=DEFAULT_AVATAR))
description, description_html = parseDescriptionSnippet(safeTraverse(entry, ["channelRenderer", "descriptionSnippet", "runs"], default=[]))
isVerified = ythdd_extractor.isVerified(safeTraverse(entry, ["channelRenderer", "ownerBadges", 0], default=[]))
return {
"type": "channel",
"author": safeTraverse(entry, ["channelRenderer", "title", "simpleText"], default="Unknown channel"),
"authorId": safeTraverse(entry, ["channelRenderer", "channelId"], default="UNKNOWNCHANNELID"),
"authorUrl": "/channel/" + safeTraverse(entry, ["channelRenderer", "channelId"], default="UNKNOWNCHANNELID"),
"authorVerified": isVerified,
"authorThumbnails": avatars,
"autoGenerated": False,
"subCount": parseViewsFromViewText(safeTraverse(entry, ["channelRenderer", "videoCountText", "simpleText"], default="0 subscribers")),
"videoCount": 0,
"channelHandle": safeTraverse(entry, ["channelRenderer", "navigationEndpoint", "browseEndpoint", "canonicalBaseUrl"], default="/@ythdd_unknown_handle")[1:],
"description": description,
"descriptionHtml": description_html
}
case "playlistVideoRenderer":
# used by all content inside of playlists which have at least one non-shorts video/livestream
video_id = safeTraverse(entry, ["playlistVideoRenderer", "videoId"], default="UnknownVideoId")
title = safeTraverse(entry, ["playlistVideoRenderer", "title", "runs", 0, "text"], default="Unknown video title")
author_ucid = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"])
author_name = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "text"], default="Unknown author")
video_index = int(safeTraverse(entry, ["playlistVideoRenderer", "index", "simpleText"], default="1")) - 1
length = parseLengthFromTimeBadge(safeTraverse(entry, ["playlistVideoRenderer", "lengthText", "simpleText"], default="0:0"))
published_date = safeTraverse(entry, ["playlistVideoRenderer", "videoInfo", "runs", -1, "text"], default="2000-01-01")
published_date = published_date.removeprefix("Streamed ").removeprefix(" watching")
# handle livestreams
if not published_date:
published_date = "now"
if author_ucid is None:
# likely a collaborative video, let's try
# to fetch the uploader's ucid with that in mind
livm = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "customContent", "listViewModel", "listItems"], default=[])
# name extraction logic the same as in videoRenderer
all_authors = []
for collaborative_author in livm:
collaborative_author_name = safeTraverse(collaborative_author, ["listItemViewModel", "title", "content"])
if collaborative_author_name is not None:
all_authors.append(collaborative_author_name)
if all_authors != []:
author_name = ", ".join(all_authors)
author_ucid = safeTraverse(livm, [0, "listItemViewModel", "title", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
return {
"type": "video",
"title": title,
"videoId": video_id,
"author": author_name,
"authorId": author_ucid,
"authorUrl": "/channel/" + author_ucid,
"videoThumbnails": genThumbs(video_id),
"index": video_index,
"lengthSeconds": length,
"liveNow": False, # todo: check this?
# these do not need to be returned, but some clients try to read it
# so we return an approximation here:
"published": int(dateparser.parse(published_date).timestamp()),
"publishedText": published_date
}
case _:
print("received an entry of unknown type:")
print(entry)
print("")
# breakpoint()
return
def customCommentRendererParser(comment: dict, context: dict = {}) -> dict:
cep = safeTraverse(comment, ["payload", "commentEntityPayload"], default={})
content = safeTraverse(cep, ["properties", "content", "content"], default="")
content_html = escape(content).replace("\r\n", "<br>").replace("\n", "<br>")
author = safeTraverse(cep, ["author"], default={})
verified = safeTraverse(author, ["isVerified"], default=False) or safeTraverse(author, ["isArtist"], default=False)
ucid = safeTraverse(author, ["channelId"], default="UNKNOWNCHANNELID")
published_date = safeTraverse(cep, ["properties", "publishedTime"], default="now")
edited = False
if published_date.endswith(" (edited)"):
edited = True
published_date_unix = int(dateparser.parse(published_date.removesuffix(" (edited)")).timestamp())
else:
published_date_unix = int(dateparser.parse(published_date).timestamp())
inv_comment = {
"authorId": ucid,
"authorUrl": "/channel/" + ucid,
"author": safeTraverse(author, ["displayName"], default="@ythdd-unknown-user"),
"verified": verified,
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(author, ["avatarThumbnailUrl"], default=DEFAULT_AVATAR)), # proxy them!
"authorIsChannelOwner": safeTraverse(author, ["isCreator"], default=False), # ???
"isSponsor": False, # not sure how to retrieve this
"likeCount": parseViewsFromViewText("0" + safeTraverse(cep, ["toolbar", "likeCountNotliked"], default="0") + " likes"),
"isPinned": False,
"commentId": safeTraverse(cep, ["properties", "commentId"], default="UNKNOWNCOMMENTID"),
"content": content,
"contentHtml": content_html,
"isEdited": edited,
"published": published_date_unix,
"publishedText": published_date if published_date != "now" else "unknown amount of time ago"
}
if "replies" in comment:
inv_comment["replies"] = comment["replies"]
return inv_comment
def parseDescriptionSnippet(snippet: list):
text = ""
text_html = ""
for entry in snippet:
text += entry["text"]
if "bold" in entry: # is checking entry["bold"] == True necessary?
text_html += "<b>" + entry["text"] + "</b>"
else:
text_html += entry["text"]
text_html = escape(text_html).replace("\r\n", "<br>").replace("\n", "<br>")
return text, text_html
def runsToText(runs: list, default: str = "") -> str:
# "default" will be returned when text extraction fails.
extracted_text = ""
for field in runs:
extracted_text += safeTraverse(field, ["text"], default="")
if extracted_text:
return extracted_text
return default
def extractTextFromSimpleOrRuns(obj: dict, default: str = "") -> str:
# Extracts the text both from "runs" and "simpleText"
# with failsafe to default.
text = default
if "runs" in obj:
text = runsToText(obj["runs"])
elif "simpleText" in obj:
text = obj["simpleText"]
else:
print(f"error(extractTextFromSimpleOrRuns): text extraction failed for {obj}")
return text