[ie/sauceplus] Add extractor (#13567)

Authored by: ceandreasen, bashonly

Co-authored-by: bashonly <88596187+bashonly@users.noreply.github.com>
This commit is contained in:
Clark 2025-06-30 18:25:28 -05:00 committed by GitHub
parent b16722ede8
commit 35fc33fbc5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 176 additions and 98 deletions

View File

@ -1824,6 +1824,7 @@ from .safari import (
from .saitosan import SaitosanIE
from .samplefocus import SampleFocusIE
from .sapo import SapoIE
from .sauceplus import SaucePlusIE
from .sbs import SBSIE
from .sbscokr import (
SBSCoKrAllvodProgramIE,

View File

@ -17,8 +17,140 @@ from ..utils import (
from ..utils.traversal import traverse_obj
class FloatplaneIE(InfoExtractor):
class FloatplaneBaseIE(InfoExtractor):
def _real_extract(self, url):
post_id = self._match_id(url)
post_data = self._download_json(
f'{self._BASE_URL}/api/v3/content/post', post_id, query={'id': post_id},
note='Downloading post data', errnote='Unable to download post data',
impersonate=self._IMPERSONATE_TARGET)
if not any(traverse_obj(post_data, ('metadata', ('hasVideo', 'hasAudio')))):
raise ExtractorError('Post does not contain a video or audio track', expected=True)
uploader_url = format_field(
post_data, [('creator', 'urlname')], f'{self._BASE_URL}/channel/%s/home') or None
common_info = {
'uploader_url': uploader_url,
'channel_url': urljoin(f'{uploader_url}/', traverse_obj(post_data, ('channel', 'urlname'))),
'availability': self._availability(needs_subscription=True),
**traverse_obj(post_data, {
'uploader': ('creator', 'title', {str}),
'uploader_id': ('creator', 'id', {str}),
'channel': ('channel', 'title', {str}),
'channel_id': ('channel', 'id', {str}),
'release_timestamp': ('releaseDate', {parse_iso8601}),
}),
}
items = []
for media in traverse_obj(post_data, (('videoAttachments', 'audioAttachments'), ...)):
media_id = media['id']
media_typ = media.get('type') or 'video'
metadata = self._download_json(
f'{self._BASE_URL}/api/v3/content/{media_typ}', media_id, query={'id': media_id},
note=f'Downloading {media_typ} metadata', impersonate=self._IMPERSONATE_TARGET)
stream = self._download_json(
f'{self._BASE_URL}/api/v2/cdn/delivery', media_id, query={
'type': 'vod' if media_typ == 'video' else 'aod',
'guid': metadata['guid'],
}, note=f'Downloading {media_typ} stream data',
impersonate=self._IMPERSONATE_TARGET)
path_template = traverse_obj(stream, ('resource', 'uri', {str}))
def format_path(params):
path = path_template
for i, val in (params or {}).items():
path = path.replace(f'{{qualityLevelParams.{i}}}', val)
return path
formats = []
for quality in traverse_obj(stream, ('resource', 'data', 'qualityLevels', ...)):
url = urljoin(stream['cdn'], format_path(traverse_obj(
stream, ('resource', 'data', 'qualityLevelParams', quality['name'], {dict}))))
format_id = traverse_obj(quality, ('name', {str}))
hls_aes = {}
m3u8_data = None
# If we need impersonation for the API, then we need it for HLS keys too: extract in advance
if self._IMPERSONATE_TARGET is not None:
m3u8_data = self._download_webpage(
url, media_id, fatal=False, impersonate=self._IMPERSONATE_TARGET, headers=self._HEADERS,
note=join_nonempty('Downloading', format_id, 'm3u8 information', delim=' '),
errnote=join_nonempty('Failed to download', format_id, 'm3u8 information', delim=' '))
if not m3u8_data:
continue
key_url = self._search_regex(
r'#EXT-X-KEY:METHOD=AES-128,URI="(https?://[^"]+)"',
m3u8_data, 'HLS AES key URI', default=None)
if key_url:
urlh = self._request_webpage(
key_url, media_id, fatal=False, impersonate=self._IMPERSONATE_TARGET, headers=self._HEADERS,
note=join_nonempty('Downloading', format_id, 'HLS AES key', delim=' '),
errnote=join_nonempty('Failed to download', format_id, 'HLS AES key', delim=' '))
if urlh:
hls_aes['key'] = urlh.read().hex()
formats.append({
**traverse_obj(quality, {
'format_note': ('label', {str}),
'width': ('width', {int}),
'height': ('height', {int}),
}),
**parse_codecs(quality.get('codecs')),
'url': url,
'ext': determine_ext(url.partition('/chunk.m3u8')[0], 'mp4'),
'format_id': format_id,
'hls_media_playlist_data': m3u8_data,
'hls_aes': hls_aes or None,
})
items.append({
**common_info,
'id': media_id,
**traverse_obj(metadata, {
'title': ('title', {str}),
'duration': ('duration', {int_or_none}),
'thumbnail': ('thumbnail', 'path', {url_or_none}),
}),
'formats': formats,
})
post_info = {
**common_info,
'id': post_id,
'display_id': post_id,
**traverse_obj(post_data, {
'title': ('title', {str}),
'description': ('text', {clean_html}),
'like_count': ('likes', {int_or_none}),
'dislike_count': ('dislikes', {int_or_none}),
'comment_count': ('comments', {int_or_none}),
'thumbnail': ('thumbnail', 'path', {url_or_none}),
}),
'http_headers': self._HEADERS,
}
if len(items) > 1:
return self.playlist_result(items, **post_info)
post_info.update(items[0])
return post_info
class FloatplaneIE(FloatplaneBaseIE):
_VALID_URL = r'https?://(?:(?:www|beta)\.)?floatplane\.com/post/(?P<id>\w+)'
_BASE_URL = 'https://www.floatplane.com'
_IMPERSONATE_TARGET = None
_HEADERS = {
'Origin': _BASE_URL,
'Referer': f'{_BASE_URL}/',
}
_TESTS = [{
'url': 'https://www.floatplane.com/post/2Yf3UedF7C',
'info_dict': {
@ -170,105 +302,9 @@ class FloatplaneIE(InfoExtractor):
}]
def _real_initialize(self):
if not self._get_cookies('https://www.floatplane.com').get('sails.sid'):
if not self._get_cookies(self._BASE_URL).get('sails.sid'):
self.raise_login_required()
def _real_extract(self, url):
post_id = self._match_id(url)
post_data = self._download_json(
'https://www.floatplane.com/api/v3/content/post', post_id, query={'id': post_id},
note='Downloading post data', errnote='Unable to download post data')
if not any(traverse_obj(post_data, ('metadata', ('hasVideo', 'hasAudio')))):
raise ExtractorError('Post does not contain a video or audio track', expected=True)
uploader_url = format_field(
post_data, [('creator', 'urlname')], 'https://www.floatplane.com/channel/%s/home') or None
common_info = {
'uploader_url': uploader_url,
'channel_url': urljoin(f'{uploader_url}/', traverse_obj(post_data, ('channel', 'urlname'))),
'availability': self._availability(needs_subscription=True),
**traverse_obj(post_data, {
'uploader': ('creator', 'title', {str}),
'uploader_id': ('creator', 'id', {str}),
'channel': ('channel', 'title', {str}),
'channel_id': ('channel', 'id', {str}),
'release_timestamp': ('releaseDate', {parse_iso8601}),
}),
}
items = []
for media in traverse_obj(post_data, (('videoAttachments', 'audioAttachments'), ...)):
media_id = media['id']
media_typ = media.get('type') or 'video'
metadata = self._download_json(
f'https://www.floatplane.com/api/v3/content/{media_typ}', media_id, query={'id': media_id},
note=f'Downloading {media_typ} metadata')
stream = self._download_json(
'https://www.floatplane.com/api/v2/cdn/delivery', media_id, query={
'type': 'vod' if media_typ == 'video' else 'aod',
'guid': metadata['guid'],
}, note=f'Downloading {media_typ} stream data')
path_template = traverse_obj(stream, ('resource', 'uri', {str}))
def format_path(params):
path = path_template
for i, val in (params or {}).items():
path = path.replace(f'{{qualityLevelParams.{i}}}', val)
return path
formats = []
for quality in traverse_obj(stream, ('resource', 'data', 'qualityLevels', ...)):
url = urljoin(stream['cdn'], format_path(traverse_obj(
stream, ('resource', 'data', 'qualityLevelParams', quality['name'], {dict}))))
formats.append({
**traverse_obj(quality, {
'format_id': ('name', {str}),
'format_note': ('label', {str}),
'width': ('width', {int}),
'height': ('height', {int}),
}),
**parse_codecs(quality.get('codecs')),
'url': url,
'ext': determine_ext(url.partition('/chunk.m3u8')[0], 'mp4'),
})
items.append({
**common_info,
'id': media_id,
**traverse_obj(metadata, {
'title': ('title', {str}),
'duration': ('duration', {int_or_none}),
'thumbnail': ('thumbnail', 'path', {url_or_none}),
}),
'formats': formats,
})
post_info = {
**common_info,
'id': post_id,
'display_id': post_id,
**traverse_obj(post_data, {
'title': ('title', {str}),
'description': ('text', {clean_html}),
'like_count': ('likes', {int_or_none}),
'dislike_count': ('dislikes', {int_or_none}),
'comment_count': ('comments', {int_or_none}),
'thumbnail': ('thumbnail', 'path', {url_or_none}),
}),
}
if len(items) > 1:
return self.playlist_result(items, **post_info)
post_info.update(items[0])
return post_info
class FloatplaneChannelIE(InfoExtractor):
_VALID_URL = r'https?://(?:(?:www|beta)\.)?floatplane\.com/channel/(?P<id>[\w-]+)/home(?:/(?P<channel>[\w-]+))?'

View File

@ -0,0 +1,41 @@
from .floatplane import FloatplaneBaseIE
class SaucePlusIE(FloatplaneBaseIE):
IE_DESC = 'Sauce+'
_VALID_URL = r'https?://(?:(?:www|beta)\.)?sauceplus\.com/post/(?P<id>\w+)'
_BASE_URL = 'https://www.sauceplus.com'
_HEADERS = {
'Origin': _BASE_URL,
'Referer': f'{_BASE_URL}/',
}
_IMPERSONATE_TARGET = True
_TESTS = [{
'url': 'https://www.sauceplus.com/post/YbBwIa2A5g',
'info_dict': {
'id': 'eit4Ugu5TL',
'ext': 'mp4',
'display_id': 'YbBwIa2A5g',
'title': 'Scare the Coyote - Episode 3',
'description': '',
'thumbnail': r're:^https?://.*\.jpe?g$',
'duration': 2975,
'comment_count': int,
'like_count': int,
'dislike_count': int,
'release_date': '20250627',
'release_timestamp': 1750993500,
'uploader': 'Scare The Coyote',
'uploader_id': '683e0a3269688656a5a49a44',
'uploader_url': 'https://www.sauceplus.com/channel/ScareTheCoyote/home',
'channel': 'Scare The Coyote',
'channel_id': '683e0a326968866ceba49a45',
'channel_url': 'https://www.sauceplus.com/channel/ScareTheCoyote/home/main',
'availability': 'subscriber_only',
},
'params': {'skip_download': 'm3u8'},
}]
def _real_initialize(self):
if not self._get_cookies(self._BASE_URL).get('__Host-sp-sess'):
self.raise_login_required()