1
1
from dataclasses import dataclass
2
2
from datetime import datetime , timezone
3
- from typing import Tuple
3
+ from typing import Optional , Tuple
4
4
5
5
import jwt
6
6
import requests
7
7
8
- from .config import AUTH_SERVER
8
+ from .config import AUTH_SERVER , DEFAULT_REQUEST_TIMEOUT
9
9
from .crypto .identity import Identity
10
- from .encoding import to_base64 , from_base64
10
+ from .encoding import from_base64 , to_base64
11
11
12
12
13
13
@dataclass
@@ -18,18 +18,29 @@ class TokenMetadata:
18
18
expires_at : datetime
19
19
20
20
21
+ def send_post_request (url : str , data : dict ) -> Optional [dict ]:
22
+ """Send a POST request to the given URL with the given data."""
23
+ try :
24
+ response = requests .post (url , json = data , timeout = DEFAULT_REQUEST_TIMEOUT )
25
+ return response .json ()
26
+ except requests .exceptions .RequestException as err :
27
+ print (f"Error: { err } " )
28
+ return None
29
+
30
+
21
31
def authenticate (identity : Identity , name : str = None ) -> Tuple [str , TokenMetadata ]:
22
- r = requests .post (
32
+ """Authenticate the given identity and return the token and metadata."""
33
+ resp = send_post_request (
23
34
f"{ AUTH_SERVER } /auth/login/wallet/challenge" ,
24
- json = {
35
+ {
25
36
"address" : identity .address ,
26
37
"client_id" : name if name else "uagent" ,
27
38
},
28
39
)
29
- r . raise_for_status ()
30
- resp = r . json ()
40
+ if not resp or "challenge" not in resp or "nonce" not in resp :
41
+ return None , None
31
42
32
- payload = resp ["challenge" ]
43
+ payload : str = resp ["challenge" ]
33
44
34
45
# create the signature
35
46
_ , signature = identity .sign_arbitrary (payload .encode ())
@@ -47,20 +58,18 @@ def authenticate(identity: Identity, name: str = None) -> Tuple[str, TokenMetada
47
58
"scope" : "" ,
48
59
}
49
60
50
- r = requests .post (
51
- f"{ AUTH_SERVER } /auth/login/wallet/verify" ,
52
- json = login_request ,
61
+ login_resp = send_post_request (
62
+ f"{ AUTH_SERVER } /auth/login/wallet/verify" , login_request
53
63
)
54
- r .raise_for_status ()
64
+ if not login_resp :
65
+ return None , None
55
66
56
- r = requests .post (
57
- f"{ AUTH_SERVER } /tokens" ,
58
- json = r .json (),
59
- )
60
- r .raise_for_status ()
67
+ token_resp = send_post_request (f"{ AUTH_SERVER } /tokens" , login_resp )
68
+ if not token_resp or "access_token" not in token_resp :
69
+ return None , None
61
70
62
71
# extract the token
63
- token = str (r . json () ["access_token" ])
72
+ token = str (token_resp ["access_token" ])
64
73
65
74
# parse the token
66
75
token_data = jwt .decode (
@@ -78,7 +87,10 @@ def authenticate(identity: Identity, name: str = None) -> Tuple[str, TokenMetada
78
87
expires_at = datetime .fromtimestamp (token_data ["exp" ], timezone .utc ),
79
88
)
80
89
81
- assert metadata .address == identity .address
82
- assert metadata .public_key == identity .public_key
90
+ if (
91
+ not metadata .address == identity .address
92
+ or not metadata .public_key == identity .public_key
93
+ ):
94
+ return None , None
83
95
84
96
return token , metadata
0 commit comments