4
4
Protocol ,
5
5
)
6
6
7
- import requests as r
7
+ from aiohttp import ClientSession
8
+ import json
8
9
9
- from bit_array import *
10
+ from bit_array import BitArray , b64url_decode , b64url_encode , dict_to_b64
10
11
from bitstring_status_list .issuer import MIN_LIST_LENGTH , StatusListLengthError
11
12
12
13
class EnvelopingTokenVerifier (Protocol ):
@@ -34,47 +35,68 @@ def __init__(
34
35
self ,
35
36
credential_status : dict ,
36
37
37
- headers : Optional [dict ] = None ,
38
- payload : Optional [ dict ] = None ,
38
+ headers : Optional [dict ],
39
+ payload : dict ,
39
40
40
- bit_array : Optional [ BitArray ] = None ,
41
+ bit_array : BitArray ,
41
42
):
42
43
self .credential_status = credential_status
43
- assert all (key in credential_status .keys () for key in ["id" , "type" , "statusPurpose" , "statusListIndex" , "statusListCredential" ]),\
44
- "Invalid credentialStatus"
44
+ if not all (key in credential_status .keys () for key in ["id" , "type" , "statusPurpose" , "statusListIndex" , "statusListCredential" ]):
45
+ raise StatusVerificationError (f"Invalid credential_status: { credential_status } . \
46
+ credential status is expected to have keys: \
47
+ [id, type, statusPurpose, statusListIndex, statusListCredential]" )
45
48
46
49
self .headers = headers
47
50
self .payload = payload
48
51
49
52
self ._bit_array = bit_array
50
53
51
- def establish_connection (
52
- self ,
53
- status_list_format : Literal ["CWT" , "JWT" ],
54
- ) -> bytes :
55
- """ Establish connection. Returns base64 encoded response. """
56
- issuer_uri = self .credential_status ["statusListCredential" ]
57
- try :
58
- response = r .get (issuer_uri )
59
- except Exception as e :
60
- raise StatusRetrievalError (f"Dereference of uri { issuer_uri } failed: { e } ." )
61
-
62
- if not (200 <= response .status_code < 300 ):
63
- raise StatusRetrievalError (f"Response status from { issuer_uri } was { response .status_code } ." )
64
-
65
- # When establishing a new connection, clear previous cached values.
66
- self .headers = None
67
- self .payload = None
68
- self ._bit_array = None
69
-
70
- return response .content
71
-
72
- def verify_jwt (
73
- self ,
74
- sl_response : bytes ,
75
- verifier : EnvelopingTokenVerifier | EmbeddingTokenVerifier ,
76
- min_list_length : int = MIN_LIST_LENGTH ,
77
- ):
54
+ @classmethod
55
+ async def retrieve_list (
56
+ cls ,
57
+ credential_status : dict ,
58
+ verifier : EnvelopingTokenVerifier | EmbeddingTokenVerifier ,
59
+ headers : dict | None = None ,
60
+ min_list_length : int = MIN_LIST_LENGTH ,
61
+ ) -> "BitstringStatusListVerifier" :
62
+ """
63
+ Establish connection, parse and verify response, and create instance of
64
+ BitstringStatusListVerifier to access it.
65
+
66
+ Args:
67
+ credential_status: REQUIRED. The credentialStatus field of a verifiable credential as
68
+ specified in S. 2.1.
69
+
70
+ verifier: REQUIRED. A callable that verifies the signature of a payload, equivalent to
71
+ signer in sign_jwt() in issuer.py.
72
+
73
+ headers: OPTIONAL. Additional headers for the HTTP request.
74
+
75
+ min_list_length: OPTIONAL. The minimum list length, recommended to be 131,072 (see S. 6.1)
76
+ Returns:
77
+ An instance of BitstringStatusListVerifier which has been verified for correctness and
78
+ integrity.
79
+ """
80
+
81
+ headers = headers or {}
82
+
83
+ async with ClientSession () as session :
84
+ async with session .get (credential_status ["statusListCredential" ], headers = headers ) as resp :
85
+ if not 200 <= resp .status < 300 :
86
+ raise StatusRetrievalError (f"Unable to retrieve token at { credential_status ["statusListCredential" ]} " )
87
+
88
+ token = await resp .read ()
89
+
90
+ return cls .from_jwt (token , credential_status , verifier , min_list_length )
91
+
92
+ @classmethod
93
+ def from_jwt (
94
+ cls ,
95
+ token : bytes | str ,
96
+ credential_status : dict ,
97
+ verifier : EnvelopingTokenVerifier | EmbeddingTokenVerifier ,
98
+ min_list_length : int = MIN_LIST_LENGTH ,
99
+ ) -> "BitstringStatusListVerifier" :
78
100
"""
79
101
Takes a status-list response and a verifier, and ensures that the response matches the
80
102
required format, verifying the signature using verifier.
@@ -83,65 +105,81 @@ def verify_jwt(
83
105
signature is correct, and raise an exception if not.
84
106
85
107
Args:
86
- sl_response : REQUIRED. A base64-encoded status_list response, acquired (eg.) from
108
+ token : REQUIRED. A base64-encoded status_list response, acquired (eg.) from
87
109
establish_connection().
88
110
111
+ credential_status: REQUIRED. The credentialStatus field of a verifiable credential as
112
+ specified in S. 2.1.
113
+
89
114
verifier: REQUIRED. A callable that verifies the signature of a payload. Must match
90
- the proof format of the sl_response (embedded or enveloping)
115
+ the proof format of the token (embedded or enveloping)
91
116
92
117
min_list_length: OPTIONAL. The minimum list length, recommended to be 131,072 (see S. 6.1)
93
118
"""
94
-
95
- if b"." in sl_response :
119
+ # Check that message is in valid JWT format
120
+ if isinstance (token , str ):
121
+ token = token .encode ()
122
+
123
+ if not token .startswith (b"ey" ):
124
+ raise ValueError ("JWT requested but token is not a JWT" )
125
+
126
+ headers = None
127
+ if b"." in token :
96
128
# Enveloping proof
97
129
98
130
# Check that message is in valid JWT format
99
- headers_bytes , payload_bytes , signature = sl_response .split (b"." , maxsplit = 3 )
131
+ headers_bytes , payload_bytes , signature = token .split (b"." , maxsplit = 3 )
100
132
assert headers_bytes and payload_bytes and signature
101
133
102
134
# Verify signature. verifier must be of type EnvelopingTokenVerifier
103
135
if not verifier (headers_bytes + b"." + payload_bytes , signature ):
104
136
raise StatusVerificationError ("Invalid signature on payload." )
105
137
106
138
# Extract data
107
- self . headers = json .loads (b64url_decode (headers_bytes ))
108
- self . payload = json .loads (b64url_decode (payload_bytes ))
139
+ headers = json .loads (b64url_decode (headers_bytes ))
140
+ payload = json .loads (b64url_decode (payload_bytes ))
109
141
else :
110
142
# Embedding proof
111
143
112
144
# Extract data
113
- self . payload = json .loads (b64url_decode (sl_response ))
145
+ payload = json .loads (b64url_decode (token ))
114
146
115
147
# Verify signature
116
- unsigned_payload = {key : self . payload [key ] for key in self . payload if key != "proof" }
117
- if not verifier (dict_to_b64 (unsigned_payload ), self . payload ["proof" ]):
148
+ unsigned_payload = {key : payload [key ] for key in payload if key != "proof" }
149
+ if not verifier (dict_to_b64 (unsigned_payload ), payload ["proof" ]):
118
150
raise StatusVerificationError ("Invalid signature on payload" )
119
151
120
152
# Check values of status list against provided credential
121
- credential_subject = self . payload ["credentialSubject" ]
122
- if credential_subject ["statusPurpose" ] != self . credential_status ["statusPurpose" ]:
153
+ credential_subject = payload ["credentialSubject" ]
154
+ if credential_subject ["statusPurpose" ] != credential_status ["statusPurpose" ]:
123
155
raise StatusVerificationError (
124
- f"statusPurpose in credential is { self . credential_status ["statusPurpose" ]} , while \
156
+ f"statusPurpose in credential is { credential_status ["statusPurpose" ]} , while \
125
157
statusPurpose in status list is { credential_subject ["statusPurpose" ]} "
126
158
)
127
159
128
160
# If statusPurpose = message, ensure that a statusMessage list exists in the credential
129
- bits = self . credential_status .get ("statusSize" )
130
- if bits is not None and bits > 1 and self . credential_status .get ("statusMessage" ) is None :
161
+ bits = credential_status .get ("statusSize" )
162
+ if bits is not None and bits > 1 and credential_status .get ("statusMessage" ) is None :
131
163
raise StatusVerificationError ("For statusSize > 1, a message must exist." )
132
164
133
- if self . credential_status ["statusPurpose" ] == "message" and self . credential_status .get ("statusMessage" ) is None :
165
+ if credential_status ["statusPurpose" ] == "message" and credential_status .get ("statusMessage" ) is None :
134
166
raise StatusVerificationError ("If statusPurpose is `message`, a statusMessage field must \
135
167
be included which provides the message associated with each bit." )
136
168
137
169
# Cache returned status list as BitArray
138
- self . _bit_array = BitArray .from_b64 (1 if bits is None else bits , credential_subject ["encodedList" ])
139
- if self . _bit_array .size < min_list_length :
170
+ bit_array = BitArray .from_b64 (1 if bits is None else bits , credential_subject ["encodedList" ])
171
+ if bit_array .size < min_list_length :
140
172
raise StatusListLengthError (f"Bitstring status list must be at least { min_list_length } \
141
- bits long, but was { self ._bit_array .size } bits long instead." )
173
+ bits long, but was { bit_array .size } bits long instead." )
174
+
175
+ return cls (
176
+ credential_status = credential_status ,
177
+ headers = headers ,
178
+ payload = payload ,
179
+ bit_array = bit_array ,
180
+ )
142
181
143
182
def get_status (self , idx : Optional [int ] = None ):
144
- assert self ._bit_array is not None , "Before accessing the status, please verify using jwt_verify or cwt_verify"
145
183
if idx is None :
146
184
idx = int (self .credential_status ["statusListIndex" ])
147
185
0 commit comments