-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathcomm.py
212 lines (178 loc) · 7.78 KB
/
comm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import hashlib
import hmac
import json
import sys
import time
from future.moves.urllib.parse import quote_plus
import requests
from aussieaddonscommon import exceptions
from aussieaddonscommon import session
from aussieaddonscommon import utils
import resources.lib.classes as classes
import resources.lib.config as config
import resources.lib.parse as parse
py2 = sys.version_info < (3, 0)
def fetch_url(url, headers=None):
"""Simple function that fetches a URL using requests."""
with session.Session() as sess:
if headers:
sess.headers.update(headers)
request = sess.get(url)
data = request.text
return data
def get_auth(hn, sess):
"""Calculate signature and build auth URL for a program"""
ts = str(int(time.time()))
auth_path = config.AUTH_PATH.format(
params=config.AUTH_PARAMS.format(ts=ts, hn=hn))
auth_path_bytes = bytes(auth_path) if py2 else bytes(auth_path, 'utf8')
secret = bytes(config.SECRET) if py2 else bytes(config.SECRET, 'utf8')
digest = hmac.new(secret, msg=auth_path_bytes,
digestmod=hashlib.sha256).hexdigest()
auth_url = config.API_BASE_URL.format(
path='{authpath}&sig={digest}'.format(authpath=auth_path,
digest=digest))
try:
res = sess.get(auth_url)
except requests.exceptions.HTTPError as e:
if e.response.status_code in [401, 404]:
utils.dialog_message(
'Accurate system time required for '
'playback. Please set the correct system '
'time/date/timezone for your location and try again.')
raise exceptions.AussieAddonsException(e)
else:
raise
return res.text
def cookies_to_string(cookiejar):
cookies = []
for cookie in cookiejar:
cookies.append('{0}={1}; path={2}; domain={3};'.format(
cookie.name, cookie.value, cookie.path, cookie.domain))
return ' '.join(cookies)
def get_drm_auth(program):
with session.Session() as sess:
jwt_url = config.API_BASE_URL.format(path='/v2/token/jwt')
jwt_res = sess.post(jwt_url, data={'clientId': config.DRM_AUTH_CLIENT_ID}).text
token = json.loads(jwt_res).get('token')
utils.log(str(vars(program)))
drm_header_url = config.API_BASE_URL.format(path='/v2/token/drm/{hn}'.format(hn=program.house_number))
drm_header_res = sess.get(drm_header_url, headers={'Authorization': 'Bearer {bearer}'.format(bearer=token)})
return quote_plus(json.loads(drm_header_res.text).get("license"))
def check_playlist(program):
with requests.Session() as sess:
url = program.stream_url[:program.stream_url.find('|')]
res = sess.get(url, params={'hdnea': program.auth_string})
if 'AUDIO="audio_aac"' in res.text and 'TYPE=AUDIO' not in res.text:
return True
return False
def get_stream_program(params):
with session.Session() as sess:
video_url = config.API_BASE_URL.format(
path='/v2{0}'.format(params.get('url')))
utils.log("Fetching stream URL: {0}".format(video_url))
try:
video_data = sess.get(video_url).text
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
error_data = json.loads(e.response.text)
msg = error_data.get('message')
p = classes.Program()
p.failure_msg = {
'msg': msg,
'availability': 'The content may have expired, '
'please refresh the listing and try again'}
return p
video_json = json.loads(video_data)
if video_json.get('playable') is False:
p = classes.Program()
p.failure_msg = {'msg': video_json.get('playableMessage'),
'availability': video_json.get('availability')}
return p
sess.headers = {'User-Agent': config.USER_AGENT}
captions_url = None
for playlist in video_json['_embedded']['playlist']:
if playlist.get('type') not in ['program', 'livestream']:
continue
streams = playlist.get('streams', [])
hls_streams = []
if ('hls') in streams:
hls_streams = streams.get('hls')
stream_url_base = hls_streams.get(
'1080', hls_streams.get('720', hls_streams.get(
'sd', hls_streams.get('sd-low'))))
if stream_url_base:
captions_url = playlist.get('captions', {}).get('src-vtt')
break
akamai_auth = get_auth(params.get('house_number'), sess)
request = sess.get(stream_url_base, params={'hdnea': akamai_auth})
cookies = cookies_to_string(request.cookies)
p = parse.parse_stream_from_json(video_json)
p.stream_url = '{0}|User-Agent={1}&Cookie={2}'.format(
request.url, quote_plus(config.USER_AGENT),
quote_plus(cookies))
p.headers_string = 'User-Agent={0}&Cookie={1}'.format(
quote_plus(config.USER_AGENT),
quote_plus(cookies))
p.cookies = request.cookies
p.captions_url = captions_url
p.auth_string = akamai_auth
return p
def get_categories():
"""Returns the list of categories"""
url = config.API_BASE_URL.format(path='/v2/navigation/mobile')
category_data = fetch_url(url)
categories = parse.parse_categories(category_data)
return categories
def validate_category(keyword):
"""Validate category
Checks if keyword is in a list of categories from the old/LG
iview API, and updates if required. Maintains compatibility with old
favourites links
"""
if keyword in config.OLD_CATEGORIES:
return config.OLD_CATEGORIES[keyword]
else:
return keyword
def get_collections_from_feed(params):
utils.log(
'Getting collections from feed ({0})'.format(params.get('category')))
feed = fetch_url(config.API_BASE_URL.format(
path='/v2{0}'.format(params.get('category'))))
collects = parse.parse_collections_from_feed(feed, params)
return collects
def get_collection_from_feed(params):
keyword = params.get('collection_id')
utils.log('Getting collection from feed ({0})'.format(params.get('title')))
feed = fetch_url(
config.API_BASE_URL.format(path='/v2/collection/{0}'.format(keyword)))
collection = parse.parse_programme_from_feed(feed, params)
return collection
def get_atoz_programme_from_feed(params):
params['category'] = validate_category(params['category'])
collects = get_collections_from_feed(params)
atoz_list = [x for x in collects if 'a-z' in x.get_title().lower()]
if len(atoz_list) > 0:
atoz_id = atoz_list[0].collection_id
feed = fetch_url(config.API_BASE_URL.format(
path='/v2/collection/{0}'.format(atoz_id)))
shows = parse.parse_programme_from_feed(feed, params)
return shows
else:
return atoz_list
def get_series_from_feed(series_url, from_series_list=False):
utils.log('Fetching series from feed')
query = '?embed=seriesList,selectedSeries'
feed = fetch_url(config.API_BASE_URL.format(path='/v2{0}{1}'.format(
series_url, query)))
return parse.parse_programs_from_feed(feed, from_series_list)
def get_livestreams_from_feed():
utils.log('Fetching livestreams from feed')
feed = fetch_url(
config.API_BASE_URL.format(path='/v2{0}'.format('/home')))
return parse.parse_livestreams_from_feed(feed)
def get_search_results(search_string):
utils.log('Fetching search results')
feed = fetch_url(config.API_BASE_URL.format(
path='/v2/search?keyword={0}'.format(search_string)))
return parse.parse_search_results(feed)