-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a verifier for token status list, as well as issuer and verifier for bitstring status list. #1
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Daniel Bluhm <[email protected]> Signed-off-by: Athan Massouras <[email protected]>
…refactor Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
…ance Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]> Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]> Signed-off-by: Athan Massouras <[email protected]>
…haviour Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
can curl jwt_example, cwt_example, and an ec256 public key headers are not yet implemented Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
…nd deserialisng a token status list Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
… yet to be implemented Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
untested, only supports jwts Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
…it statuses Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
70fd004
to
3e65ff1
Compare
Signed-off-by: Athan Massouras <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! I have a few recommendations that I think will help polish the implementation and more clearly define the expected usage of our classes here. Nice work and thanks for reviewing the feedback below!
src/token_status_list/verifier.py
Outdated
def establish_connection( | ||
self, | ||
status_list_format: Literal["CWT", "JWT"], | ||
issuer_uri: str, | ||
) -> bytes: | ||
""" Establish connection. Returns base64 encoded response. """ | ||
response = r.get( | ||
issuer_uri, | ||
headers={"Accept": f"application/statuslist+{status_list_format.lower()}"} | ||
) | ||
|
||
assert 200 <= response.status_code < 300, f"Unable to establish connection." | ||
self.issuer_uri = issuer_uri | ||
self.encoding = status_list_format | ||
|
||
# When establishing a new connection, clear previous cached values. | ||
self.headers = None | ||
self.protected_headers = None | ||
self.unprotected_headers = None | ||
self.payload = None | ||
self._bit_array = None | ||
|
||
return response.content |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a number of recommendations for this method:
- I think
retrieve_list
would be a better name;establish_connection
could be interpreted a number of ways and implies just initial connection and not necessarily a transfer of data. - I think we should use either
aiohttp
orhttpx.AsyncClient
for this instead ofrequests
. That would mean the method will have to be declared async. - I propose we make this method a
@classmethod
that returns a new instance of the verifier. In that model, the headers, protected_headers, payload, and _bit_array attributes would be immutable which I think would help prevent a category of bugs that could arise from having attributes designed to be reset by certain events. It also makes it so the types of these attributes are cleaner; instead of, for example,encoding
beingLiteral["CWT", "JWT"] | None = None
, it could just beLiteral["CWT", "JWT"]
. This makes it easier to work with these attributes since we don't have to worry about the None case. It also makes it easier to constrain the usage in appropriate ways; for example, just by looking at the init method of this class, it's unclear whether the user should provide none or all of the attributes passed in -- or something in between. - I propose we add another classmethod for creating a verifier from a jwt/cwt that the caller might have obtained through some other means. The
retrieve_list
class method would reuse this classmethod. assert
statements should be reserved for catch errors rising from us, the programmers, violating our own assumptions we've encoded into our program. We should raise a "normal" exception in the event that we get a status code outside of the expected range.
Here is an example of what this might look like:
def __init__(
self,
encoding: Literal["JWT", "CWT"],
protected_headers: dict,
unprotected_headers: dict,
payload: dict,
bit_array: BitArray,
):
self.encoding = encoding
self.headers = protected_headers
self.protected_headers = protected_headers
self.unprotected_headers = unprotected_headers
self.payload = payload
self._bit_array = bit_array
@classmethod
async def retrieve_list(
cls,
status_list_uri: str,
encoding: Literal["JWT", "CWT"] = "JWT",
headers: dict | None = None,
) -> "TokenStatusListVerifier":
""" Establish connection. Returns base64 encoded response. """
headers = headers or {}
headers.update({"Accept": f"application/statuslist+{encoding.lower()}"})
async with ClientSession() as session:
async with session.get(status_list_uri, headers=headers) as resp:
# Quick method to raise exception on status outside of 200 range
# TODO: consider using a more semantically rich exception
resp.raise_for_status()
token = await resp.read()
if encoding == "JWT":
return cls.from_jwt(token)
if encoding == "CWT":
return cls.from_cwt(token)
@classmethod
def from_jwt(cls, token: str | bytes) -> "TokenStatusListVerifier":
if isinstance(token, str):
token = token.encode()
if not token.startswith(b"ey"):
raise ValueError("JWT requested but token is not a JWT")
# INSERT DATA EXTRACTION HERE
protected = None
unprotected = None
payload = None
bit_array = None
return cls("JWT", protected, unprotected, payload, bit_array)
@classmethod
def from_cwt(cls, token: bytes) -> "TokenStatusListVerifier":
# TODO: Is there a consistent byte at the beginning of statuslist+cwt?
if token.startswith(b"ey"):
raise ValueError("CWT request but got JWT")
# INSERT DATA EXTRACTION HERE
protected = None
unprotected = None
payload = None
bit_array = None
return cls("CWT", protected, unprotected, payload, bit_array)
src/token_status_list/verifier.py
Outdated
signer in sign_jwt() in issuer.py. | ||
""" | ||
# Ensure that the format is correct | ||
assert self.encoding != "CWT", "Please use TokenStatusListVerifier.cwt_verifier() for tokens in cwt format." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be a single verify
method that detects the encoding of the current status list and then calls an encoding specific verify; e.g.:
def verify(self, verifier: TokenVerifier):
assert self.encoding in ("JWT", "CWT"), f"Invalid value for encoding: {self.encoding}"
if self.encoding == "JWT":
return self.jwt_verify(verifier)
if self.encoding == "CWT":
return self.cwt_verify(verifier)
src/token_status_list/verifier.py
Outdated
def get_status(self, idx: int) -> int: | ||
""" | ||
Returns the status of an object from the status_list in payload. | ||
Requies that the payload has already been checked using jwt_verify or cwt_verify. | ||
Caches the status list as a BitArray for ease of future reference. | ||
|
||
Args: | ||
payload: REQUIRED. A verified payload returned from jwt_verify or cwt_verify. | ||
|
||
index: REQUIRED. The index of the token's status in the list. | ||
|
||
Returns: | ||
The status of the requested token. | ||
""" | ||
|
||
assert self.encoding is not None and self.payload is not None and self._bit_array is not None,\ | ||
"Before accessing the status, please verify using jwt_verify or cwt_verify" | ||
|
||
return self._bit_array[idx] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To encode the expected usage of our verifier class, I wonder if we should have a StatusReporter
that gets returned by the verify function. The StatusReporter
would be the one with this method. So the expected usage (combined with my previous comments) might look something like this:
verifier = await TokenStatusListVerifier.retrieve_list("JWT", "https://status.example.com/0")
reporter = verifier.verify(token_verifier)
reporter.get_status(0)
The first line is responsible for retrieval and basic parsing of the token. The second is responsible for evaluating the cryptographic integrity and origin of the token. The third uses the returned status reporter and is basically just a simple wrapper around BitArray
.
This forces the user to correctly use our classes; this should make it harder for them to use a value that is private to the class in a way that compromises the integrity of the status list (i.e. by somehow skipping signature verification or altering a critical value after signature verification and polluting the results).
@@ -0,0 +1,50 @@ | |||
from bit_array import * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This (and other imports) should look like from token_status_list.bit_array import ...
. The package is token_status_list
; pdm will install the package into the local scope when you run pdm install
. If you have any issues with this, let me know.
Also, it is generally discouraged to do from x import *
; imports should be precise, importing only what is actually used in the code of the current module.
class StatusVerificationError(Exception): | ||
"""Raised if proofs fail or if format is invalid. See Bitstring Status List Spec S. 3.2""" | ||
|
||
class BitstringStatusListVerifier(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comments above about the TokenStatusListVerifier; many of the same recommendations apply here.
Signed-off-by: Athan Massouras <[email protected]>
…h/token-status-list-py into feature/verifier-status-list
Signed-off-by: Athan Massouras <[email protected]>
…import statements Signed-off-by: Athan Massouras <[email protected]>
Signed-off-by: Athan Massouras <[email protected]>
Create a verifier for token status list, both JWT and CWT formats. Implement the bitstring status list spec, creating both an issuer and verifier equivalent to those in token status list.