|
| 1 | +# Copyright 2025 Planet Labs PBC. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +import click |
| 16 | +import json |
| 17 | +import pathlib |
| 18 | +import sys |
| 19 | +import textwrap |
| 20 | +import time |
| 21 | +import typing |
| 22 | + |
| 23 | +from planet_auth import ( |
| 24 | + AuthException, |
| 25 | + TokenValidator, |
| 26 | + OidcMultiIssuerValidator, |
| 27 | +) |
| 28 | +from planet_auth.util import custom_json_class_dumper |
| 29 | + |
| 30 | +from .options import ( |
| 31 | + opt_audience, |
| 32 | + opt_issuer, |
| 33 | + opt_token, |
| 34 | + opt_token_file, |
| 35 | + opt_human_readable, |
| 36 | +) |
| 37 | +from .util import recast_exceptions_to_click |
| 38 | + |
| 39 | + |
| 40 | +class _jwt_human_dumps: |
| 41 | + """ |
| 42 | + Wrapper object for controlling the json.dumps behavior of JWTs so that |
| 43 | + we can display a version different from what is stored in memory. |
| 44 | +
|
| 45 | + For pretty printing JWTs, we convert timestamps into |
| 46 | + human-readable strings. |
| 47 | + """ |
| 48 | + |
| 49 | + def __init__(self, data): |
| 50 | + self._data = data |
| 51 | + |
| 52 | + def __json_pretty_dumps__(self): |
| 53 | + def _human_timestamp_iso(d): |
| 54 | + for key, value in list(d.items()): |
| 55 | + if key in ["iat", "exp", "nbf"] and isinstance(value, int): |
| 56 | + fmt_time = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime(value)) |
| 57 | + if (key == "exp") and (d[key] < time.time()): |
| 58 | + fmt_time += " (Expired)" |
| 59 | + d[key] = fmt_time |
| 60 | + elif isinstance(value, dict): |
| 61 | + _human_timestamp_iso(value) |
| 62 | + return d |
| 63 | + |
| 64 | + json_dumps = self._data.copy() |
| 65 | + _human_timestamp_iso(json_dumps) |
| 66 | + return json_dumps |
| 67 | + |
| 68 | + |
| 69 | +def json_dumps_for_jwt_dict(data: dict, human_readable: bool, indent: int = 2): |
| 70 | + if human_readable: |
| 71 | + return json.dumps(_jwt_human_dumps(data), indent=indent, sort_keys=True, default=custom_json_class_dumper) |
| 72 | + else: |
| 73 | + return json.dumps(data, indent=2, sort_keys=True) |
| 74 | + |
| 75 | + |
| 76 | +def print_jwt_parts(raw, header, body, signature, human_readable): |
| 77 | + if raw: |
| 78 | + print(f"RAW:\n {raw}\n") |
| 79 | + |
| 80 | + if header: |
| 81 | + print( |
| 82 | + f'HEADER:\n{textwrap.indent(json_dumps_for_jwt_dict(data=header, human_readable=human_readable), prefix=" ")}\n' |
| 83 | + ) |
| 84 | + |
| 85 | + if body: |
| 86 | + print( |
| 87 | + f'BODY:\n{textwrap.indent(json_dumps_for_jwt_dict(body, human_readable=human_readable), prefix=" ")}\n' |
| 88 | + ) |
| 89 | + |
| 90 | + if signature: |
| 91 | + pretty_hex_signature = "" |
| 92 | + i = 0 |
| 93 | + for c in signature: |
| 94 | + if i == 0: |
| 95 | + pass |
| 96 | + elif (i % 16) != 0: |
| 97 | + pretty_hex_signature += ":" |
| 98 | + else: |
| 99 | + pretty_hex_signature += "\n" |
| 100 | + |
| 101 | + pretty_hex_signature += "{:02x}".format(c) |
| 102 | + i += 1 |
| 103 | + |
| 104 | + print(f'SIGNATURE:\n{textwrap.indent(pretty_hex_signature, prefix=" ")}\n') |
| 105 | + |
| 106 | + |
| 107 | +def hazmat_print_jwt(token_str, human_readable): |
| 108 | + print("UNTRUSTED JWT Decoding\n") |
| 109 | + if token_str: |
| 110 | + (hazmat_header, hazmat_body, hazmat_signature) = TokenValidator.hazmat_unverified_decode(token_str) |
| 111 | + print_jwt_parts( |
| 112 | + raw=token_str, |
| 113 | + header=hazmat_header, |
| 114 | + body=hazmat_body, |
| 115 | + signature=hazmat_signature, |
| 116 | + human_readable=human_readable, |
| 117 | + ) |
| 118 | + |
| 119 | + |
| 120 | +@click.group("jwt", invoke_without_command=True) |
| 121 | +@click.pass_context |
| 122 | +def cmd_jwt(ctx): |
| 123 | + """ |
| 124 | + JWT utility for working with tokens. These functions are primarily targeted |
| 125 | + towards debugging usage. Many of the functions do not perform token validation. |
| 126 | + THE CONTENTS OF UNVALIDATED TOKENS MUST BE TREATED AS UNTRUSTED AND POTENTIALLY |
| 127 | + MALICIOUS. |
| 128 | + """ |
| 129 | + if ctx.invoked_subcommand is None: |
| 130 | + click.echo(ctx.get_help()) |
| 131 | + sys.exit(0) |
| 132 | + |
| 133 | + |
| 134 | +def _get_token_or_fail(token_opt: typing.Optional[str], token_file_opt: typing.Optional[pathlib.Path]): |
| 135 | + if token_opt: |
| 136 | + token = token_opt |
| 137 | + elif token_file_opt: |
| 138 | + with open(token_file_opt, mode="r", encoding="UTF-8") as file_r: |
| 139 | + token = file_r.read() |
| 140 | + else: |
| 141 | + # click.echo(ctx.get_help()) |
| 142 | + # click.echo() |
| 143 | + raise click.UsageError("A token must be provided.") |
| 144 | + return token |
| 145 | + |
| 146 | + |
| 147 | +@cmd_jwt.command("decode") |
| 148 | +@click.pass_context |
| 149 | +@opt_human_readable |
| 150 | +@opt_token |
| 151 | +@opt_token_file |
| 152 | +@recast_exceptions_to_click(AuthException, FileNotFoundError) |
| 153 | +def cmd_jwt_decode(ctx, token: str, token_file: pathlib.Path, human_readable): |
| 154 | + """ |
| 155 | + Decode a JWT token WITHOUT PERFORMING ANY VALIDATION. |
| 156 | + """ |
| 157 | + token_to_print = _get_token_or_fail(token_opt=token, token_file_opt=token_file) |
| 158 | + hazmat_print_jwt(token_str=token_to_print, human_readable=human_readable) |
| 159 | + |
| 160 | + |
| 161 | +@cmd_jwt.command("validate-oauth") |
| 162 | +@click.pass_context |
| 163 | +@opt_human_readable |
| 164 | +@opt_token |
| 165 | +@opt_token_file |
| 166 | +@opt_audience() |
| 167 | +@opt_issuer() |
| 168 | +@recast_exceptions_to_click(AuthException, FileNotFoundError) |
| 169 | +def cmd_jwt_validate_oauth(ctx, token, token_file, audience, issuer, human_readable): |
| 170 | + """ |
| 171 | + Perform signature validation on an RFC 9068 compliant JWT token. |
| 172 | + The `iss` and `aud` claims will be used to look up signing keys |
| 173 | + using OAuth2/OIDC discovery protocols and perform basic validation |
| 174 | + checks. |
| 175 | +
|
| 176 | + This command performs only basic signature verification and token validity |
| 177 | + checks. For checks against auth server token revocation lists, see the `oauth` |
| 178 | + command. For deeper checks specific to the claims and structure of |
| 179 | + Identity or Access tokens, see the `oauth` command. |
| 180 | +
|
| 181 | + WARNING:\n |
| 182 | + THIS TOOL IS ABSOLUTELY INAPPROPRIATE FOR PRODUCTION TRUST USAGE. This is a |
| 183 | + development and debugging utility. The default behavior to inspect the token |
| 184 | + for issuer and audience information used to validate the token is wholly |
| 185 | + incorrect for a production use case. The decision of which issuers to |
| 186 | + trust with which audiences MUST be controlled by the service operator. |
| 187 | + """ |
| 188 | + token_to_validate = _get_token_or_fail(token_opt=token, token_file_opt=token_file) |
| 189 | + (hazmat_header, hazmat_body, hazmat_signature) = TokenValidator.hazmat_unverified_decode(token_to_validate) |
| 190 | + |
| 191 | + if issuer: |
| 192 | + validation_iss = issuer |
| 193 | + else: |
| 194 | + if not hazmat_body.get("iss"): |
| 195 | + raise click.BadParameter( |
| 196 | + "The provided token does not contain an `iss` claim. Is the provided JWT RFC 9068 compliant?" |
| 197 | + ) |
| 198 | + validation_iss = hazmat_body.get("iss") |
| 199 | + |
| 200 | + if audience: |
| 201 | + validation_aud = audience |
| 202 | + else: |
| 203 | + if not hazmat_body.get("aud"): |
| 204 | + raise click.BadParameter( |
| 205 | + "The provided token does not contain an `aud` claim. Is the provided JWT RFC 9068 compliant?" |
| 206 | + ) |
| 207 | + hazmat_aud = hazmat_body.get("aud") |
| 208 | + if isinstance(hazmat_aud, list): |
| 209 | + validation_aud = hazmat_aud[0] |
| 210 | + else: |
| 211 | + validation_aud = hazmat_aud |
| 212 | + |
| 213 | + validator = OidcMultiIssuerValidator.from_auth_server_urls( |
| 214 | + trusted_auth_server_urls=[validation_iss], audience=validation_aud, log_result=False |
| 215 | + ) |
| 216 | + validated_body, _ = validator.validate_access_token(token_to_validate, do_remote_revocation_check=False) |
| 217 | + # Validation throws on error |
| 218 | + click.echo("TOKEN OK") |
| 219 | + print_jwt_parts( |
| 220 | + raw=token_to_validate, |
| 221 | + header=hazmat_header, |
| 222 | + body=validated_body, |
| 223 | + signature=hazmat_signature, |
| 224 | + human_readable=human_readable, |
| 225 | + ) |
| 226 | + |
| 227 | + |
| 228 | +@cmd_jwt.command("validate-rs256") |
| 229 | +@click.pass_context |
| 230 | +@opt_human_readable |
| 231 | +@opt_token |
| 232 | +@opt_token_file |
| 233 | +@recast_exceptions_to_click(AuthException, FileNotFoundError, NotImplementedError) |
| 234 | +def cmd_jwt_validate_rs256(ctx, token, token_file, human_readable): |
| 235 | + """ |
| 236 | + Validate a JWT signed with a RS256 signature |
| 237 | + """ |
| 238 | + # token_to_validate = _get_token_or_fail(token_opt=token, token_file_opt=token_file) |
| 239 | + raise NotImplementedError("Command not implemented") |
| 240 | + |
| 241 | + |
| 242 | +@cmd_jwt.command("validate-hs512") |
| 243 | +@click.pass_context |
| 244 | +@opt_human_readable |
| 245 | +@opt_token |
| 246 | +@opt_token_file |
| 247 | +@recast_exceptions_to_click(AuthException, FileNotFoundError, NotImplementedError) |
| 248 | +def cmd_jwt_validate_hs512(ctx, token, token_file, human_readable): |
| 249 | + """ |
| 250 | + Validate a JWT signed with a HS512 signature |
| 251 | + """ |
| 252 | + # token_to_validate = _get_token_or_fail(token_opt=token, token_file_opt=token_file) |
| 253 | + raise NotImplementedError("Command not implemented") |
0 commit comments