Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement n-param descrambling using JSInterp #6

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 99 additions & 16 deletions youtube_dl/extractor/youtube.py
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,17 @@ def _extract_player_info(cls, player_url):
raise ExtractorError('Cannot identify player %r' % player_url)
return id_m.group('id')

def _get_player_code(self, video_id, player_url, player_id=None):
if not player_id:
player_id = self._extract_player_info(player_url)

if player_id not in self._code_cache:
self._code_cache[player_id] = self._download_webpage(
player_url, video_id,
note='Downloading player ' + player_id,
errnote='Download of %s failed' % player_url)
return self._code_cache[player_id]

def _extract_signature_function(self, video_id, player_url, example_sig):
player_id = self._extract_player_info(player_url)

Expand All @@ -1266,12 +1277,7 @@ def _extract_signature_function(self, video_id, player_url, example_sig):
if cache_spec is not None:
return lambda s: ''.join(s[i] for i in cache_spec)

if player_id not in self._code_cache:
self._code_cache[player_id] = self._download_webpage(
player_url, video_id,
note='Downloading player ' + player_id,
errnote='Download of %s failed' % player_url)
code = self._code_cache[player_id]
code = self._get_player_code(video_id, player_url, player_id)
res = self._parse_sig_js(code)

test_string = ''.join(map(compat_chr, range(len(example_sig))))
Expand Down Expand Up @@ -1350,11 +1356,6 @@ def _decrypt_signature(self, s, video_id, player_url):
if player_url is None:
raise ExtractorError('Cannot decrypt signature without player_url')

if player_url.startswith('//'):
player_url = 'https:' + player_url
elif not re.match(r'https?://', player_url):
player_url = compat_urlparse.urljoin(
'https://www.youtube.com', player_url)
try:
player_id = (player_url, self._signature_cache_id(s))
if player_id not in self._player_cache:
Expand All @@ -1371,6 +1372,88 @@ def _decrypt_signature(self, s, video_id, player_url):
raise ExtractorError(
'Signature extraction failed: ' + tb, cause=e)

def _extract_player_url(self, webpage):
player_url = self._search_regex(
r'"(?:PLAYER_JS_URL|jsUrl)"\s*:\s*"([^"]+)"',
webpage or '', 'player URL', fatal=False)
if not player_url:
return
if player_url.startswith('//'):
player_url = 'https:' + player_url
elif not re.match(r'https?://', player_url):
player_url = compat_urlparse.urljoin(
'https://www.youtube.com', player_url)
return player_url

# from yt-dlp
# See also:
# 1. https://github.com/ytdl-org/youtube-dl/issues/29326#issuecomment-894619419
# 2. https://code.videolan.org/videolan/vlc/-/blob/4fb284e5af69aa9ac2100ccbdd3b88debec9987f/share/lua/playlist/youtube.lua#L116
# 3. https://github.com/ytdl-org/youtube-dl/issues/30097#issuecomment-950157377
def _extract_n_function_name(self, jscode):
return self._search_regex(
(r'\.get\("n"\)\)&&\(b=(?P<nfunc>[a-zA-Z0-9$]{3})\([a-zA-Z0-9]\)',),
jscode, 'Initial JS player n function name', group='nfunc')

def _extract_n_function(self, video_id, player_url):
player_id = self._extract_player_info(player_url)
func_code = self._downloader.cache.load('youtube-nsig', player_id)

if func_code:
jsi = JSInterpreter(func_code)
else:
player_id = self._extract_player_info(player_url)
jscode = self._get_player_code(video_id, player_url, player_id)
funcname = self._extract_n_function_name(jscode)
jsi = JSInterpreter(jscode)
func_code = jsi.extract_function_code(funcname)
self._downloader.cache.store('youtube-nsig', player_id, func_code)

if self._downloader.params.get('youtube_print_sig_code'):
self.to_screen('Extracted nsig function from {0}:\n{1}\n'.format(player_id, func_code[1]))

return lambda s: jsi.extract_function_from_code(*func_code)([s])

def _n_descramble(self, n_param, player_url, video_id):
"""Compute the response to YT's "n" parameter challenge

Args:
n_param -- challenge string that is the value of the
URL's "n" query parameter
player_url -- URL of YT player JS
video_id
"""

sig_id = ('nsig_value', n_param)
if sig_id in self._player_cache:
return self._player_cache[sig_id]

try:
player_id = ('nsig', player_url)
if player_id not in self._player_cache:
self._player_cache[player_id] = self._extract_n_function(video_id, player_url)
func = self._player_cache[player_id]
self._player_cache[sig_id] = func(n_param)
if self._downloader.params.get('verbose', False):
self._downloader.to_screen('[debug] [%s] %s' % (self.IE_NAME, 'Decrypted nsig {0} => {1}'.format(n_param, self._player_cache[sig_id])))
return self._player_cache[sig_id]
except Exception as e:
raise ExtractorError(traceback.format_exc(), cause=e, video_id=video_id)

def _unthrottle_format_urls(self, video_id, player_url, formats):
for fmt in formats:
parsed_fmt_url = compat_urlparse.urlparse(fmt['url'])
qs = compat_urlparse.parse_qs(parsed_fmt_url.query)
n_param = qs.get('n')
if not n_param:
continue
n_param = n_param[-1]
n_response = self._n_descramble(n_param, player_url, video_id)
if n_response:
qs['n'] = [n_response]
fmt['url'] = compat_urlparse.urlunparse(
parsed_fmt_url._replace(query=compat_urllib_parse_urlencode(qs, True)))

def _mark_watched(self, video_id, player_response):
playback_url = url_or_none(try_get(
player_response,
Expand Down Expand Up @@ -1632,11 +1715,7 @@ def feed_entry(name):
if not (sc and fmt_url and encrypted_sig):
continue
if not player_url:
if not webpage:
continue
player_url = self._search_regex(
r'"(?:PLAYER_JS_URL|jsUrl)"\s*:\s*"([^"]+)"',
webpage, 'player URL', fatal=False)
player_url = self._extract_player_url(webpage)
if not player_url:
continue
signature = self._decrypt_signature(sc['s'][0], video_id, player_url)
Expand Down Expand Up @@ -1782,6 +1861,10 @@ def feed_entry(name):
is_live = video_details.get('isLive')
owner_profile_url = microformat.get('ownerProfileUrl')

if not player_url:
player_url = self._extract_player_url(webpage)
self._unthrottle_format_urls(video_id, player_url, formats)

info = {
'id': video_id,
'title': self._live_title(video_title) if is_live else video_title,
Expand Down