Skip to content

Commit

Permalink
Fix age gate video
Browse files Browse the repository at this point in the history
Update players headers and data, add signatureTimestamp in context to get working url.
Return back n-sig decrypting to use for embedded age gate video.
Sync jsinterp code with youtube_dl.
  • Loading branch information
Taapat committed Aug 29, 2023
1 parent f9d1a23 commit 5824f84
Show file tree
Hide file tree
Showing 6 changed files with 1,233 additions and 19 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ jobs:
python -O -m compileall ./src/
- name: Check code with flake8
run: |
flake8 --ignore=W191,W504,E126,E127,E128,E501 --show-source --exclude=./src/compat.py,./test/enigmahelper.py,./test/try_plugin.py,./src/__init__.py
flake8 --ignore=W191,W504,E126,E127,E128,E501 --show-source --exclude=./src/compat.py,./src/jsinterp.py,./test/enigmahelper.py,./test/try_plugin.py,./src/__init__.py
flake8 --ignore=W191,E501,F821 --show-source --filename=./test/try_plugin.py
flake8 --ignore=W191,F401,E128,E402,E501 --show-source --filename=./test/enigmahelper.py,./src/__init__.py
flake8 --ignore=W191,F401,E128,E402,E501 --show-source --filename=./src/jsinterp.py,./test/enigmahelper.py,./src/__init__.py
flake8 --ignore=W191,F821,F401 --show-source --filename=./src/compat.py
- name: Remove enigma2 imports and variables for test
run: |
echo "" > src/__init__.py
sed -i 's/from Components/# from Components/g' src/YouTubeVideoUrl.py
sed -i 's/config.plugins.YouTube.maxResolution.value/"38"/g' src/YouTubeVideoUrl.py
sed -i 's/config.plugins.YouTube.maxResolution.value/"22"/g' src/YouTubeVideoUrl.py
sed -i 's/config.plugins.YouTube.useDashMP4.value/True/g' src/YouTubeVideoUrl.py
- name: Test code with pytest
run: |
Expand Down
1 change: 1 addition & 0 deletions src/OAuth.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def get_key(x):
API_KEY = get_key('Xhi3_LoIzw_OizD15SyCNReMvKL27nw_OizDWRR395T5uGWpvn451I2VYc78Gy463')
CLIENT_ID = get_key('4113447027255-v15bgs05u1o3m278mpjs2vcd0394w_OizDfrg5160drbw_Oiz63D.w_OizDpp75s.googleus87ercontent.99com')
CLIENT_SECRET = get_key('Zf93pqd2rxgY2ro159rK20BMxif27')
YT_EMBKEY = get_key('Xhi3_LoIzw_OizD15SyXhi_LoO_27FJ2SlqU8Q439STEHLGCilw51_Y9_11qcW863')
YT_KEY = get_key('Xhi3_LoIzw_OizD15SyXhi_Lo8e27iZmM1Fw_Oi39zDDVjRy-df512KTyQ_vz_y63YM39w')

if path.exists('/etc/enigma2/YouTube.key'): # pragma: no cover
Expand Down
118 changes: 104 additions & 14 deletions src/YouTubeVideoUrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@

from __future__ import print_function

from re import search, match, sub
from json import loads, dumps
from re import search
from re import match
from re import sub
from re import escape
from json import loads
from json import dumps

from Components.config import config

Expand All @@ -13,6 +17,8 @@
from .compat import compat_URLError
from .compat import compat_Request
from .compat import SUBURI
from .jsinterp import JSInterpreter
from .OAuth import YT_EMBKEY
from .OAuth import YT_KEY


Expand Down Expand Up @@ -83,6 +89,8 @@ def clean_html(html):
class YouTubeVideoUrl():
def __init__(self):
self.use_dash_mp4 = ()
self._code_cache = {}
self._player_cache = {}

@staticmethod
def _guess_encoding_from_content(content_type, webpage_bytes):
Expand Down Expand Up @@ -125,6 +133,66 @@ def _download_webpage(self, url, data=None, headers={}):

return content

