Compare commits
2 Commits
1c9174c888
...
b98aa718b0
| Author | SHA1 | Date | |
|---|---|---|---|
| b98aa718b0 | |||
| 30850a7ce0 |
@@ -14,3 +14,4 @@ requests>=2.32.3
|
||||
yt_dlp
|
||||
brotli>=1.1.0
|
||||
dateparser>=1.2.2
|
||||
bbpb>=1.4.2
|
||||
@@ -1,6 +1,7 @@
|
||||
#!/usr/bin/python3
|
||||
import brotli, yt_dlp, requests, json, time
|
||||
from ythdd_globals import safeTraverse
|
||||
import ythdd_proto
|
||||
import ythdd_globals
|
||||
|
||||
ytdl_opts = {
|
||||
@@ -518,12 +519,15 @@ def WEBgetVideoComments(ctoken: str) -> tuple:
|
||||
|
||||
return actual_comments, new_continuation
|
||||
|
||||
def WEBextractPlaylist(plid: str = "", ctoken: str = "", prefix: str = "VL"):
|
||||
def WEBextractPlaylist(plid: str = "", ctoken: str = ""):
|
||||
|
||||
additional_context = {'browseId': prefix + plid}
|
||||
# if ctoken has been provided, use it
|
||||
if ctoken:
|
||||
# playlist id can be omitted if ctoken is provided
|
||||
additional_context = {'continuation': ctoken}
|
||||
else:
|
||||
# try to create ctoken which will allow for accessing the full playlist, including delisted/deleted videos
|
||||
additional_context = {'continuation': ythdd_proto.producePlaylistContinuation(plid, offset=0)}
|
||||
|
||||
context = makeWebContext(additional_context)
|
||||
|
||||
@@ -540,52 +544,15 @@ def WEBextractPlaylist(plid: str = "", ctoken: str = "", prefix: str = "VL"):
|
||||
if not ctoken:
|
||||
metadata = {
|
||||
"header": safeTraverse(resp_json, ["header"]),
|
||||
#"microformat": safeTraverse(resp_json, ["microformat"]),
|
||||
"sidebar": safeTraverse(resp_json, ["sidebar"])
|
||||
}
|
||||
|
||||
# TODO (after python protodec implementation/wrapper is done):
|
||||
# # try to extract ctoken containing the full playlist, including delisted/deleted videos
|
||||
# full_playlist_ctoken = urllib.parse.quote(protodec.from_json({
|
||||
# "80226972:embedded": {
|
||||
# "2:string": prefix + plid,
|
||||
# "3:base64": {
|
||||
# "1:varint": request_count, # todo: increment by 200 with an external index
|
||||
# "15:string": "PT:" + urllib.parse.quote(protodec.from_json({"1:varint": index})),
|
||||
# "104:embedded": {"1:0:varint": 0}
|
||||
# },
|
||||
# "35:string": plid
|
||||
# }
|
||||
# }))
|
||||
# # if ctoken creation succeeded
|
||||
# if full_playlist_ctoken:
|
||||
# # make another request
|
||||
# response = requests.post(
|
||||
# 'https://www.youtube.com/youtubei/v1/browse?prettyPrint=false',
|
||||
# headers = ythdd_globals.getHeaders(),
|
||||
# json = makeWebContext({'continuation': full_playlist_ctoken})
|
||||
# )
|
||||
# resp_json = json.loads(response.text)
|
||||
# else:
|
||||
# print("error(WEBextractPlaylist): full playlist metadata extraction failed. Delisted/deleted videos will be missing.")
|
||||
|
||||
# extract continuation
|
||||
new_continuation = None
|
||||
if ctoken:
|
||||
# subsequent request
|
||||
new_continuation = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems", -1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"])
|
||||
else:
|
||||
# first-time request
|
||||
new_continuation = safeTraverse(resp_json, ["contents", "twoColumnBrowseResultsRenderer", "tabs", 0, "tabRenderer", "content", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "playlistVideoListRenderer", "contents", -1, "continuationItemRenderer", "continuationEndpoint", "commandExecutorCommand", "commands", -1, "continuationCommand", "token"])
|
||||
new_continuation = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems", -1, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token"])
|
||||
|
||||
# "best-effort" playlist's videos extraction
|
||||
# "best-effort" because None's (unsuccessful video extraction = None) are passed as they are
|
||||
# warning! todo: iterate over this, as shorts cannot be currently extracted (they use richGridRenderer, not playlistVideoListRenderer)
|
||||
videos = None
|
||||
if ctoken: # or full_playlist_ctoken:
|
||||
videos = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"]) # includes continuation as last element of list, which will be ignored
|
||||
else:
|
||||
videos = safeTraverse(resp_json, ["contents", "twoColumnBrowseResultsRenderer", "tabs", 0, "tabRenderer", "content", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "playlistVideoListRenderer", "contents"])
|
||||
videos = safeTraverse(resp_json, ["onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems"]) # includes continuation as last element of list, which will be ignored
|
||||
|
||||
return metadata, new_continuation, videos
|
||||
|
||||
|
||||
85
ythdd_proto.py
Normal file
85
ythdd_proto.py
Normal file
@@ -0,0 +1,85 @@
|
||||
from ythdd_globals import safeTraverse
|
||||
import base64
|
||||
import blackboxprotobuf as bbpb
|
||||
import json
|
||||
import urllib.parse
|
||||
import ythdd_globals
|
||||
|
||||
def bbpbToB64(msg_and_typedef: tuple, urlsafe: bool = False, padding: bool = False) -> str:
|
||||
encoded_protobuf = bbpb.encode_message(*msg_and_typedef)
|
||||
if urlsafe:
|
||||
b64_protobuf = base64.urlsafe_b64encode(encoded_protobuf)
|
||||
else:
|
||||
b64_protobuf = base64.b64encode(encoded_protobuf)
|
||||
if padding:
|
||||
url_encoded_b64 = urllib.parse.quote(b64_protobuf.decode())
|
||||
else:
|
||||
url_encoded_b64 = b64_protobuf.decode().rstrip('=')
|
||||
return url_encoded_b64
|
||||
|
||||
def fdictToBbpb(msg: dict) -> tuple:
|
||||
# Requires Python 3.7+ or CPython 3.6+,
|
||||
# as these versions preserve dictionary insertion order.
|
||||
# Structural matching (match, case) requires Python 3.10+.
|
||||
clean_msg = {}
|
||||
clean_type = {}
|
||||
for key in msg:
|
||||
num, type = key.split(":")
|
||||
|
||||
match type:
|
||||
case "message":
|
||||
# if the type is an embedded message
|
||||
internal_msg, internal_type = fdictToBbpb(msg[key])
|
||||
# msg can just be appended as usual
|
||||
clean_msg[num] = internal_msg
|
||||
# type contains more fields than normally
|
||||
clean_type[num] = {
|
||||
'field_order': list(internal_msg.keys()),
|
||||
'message_typedef': internal_type,
|
||||
'type': type
|
||||
}
|
||||
|
||||
case "base64" | "base64u" | "base64p" | "base64up":
|
||||
# if the type is a base64-embedded message
|
||||
internal_msg, internal_type = fdictToBbpb(msg[key])
|
||||
match type.removeprefix("base64"):
|
||||
case "":
|
||||
b64_encoded_msg = bbpbToB64((internal_msg, internal_type))
|
||||
case "u":
|
||||
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), urlsafe=True)
|
||||
case "p":
|
||||
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), padding=True)
|
||||
case "up":
|
||||
b64_encoded_msg = bbpbToB64((internal_msg, internal_type), urlsafe=True, padding=True)
|
||||
clean_msg[num] = b64_encoded_msg
|
||||
clean_type[num] = {'type': 'string'}
|
||||
|
||||
case "int" | "string":
|
||||
clean_msg[num] = msg[key]
|
||||
clean_type[num] = {'type': type}
|
||||
|
||||
case _:
|
||||
raise KeyError(f'error(fmsgToBBPBTuple): invalid key "{type}"')
|
||||
|
||||
|
||||
return (clean_msg, clean_type)
|
||||
|
||||
def producePlaylistContinuation(plid: str, offset: int = 0) -> str:
|
||||
msge = {
|
||||
'80226972:message': {
|
||||
'2:string': f'VL{plid}',
|
||||
'3:base64': {
|
||||
'1:int': int(offset / 100),
|
||||
'15:string': f'PT:{bbpbToB64(fdictToBbpb({"1:int": offset}))}',
|
||||
'104:message': {
|
||||
'1:int': 0
|
||||
}
|
||||
},
|
||||
'35:string': plid
|
||||
}
|
||||
}
|
||||
|
||||
bbpb_dicts = fdictToBbpb(msge)
|
||||
b64_ctoken = bbpbToB64(bbpb_dicts, urlsafe=True, padding=True)
|
||||
|
||||
return b64_ctoken
|
||||
@@ -313,6 +313,7 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
|
||||
}
|
||||
|
||||
case "playlistVideoRenderer":
|
||||
# used by all content inside of playlists which have at least one non-shorts video/livestream
|
||||
|
||||
video_id = safeTraverse(entry, ["playlistVideoRenderer", "videoId"], default="UnknownVideoId")
|
||||
title = safeTraverse(entry, ["playlistVideoRenderer", "title", "runs", 0, "text"], default="Unknown video title")
|
||||
@@ -321,7 +322,11 @@ def parseRenderers(entry: dict, context: dict = {}) -> dict:
|
||||
video_index = int(safeTraverse(entry, ["playlistVideoRenderer", "index", "simpleText"], default="1")) - 1
|
||||
length = parseLengthFromTimeBadge(safeTraverse(entry, ["playlistVideoRenderer", "lengthText", "simpleText"], default="0:0"))
|
||||
published_date = safeTraverse(entry, ["playlistVideoRenderer", "videoInfo", "runs", -1, "text"], default="2000-01-01")
|
||||
published_date = published_date.removeprefix("Streamed ")
|
||||
published_date = published_date.removeprefix("Streamed ").removeprefix(" watching")
|
||||
|
||||
# handle livestreams
|
||||
if not published_date:
|
||||
published_date = "now"
|
||||
|
||||
return {
|
||||
"type": "video",
|
||||
|
||||
Reference in New Issue
Block a user