reports factually wrong, but close enough resolution. to be removed when this will be fixed on yattee's end.
619 lines
28 KiB
Python
619 lines
28 KiB
Python
from html import escape
|
|
from invidious_formats import FORMATS
|
|
from ythdd_globals import safeTraverse
|
|
import json
|
|
import dateparser
|
|
import ythdd_globals
|
|
import ythdd_extractor
|
|
import ythdd_struct_builder
|
|
|
|
DEFAULT_AVATAR = "https://yt3.ggpht.com/a/default-user=s176-c-k-c0x00ffffff-no-rj"
|
|
|
|
def doesContainNumber(string: str, numeric_system: int = 10) -> bool:
|
|
try:
|
|
number = int(string, numeric_system)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
raise BaseException("doesContainNumber(): Unknown error while determining if a string contains a number")
|
|
|
|
def parseLengthFromTimeBadge(time_str: str) -> int:
|
|
# Returns 0 if unsuccessful
|
|
length = 0
|
|
time_lookup_list = [1, 60, 3_600, 86_400]
|
|
time_list = time_str.split(":")
|
|
if False in map(doesContainNumber, time_list): # works around ['LIVE'] for livestreams or ['Upcoming'] for scheduled videos
|
|
pass
|
|
else:
|
|
for z in range(len(time_list)):
|
|
length += time_lookup_list[z] * int(time_list[len(time_list) - 1 - z])
|
|
return length
|
|
|
|
def parseViewsFromViewText(viewcounttext: str) -> int:
|
|
# Returns 0 if unsuccessful
|
|
views = 0
|
|
magnitude = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}
|
|
if viewcounttext:
|
|
if viewcounttext.lower() == "no":
|
|
viewcounttext = "0"
|
|
views = float("0" + "".join([z for z in viewcounttext if 48 <= ord(z) and ord(z) <= 57 or ord(z) == 46]))
|
|
viewcounttext = viewcounttext.split(" ")[0]
|
|
for x in magnitude.keys():
|
|
if x == viewcounttext[-1].upper():
|
|
views *= magnitude[x]
|
|
return int(views)
|
|
|
|
def parseRenderers(entry: dict, context: dict = {}) -> dict:
|
|
|
|
if not isinstance(entry, dict):
|
|
raise ValueError("parsed entry is not of type dict")
|
|
|
|
match safeTraverse(list(entry.keys()), [0], default=""):
|
|
|
|
case "videoRenderer": # represents a video
|
|
# as of october 2025 slowly phased out in favor of lockupViewModel(?)
|
|
|
|
published_date = safeTraverse(entry, ["videoRenderer", "publishedTimeText", "simpleText"], default="now")
|
|
published_date = published_date.removeprefix("Streamed ")
|
|
description, description_html = parseDescriptionSnippet(safeTraverse(entry, ["videoRenderer", "descriptionSnippet", "runs"], default=[]))
|
|
collaborative = False
|
|
|
|
if "author_name" in context:
|
|
author_name = context["author_name"]
|
|
else:
|
|
author_name = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "text"], default="Unknown author")
|
|
|
|
if "author_ucid" in context:
|
|
author_ucid = context["author_ucid"]
|
|
else:
|
|
author_ucid = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
|
if author_ucid == "UNKNOWNCHANNELID":
|
|
# this is a first indicator that a video is a collaborative (has multiple authors)
|
|
# if that's the case, let's take the first author's ucid as the ucid
|
|
collaborative = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "header", "dialogHeaderViewModel", "headline", "content"]) == "Collaborators"
|
|
|
|
if "verified" in context:
|
|
verified = context["verified"]
|
|
else:
|
|
verified = ythdd_extractor.isVerified(safeTraverse(entry, ["videoRenderer", "ownerBadges", 0]))
|
|
|
|
if "avatar" in context:
|
|
avatar_url = context["avatar"]
|
|
else:
|
|
avatar_url = safeTraverse(entry, ["videoRenderer", "avatar", "decoratedAvatarViewModel", "avatar", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
|
|
|
|
views_or_viewers_model = safeTraverse(entry, ["videoRenderer", "viewCountText"], default={})
|
|
if "simpleText" in views_or_viewers_model:
|
|
# means this is a video with X views
|
|
view_count = parseViewsFromViewText(entry["videoRenderer"]["viewCountText"]["simpleText"])
|
|
view_count_text = entry["videoRenderer"]["viewCountText"]["simpleText"]
|
|
elif "runs" in views_or_viewers_model:
|
|
# means this is a livestream with X concurrent viewers
|
|
view_count = parseViewsFromViewText(entry["videoRenderer"]["viewCountText"]["runs"][0]["text"] + " watching")
|
|
view_count_text = entry["videoRenderer"]["viewCountText"]["runs"][0]["text"] + " watching"
|
|
else:
|
|
# unknown model, assume no views
|
|
view_count = 0
|
|
view_count_text = "Unknown amount of views"
|
|
|
|
if collaborative:
|
|
livm = safeTraverse(entry, ["videoRenderer", "ownerText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "customContent", "listViewModel", "listItems"], default=[])
|
|
if "author_name" not in context:
|
|
# override the default "name1 and others" or "name1 and name2" text
|
|
# with full author info
|
|
all_authors = []
|
|
for collaborative_author in livm:
|
|
collaborative_author_name = safeTraverse(collaborative_author, ["listItemViewModel", "title", "content"])
|
|
if collaborative_author_name is not None:
|
|
all_authors.append(collaborative_author_name)
|
|
if all_authors != []: # check if custom extraction succeeded
|
|
author_name = ", ".join(all_authors)
|
|
if author_ucid == "UNKNOWNCHANNELID":
|
|
# retrieve main author's ucid
|
|
author_ucid = safeTraverse(livm, [0, "listItemViewModel", "title", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
|
if safeTraverse(entry, ["videoRenderer", "ownerBadges", 0]) is None:
|
|
# check if the main author is verified
|
|
verified = False
|
|
if safeTraverse(livm, [0, "listItemViewModel", "title", "attachmentRuns", 0, "element", "type", "imageType", "image", "sources", 0, "clientResource", "imageName"]) in ("AUDIO_BADGE", "CHECK_CIRCLE_FILLED"):
|
|
verified = True
|
|
if avatar_url == DEFAULT_AVATAR:
|
|
# retrieve the main channel's avatar
|
|
avatar_url = safeTraverse(livm, [0, "listItemViewModel", "leadingAccessory", "avatarViewModel", "image", "sources", 0, "url"], default=DEFAULT_AVATAR)
|
|
|
|
ythdd_globals.print_debug("videoRenderer fired")
|
|
return {
|
|
"type": "video",
|
|
"title": safeTraverse(entry, ["videoRenderer", "title", "runs", 0, "text"]),
|
|
"videoId": safeTraverse(entry, ["videoRenderer", "videoId"]),
|
|
"author": author_name,
|
|
"authorId": author_ucid,
|
|
"authorUrl": "/channel/" + author_ucid,
|
|
"authorVerified": verified, # TODO
|
|
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url),
|
|
"videoThumbnails": ythdd_struct_builder.genThumbs(safeTraverse(entry, ["videoRenderer", "videoId"], default="unknown")),
|
|
"description": description,
|
|
"descriptionHtml": description_html,
|
|
"viewCount": view_count,
|
|
"viewCountText": view_count_text,
|
|
"published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too
|
|
"publishedText": published_date,
|
|
"lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["videoRenderer", "lengthText", "simpleText"], default="0:0")),
|
|
"liveNow": False,
|
|
"premium": ythdd_extractor.isPremium(safeTraverse(entry, ["videoRenderer", "badges", 0])), # will fail if it's not the only badge
|
|
"isUpcoming": False,
|
|
"isNew": False,
|
|
"is4k": False,
|
|
"is8k": False,
|
|
"isVr180": False,
|
|
"isVr360": False,
|
|
"is3d": False,
|
|
"hasCaptions": False
|
|
}
|
|
|
|
# modify the premiere timestamp afterwards here?
|
|
|
|
case "lockupViewModel": # represents playlists/mixes (and videos since october 2025)
|
|
# related videos lvms are handled in ythdd_inv_tl.videos()
|
|
|
|
lvm = entry["lockupViewModel"]
|
|
playlist_type = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "icon", "sources", 0, "clientResource", "imageName"], default="")
|
|
|
|
if playlist_type == "MIX":
|
|
# mixes aren't currently supported
|
|
return
|
|
|
|
if not playlist_type:
|
|
# struct represents a video
|
|
ythdd_globals.print_debug("lockupViewModel fired (not a playlist). this is an a/b test; any following errors stem from it.")
|
|
|
|
lmvm = safeTraverse(lvm, ['metadata', 'lockupMetadataViewModel'], default={})
|
|
video_id = safeTraverse(lvm, ['contentId'])
|
|
|
|
author_name = safeTraverse(context, ["author_name"], default="Unknown author")
|
|
author_ucid = safeTraverse(context, ["author_ucid"], default="UNKNOWNCHANNELID")
|
|
verified = safeTraverse(context, ["verified"], default=False) # TODO: check if this can be retrieved here
|
|
avatar_url = safeTraverse(context, ["avatar"], default=DEFAULT_AVATAR)
|
|
|
|
title = safeTraverse(lmvm, ["title", "content"], default="No title")
|
|
video_metadata = safeTraverse(lmvm, ["metadata", "contentMetadataViewModel", "metadataRows", 0, "metadataParts"], default=[])
|
|
view_count_text = safeTraverse(video_metadata, [0, "text", "content"], default="0 views")
|
|
published_date = safeTraverse(video_metadata, [1, "text", "content"], default="now")
|
|
length_text = safeTraverse(lvm, ["contentImage", "thumbnailViewModel", "overlays", ..., "thumbnailBottomOverlayViewModel", "badges", -1, "thumbnailBadgeViewModel", "text"], default="0:0")
|
|
view_count = parseViewsFromViewText(view_count_text)
|
|
length = parseLengthFromTimeBadge(length_text)
|
|
|
|
resp = {
|
|
"type": "video",
|
|
"title": title,
|
|
"videoId": video_id,
|
|
"author": author_name,
|
|
"authorId": author_ucid,
|
|
"authorUrl": "/channel/" + author_ucid,
|
|
"authorVerified": verified, # TODO
|
|
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url),
|
|
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
|
|
"description": "", # can't be retrieved from lockupViewModel
|
|
"descriptionHtml": "",
|
|
"viewCount": view_count,
|
|
"viewCountText": view_count_text,
|
|
"published": int(dateparser.parse(published_date).timestamp()), # sadly best we can do, invidious does this too
|
|
"publishedText": published_date,
|
|
"lengthSeconds": length,
|
|
"liveNow": False, # can't be live if it's in creator's video feed
|
|
"premium": False, # todo: check this
|
|
"isUpcoming": False,
|
|
"isNew": False,
|
|
"is4k": False,
|
|
"is8k": False,
|
|
"isVr180": False,
|
|
"isVr360": False,
|
|
"is3d": False,
|
|
"hasCaptions": False
|
|
}
|
|
return resp
|
|
|
|
# struct represents a playlist
|
|
meta = safeTraverse(lvm, ["metadata"], default=[])
|
|
lmvm = safeTraverse(meta, ["lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows"], default=[])
|
|
thumbnail = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", -1, "url"], default="no-url?")
|
|
thumbnail = ythdd_globals.translateLinks(thumbnail[:thumbnail.rfind("?")])
|
|
verified = safeTraverse(context, ["verified"], default=False)
|
|
|
|
playlist_id = safeTraverse(lvm, ["contentId"], default="UNKNOWNPLAYLISTID")
|
|
length = safeTraverse(lvm, ["contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text"], default="0 videos")
|
|
length = parseViewsFromViewText(length.split(" ")[0])
|
|
|
|
# Turns out for some responses we do have some data, while not on others.
|
|
# Data from context should be prioritized, thus even if something is found with safeTraverse,
|
|
# the parser will ignore it in favour of the context.
|
|
ucid = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
|
author = safeTraverse(lmvm, [0, "metadataParts", 0, "text", "content"], default="ythdd: unknown author")
|
|
ucid = safeTraverse(context, ["author_ucid"], default=ucid)
|
|
author = safeTraverse(context, ["author_name"], default=author)
|
|
|
|
ythdd_globals.print_debug("lockupViewModel fired (playlist)")
|
|
return {
|
|
"type": "playlist",
|
|
"title": safeTraverse(meta, ["lockupMetadataViewModel", "title", "content"], default="ythdd: unknown title"),
|
|
"playlistId": playlist_id,
|
|
"playlistThumbnail": thumbnail,
|
|
"author": author,
|
|
"authorId": ucid,
|
|
"authorUrl": "/channel/" + ucid,
|
|
"authorVerified": verified,
|
|
"videoCount": length,
|
|
"videos": [] # provided for historical reasons i guess
|
|
}
|
|
|
|
case "shelfRenderer": # "people also watched"
|
|
return
|
|
|
|
case "gridShelfViewModel": # shorts?
|
|
return
|
|
|
|
case "shortsLockupViewModel": # shorts on channel pages
|
|
|
|
video_id = safeTraverse(entry, ["shortsLockupViewModel", "onTap", "innertubeCommand", "reelWatchEndpoint", "videoId"], default="UnknownVideoId")
|
|
title = safeTraverse(entry, ["shortsLockupViewModel", "overlayMetadata", "primaryText", "content"], default="ythdd: couldn't find title")
|
|
views_text = safeTraverse(entry, ["shortsLockupViewModel", "overlayMetadata", "secondaryText", "content"], default="No views")
|
|
|
|
published_date = "No data about published time" # the view model doesn't provide data about the date a short is published
|
|
|
|
if video_id == "UnknownVideoId": # failsafe
|
|
video_id = safeTraverse(entry, ["shortsLockupViewModel", "entityId"], default="-UnknownVideoId")
|
|
video_id = video_id[video_id.rfind("-") + 1:]
|
|
|
|
if "author_name" in context:
|
|
author_name = context["author_name"]
|
|
else:
|
|
author_name = "Unknown author"
|
|
|
|
if "author_ucid" in context:
|
|
author_ucid = context["author_ucid"]
|
|
else:
|
|
author_ucid = "UNKNOWNCHANNELID"
|
|
|
|
if "verified" in context:
|
|
verified = context["verified"]
|
|
else:
|
|
verified = False
|
|
|
|
if "avatar" in context:
|
|
avatar_url = context["avatar"]
|
|
else:
|
|
avatar_url = "unknown"
|
|
|
|
ythdd_globals.print_debug("shortsLockupViewModel fired")
|
|
return {
|
|
"type": "video",
|
|
"title": title,
|
|
"videoId": video_id,
|
|
"author": author_name,
|
|
"authorId": author_ucid,
|
|
"authorUrl": "/channel/" + author_ucid,
|
|
"authorVerified": False,
|
|
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
|
|
"description": "",
|
|
"descriptionHtml": "",
|
|
"viewCount": parseViewsFromViewText(views_text),
|
|
"viewCountText": views_text,
|
|
"published": int(0),
|
|
"publishedText": published_date,
|
|
"lengthSeconds": int(60), # invidious locks this to 60s no matter what the actual duration is
|
|
"liveNow": False,
|
|
"premium": False,
|
|
"isUpcoming": False,
|
|
"premiereTimestamp": 0,
|
|
"isNew": False,
|
|
"is4k": False,
|
|
"is8k": False,
|
|
"isVr180": False,
|
|
"isVr360": False,
|
|
"is3d": False,
|
|
"hasCaptions": False
|
|
}
|
|
|
|
case "gridVideoRenderer": # videos on channel pages
|
|
|
|
# doesn't work on Yattee
|
|
# thumbnails = safeTraverse(entry, ["gridVideoRenderer", "thumbnail", "thumbnails"], default=[])
|
|
# for thumbnail in thumbnails:
|
|
# thumbnail["url"] = ythdd_globals.translateLinks(thumbnail["url"])
|
|
|
|
video_id = safeTraverse(entry, ["gridVideoRenderer", "videoId"], default="UnknownVideoId")
|
|
thumbnails = ythdd_struct_builder.genThumbs(video_id)
|
|
|
|
published_date = safeTraverse(entry, ["gridVideoRenderer", "publishedTimeText", "simpleText"], default="now")
|
|
published_date = published_date.removeprefix("Streamed ")
|
|
|
|
ythdd_globals.print_debug("gridVideoRenderer fired")
|
|
return {
|
|
"type": "video",
|
|
"title": safeTraverse(entry, ["gridVideoRenderer", "title", "simpleText"], default="unknown video title"),
|
|
"videoId": video_id,
|
|
"author": context["author_name"],
|
|
"authorId": context["author_ucid"],
|
|
"authorUrl": "/channel/" + context["author_ucid"],
|
|
"authorVerified": False, # TODO: handle badge related tasks here using context
|
|
"videoThumbnails": thumbnails,
|
|
"description": "", # won't work without using an RSS feed (?)
|
|
"descriptionHtml": "", # -||-
|
|
"viewCount": parseViewsFromViewText(safeTraverse(entry, ["gridVideoRenderer", "viewCountText", "simpleText"], default="0 views")),
|
|
"viewCountText": safeTraverse(entry, ["gridVideoRenderer", "shortViewCountText", "simpleText"], default="0 views"),
|
|
"published": int(dateparser.parse(published_date).timestamp()),
|
|
"publishedText": published_date,
|
|
"lengthSeconds": parseLengthFromTimeBadge(safeTraverse(entry, ["gridVideoRenderer", "thumbnailOverlays", 0, "thumbnailOverlayTimeStatusRenderer", "text", "simpleText"], default="0:0")),
|
|
"liveNow": True if published_date == "now" else False,
|
|
"premium": False,
|
|
"isUpcoming": False,
|
|
"isNew": False,
|
|
"is4k": False,
|
|
"is8k": False,
|
|
"isVr180": False,
|
|
"isVr360": False,
|
|
"is3d": False,
|
|
"hasCaptions": False
|
|
}
|
|
|
|
case "channelRenderer": # channels in search results
|
|
|
|
avatars = ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(entry, ["channelRenderer", "thumbnail", "thumbnails", 0, "url"], default=DEFAULT_AVATAR))
|
|
description, description_html = parseDescriptionSnippet(safeTraverse(entry, ["channelRenderer", "descriptionSnippet", "runs"], default=[]))
|
|
isVerified = ythdd_extractor.isVerified(safeTraverse(entry, ["channelRenderer", "ownerBadges", 0], default=[]))
|
|
|
|
ythdd_globals.print_debug("channelRenderer fired")
|
|
return {
|
|
"type": "channel",
|
|
"author": safeTraverse(entry, ["channelRenderer", "title", "simpleText"], default="Unknown channel"),
|
|
"authorId": safeTraverse(entry, ["channelRenderer", "channelId"], default="UNKNOWNCHANNELID"),
|
|
"authorUrl": "/channel/" + safeTraverse(entry, ["channelRenderer", "channelId"], default="UNKNOWNCHANNELID"),
|
|
"authorVerified": isVerified,
|
|
"authorThumbnails": avatars,
|
|
"autoGenerated": False,
|
|
"subCount": parseViewsFromViewText(safeTraverse(entry, ["channelRenderer", "videoCountText", "simpleText"], default="0 subscribers")),
|
|
"videoCount": 0,
|
|
"channelHandle": safeTraverse(entry, ["channelRenderer", "navigationEndpoint", "browseEndpoint", "canonicalBaseUrl"], default="/@ythdd_unknown_handle")[1:],
|
|
"description": description,
|
|
"descriptionHtml": description_html
|
|
}
|
|
|
|
case "playlistVideoRenderer":
|
|
# used by all content inside of playlists which have at least one non-shorts video/livestream
|
|
|
|
video_id = safeTraverse(entry, ["playlistVideoRenderer", "videoId"], default="UnknownVideoId")
|
|
title = safeTraverse(entry, ["playlistVideoRenderer", "title", "runs", 0, "text"], default="Unknown video title")
|
|
author_ucid = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"])
|
|
author_name = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "text"], default="Unknown author")
|
|
video_index = int(safeTraverse(entry, ["playlistVideoRenderer", "index", "simpleText"], default="1")) - 1
|
|
length = parseLengthFromTimeBadge(safeTraverse(entry, ["playlistVideoRenderer", "lengthText", "simpleText"], default="0:0"))
|
|
published_date = safeTraverse(entry, ["playlistVideoRenderer", "videoInfo", "runs", -1, "text"], default="2000-01-01")
|
|
published_date = published_date.removeprefix("Streamed ").removeprefix(" watching")
|
|
|
|
# handle livestreams
|
|
if not published_date:
|
|
published_date = "now"
|
|
|
|
if author_ucid is None:
|
|
# likely a collaborative video, let's try
|
|
# to fetch the uploader's ucid with that in mind
|
|
livm = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "navigationEndpoint", "showDialogCommand", "panelLoadingStrategy", "inlineContent", "dialogViewModel", "customContent", "listViewModel", "listItems"], default=[])
|
|
# name extraction logic the same as in videoRenderer
|
|
all_authors = []
|
|
for collaborative_author in livm:
|
|
collaborative_author_name = safeTraverse(collaborative_author, ["listItemViewModel", "title", "content"])
|
|
if collaborative_author_name is not None:
|
|
all_authors.append(collaborative_author_name)
|
|
if all_authors != []:
|
|
author_name = ", ".join(all_authors)
|
|
author_ucid = safeTraverse(livm, [0, "listItemViewModel", "title", "commandRuns", 0, "onTap", "innertubeCommand", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
|
|
|
|
# surprisingly, innertube responds with the avatar of the user that added the video to the playlist
|
|
# we can extract that information, e.g. for yattee to display
|
|
avatar_url = safeTraverse(entry, ["playlistVideoRenderer", "thumbnailOverlays", ..., "thumbnailOverlayAvatarStackViewModel", "avatarStack", "avatarStackViewModel", "avatars", 0, "avatarViewModel", "image", "sources", 0, "url"])
|
|
avatars = None if avatar_url is None else ythdd_extractor.generateChannelAvatarsFromUrl(avatar_url)
|
|
|
|
ythdd_globals.print_debug("playlistVideoRenderer fired")
|
|
return {
|
|
"type": "video",
|
|
"title": title,
|
|
"videoId": video_id,
|
|
"author": author_name,
|
|
"authorId": author_ucid,
|
|
"authorUrl": "/channel/" + author_ucid,
|
|
"authorThumbnails": avatars,
|
|
"videoThumbnails": ythdd_struct_builder.genThumbs(video_id),
|
|
"index": video_index,
|
|
"lengthSeconds": length,
|
|
"liveNow": False, # todo: check this?
|
|
# these do not need to be returned, but some clients try to read it
|
|
# so we return an approximation here:
|
|
"published": int(dateparser.parse(published_date).timestamp()),
|
|
"publishedText": published_date
|
|
}
|
|
|
|
case _:
|
|
print("received an entry of unknown type (thus can't be parsed):")
|
|
print(entry)
|
|
print("")
|
|
# breakpoint()
|
|
return
|
|
|
|
def customCommentRendererParser(comment: dict, context: dict = {}) -> dict:
|
|
|
|
cep = safeTraverse(comment, ["payload", "commentEntityPayload"], default={})
|
|
content = safeTraverse(cep, ["properties", "content", "content"], default="")
|
|
content_html = escape(content).replace("\r\n", "<br>").replace("\n", "<br>")
|
|
author = safeTraverse(cep, ["author"], default={})
|
|
verified = safeTraverse(author, ["isVerified"], default=False) or safeTraverse(author, ["isArtist"], default=False)
|
|
ucid = safeTraverse(author, ["channelId"], default="UNKNOWNCHANNELID")
|
|
published_date = safeTraverse(cep, ["properties", "publishedTime"], default="now")
|
|
edited = False
|
|
|
|
if published_date.endswith(" (edited)"):
|
|
edited = True
|
|
published_date_unix = int(dateparser.parse(published_date.removesuffix(" (edited)")).timestamp())
|
|
else:
|
|
published_date_unix = int(dateparser.parse(published_date).timestamp())
|
|
|
|
inv_comment = {
|
|
"authorId": ucid,
|
|
"authorUrl": "/channel/" + ucid,
|
|
"author": safeTraverse(author, ["displayName"], default="@ythdd-unknown-user"),
|
|
"verified": verified,
|
|
"authorThumbnails": ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(author, ["avatarThumbnailUrl"], default=DEFAULT_AVATAR)), # proxy them!
|
|
"authorIsChannelOwner": safeTraverse(author, ["isCreator"], default=False), # ???
|
|
"isSponsor": False, # not sure how to retrieve this
|
|
"likeCount": parseViewsFromViewText("0" + safeTraverse(cep, ["toolbar", "likeCountNotliked"], default="0") + " likes"),
|
|
"isPinned": False,
|
|
"commentId": safeTraverse(cep, ["properties", "commentId"], default="UNKNOWNCOMMENTID"),
|
|
"content": content,
|
|
"contentHtml": content_html,
|
|
"isEdited": edited,
|
|
"published": published_date_unix,
|
|
"publishedText": published_date if published_date != "now" else "unknown amount of time ago"
|
|
}
|
|
|
|
if "replies" in comment:
|
|
inv_comment["replies"] = comment["replies"]
|
|
|
|
return inv_comment
|
|
|
|
def parseDescriptionSnippet(snippet: list):
|
|
|
|
text = ""
|
|
text_html = ""
|
|
for entry in snippet:
|
|
text += entry["text"]
|
|
if "bold" in entry: # is checking entry["bold"] == True necessary?
|
|
text_html += "<b>" + entry["text"] + "</b>"
|
|
else:
|
|
text_html += entry["text"]
|
|
text_html = escape(text_html).replace("\r\n", "<br>").replace("\n", "<br>")
|
|
|
|
return text, text_html
|
|
|
|
def runsToText(runs: list, default: str = "") -> str:
|
|
# "default" will be returned when text extraction fails.
|
|
extracted_text = ""
|
|
|
|
for field in runs:
|
|
extracted_text += safeTraverse(field, ["text"], default="")
|
|
|
|
if extracted_text:
|
|
return extracted_text
|
|
|
|
return default
|
|
|
|
def extractTextFromSimpleOrRuns(obj: dict, default: str = "") -> str:
|
|
# Extracts the text both from "runs" and "simpleText"
|
|
# with failsafe to default.
|
|
text = default
|
|
if not isinstance(obj, dict):
|
|
return default
|
|
if "runs" in obj:
|
|
text = runsToText(obj["runs"])
|
|
elif "simpleText" in obj:
|
|
text = obj["simpleText"]
|
|
else:
|
|
print(f"error(extractTextFromSimpleOrRuns): text extraction failed for {obj}")
|
|
return text
|
|
|
|
|
|
def findNearestResolution(width: int, height: int) -> int:
|
|
# Finds the nearest standard resolution (one of 144p, 240p, ...)
|
|
# So far only used for Yattee, as it has trouble playing anything
|
|
# without one of the standard resolutions. Playback on other
|
|
# clients is unaffected.
|
|
|
|
# failsafe behaviour
|
|
try:
|
|
width = int(width)
|
|
height = int(height)
|
|
res = min(width, height)
|
|
except:
|
|
return 360
|
|
|
|
standard_resolutions = [144, 240, 360, 720, 1080, 2160, 4320]
|
|
if res in standard_resolutions:
|
|
return res
|
|
|
|
# calculate relative distance to one of the standard resolutions
|
|
res_normalized = [abs(1 - (x / res)) for x in standard_resolutions]
|
|
# pick the one where the distance is the smallest
|
|
target_index = res_normalized.index(min(res_normalized))
|
|
target_res = standard_resolutions[target_index]
|
|
|
|
return target_res
|
|
|
|
def parseFormatStreams(wdata_fstream: dict, ydata_stream: dict) -> dict:
|
|
|
|
try:
|
|
stream_url = ydata_stream["url"]
|
|
except:
|
|
ythdd_globals.print_debug( "could not extract format stream URL from yt-dlp response:")
|
|
ythdd_globals.print_debug(f"wdata: {wdata_fstream}")
|
|
ythdd_globals.print_debug(f"ydata: {ydata_stream}")
|
|
|
|
fstream = {
|
|
"url": stream_url,
|
|
"itag": str(wdata_fstream["itag"]),
|
|
"type": wdata_fstream["mimeType"],
|
|
"quality": wdata_fstream["quality"],
|
|
"bitrate": str(wdata_fstream["bitrate"]),
|
|
"fps": wdata_fstream["fps"],
|
|
"size": f"{wdata_fstream['width']}x{wdata_fstream['height']}",
|
|
"resolution": f"{findNearestResolution(wdata_fstream['width'], wdata_fstream['height'])}p", # possibly not really needed here
|
|
"qualityLabel": wdata_fstream["qualityLabel"],
|
|
"container": safeTraverse(FORMATS.get(wdata_fstream["itag"]), [ "ext"], default="mp4"), # invidious_formats
|
|
"encoding": safeTraverse(FORMATS.get(wdata_fstream["itag"]), ["vcodec"], default="mp4") # invidious_formats
|
|
}
|
|
|
|
|
|
return fstream
|
|
|
|
def parseAdaptiveStreams(wdata_astream: dict, ydata_stream: dict) -> dict:
|
|
|
|
try:
|
|
stream_url = ydata_stream["url"]
|
|
except:
|
|
ythdd_globals.print_debug( "could not extract adaptive stream URL from yt-dlp response:")
|
|
ythdd_globals.print_debug(f"wdata: {wdata_fstream}")
|
|
ythdd_globals.print_debug(f"ydata: {ydata_stream}")
|
|
|
|
astream_common = {
|
|
"init": f"{wdata_astream[ 'initRange']['start']}-{wdata_astream[ 'initRange']['end']}",
|
|
"index": f"{wdata_astream['indexRange']['start']}-{wdata_astream['indexRange']['end']}",
|
|
"bitrate": str(wdata_astream["bitrate"]),
|
|
"url": stream_url,
|
|
"itag": str(wdata_astream["itag"]),
|
|
"type": wdata_astream["mimeType"],
|
|
"clen": wdata_astream["contentLength"],
|
|
"lmt": wdata_astream["lastModified"],
|
|
"projectionType": wdata_astream["projectionType"],
|
|
"container": safeTraverse(FORMATS.get(wdata_astream["itag"]), [ "ext"], default="mp4"), # invidious_formats
|
|
"encoding": safeTraverse(FORMATS.get(wdata_astream["itag"]), ["vcodec"], default="mp4") # invidious_formats
|
|
}
|
|
|
|
isVideo = True
|
|
if "audioQuality" in wdata_astream:
|
|
isVideo = False
|
|
|
|
if isVideo:
|
|
astream = astream_common
|
|
# video-specific metadata
|
|
astream["fps"] = wdata_astream["fps"]
|
|
astream["size"] = f"{wdata_astream['width']}x{wdata_astream['height']}"
|
|
astream["resolution"] = f"{findNearestResolution(wdata_astream['width'], wdata_astream['height'])}p"
|
|
astream["qualityLabel"] = wdata_astream["qualityLabel"]
|
|
astream["colorInfo"] = safeTraverse(wdata_astream, ["colorInfo"])
|
|
else:
|
|
astream = astream_common
|
|
# audio-specific metadata
|
|
astream["encoding"] = safeTraverse(FORMATS.get(wdata_astream["itag"]), ["acodec"], default="mp4")
|
|
astream["audioQuality"] = wdata_astream["audioQuality"],
|
|
astream["audioSampleRate"] = int(wdata_astream["audioSampleRate"]),
|
|
astream["audioChannels"] = wdata_astream["audioChannels"]
|
|
|
|
return astream
|
|
|