def _extract_n_function_name(self, jscode):
nfunc, idx = search(
r'\.get\("n"\)\)&&\(b=(?P<nfunc>[a-zA-Z0-9$]+)(?:\[(?P<idx>\d+)\])?\([a-zA-Z0-9]\)',
jscode
).group('nfunc', 'idx')
if not idx:
return nfunc
if int(idx) == 0:
real_nfunc = search(
r'var %s\s*=\s*\[([a-zA-Z_$][\w$]*)\];' % (escape(nfunc), ),
jscode
)
if real_nfunc:
return real_nfunc.group(1)

def _extract_player_info(self):
res = self._download_webpage('https://www.youtube.com/iframe_api')
if res:
player_id = search(r'player\\?/([0-9a-fA-F]{8})\\?/', res)
if player_id:
return player_id.group(1)
print('[YouTubeVideoUrl] Cannot get player info')

def _load_player(self, player_id):
if player_id and player_id not in self._player_cache:
self._player_cache[player_id] = self._download_webpage(
'https://www.youtube.com/s/player/%s/player_ias.vflset/en_US/base.js' % player_id
)

def _extract_n_function(self, player_id):
if player_id not in self._player_cache:
self._load_player(player_id)
if player_id not in self._code_cache:
jsi = JSInterpreter(self._player_cache[player_id])
funcname = self._extract_n_function_name(self._player_cache[player_id])
self._code_cache[player_id] = jsi.extract_function_code(funcname)
else:
jsi = JSInterpreter(self._player_cache[player_id])
return lambda s: jsi.extract_function_from_code(*self._code_cache[player_id])([s])

def _unthrottle_url(self, url, player_id):
print('[YouTubeVideoUrl] Try unthrottle url')
if not player_id:
print('[YouTubeVideoUrl] Cannot decrypt nsig without player info')
return url
n_param = search(r'&n=(.+?)&', url).group(1)
try:
ret = self._extract_n_function(player_id)(n_param)
except Exception as ex:
print('[YouTubeVideoUrl] Unable to decode nsig', ex)
else:
if ret.startswith('enhanced_except_'):
print('[YouTubeVideoUrl] Unhandled exception in decode', ret)
else:
print('[YouTubeVideoUrl] Decrypted nsig %s => %s' % (n_param, ret))
return url.replace(n_param, ret)
if player_id in self._code_cache:
del self._code_cache[player_id]
return url

def _extract_from_m3u8(self, manifest_url):
url_map = {}

Expand Down Expand Up @@ -176,37 +244,48 @@ def _extract_dash_audio_format(self, streaming_formats):
return ''

def _extract_player_response(self, video_id, client):
url = 'https://www.youtube.com/youtubei/v1/player?key=%s&bpctr=9999999999&has_verified=1' % YT_KEY
player_id = None
url = 'https://www.youtube.com/youtubei/v1/player?key=%s&bpctr=9999999999&has_verified=1' % (YT_EMBKEY if client == 85 else YT_KEY)
USER_AGENT = 'com.google.android.youtube/17.31.35 (Linux; U; Android 12) gzip'
data = {
'videoId': video_id,
'params': 'CgIQBg',
'playbackContext': {
'contentPlaybackContext': {
'html5Preference': 'HTML5_PREF_WANTS'
}
}
}
headers = {
'Content-Type': 'application/json',
'Origin': 'https://www.youtube.com',
'User-Agent': USER_AGENT,
'X-YouTube-Client-Version': '17.31.35'
'content-type': 'application/json',
'Origin': 'https://www.youtube.com'
}
if client == 85:
player_id = self._extract_player_info()
if player_id:
if player_id not in self._player_cache:
self._load_player(player_id)
sts = search(
r'(?:signatureTimestamp|sts)\s*:\s*(?P<sts>[0-9]{5})',
self._player_cache[player_id]
).group('sts')
if sts:
data['playbackContext']['contentPlaybackContext']['signatureTimestamp'] = sts
data['context'] = {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'TVHTML5_SIMPLY_EMBEDDED_PLAYER',
'clientVersion': '2.0',
'clientScreen': 'EMBED'
},
'thirdParty': {
'embedUrl': 'https://www.youtube.com/'
}
}
headers['X-YouTube-Client-Name'] = 85
headers['X-YouTube-Client-Version'] = '2.0'
else:
data['context'] = {
'client': {
'hl': 'en',
'gl': 'US',
'clientVersion': '17.31.35',
'androidSdkVersion': 31,
'clientName': 'ANDROID',
Expand All @@ -215,16 +294,20 @@ def _extract_player_response(self, video_id, client):
'userAgent': USER_AGENT
}
}
data['params'] = 'CgIQBg'
headers['X-YouTube-Client-Name'] = 3
headers['X-YouTube-Client-Version'] = '17.31.35'
headers['User-Agent'] = USER_AGENT
try:
return loads(self._download_webpage(url, data, headers))
return loads(self._download_webpage(url, data, headers)), player_id
except ValueError: # pragma: no cover
print('[YouTubeVideoUrl] Failed to parse JSON')
return None, None

