forked from zpoo32/RedSea
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathredsea.py
520 lines (437 loc) · 21.5 KB
/
redsea.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
#!/usr/bin/env python
import traceback
import sys
import os
import re
import urllib3
import redsea.cli as cli
from redsea.mediadownloader import MediaDownloader
from redsea.tagger import Tagger
from redsea.tidal_api import TidalApi, TidalError
from redsea.sessions import RedseaSessionFile
from config.settings import PRESETS, BRUTEFORCEREGION
LOGO = """
/$$$$$$$ /$$ /$$$$$$
| $$__ $$ | $$ /$$__ $$
| $$ \ $$ /$$$$$$ /$$$$$$$| $$ \__/ /$$$$$$ /$$$$$$
| $$$$$$$/ /$$__ $$ /$$__ $$| $$$$$$ /$$__ $$ |____ $$
| $$__ $$| $$$$$$$$| $$ | $$ \____ $$| $$$$$$$$ /$$$$$$$
| $$ \ $$| $$_____/| $$ | $$ /$$ \ $$| $$_____/ /$$__ $$
| $$ | $$| $$$$$$$| $$$$$$$| $$$$$$/| $$$$$$$| $$$$$$$
|__/ |__/ \_______/ \_______/ \______/ \_______/ \_______/
(c) 2016 Joe Thatcher
https://github.com/svbnet/RedSea
\n"""
MEDIA_TYPES = {'t': 'track', 'p': 'playlist', 'a': 'album', 'r': 'artist', 'v': 'video'}
def main():
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
os.chdir(sys.path[0])
# Get args
args = cli.get_args()
# Load config
BRUTEFORCE = args.bruteforce or BRUTEFORCEREGION
preset = PRESETS[args.preset]
# Parse options
preset['quality'] = []
preset['quality'].append('HI_RES') if preset['MQA_FLAC_24'] else None
preset['quality'].append('LOSSLESS') if preset['FLAC_16'] else None
preset['quality'].append('HIGH') if preset['AAC_320'] else None
preset['quality'].append('LOW') if preset['AAC_96'] else None
# Check for auth flag / session settings
RSF = RedseaSessionFile('./config/sessions.pk')
if args.urls[0] == 'auth' and len(args.urls) == 1:
print('\nThe "auth" command provides the following methods:')
print('\n list: Lists stored sessions if any exist')
print(' add: Prompts for a TV or Mobile session. The TV option displays a 6 digit key which should be '
'entered inside link.tidal.com where the user can login. The Mobile option prompts for a Tidal username '
'and password. Both options authorize a session which then gets stored in the sessions file')
print(' remove: Removes a stored session from the sessions file by name')
print(' default: Set a default account for redsea to use when the -a flag has not been passed')
print(' reauth: Reauthenticates with server to get new sessionId')
print('\nUsage: redsea.py auth add\n')
exit()
elif args.urls[0] == 'auth' and len(args.urls) > 1:
if args.urls[1] == 'list':
RSF.list_sessions()
exit()
elif args.urls[1] == 'add':
if len(args.urls) == 5:
RSF.create_session(args.urls[2], args.urls[3], args.urls[4])
else:
RSF.new_session()
exit()
elif args.urls[1] == 'remove':
RSF.remove_session()
exit()
elif args.urls[1] == 'default':
RSF.set_default()
exit()
elif args.urls[1] == 'reauth':
RSF.reauth()
exit()
elif args.urls[0] == 'id':
type = None
md = MediaDownloader(TidalApi(RSF.load_session(args.account)), preset, Tagger(preset))
if len(args.urls) == 2:
id = args.urls[1]
if not id.isdigit():
# Check if id is playlist (UUIDv4)
pattern = re.compile('^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$')
if pattern.match(id):
try:
result = md.playlist_from_id(id)
type = 'p'
except TidalError:
print("The playlist id " + str(id) + " could not be found!")
exit()
else:
print('The id ' + str(id) + ' is not valid.')
exit()
else:
print('Example usage: python redsea.py id 92265335')
exit()
if type is None:
type = md.type_from_id(id)
if type:
media_to_download = [{'id': id, 'type': type}]
else:
print("The id " + str(id) + " could not be found!")
exit()
elif args.urls[0] == 'explore':
if len(args.urls) == 3:
if args.urls[1] == 'dolby' and args.urls[2] == 'atmos':
page = 'dolby_atmos'
elif args.urls[1] == 'sony' and args.urls[2] == '360':
page = '360'
else:
print("Example usage of explore: python redsea.py explore (dolby atmos/sony 360)")
exit()
else:
print("Example usage of explore: python redsea.py explore (dolby atmos/sony 360)")
exit()
md = MediaDownloader(TidalApi(RSF.load_session(args.account)), preset, Tagger(preset))
page_content = md.page(page)
show_more_link = page_content['rows'][len(page_content['rows'])-1]['modules'][0]['showMore']['apiPath']
now_available = md.page(show_more_link[6:])
items = now_available['rows'][0]['modules'][0]['pagedList']['items']
total_items = len(items)
for i in range(total_items):
item = items[i]
if item['audioModes'] == ['DOLBY_ATMOS']:
specialtag = " [Dolby Atmos]"
elif item['audioModes'] == ['SONY_360RA']:
specialtag = " [360 Reality Audio]"
else:
specialtag = ""
if item['explicit']:
explicittag = " [E]"
else:
explicittag = ""
date = " (" + item['streamStartDate'].split('T')[0] + ")"
print(str(i + 1) + ") " + str(item['title']) + " - " + str(
item['artists'][0]['name']) + explicittag + specialtag + date)
print(str(total_items+1) + ") Exit")
while True:
chosen = int(input("Album Selection: ")) - 1
if chosen == total_items:
exit()
elif chosen > total_items or chosen < 0:
print("Enter an existing number")
else:
break
print()
media_to_download = [{'id': str(items[chosen]['id']), 'type': 'a'}]
elif args.urls[0] == 'search':
md = MediaDownloader(TidalApi(RSF.load_session(args.account)), preset, Tagger(preset))
while True:
searchresult = md.search_for_id(args.urls[2:])
if args.urls[1] == 'track':
searchtype = 'tracks'
elif args.urls[1] == 'album':
searchtype = 'albums'
elif args.urls[1] == 'video':
searchtype = 'videos'
else:
print("Example usage of search: python redsea.py search [track/album/video] Darkside Alan Walker")
exit()
# elif args.urls[1] == 'playlist':
# searchtype = 'playlists'
numberofsongs = int(searchresult[searchtype]['totalNumberOfItems'])
if numberofsongs > 10:
numberofsongs = 10
for i in range(numberofsongs):
song = searchresult[searchtype]['items'][i]
if searchtype != 'videos':
if song['audioModes'] == ['DOLBY_ATMOS']:
specialtag = " [Dolby Atmos]"
elif song['audioModes'] == ['SONY_360RA']:
specialtag = " [360 Reality Audio]"
elif song['audioQuality'] == 'HI_RES':
specialtag = " [MQA]"
else:
specialtag = ""
else:
specialtag = " [" + song['quality'].replace('MP4_', '') + "]"
if song['explicit']:
explicittag = " [E]"
else:
explicittag = ""
print(str(i + 1) + ") " + str(song['title']) + " - " + str(
song['artists'][0]['name']) + explicittag + specialtag)
query = None
if numberofsongs > 0:
print(str(numberofsongs + 1) + ") Not found? Try a new search")
while True:
chosen = int(input("Song Selection: ")) - 1
if chosen == numberofsongs:
query = input("Enter new search query: [track/album/video] Darkside Alan Walker: ")
break
elif chosen > numberofsongs:
print("Enter an existing number")
else:
break
print()
if query:
args.urls = ("search " + query).split()
continue
else:
print("No results found for '" + ' '.join(args.urls[2:]))
print("1) Not found? Try a new search")
print("2) Quit")
while True:
chosen = int(input("Selection: "))
if chosen == 1:
query = input("Enter new search query: [track/album/video] Darkside Alan Walker: ")
break
else:
exit()
print()
if query:
args.urls = ("search " + query).split()
continue
if searchtype == 'tracks':
media_to_download = [{'id': str(searchresult[searchtype]['items'][chosen]['id']), 'type': 't'}]
elif searchtype == 'albums':
media_to_download = [{'id': str(searchresult[searchtype]['items'][chosen]['id']), 'type': 'a'}]
elif searchtype == 'videos':
media_to_download = [{'id': str(searchresult[searchtype]['items'][chosen]['id']), 'type': 'v'}]
# elif searchtype == 'playlists':
# media_to_download = [{'id': str(searchresult[searchtype]['items'][chosen]['id']), 'type': 'p'}]
break
else:
media_to_download = cli.parse_media_option(args.urls, args.file)
print(LOGO)
# Loop through media and download if possible
cm = 0
for mt in media_to_download:
# Is it an acceptable media type? (skip if not)
if not mt['type'] in MEDIA_TYPES:
print('Unknown media type - ' + mt['type'])
continue
cm += 1
print('<<< Getting {0} info... >>>'.format(MEDIA_TYPES[mt['type']]))
# Create a new TidalApi and pass it to a new MediaDownloader
md = MediaDownloader(TidalApi(RSF.load_session(args.account)), preset, Tagger(preset))
# Create a new session generator in case we need to switch sessions
session_gen = RSF.get_session()
# Get media info
def get_tracks(media):
media_name = None
tracks = []
media_info = None
track_info = []
while True:
try:
if media['type'] == 'f':
lines = media['content'].split('\n')
for i, l in enumerate(lines):
print('Getting info for track {}/{}'.format(i, len(lines)), end='\r')
tracks.append(md.api.get_track(l))
print()
# Track
elif media['type'] == 't':
tracks.append(md.api.get_track(media['id']))
# Playlist
elif media['type'] == 'p':
# Get playlist title to create path
playlist = md.api.get_playlist(media['id'])
md.opts['path'] += '/' + md._sanitise_name(playlist['title'])
# Make sure only tracks are in playlist items
playlistItems = md.api.get_playlist_items(media['id'])['items']
for item in playlistItems:
if item['type'] == 'track':
tracks.append(item['item'])
# Album
elif media['type'] == 'a':
# Get album information
media_info = md.api.get_album(media['id'])
# Get a list of the tracks from the album
tracks = md.api.get_album_tracks(media['id'])['items']
# Video
elif media['type'] == 'v':
# Get video information
tracks.append(md.api.get_video(media['id']))
# Artist
else:
# Get the name of the artist for display to user
media_name = md.api.get_artist(media['id'])['name']
# Collect all of the tracks from all of the artist's albums
albums = md.api.get_artist_albums(media['id'])['items'] + md.api.get_artist_albums_ep_singles(media['id'])['items']
eps_info = []
singles_info = []
for album in albums:
if 'aggressive_remix_filtering' in preset and preset['aggressive_remix_filtering']:
title = album['title'].lower()
if 'remix' in title or 'commentary' in title or 'karaoke' in title:
print('\tSkipping ' + album['title'])
continue
# remove sony 360 reality audio albums if there's another (duplicate) album that isn't 360 reality audio
if 'skip_360ra' in preset and preset['skip_360ra']:
if 'SONY_360RA' in album['audioModes']:
is_duplicate = False
for a2 in albums:
if album['title'] == a2['title'] and album['numberOfTracks'] == a2['numberOfTracks']:
is_duplicate = True
break
if is_duplicate:
print('\tSkipping duplicate Sony 360 Reality Audio album - ' + album['title'])
continue
# Get album information
media_info = md.api.get_album(album['id'])
# Get a list of the tracks from the album
tracks = md.api.get_album_tracks(album['id'])['items']
if 'type' in media_info and str(media_info['type']).lower() == 'single':
singles_info.append((tracks, media_info))
else:
eps_info.append((tracks, media_info))
if 'skip_singles_when_possible' in preset and preset['skip_singles_when_possible']:
# Filter singles that also appear in albums (EPs)
def track_in_ep(title):
for tracks, _ in eps_info:
for t in tracks:
if t['title'] == title:
return True
return False
for track_info in singles_info[:]:
for t in track_info[0][:]:
if track_in_ep(t['title']):
print('\tSkipping ' + t['title'])
track_info[0].remove(t)
if len(track_info[0]) == 0:
singles_info.remove(track_info)
track_info = eps_info + singles_info
if not track_info:
track_info = [(tracks, media_info)]
return media_name, track_info
# Catch region error
except TidalError as e:
if 'not found. This might be region-locked.' in str(e) and BRUTEFORCE:
# Try again with a different session
try:
session, name = next(session_gen)
md.api = TidalApi(session)
print('Checking info fetch with session "{}" in region {}'.format(name, session.country_code))
continue
# Ran out of sessions
except StopIteration as s:
print(e)
raise s
# Skip or halt
else:
raise(e)
try:
media_name, track_info = get_tracks(media=mt)
except StopIteration:
# Let the user know we cannot download this release and skip it
print('None of the available accounts were able to get info for release {}. Skipping..'.format(mt['id']))
continue
total = sum([len(t[0]) for t in track_info])
# Single
if total == 1:
print('<<< Downloading single track... >>>')
# Playlist or album
else:
if mt['type'] == 'p':
name = md.playlist_from_id(mt['id'])['title']
else:
name = track_info[0][1]['title']
print('<<< Downloading {0} "{1}": {2} track(s) in total >>>'.format(
MEDIA_TYPES[mt['type']] + (' ' + media_name if media_name else ''), name, total))
if args.resumeon and len(media_to_download) == 1 and mt['type'] == 'p':
print('<<< Resuming on track {} >>>'.format(args.resumeon))
args.resumeon -= 1
else:
args.resumeon = 0
cur = args.resumeon
for tracks, media_info in track_info:
for track in tracks[args.resumeon:]:
first = True
# Actually download the track (finally)
while True:
try:
md.download_media(track, preset, media_info, overwrite=args.overwrite)
break
# Catch quality error
except ValueError as e:
print("\t" + str(e))
traceback.print_exc()
if args.skip is True:
print('Skipping track "{} - {}" due to insufficient quality'.format(
track['artist']['name'], track['title']))
break
else:
print('Halting on track "{} - {}" due to insufficient quality'.format(
track['artist']['name'], track['title']))
quit()
# Catch file name errors
except OSError as e:
print(e)
print("\tFile name too long or contains apostrophes")
file = open('failed_tracks.txt', 'a')
file.write(str(track['url']) + "\n")
file.close()
break
# Catch session audio stream privilege error
except AssertionError as e:
if 'Unable to download track' in str(e) and BRUTEFORCE:
# Try again with a different session
try:
# Reset generator if this is the first attempt
if first:
session_gen = RSF.get_session()
first = False
session, name = next(session_gen)
md.api = TidalApi(session)
print('Attempting audio stream with session "{}" in region {}'.format(name, session.country_code))
continue
# Ran out of sessions, skip track
except StopIteration:
# Let the user know we cannot download this release and skip it
print('None of the available accounts were able to download track {}. Skipping..'.format(track['id']))
break
elif 'Please use a mobile session' in str(e):
print(e)
print('Choose one of the following mobile sessions: ')
RSF.list_sessions(True)
break
# Skip
else:
print(str(e) + '. Skipping..')
# Progress of current track
cur += 1
print('=== {0}/{1} complete ({2:.0f}% done) ===\n'.format(
cur, total, (cur / total) * 100))
# Progress of queue
print('> Download queue: {0}/{1} items complete ({2:.0f}% done) <\n'.
format(cm, len(media_to_download),
(cm / len(media_to_download)) * 100))
print('> All downloads completed. <')
# since oauth sessions can change while downloads are happening if the token gets refreshed
RSF._save()
# Run from CLI - catch Ctrl-C and handle it gracefully
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('\n^C pressed - abort')
exit()