21
21
import base64
22
22
import concurrent .futures
23
23
import datetime
24
+ import hashlib
24
25
import json
25
26
import locale
26
27
import logging
28
+ import os
27
29
import random
28
30
import time
29
31
import uuid
43
45
cast ,
44
46
no_type_check ,
45
47
)
46
- from urllib .parse import urljoin
48
+ from urllib .parse import parse_qs , urlencode , urljoin , urlsplit
47
49
48
50
import requests
49
51
@@ -95,6 +97,8 @@ class Config:
95
97
Additionally, num_videos will turn into num_tracks in playlists.
96
98
"""
97
99
100
+ api_oauth2_token : str = "https://auth.tidal.com/v1/oauth2/token"
101
+ api_pkce_auth : str = "https://login.tidal.com/authorize"
98
102
api_v1_location : str = "https://api.tidal.com/v1/"
99
103
api_v2_location : str = "https://api.tidal.com/v2/"
100
104
api_token : str
@@ -105,6 +109,12 @@ class Config:
105
109
quality : str
106
110
video_quality : str
107
111
video_url : str = "https://resources.tidal.com/videos/%s/%ix%i.mp4"
112
+ # Necessary for PKCE authorization only
113
+ client_unique_key : str
114
+ code_verifier : str
115
+ code_challenge : str
116
+ pkce_uri_redirect : str = "https://tidal.com/android/login/auth"
117
+ client_id_pkce : str
108
118
109
119
@no_type_check
110
120
def __init__ (
@@ -184,6 +194,23 @@ def __init__(
184
194
self .client_id = "" .join (self .client_id )
185
195
self .client_secret = self .client_id
186
196
self .client_id = self .api_token
197
+ # PKCE Authorization. We will keep the former `client_id` as a fallback / will only be used for non PCKE
198
+ # authorizations.
199
+ self .client_unique_key = format (random .getrandbits (64 ), "02x" )
200
+ self .code_verifier = base64 .urlsafe_b64encode (os .urandom (32 ))[:- 1 ].decode (
201
+ "utf-8"
202
+ )
203
+ self .code_challenge = base64 .urlsafe_b64encode (
204
+ hashlib .sha256 (self .code_verifier .encode ("utf-8" )).digest ()
205
+ )[:- 1 ].decode ("utf-8" )
206
+ self .client_id_pkce = base64 .b64decode (
207
+ base64 .b64decode (b"TmtKRVUxSmtjRXM=" )
208
+ + base64 .b64decode (b"NWFIRkZRbFJuVlE9PQ==" )
209
+ ).decode ("utf-8" )
210
+ self .client_secret_pkce = base64 .b64decode (
211
+ base64 .b64decode (b"ZUdWMVVHMVpOMjVpY0ZvNVNVbGlURUZqVVQ=" )
212
+ + base64 .b64decode (b"a3pjMmhyWVRGV1RtaGxWVUZ4VGpaSlkzTjZhbFJIT0QwPQ==" )
213
+ ).decode ("utf-8" )
187
214
188
215
189
216
class Case (Enum ):
@@ -359,7 +386,7 @@ def load_oauth_session(
359
386
:param refresh_token: (Optional) A refresh token that lets you get a new access
360
387
token after it has expired
361
388
:param expiry_time: (Optional) The datetime the access token will expire
362
- :return: True if we believe the log in was successful, otherwise false.
389
+ :return: True if we believe the login was successful, otherwise false.
363
390
"""
364
391
self .token_type = token_type
365
392
self .access_token = access_token
@@ -431,6 +458,109 @@ def login_oauth_file(self, oauth_file: Path) -> bool:
431
458
log .info ("TIDAL Login KO" )
432
459
return False
433
460
461
+ def login_pkce (self , fn_print : Callable [[str ], None ] = print ) -> None :
462
+ """Login handler for PKCE based authentication. This is the only way how to get
463
+ access to HiRes (Up to 24-bit, 192 kHz) FLAC files.
464
+
465
+ This handler will ask you to follow a URL, process with the login in the browser
466
+ and copy & paste the URL of the redirected browser page.
467
+
468
+ :param fn_print: A function which will be called to print the instructions,
469
+ defaults to `print()`.
470
+ :type fn_print: Callable, optional
471
+ :return:
472
+ """
473
+ # Get login url
474
+ url_login : str = self .pkce_login_url ()
475
+
476
+ fn_print ("READ CAREFULLY!" )
477
+ fn_print ("---------------" )
478
+ fn_print (
479
+ "You need to open this link and login with your username and password. "
480
+ "Afterwards you will be redirected to an 'Oops' page. "
481
+ "To complete the login you must copy the URL from this 'Oops' page and paste it to the input field."
482
+ )
483
+ fn_print (url_login )
484
+
485
+ # Get redirect URL from user input.
486
+ url_redirect : str = input ("Paste 'Ooops' page URL here and press <ENTER>:" )
487
+ # Query for auth tokens
488
+ json : dict [str , Union [str , int ]] = self .pkce_get_auth_token (url_redirect )
489
+
490
+ # Parse and set tokens.
491
+ self .process_auth_token (json )
492
+
493
+ # Swap the client_id and secret
494
+ #self.client_enable_hires()
495
+
496
+ def client_enable_hires (self ):
497
+ self .config .client_id = self .config .client_id_pkce
498
+ self .config .client_secret = self .config .client_secret_pkce
499
+
500
+ def pkce_login_url (self ) -> str :
501
+ """Returns the Login-URL to login via web browser.
502
+
503
+ :return: The URL the user has to use for login.
504
+ :rtype: str
505
+ """
506
+ params : request .Params = {
507
+ "response_type" : "code" ,
508
+ "redirect_uri" : self .config .pkce_uri_redirect ,
509
+ "client_id" : self .config .client_id_pkce ,
510
+ "lang" : "EN" ,
511
+ "appMode" : "android" ,
512
+ "client_unique_key" : self .config .client_unique_key ,
513
+ "code_challenge" : self .config .code_challenge ,
514
+ "code_challenge_method" : "S256" ,
515
+ "restrict_signup" : "true" ,
516
+ }
517
+
518
+ return self .config .api_pkce_auth + "?" + urlencode (params )
519
+
520
+ def pkce_get_auth_token (self , url_redirect : str ) -> dict [str , Union [str , int ]]:
521
+ """Parses the redirect url to extract access and refresh tokens.
522
+
523
+ :param url_redirect: URL of the 'Ooops' page, where the user was redirected to
524
+ after login.
525
+ :type url_redirect: str
526
+ :return: A parsed JSON object with access and refresh tokens and other
527
+ information.
528
+ :rtype: dict[str, str | int]
529
+ """
530
+ # w_usr=WRITE_USR, r_usr=READ_USR_DATA, w_sub=WRITE_SUBSCRIPTION
531
+ scope_default : str = "r_usr+w_usr+w_sub"
532
+
533
+ # Extract the code parameter from query string
534
+ if url_redirect and "https://" in url_redirect :
535
+ code : str = parse_qs (urlsplit (url_redirect ).query )["code" ][0 ]
536
+ else :
537
+ raise Exception ("The provided redirect url looks wrong: " + url_redirect )
538
+
539
+ # Set post data and call the API
540
+ data : request .Params = {
541
+ "code" : code ,
542
+ "client_id" : self .config .client_id_pkce ,
543
+ "grant_type" : "authorization_code" ,
544
+ "redirect_uri" : self .config .pkce_uri_redirect ,
545
+ "scope" : scope_default ,
546
+ "code_verifier" : self .config .code_verifier ,
547
+ "client_unique_key" : self .config .client_unique_key ,
548
+ }
549
+ response = self .request_session .post (self .config .api_oauth2_token , data )
550
+
551
+ # Check response
552
+ if not response .ok :
553
+ log .error ("Login failed: %s" , response .text )
554
+ response .raise_for_status ()
555
+
556
+ # Parse the JSON response.
557
+ try :
558
+ token : dict [str , Union [str , int ]] = response .json ()
559
+ except :
560
+ raise Exception ("Wrong one-time authorization code" , response )
561
+
562
+ return token
563
+
434
564
def login_oauth_simple (self , function : Callable [[str ], None ] = print ) -> None :
435
565
"""Login to TIDAL using a remote link. You can select what function you want to
436
566
use to display the link.
@@ -496,6 +626,16 @@ def _login_with_link(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]:
496
626
497
627
def _process_link_login (self , json : JsonObj ) -> None :
498
628
json = self ._wait_for_link_login (json )
629
+ self .process_auth_token (json )
630
+
631
+ def process_auth_token (self , json : dict [str , Union [str , int ]]) -> None :
632
+ """Parses the authorization response and sets the token values to the specific
633
+ variables for further usage.
634
+
635
+ :param json: Parsed JSON response after login / authorization.
636
+ :type json: dict[str, str | int]
637
+ :return: None
638
+ """
499
639
self .access_token = json ["access_token" ]
500
640
self .expiry_time = datetime .datetime .utcnow () + datetime .timedelta (
501
641
seconds = json ["expires_in" ]
@@ -512,7 +652,7 @@ def _wait_for_link_login(self, json: JsonObj) -> Any:
512
652
expiry = float (json ["expiresIn" ])
513
653
interval = float (json ["interval" ])
514
654
device_code = json ["deviceCode" ]
515
- url = "https://auth.tidal.com/v1/oauth2/token"
655
+ url = self . config . api_oauth2_token
516
656
params = {
517
657
"client_id" : self .config .client_id ,
518
658
"client_secret" : self .config .client_secret ,
@@ -541,7 +681,7 @@ def token_refresh(self, refresh_token: str) -> bool:
541
681
:return: True if we believe the token was successfully refreshed, otherwise
542
682
False
543
683
"""
544
- url = "https://auth.tidal.com/v1/oauth2/token"
684
+ url = self . config . api_oauth2_token
545
685
params = {
546
686
"grant_type" : "refresh_token" ,
547
687
"refresh_token" : refresh_token ,
0 commit comments