def _real_extract(self, video_id):
url = ''

player_response = self._extract_player_response(video_id, 3)
player_response, player_id = self._extract_player_response(video_id, 3)
if not player_response:
raise RuntimeError('Player response not found!')

Expand All @@ -233,7 +316,10 @@ def _real_extract(self, video_id):

if not is_live and playability_status.get('status') == 'LOGIN_REQUIRED':
print('[YouTubeVideoUrl] Age gate content')
player_response = self._extract_player_response(video_id, 85)
player_response, player_id = self._extract_player_response(video_id, 85)
if not player_response:
raise RuntimeError('Age gate content player response not found!')

playability_status = player_response.get('playabilityStatus', {})

trailer_video_id = try_get(
Expand Down Expand Up @@ -262,9 +348,13 @@ def _real_extract(self, video_id):
self.use_dash_mp4 = DASHMP4_FORMAT

url, our_format = self._extract_fmt_video_format(streaming_formats)
if url and '&n=' in url:
url = self._unthrottle_url(url, player_id)
if url and our_format in DASHMP4_FORMAT:
audio_url = self._extract_dash_audio_format(streaming_formats)
if audio_url:
if '&n=' in audio_url:
audio_url = self._unthrottle_url(audio_url, player_id)
url += SUBURI + audio_url
if not url: # pragma: no cover
for fmt in streaming_formats:
Expand Down
45 changes: 43 additions & 2 deletions src/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

if version_info[0] == 2:
# Python 2
compat_str = unicode
compat_chr, compat_str = (unichr, unicode)

from itertools import izip_longest as compat_zip_longest
from urllib import urlencode as compat_urlencode
from urllib import quote as compat_quote
from urllib import urlretrieve as compat_urlretrieve
Expand All @@ -21,8 +22,9 @@
from urllib2 import URLError as compat_URLError
else:
# Python 3
compat_str = str
compat_chr, compat_str = (chr, str)

from itertools import zip_longest as compat_zip_longest
from urllib.parse import urlencode as compat_urlencode
from urllib.parse import quote as compat_quote
from urllib.request import urlretrieve as compat_urlretrieve
Expand All @@ -32,6 +34,45 @@
from urllib.error import URLError as compat_URLError


if version_info >= (3, 4):
from collections import ChainMap as compat_map
else:
from collections import MutableMapping

class compat_map(MutableMapping):
def __init__(self, *maps):
self.maps = list(maps) or [{}]

def __getitem__(self, k):
for m in self.maps:
if k in m:
return m[k]
raise KeyError(k)

def __setitem__(self, k, v):
self.maps[0][k] = v

def __contains__(self, k):
return any((k in m) for m in self.maps)

def __delitem__(self, k):
raise NotImplementedError('Deleting is not supported')

def __iter__(self):
d = {}
for m in reversed(self.maps):
d.update(dict.fromkeys(m))
return iter(d)

def __len__(self):
return len(set().union(*self.maps))

def new_child(self, m=None, **kwargs):
m = m or {}
m.update(kwargs)
return self.__class__(m, *self.maps)


SUBURI = '&suburi='


Expand Down
Loading

0 comments on commit 5824f84

Please sign in to comment.