16
16
# You should have received a copy of the GNU Lesser General Public License
17
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
18
19
+ from __future__ import print_function
19
20
from __future__ import unicode_literals
20
21
from collections import namedtuple
21
22
from enum import Enum
22
23
24
+ import concurrent .futures
23
25
import datetime
24
26
import json
25
27
import logging
26
28
import requests
27
29
import base64
30
+ import time
31
+ import uuid
28
32
from .models import Artist , Album , Track , Video , Playlist , SearchResult , Category , Role
29
33
30
34
try :
@@ -47,6 +51,26 @@ class VideoQuality(Enum):
47
51
low = 'LOW'
48
52
49
53
54
+ class LinkLogin (object ):
55
+ """
56
+ The data required for logging in to TIDAL using a remote link, json is the data returned from TIDAL
57
+ """
58
+ #: Amount of seconds until the code expires
59
+ expires_in = None
60
+ #: The code the user should enter at the uri
61
+ user_code = None
62
+ #: The link the user has to visit
63
+ verification_uri = None
64
+ #: The link the user has to visit, with the code already included
65
+ verification_uri_complete = None
66
+
67
+ def __init__ (self , json ):
68
+ self .expires_in = json ['expiresIn' ]
69
+ self .user_code = json ['userCode' ]
70
+ self .verification_uri = json ['verificationUri' ]
71
+ self .verification_uri_complete = json ['verificationUriComplete' ]
72
+
73
+
50
74
class Config (object ):
51
75
def __init__ (self , quality = Quality .high , video_quality = VideoQuality .high ):
52
76
self .quality = quality .value
@@ -62,15 +86,57 @@ def __init__(self, quality=Quality.high, video_quality=VideoQuality.high):
62
86
decode ("" .join (map (chr , [117 , 116 , 70 , 95 , 56 ]))))
63
87
token = self .api_token
64
88
self .api_token = list ((base64 .b64decode ("d3RjaThkamFfbHlhQnBKaWQuMkMwb3puT2ZtaXhnMA==" ).decode ()))
65
- for B in token :
66
- self .api_token .remove (B )
89
+ tok = "" .join (([chr (ord (x )+ 1 ) for x in token [- 6 :]]))
90
+ token2 = token
91
+ token = token [:9 ]
92
+ token += tok
93
+ tok2 = "" .join (([chr (ord (x )+ 2 ) for x in token [:- 7 ]]))
94
+ token = token [8 :]
95
+ token = tok2 + token
96
+ self .api_token = list ((base64 .b64decode ("UHZ6a2RmMGNFbnhjTnJWa0gua0R5cFpvTGdpalloTg==" ).decode ()))
97
+ for word in token :
98
+ self .api_token .remove (word )
67
99
self .api_token = "" .join (self .api_token )
100
+ string = ""
101
+ save = False
102
+ if not isinstance (token2 , str ):
103
+ save = True
104
+ string = "" .encode ('ISO-8859-1' )
105
+ token2 = token2 .encode ('ISO-8859-1' )
106
+ tok = string .join (([chr (ord (x ) + 20 ) for x in token2 [:- 7 ]]))
107
+ token2 = token2 [8 :]
108
+ token2 = tok + token2
109
+ tok2 = string .join (([chr (ord (x )+ 22 ) for x in token2 [- 6 :]]))
110
+ token2 = token2 [:9 ]
111
+ token2 += tok2
112
+ self .client_id = list ((base64 .b64decode ("V4g3fVh4NnVVgHZ1QoRhfWgub1krhViET3xpfzF9TVVl"
113
+ "Q1g2ZXd2MnpUZFNPVjNZN3FDM3AzNjc1ST0=" ).decode ('ISO-8859-1' )))
114
+ if save :
115
+ token2 .decode ('ISO-8859-1' ).encode ('utf-16' )
116
+ self .client_id = [x .encode ('ISO-8859-1' ) for x in self .client_id ]
117
+ for word in token2 :
118
+ self .client_id .remove (word )
119
+ self .client_id = "" .join (self .client_id )
120
+ self .client_secret = self .client_id
121
+ self .client_id = self .api_token
122
+
68
123
69
124
class Session (object ):
125
+ #: The TIDAL access token, this is what you use with load_oauth_session
126
+ access_token = None
127
+ #: A :class:`datetime` object containing the date the access token will expire
128
+ expiry_time = None
129
+ #: A refresh token for retrieving a new access token through refresh_token
130
+ refresh_token = None
131
+ #: The type of access token, e.g. Bearer
132
+ token_type = None
133
+ #: The id for a TIDAL session, you also need this to use load_oauth_session
134
+ session_id = None
135
+ country_code = None
136
+ #: A :class:`.User` object containing the currently logged in user.
137
+ user = None
138
+
70
139
def __init__ (self , config = Config ()):
71
- self .session_id = None
72
- self .country_code = None
73
- self .user = None
74
140
self ._config = config
75
141
""":type _config: :class:`Config`"""
76
142
@@ -84,6 +150,40 @@ def load_session(self, session_id, country_code=None, user_id=None):
84
150
self .country_code = country_code
85
151
self .user = User (self , id = user_id )
86
152
153
+ def load_oauth_session (self , session_id , token_type , access_token , refresh_token = None ):
154
+ """
155
+ Login to TIDAL using details from a previous OAuth login, automatically
156
+ refreshes expired access tokens if refresh_token is supplied as well.
157
+
158
+ :param token_type: The type of token, e.g. Bearer
159
+ :param session_id: The TIDAL session id, has to be a UUID
160
+ :param access_token: The access token received from an oauth login or refresh
161
+ :param refresh_token: (Optional) A refresh token that lets you get a new access token after it has expired
162
+ :return: True if we believe the log in was successful, otherwise false.
163
+ """
164
+ try :
165
+ uuid .UUID (session_id )
166
+ except ValueError :
167
+ log .error ("Session id did not have a valid UUID format" )
168
+ return False
169
+ self .session_id = session_id
170
+ self .token_type = token_type
171
+ self .access_token = access_token
172
+ self .refresh_token = refresh_token
173
+
174
+ request = self .basic_request ('GET' , 'sessions' )
175
+ json = request .json ()
176
+ if not request .ok and json ['userMessage' ].startswith ("The token has expired." ) and refresh_token :
177
+ log .debug ("The access token has expired, trying to refresh it." )
178
+ refreshed = self .token_refresh (refresh_token )
179
+ if not refreshed :
180
+ return False
181
+
182
+ self .country_code = json ['countryCode' ]
183
+ self .user = User (self , id = json ['userId' ])
184
+
185
+ return True
186
+
87
187
def login (self , username , password ):
88
188
url = urljoin (self ._config .api_location , 'login/username' )
89
189
headers = {"X-Tidal-Token" : self ._config .api_token }
@@ -103,23 +203,132 @@ def login(self, username, password):
103
203
self .user = User (self , id = body ['userId' ])
104
204
return True
105
205
206
+ def login_oauth_simple (self , function = print ):
207
+ """
208
+ Login to TIDAL using a remote link. You can select what function you want to use to display the link
209
+
210
+ :param function: The function you want to display the link with
211
+ :raises: TimeoutError: If the login takes too long
212
+ """
213
+ login , future = self .login_oauth ()
214
+ text = "Visit {0} to log in, the code will expire in {1} seconds"
215
+ function (text .format (login .verification_uri_complete , login .expires_in ))
216
+ future .result ()
217
+
218
+ def login_oauth (self ):
219
+ """
220
+ Login to TIDAL with a remote link for limited input devices. The function will return everything you
221
+ need to log in through a web browser, and will return an future that will run until login.
222
+ :return: A :class:`LinkLogin` object containing all the data needed to log in remotely, and
223
+ a :class:`concurrent.futures.Future` that will poll until the login is completed, or until the link expires.
224
+ :raises: TimeoutError: If the login takes too long
225
+ """
226
+ login , future = self ._login_with_link ()
227
+ return login , future
228
+
229
+ def _login_with_link (self ):
230
+ url = 'https://auth.tidal.com/v1/oauth2/device_authorization'
231
+ params = {
232
+ 'client_id' : self ._config .client_id ,
233
+ 'scope' : 'r_usr w_usr w_sub'
234
+ }
235
+
236
+ request = requests .post (url , params )
237
+
238
+ if not request .ok :
239
+ log .error ("Login failed: %s" , request .text )
240
+ request .raise_for_status ()
241
+
242
+ json = request .json ()
243
+ executor = concurrent .futures .ThreadPoolExecutor ()
244
+ return LinkLogin (json ), executor .submit (self ._process_link_login , json )
245
+
246
+ def _process_link_login (self , json ):
247
+ json = self ._wait_for_link_login (json )
248
+ self .access_token = json ['access_token' ]
249
+ self .expiry_time = datetime .datetime .now () + datetime .timedelta (seconds = json ['expires_in' ])
250
+ self .refresh_token = json ['refresh_token' ]
251
+ self .token_type = json ['token_type' ]
252
+ session = self .request ('GET' , 'sessions' )
253
+ json = session .json ()
254
+ self .session_id = json ['sessionId' ]
255
+ self .country_code = json ['countryCode' ]
256
+ self .user = User (self , id = json ['userId' ])
257
+
258
+ def _wait_for_link_login (self , json ):
259
+ expiry = json ['expiresIn' ]
260
+ interval = json ['interval' ]
261
+ device_code = json ['deviceCode' ]
262
+ url = 'https://auth.tidal.com/v1/oauth2/token'
263
+ params = {
264
+ 'client_id' : self ._config .client_id ,
265
+ 'client_secret' : self ._config .client_secret ,
266
+ 'device_code' : device_code ,
267
+ 'grant_type' : 'urn:ietf:params:oauth:grant-type:device_code' ,
268
+ 'scope' : 'r_usr w_usr w_sub'
269
+ }
270
+ while expiry > 0 :
271
+ request = requests .post (url , params )
272
+ json = request .json ()
273
+ if request .ok :
274
+ return json
275
+ # Because the requests take time, the expiry variable won't be accurate, so stop if TIDAL says it's expired
276
+ if json ['error' ] == 'expired_token' :
277
+ break
278
+ time .sleep (interval )
279
+ expiry = expiry - interval
280
+
281
+ raise TimeoutError ('You took too long to log in' )
282
+
283
+ def token_refresh (self , refresh_token ):
284
+ """
285
+ Retrieves a new access token using the specified refresh token, updating the current access token
286
+ :param refresh_token: The refresh token retrieved when using the OAuth login.
287
+ :return: True if we believe the token was successfully refreshed, otherwise False
288
+ """
289
+ url = 'https://auth.tidal.com/v1/oauth2/token'
290
+ params = {
291
+ 'grant_type' : 'refresh_token' ,
292
+ 'refresh_token' : refresh_token ,
293
+ 'client_id' : self ._config .client_id ,
294
+ 'client_secret' : self ._config .client_id
295
+ }
296
+
297
+ request = requests .post (url , params )
298
+ json = request .json ()
299
+ if not request .ok :
300
+ log .debug ("The refresh token has expired, a new login is required." )
301
+ return False
302
+ self .access_token = json ['access_token' ]
303
+ self .expiry_time = datetime .datetime .now () + datetime .timedelta (seconds = json ['expires_in' ])
304
+ self .token_type = json ['token_type' ]
305
+ return True
306
+
106
307
def check_login (self ):
107
308
""" Returns true if current session is valid, false otherwise. """
108
309
if self .user is None or not self .user .id or not self .session_id :
109
310
return False
110
- url = urljoin (self ._config .api_location , 'users/%s/subscription' % self .user .id )
111
- return requests .get (url , params = {'sessionId' : self .session_id }).ok
311
+ return self .basic_request ('GET' , 'users/%s/subscription' % self .user .id ).ok
112
312
113
- def request (self , method , path , params = None , data = None ):
313
+ def basic_request (self , method , path , params = None , data = None , headers = None ):
114
314
request_params = {
115
315
'sessionId' : self .session_id ,
116
316
'countryCode' : self .country_code ,
117
317
'limit' : '999' ,
118
318
}
119
319
if params :
120
320
request_params .update (params )
321
+
322
+ if not headers :
323
+ headers = {}
324
+ if self .token_type :
325
+ headers ['authorization' ] = self .token_type + ' ' + self .access_token
121
326
url = urljoin (self ._config .api_location , path )
122
- request = requests .request (method , url , params = request_params , data = data )
327
+ request = requests .request (method , url , params = request_params , data = data , headers = headers )
328
+ return request
329
+
330
+ def request (self , method , path , params = None , data = None , headers = None ):
331
+ request = self .basic_request (method , path , params , data , headers )
123
332
log .debug ("request: %s" , request .request .url )
124
333
if not request .ok :
125
334
print (request .text )
0 commit comments