Compare commits

..

2 Commits

Author SHA1 Message Date
b98aa718b0 fix: fix shorts-only playlists by using on-demand ctoken generator
this will allow for retrieving playlist videos as playlistVideoRenderer
which we already have a parser for. another benefit is being able to
list videos in a playlist which have been deleted/made private.
also fixes livestream parsing
2025-09-26 22:49:16 +02:00
30850a7ce0 feat: protobuf ctoken generation
this commit introduces on-demand ctoken generation with a wrapper
for more concise, protodec-like syntax (see producePlaylistContinuation)
2025-09-26 22:40:29 +02:00
4 changed files with 101 additions and 43 deletions

View File

@@ -14,3 +14,4 @@ requests>=2.32.3
yt_dlp yt_dlp
brotli>=1.1.0 brotli>=1.1.0
dateparser>=1.2.2 dateparser>=1.2.2
bbpb>=1.4.2

View File

@@ -1,6 +1,7 @@
#!/usr/bin/python3 #!/usr/bin/python3
import brotli, yt_dlp, requests, json, time import brotli, yt_dlp, requests, json, time
from ythdd_globals import safeTraverse from ythdd_globals import safeTraverse
import ythdd_proto
import ythdd_globals import ythdd_globals
ytdl_opts = { ytdl_opts = {
@@ -518,12 +519,15 @@ def WEBgetVideoComments(ctoken: str) -> tuple:
return actual_comments, new_continuation return actual_comments, new_continuation
def WEBextractPlaylist(plid: str = "", ctoken: str = "", prefix: str = "VL"): def WEBextractPlaylist(plid: str = "", ctoken: str = ""):
additional_context = {'browseId': prefix + plid} # if ctoken has been provided, use it
if ctoken: if ctoken:
# playlist id can be omitted if ctoken is provided # playlist id can be omitted if ctoken is provided
additional_context = {'continuation': ctoken} additional_context = {'continuation': ctoken}
else:
# try to create ctoken which will allow for accessing the full playlist, including delisted/deleted videos
additional_context = {'continuation': ythdd_proto.producePlaylistContinuation(plid, offset=0)}
context = makeWebContext(additional_context) context = makeWebContext(additional_context)
@@ -540,52 +544,15 @@ def WEBextractPlaylist(plid: str = "", ctoken: str = "", prefix: str = "VL"):
if not ctoken: if not ctoken:
metadata = { metadata = {
"header": safeTraverse(resp_json, ["header"]), "header": safeTraverse(resp_json, ["header"]),
#"microformat": safeTraverse(resp_json, ["microformat"]),
"sidebar": safeTraverse(resp_json, ["sidebar"]) "sidebar": safeTraverse(resp_json, ["sidebar"])
} }
# TODO (after python protodec implementation/wrapper is done):
# # try to extract ctoken containing the full playlist, including delisted/deleted videos
# full_playlist_ctoken = urllib.parse.quote(protodec.from_json({
# "80226972:embedded": {
# "2:string": prefix + plid,
# "3:base64": {
# "1:varint": request_count, # todo: increment by 200 with an external index
# "15:string": "PT:" + urllib.parse.quote(protodec.from_json({"1:varint": index})),
# "104:embedded": {"1:0:varint": 0}
# },
# "35:string": plid
# }
# }))
# # if ctoken creation succeeded
# if full_playlist_ctoken:
# # make another request
# response = requests.post(
# 'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
# headers = ythdd_globals.getHeaders(),
# json = makeWebContext({'continuation': full_playlist_ctoken})
# )
# resp_json = json.loads(response.text)
# else:
# print("error(WEBextractPlaylist): full playlist metadata extraction failed. Delisted/deleted videos will be missing.")
# extract continuation # extract continuation
new_continuation = None
if ctoken:
# subsequent request
new_continuation = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems", -1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"]) new_continuation = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems", -1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"])
else:
# first-time request
new_continuation = safeTraverse(resp_json, ["contents", "twoColumnBrowseResultsRenderer", "tabs", 0, "tabRenderer", "content", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "playlistVideoListRenderer", "contents", -1, "continuationItemRenderer", "continuationEndpoint", "commandExecutorCommand", "commands", -1, "continuationCommand", "token"])
# "best-effort" playlist's videos extraction # "best-effort" playlist's videos extraction
# "best-effort" because None's (unsuccessful video extraction = None) are passed as they are # "best-effort" because None's (unsuccessful video extraction = None) are passed as they are
# warning! todo: iterate over this, as shorts cannot be currently extracted (they use richGridRenderer, not playlistVideoListRenderer)
videos = None
if ctoken: # or full_playlist_ctoken:
videos = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"]) # includes continuation as last element of list, which will be ignored videos = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"]) # includes continuation as last element of list, which will be ignored
else:
videos = safeTraverse(resp_json, ["contents", "twoColumnBrowseResultsRenderer", "tabs", 0, "tabRenderer", "content", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "playlistVideoListRenderer", "contents"])
return metadata, new_continuation, videos return metadata, new_continuation, videos

