feat: playlist browsing

pagination still needs refinement for some of the clients.
on another note, this is an anniversary commit, as ythdd turns 1 year
today.
This commit is contained in:
2025-09-25 23:30:59 +02:00
parent 6d0c70696b
commit 1c9174c888
4 changed files with 228 additions and 3 deletions

View File

@@ -517,3 +517,75 @@ def WEBgetVideoComments(ctoken: str) -> tuple:
}
return actual_comments, new_continuation
def WEBextractPlaylist(plid: str = "", ctoken: str = "", prefix: str = "VL"):
additional_context = {'browseId': prefix + plid}
if ctoken:
# playlist id can be omitted if ctoken is provided
additional_context = {'continuation': ctoken}
context = makeWebContext(additional_context)
response = requests.post(
'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
headers = ythdd_globals.getHeaders(),
json = context
)
resp_json = json.loads(response.text)
# if this is a first-time fetch (no ctoken passed), extract metadata
metadata = None
if not ctoken:
metadata = {
"header": safeTraverse(resp_json, ["header"]),
#"microformat": safeTraverse(resp_json, ["microformat"]),
"sidebar": safeTraverse(resp_json, ["sidebar"])
}
# TODO (after python protodec implementation/wrapper is done):
# # try to extract ctoken containing the full playlist, including delisted/deleted videos
# full_playlist_ctoken = urllib.parse.quote(protodec.from_json({
# "80226972:embedded": {
# "2:string": prefix + plid,
# "3:base64": {
# "1:varint": request_count, # todo: increment by 200 with an external index
# "15:string": "PT:" + urllib.parse.quote(protodec.from_json({"1:varint": index})),
# "104:embedded": {"1:0:varint": 0}
# },
# "35:string": plid
# }
# }))
# # if ctoken creation succeeded
# if full_playlist_ctoken:
# # make another request
# response = requests.post(
# 'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
# headers = ythdd_globals.getHeaders(),
# json = makeWebContext({'continuation': full_playlist_ctoken})
# )
# resp_json = json.loads(response.text)
# else:
# print("error(WEBextractPlaylist): full playlist metadata extraction failed. Delisted/deleted videos will be missing.")
# extract continuation
new_continuation = None
if ctoken:
# subsequent request
new_continuation = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems", -1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"])
else:
# first-time request
new_continuation = safeTraverse(resp_json, ["contents", "twoColumnBrowseResultsRenderer", "tabs", 0, "tabRenderer", "content", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "playlistVideoListRenderer", "contents", -1, "continuationItemRenderer", "continuationEndpoint", "commandExecutorCommand", "commands", -1, "continuationCommand", "token"])
# "best-effort" playlist's videos extraction
# "best-effort" because None's (unsuccessful video extraction = None) are passed as they are
# warning! todo: iterate over this, as shorts cannot be currently extracted (they use richGridRenderer, not playlistVideoListRenderer)
videos = None
if ctoken: # or full_playlist_ctoken:
videos = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"]) # includes continuation as last element of list, which will be ignored
else:
videos = safeTraverse(resp_json, ["contents", "twoColumnBrowseResultsRenderer", "tabs", 0, "tabRenderer", "content", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "playlistVideoListRenderer", "contents"])
return metadata, new_continuation, videos

View File

@@ -23,7 +23,7 @@ version = "0.0.1"
apiVersion = "1"
randomly_generated_passcode = 0
video_cache = {}
general_cache = {"search": [], "continuations": {"channels": {}, "comments": {}}, "channels": {}}
general_cache = {"search": [], "continuations": {"channels": {}, "comments": {}}, "channels": {}, "playlists": {}}
def getConfig(configfile):

View File

