# -*- coding: UTF-8 -*-
# This video extraction code based on youtube-dl: https://github.com/ytdl-org/youtube-dl
from __future__ import print_function
from re import escape
from re import findall
from re import match
from re import search
from re import sub
from json import dumps
from json import loads
from Components.config import config
from .compat import compat_parse_qs
from .compat import compat_Request
from .compat import compat_urlopen
from .compat import compat_URLError
from .compat import SUBURI
from .jsinterp import JSInterpreter
IGNORE_VIDEO_FORMAT = (
'43', '44', '45', '46', # webm
'82', '83', '84', '85', # 3D
'100', '101', '102', # 3D
'167', '168', '169', # webm
'170', '171', '172', # webm
'218', '219', # webm
'242', '243', '244', '245', '246', '247', # webm
'394', '395', '396', '397', '398', '399', '400', '401', '402', '694', '695', '696', '697', '698', '699', '700', '701', '571', # AV1
'249', '250', '251', # webm
'302' # webm
)
PRIORITY_VIDEO_FORMAT = ()
def create_priority_formats():
global PRIORITY_VIDEO_FORMAT
itag = config.plugins.YouTube.maxResolution.value
video_formats = (
('17', '91', '13', '151', '160', '269'), # 176x144
('5', '36', '92', '132', '133', '229'), # 400x240
('18', '93', '34', '6', '134', '230'), # 640x360
('35', '59', '78', '94', '135', '212', '231'), # 854x480
('22', '95', '300', '136', '298', '232'), # 1280x720
('37', '96', '301', '137', '299', '248', '303', '271', '270'), # 1920x1080
('38', '266', '264', '138', '313', '315', '272', '308') # 4096x3072
)
for video_format in video_formats:
PRIORITY_VIDEO_FORMAT = video_format + PRIORITY_VIDEO_FORMAT
if video_format[0] == itag:
break
create_priority_formats()
class YouTubeVideoUrl():
def __init__(self):
self.use_dash_mp4 = ()
self._code_cache = {}
self._player_cache = {}
self.nsig_cache = (None, None)
@staticmethod
def try_get(src, getter):
for x in getter:
if isinstance(src, dict) and x in src:
src = src[x]
else:
return None
return src
@staticmethod
def _guess_encoding_from_content(content_type, webpage_bytes):
m = match(r'[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+\s*;\s*charset=(.+)', content_type)
if m:
encoding = m.group(1)
else:
m = search(br']+charset=[\'"]?([^\'")]+)[ /\'">]', webpage_bytes[:1024])
if m:
encoding = m.group(1).decode('ascii')
elif webpage_bytes.startswith(b'\xff\xfe'):
encoding = 'utf-16'
else:
encoding = 'utf-8'
return encoding
def _download_webpage(self, url, data=None, headers={}):
""" Return the data of the page as a string """
if data:
data = dumps(data).encode('utf8')
if data or headers:
url = compat_Request(url, data=data, headers=headers)
url.get_method = lambda: 'POST'
try:
urlh = compat_urlopen(url, timeout=5)
except compat_URLError as e: # pragma: no cover
raise RuntimeError(e.reason)
content_type = urlh.headers.get('Content-Type', '')
webpage_bytes = urlh.read()
encoding = self._guess_encoding_from_content(content_type, webpage_bytes)
try:
content = webpage_bytes.decode(encoding, 'replace')
except Exception: # pragma: no cover
content = webpage_bytes.decode('utf-8', 'replace')
return content
@staticmethod
def _extract_n_function_name(jscode):
func_name, idx = search(
r'''(?x)
(?:
\.get\("n"\)\)&&\(b=|
(?:
b=String\.fromCharCode\(110\)|
(?P[a-zA-Z0-9_$.]+)&&\(b="nn"\[\+(?P=str_idx)\]
)
(?:
,[a-zA-Z0-9_$]+\(a\))?,c=a\.
(?:
get\(b\)|
[a-zA-Z0-9_$]+\[b\]\|\|null
)\)&&\(c=|
\b(?P[a-zA-Z0-9_$]+)=
)(?P[a-zA-Z0-9_$]+)(?:\[(?P\d+)\])?\([a-zA-Z]\)
(?(var),[a-zA-Z0-9_$]+\.set\((?:"n+"|[a-zA-Z0-9_$]+)\,(?P=var)\))
''', jscode
).group('nfunc', 'idx')
if not func_name:
print('[YouTubeVideoUrl] Falling back to generic n function search')
return search(
r'''(?xs)
;\s*(?P[a-zA-Z0-9_$]+)\s*=\s*function\([a-zA-Z0-9_$]+\)
\s*\{(?:(?!};).)+?return\s*(?P["'])[\w-]+_w8_(?P=q)\s*\+\s*[a-zA-Z0-9_$]+
''', jscode
).group('name')
if not idx:
return func_name
if int(idx) == 0:
real_nfunc = search(
r'var %s\s*=\s*(\[.+?\])\s*[,;]' % (escape(func_name), ),
jscode
)
if real_nfunc:
return real_nfunc.group(1)[1:-1]
def _extract_player_info(self):
res = self._download_webpage('https://www.youtube.com/iframe_api')
if res:
player_id = search(r'player\\?/([0-9a-fA-F]{8})\\?/', res)
if player_id:
return player_id.group(1)
print('[YouTubeVideoUrl] Cannot get player info')
def _load_player(self, player_id):
if player_id and player_id not in self._player_cache:
self._player_cache[player_id] = self._download_webpage(
'https://www.youtube.com/s/player/%s/player_ias.vflset/en_US/base.js' % player_id
)
@staticmethod
def _fixup_n_function_code(argnames, code):
return argnames, sub(
r';\s*if\s*\(\s*typeof\s+[a-zA-Z0-9_$]+\s*===?\s*(["\'])undefined\1\s*\)\s*return\s+%s;' % argnames[0],
';', code)
def _extract_function(self, player_id, s_id):
if player_id not in self._player_cache:
self._load_player(player_id)
jsi = JSInterpreter(self._player_cache[player_id])
if s_id not in self._code_cache:
if s_id.startswith('nsig_'):
funcname = self._extract_n_function_name(self._player_cache[player_id])
else:
funcname = self._parse_sig_js(self._player_cache[player_id])
self._code_cache[s_id] = self._fixup_n_function_code(*jsi.extract_function_code(funcname))
return lambda s: jsi.extract_function_from_code(*self._code_cache[s_id])([s])
def _unthrottle_url(self, url, player_id):
n_param = search(r'&n=(.+?)&', url).group(1)
n_id = 'nsig_%s_%s' % (player_id, '.'.join(str(len(p)) for p in n_param.split('.')))
if self.nsig_cache[0] != n_param:
print('[YouTubeVideoUrl] Decrypt nsig', n_id)
self.nsig_cache = (None, None)
try:
ret = self._extract_function(player_id, n_id)(n_param)
except Exception as ex:
print('[YouTubeVideoUrl] Unable to decode nsig', ex)
else:
if ret.startswith('enhanced_except_') or ret.endswith(n_param):
print('[YouTubeVideoUrl] Unhandled exception in decode', ret)
else:
self.nsig_cache = (n_param, ret)
if self.nsig_cache[1]:
print('[YouTubeVideoUrl] Decrypted nsig %s => %s' % self.nsig_cache)
return url.replace(self.nsig_cache[0], self.nsig_cache[1])
if n_id in self._code_cache:
del self._code_cache[n_id]
return url
def _decrypt_signature_url(self, sc, player_id):
"""Turn the encrypted s field into a working signature"""
s = sc.get('s', [''])[0]
s_id = 'sig_%s_%s' % (player_id, '.'.join(str(len(p)) for p in s.split('.')))
print('[YouTubeVideoUrl] Decrypt signature', s_id)
try:
sig = self._extract_function(player_id, s_id)(s)
except Exception as ex:
print('[YouTubeVideoUrl] Signature extraction failed', ex)
if s_id in self._code_cache:
del self._code_cache[s_id]
else:
return '%s&%s=%s' % (sc['url'][0], sc['sp'][0] if 'sp' in sc else 'signature', sig)
def _parse_sig_js(self, jscode):
def _search_regex(pattern, string):
mobj = ''
for p in pattern:
mobj = search(p, string, 0)
if mobj:
break
return mobj
return _search_regex(
(r'\b(?P[a-zA-Z0-9_$]+)&&\((?P=var)=(?P[a-zA-Z0-9_$]{2,})\(decodeURIComponent\((?P=var)\)\)',
r'(?P[a-zA-Z0-9_$]+)\s*=\s*function\(\s*(?P[a-zA-Z0-9_$]+)\s*\)\s*{\s*(?P=arg)\s*=\s*(?P=arg)\.split\(\s*""\s*\)\s*;\s*[^}]+;\s*return\s+(?P=arg)\.join\(\s*""\s*\)',
r'(?:\b|[^a-zA-Z0-9_$])(?P[a-zA-Z0-9_$]{2,})\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)(?:;[a-zA-Z0-9_$]{2}\.[a-zA-Z0-9_$]{2}\(a,\d+\))?',
# Old patterns
r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P[a-zA-Z0-9$]+)\(',
r'\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P[a-zA-Z0-9$]+)\(',
r'\bm=(?P[a-zA-Z0-9$]{2,})\(decodeURIComponent\(h\.s\)\)',
# Obsolete patterns
r'("|\')signature\1\s*,\s*(?P[a-zA-Z0-9$]+)\(',
r'\.sig\|\|(?P[a-zA-Z0-9$]+)\(',
r'yt\.akamaized\.net/\)\s*\|\|\s*.*?\s*[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?:encodeURIComponent\s*\()?\s*(?P[a-zA-Z0-9$]+)\(',
r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?P[a-zA-Z0-9$]+)\(',
r'\bc\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*\([^)]*\)\s*\(\s*(?P[a-zA-Z0-9$]+)\('),
jscode
).group('sig')
@staticmethod
def _parse_m3u8_attributes(attrib):
return {key: val[1:-1] if val.startswith('"') else val for (key, val) in findall(r'(?P[A-Z0-9-]+)=(?P"[^"]+"|[^",]+)(?:,|$)', attrib)}
def _get_m3u8_audio_urls(self, manifest):
audio_urls = {}
if '#EXT-X-MEDIA:' in manifest:
for line in manifest.splitlines():
if line.startswith('#EXT-X-MEDIA:'):
audio_info = self._parse_m3u8_attributes(line)
audio_urls[audio_info.get('GROUP-ID')] = audio_info.get('URI')
return audio_urls
@staticmethod
def _url_map_append(url_map, itag, url):
if itag not in IGNORE_VIDEO_FORMAT:
url_map.append({
'url': url,
'preference': PRIORITY_VIDEO_FORMAT.index(itag) if itag in PRIORITY_VIDEO_FORMAT else 100
})
def _extract_from_m3u8(self, manifest_url):
url_map = []
audio_url = ''
manifest = self._download_webpage(manifest_url)
audio_urls = self._get_m3u8_audio_urls(manifest)
for line in manifest.splitlines():
if audio_urls and line.startswith('#EXT-X-STREAM-INF:'):
audio_id = self._parse_m3u8_attributes(line).get('AUDIO')
if audio_id and audio_id in audio_urls:
audio_url = SUBURI + audio_urls.get(audio_id)
elif line.startswith('https'):
itag = search(r'/sgovp/[^/]+itag%3D(\d+?)/', line) or search(r'/itag/(\d+?)/', line)
if itag:
self._url_map_append(url_map, itag.group(1), line + audio_url)
audio_url = ''
return sorted(url_map, key=lambda k: k['preference'])
def _skip_fmt(self, fmt, itag):
return (
fmt.get('targetDurationSec') or
fmt.get('drmFamilies') or
fmt.get('type') == 'FORMAT_STREAM_TYPE_OTF' or
itag in IGNORE_VIDEO_FORMAT or
itag in self.use_dash_mp4
)
def _extract_url(self, fmt, player_id):
url = fmt.get('url')
if not url and 'signatureCipher' in fmt:
url = self._decrypt_signature_url(compat_parse_qs(fmt.get('signatureCipher', '')), player_id)
if url:
if '&n=' in url:
url = self._unthrottle_url(url, player_id)
return url
@staticmethod
def _video_pref(fmt, prefer):
if prefer == 100:
prefer = 20 if 'video/mp4' in fmt.get('mimeType').lower() else 200
return prefer
@staticmethod
def _audio_pref(fmt, prefer, get_audio):
audio_track = fmt.get('audioTrack', {})
if get_audio == '' and 'original' in audio_track.get('displayName', '').lower():
prefer -= 40
if audio_track.get('audioIsDefault'):
prefer -= 20
if prefer == 100:
prefer = 20 if 'audio/mp4' in fmt.get('mimeType').lower() else 200
return prefer
def _sort_formats(self, priority_formats, streaming_formats, get_audio=None):
sorted_fmt = []
for fmt in streaming_formats:
itag = str(fmt.get('itag', ''))
if self._skip_fmt(fmt, itag):
continue
prefer = priority_formats.index(itag) if itag in priority_formats else 100
prefer = self._video_pref(fmt, prefer) if get_audio is None else self._audio_pref(fmt, prefer, get_audio)
if prefer < 200:
fmt['preference'] = prefer
sorted_fmt.append(fmt)
return sorted(sorted_fmt, key=lambda k: k['preference'])
def _extract_fmt_video_format(self, streaming_formats, player_id):
print('[YouTubeVideoUrl] Try fmt url')
for fmt in self._sort_formats(PRIORITY_VIDEO_FORMAT, streaming_formats):
url = self._extract_url(fmt, player_id)
if url:
print('[YouTubeVideoUrl] Found fmt url')
return url, str(fmt.get('itag'))
return '', ''
def _extract_dash_audio_format(self, streaming_formats, player_id, lang):
""" If DASH MP4 video add also DASH MP4 audio track"""
print('[YouTubeVideoUrl] Try fmt audio url')
DASH_AUDIO_FORMATS = ('141', '140', '139', '258', '265', '325', '328', '233', '234')
for fmt in self._sort_formats(DASH_AUDIO_FORMATS, streaming_formats, lang):
url = self._extract_url(fmt, player_id)
if url:
print('[YouTubeVideoUrl] Found fmt audio url')
return url
return ''
def _extract_signature_timestamp(self):
sts = None
player_id = self._extract_player_info()
if player_id:
if player_id not in self._player_cache:
self._load_player(player_id)
sts = search(
r'(?:signatureTimestamp|sts)\s*:\s*(?P\d{5})',
self._player_cache[player_id]
).group('sts')
return sts, player_id
def _extract_visitor_id(self, webpage):
ytcfg = search(r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;', webpage)
if ytcfg:
try:
return self.try_get(loads(ytcfg.group(1)), ('INNERTUBE_CONTEXT', 'client', 'visitorData'))
except ValueError: # pragma: no cover
pass
print('[YouTubeVideoUrl] Failed to extract visitor id')
def _extract_web_response(self, webpage):
player_response = search(r'ytInitialPlayerResponse\s*=\s*({[^>]*})\s*;\s*(?:var\s+meta|