85
ythdd_proto.py Normal file
View File

@@ -0,0 +1,85 @@
from ythdd_globals import safeTraverse
import base64
import blackboxprotobuf as bbpb
import json
import urllib.parse
import ythdd_globals
def bbpbToB64(msg_and_typedef: tuple, urlsafe: bool = False, padding: bool = False) -> str:
encoded_protobuf = bbpb.encode_message(*msg_and_typedef)
if urlsafe:
b64_protobuf = base64.urlsafe_b64encode(encoded_protobuf)
else:
b64_protobuf = base64.b64encode(encoded_protobuf)
if padding:
url_encoded_b64 = urllib.parse.quote(b64_protobuf.decode())
else:
url_encoded_b64 = b64_protobuf.decode().rstrip('=')
return url_encoded_b64
def fdictToBbpb(msg: dict) -> tuple:
# Requires Python 3.7+ or CPython 3.6+,
# as these versions preserve dictionary insertion order.
# Structural matching (match, case) requires Python 3.10+.
clean_msg = {}
clean_type = {}
for key in msg:
num, type = key.split(":")
match type:
case "message":
# if the type is an embedded message
internal_msg, internal_type = fdictToBbpb(msg[key])
# msg can just be appended as usual
clean_msg[num] = internal_msg
# type contains more fields than normally
clean_type[num] = {
'field_order': list(internal_msg.keys()),
'message_typedef': internal_type,
'type': type
}
case "base64" | "base64u" | "base64p" | "base64up":
# if the type is a base64-embedded message
internal_msg, internal_type = fdictToBbpb(msg[key])
match type.removeprefix("base64"):
case "":
b64_encoded_msg = bbpbToB64((internal_msg, internal_type))
case "u":
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), urlsafe=True)
case "p":
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), padding=True)
case "up":
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), urlsafe=True, padding=True)
clean_msg[num] = b64_encoded_msg
clean_type[num] = {'type': 'string'}
case "int" | "string":
clean_msg[num] = msg[key]
clean_type[num] = {'type': type}
case _:
raise KeyError(f'error(fmsgToBBPBTuple): invalid key "{type}"')
return (clean_msg, clean_type)
def producePlaylistContinuation(plid: str, offset: int = 0) -> str:
msge = {
'80226972:message': {
'2:string': f'VL{plid}',
'3:base64': {
'1:int': int(offset / 100),
'15:string': f'PT:{bbpbToB64(fdictToBbpb({"1:int": offset}))}',
'104:message': {
'1:int': 0
}
},
'35:string': plid
}
}
bbpb_dicts = fdictToBbpb(msge)
b64_ctoken = bbpbToB64(bbpb_dicts, urlsafe=True, padding=True)
return b64_ctoken

View File

@@ -313,6 +313,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
} }
case "playlistVideoRenderer": case "playlistVideoRenderer":
# used by all content inside of playlists which have at least one non-shorts video/livestream
video_id = safeTraverse(entry, ["playlistVideoRenderer", "videoId"], default="UnknownVideoId") video_id = safeTraverse(entry, ["playlistVideoRenderer", "videoId"], default="UnknownVideoId")
title = safeTraverse(entry, ["playlistVideoRenderer", "title", "runs", 0, "text"], default="Unknown video title") title = safeTraverse(entry, ["playlistVideoRenderer", "title", "runs", 0, "text"], default="Unknown video title")
@@ -321,7 +322,11 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
video_index = int(safeTraverse(entry, ["playlistVideoRenderer", "index", "simpleText"], default="1")) - 1 video_index = int(safeTraverse(entry, ["playlistVideoRenderer", "index", "simpleText"], default="1")) - 1
length = parseLengthFromTimeBadge(safeTraverse(entry, ["playlistVideoRenderer", "lengthText", "simpleText"], default="0:0")) length = parseLengthFromTimeBadge(safeTraverse(entry, ["playlistVideoRenderer", "lengthText", "simpleText"], default="0:0"))
published_date = safeTraverse(entry, ["playlistVideoRenderer", "videoInfo", "runs", -1, "text"], default="2000-01-01") published_date = safeTraverse(entry, ["playlistVideoRenderer", "videoInfo", "runs", -1, "text"], default="2000-01-01")
published_date = published_date.removeprefix("Streamed ") published_date = published_date.removeprefix("Streamed ").removeprefix(" watching")
# handle livestreams
if not published_date:
published_date = "now"
return { return {
"type": "video", "type": "video",