@@ -29,9 +29,10 @@ import ythdd_struct_parser
# [✓] /api/v1/channel/:ucid/videos, shorts, playlists, streams
# [✓] /api/v1/comments/:videoid?continuation=...
# [✓] /api/v1/videos/:videoIdXXXX
# [✓] /api/v1/playlists/:plid
# ----------
# PLANNED:
# [X] /api/v1/playlists/:plid
# [X] /api/v1/channel/{videos, shorts, playlists, streams, latest?}/:ucid (rewrite)
# [X] /api/v1/:videoIdXXXX/maxres.jpg redirects to best quality thumbnail
# [X] /api/v1/storyboards/:videoIdXXXX
# [X] /api/v1/videos/:videoIdXXXX does not depend on yt-dlp and offloads stream retrieval elsewhere (making initial response fast)
@@ -388,7 +389,7 @@ def videos(data):
title = safeTraverse(video_details, ['title'], default=video_id)
views = int(safeTraverse(video_details, ['viewCount'], default=0))
length = int(safeTraverse(video_details, ['lengthSeconds'], default=1))
published = dateToEpoch(safeTraverse(microformat, ['publishDate'], default="1970-01-02T00:00:00Z")) # ISO format to Unix timestamp
published = dateToEpoch(safeTraverse(microformat, ['publishDate'], default="2000-01-01T00:00:00Z")) # ISO format to Unix timestamp
published_date = epochToDate(published)
premiere_timestamp = safeTraverse(microformat, ['liveBroadcastDetails', 'startTimestamp'], default=None) # let's ignore the nitty gritty for the time being
premiere_timestamp = premiere_timestamp if premiere_timestamp else safeTraverse(microformat, ['playabilityStatus', 'liveStreamability', 'liveStreamabilityRenderer', 'offlineSlate', 'liveStreamOfflineSlateRenderer', 'scheduledStartTime'], default=None)
@@ -865,6 +866,104 @@ def channels(data, req, only_json: bool = False):
if ythdd_globals.config["general"]["debug"]:
response["wdata"] = wdata
# todo: cache response
if only_json:
return response
return send(200, response)
def playlists(data, req, only_json: bool = False):
# read playlist id and sanity check
if len(data) < 4:
return send(400, {"error": "No playlist specified."})
# todo: make clipious stop spamming requests for paginated response
page = req.args.get('page')
if page is not None and page != '1':
return send(404, {"error": "Paginated queries are not supported."})
plid = data[3]
if len(plid) not in (34, 36):
print("error(playlists): len(plid) is not 34 or 36...!")
response = {"error": "Only standard playlists are currently supported (no mixes, video-based playlists, etc.)"}
if only_json:
return response
return send(400, response)
# check if request has been cached within the last hour
if ythdd_globals.config['general']['cache'] and plid in ythdd_globals.general_cache['playlists']:
if ythdd_globals.general_cache['playlists'][plid]['cacheTime'] + 1 * 60 * 60 > time():
response = ythdd_globals.general_cache['playlists'][plid]
if only_json:
return response
else:
return send(200, response)
else:
del ythdd_globals.general_cache['playlists'][plid]
# browse the playlist iteratively, first fetch is without any continuation
all_unparsed_videos = []
meta, new_continuation, videos = ythdd_extractor.WEBextractPlaylist(plid=plid)
if isinstance(videos, list):
all_unparsed_videos = videos.copy()
while new_continuation != None:
# fetch subsequent playlist videos
_, new_continuation, videos = ythdd_extractor.WEBextractPlaylist(ctoken=new_continuation)
if videos is not None:
all_unparsed_videos.extend(videos)
# process videos
parsed_videos = []
for video in all_unparsed_videos:
parsed_video = ythdd_struct_parser.parseRenderers(video)
if parsed_video is not None:
parsed_videos.append(parsed_video)
# process metadata
primary_sidebar = safeTraverse(meta, ["sidebar", "playlistSidebarRenderer", "items", 0, "playlistSidebarPrimaryInfoRenderer"], default={})
secondary_sidebar = safeTraverse(meta, ["sidebar", "playlistSidebarRenderer", "items", 1, "playlistSidebarSecondaryInfoRenderer"], default={})
# apparently fields can be stored inside of simpleText one time, only to be stored inside of runs another time
title = ythdd_struct_parser.extractTextFromSimpleOrRuns(safeTraverse(primary_sidebar, ["title"]), default="Unknown playlist title")
playlist_thumb = ythdd_globals.translateLinks(safeTraverse(primary_sidebar, ["thumbnailRenderer", "playlistVideoThumbnailRenderer", "thumbnail", "thumbnails", -1, "url"], default=DEFAULT_VIDEO))
author = safeTraverse(secondary_sidebar, ["videoOwner", "videoOwnerRenderer", "title", "runs", 0, "text"], default="Unknown channel")
author_ucid = safeTraverse(secondary_sidebar, ["videoOwner", "videoOwnerRenderer", "title", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
author_avatars = ythdd_extractor.generateChannelAvatarsFromUrl(safeTraverse(secondary_sidebar, ["videoOwner", "videoOwnerRenderer", "thumbnail", "thumbnails", 0, "url"], default=DEFAULT_AVATAR))
description = safeTraverse(meta, ["header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "description", "descriptionPreviewViewModel", "description", "content"], default="(ythdd: failed to retrieve description, perhaps it's empty?)")
description_html = html.escape(description).replace("\r\n", "<br>").replace("\n", "<br>")
video_count = ythdd_struct_parser.parseViewsFromViewText(ythdd_struct_parser.extractTextFromSimpleOrRuns(safeTraverse(primary_sidebar, ["stats", 0]), default="No videos"))
view_count = ythdd_struct_parser.parseViewsFromViewText(ythdd_struct_parser.extractTextFromSimpleOrRuns(safeTraverse(primary_sidebar, ["stats", 1]), default="No views"))
updated = ythdd_struct_parser.extractTextFromSimpleOrRuns(safeTraverse(primary_sidebar, ["stats", 2]), default="2000-01-01").removeprefix("Last updated on ").removeprefix("Updated ")
updated = int(dateparser.parse(updated).timestamp())
is_unlisted = safeTraverse(primary_sidebar, ["badges", 0, "metadataBadgeRenderer", "icon", "iconType"], default="PRIVACY_LISTED") == "PRIVACY_UNLISTED" # this needs further research https://gitea.invidious.io/iv-org/invidious/src/commit/325e013e0d9e5670fa0df7635ff30a0ee029e05e/src/invidious/playlists.cr#L133
response = {
"type": "playlist",
"title": title,
"playlistId": plid,
"playlistThumbnail": playlist_thumb,
"author": author,
"authorId": author_ucid,
"authorUrl": "/channel/" + author_ucid,
"subtitle": None, # todo?
"authorThumbnails": author_avatars,
"description": description,
"descriptionHtml": description_html,
"videoCount": video_count,
"viewCount": view_count,
"updated": updated,
"isListed": not is_unlisted,
"videos": parsed_videos
}
# todo: cache videos and metadata separately, so that paginated queries can be supported as well
if ythdd_globals.config['general']['cache']:
ythdd_globals.general_cache['playlists'][plid] = response
ythdd_globals.general_cache['playlists'][plid]['cacheTime'] = time()
if only_json:
return response
@@ -891,6 +990,8 @@ def lookup(data, req):
return channels(data, req)
case 'comments':
return get_comments(data, req)
case 'playlists':
return playlists(data, req)
case _:
incrementBadRequests()
return notImplemented(data)

View File

@@ -312,6 +312,34 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
"descriptionHtml": description_html
}
case "playlistVideoRenderer":
video_id = safeTraverse(entry, ["playlistVideoRenderer", "videoId"], default="UnknownVideoId")
title = safeTraverse(entry, ["playlistVideoRenderer", "title", "runs", 0, "text"], default="Unknown video title")
author_ucid = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId"], default="UNKNOWNCHANNELID")
author_name = safeTraverse(entry, ["playlistVideoRenderer", "shortBylineText", "runs", 0, "text"], default="Unknown author")
video_index = int(safeTraverse(entry, ["playlistVideoRenderer", "index", "simpleText"], default="1")) - 1
length = parseLengthFromTimeBadge(safeTraverse(entry, ["playlistVideoRenderer", "lengthText", "simpleText"], default="0:0"))
published_date = safeTraverse(entry, ["playlistVideoRenderer", "videoInfo", "runs", -1, "text"], default="2000-01-01")
published_date = published_date.removeprefix("Streamed ")
return {
"type": "video",
"title": title,
"videoId": video_id,
"author": author_name,
"authorId": author_ucid,
"authorUrl": "/channel/" + author_ucid,
"videoThumbnails": genThumbs(video_id),
"index": video_index,
"lengthSeconds": length,
"liveNow": False, # todo: check this?
# these do not need to be returned, but some clients try to read it
# so we return an approximation here:
"published": int(dateparser.parse(published_date).timestamp()),
"publishedText": published_date
}
case _:
print("received an entry of unknown type:")
print(entry)
@@ -372,3 +400,27 @@ def parseDescriptionSnippet(snippet: list):
text_html = escape(text_html).replace("\r\n", "<br>").replace("\n", "<br>")
return text, text_html
def runsToText(runs: list, default: str = "") -> str:
# "default" will be returned when text extraction fails.
extracted_text = ""
for field in runs:
extracted_text += safeTraverse(field, ["text"], default="")
if extracted_text:
return extracted_text
return default
def extractTextFromSimpleOrRuns(obj: dict, default: str = "") -> str:
# Extracts the text both from "runs" and "simpleText"
# with failsafe to default.
text = default
if "runs" in obj:
text = runsToText(obj["runs"])
elif "simpleText" in obj:
text = obj["simpleText"]
else:
print(f"error(extractTextFromSimpleOrRuns): text extraction failed for {obj}")
return text