-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdemo-as.py
149 lines (125 loc) · 5.34 KB
/
demo-as.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright 2022 EDF (Électricité de France S.A.)
# SPDX-License-Identifier: BSD-3-Clause
# See README for all details on copyright, authorship and license.
import socket
import secrets
from http.server import HTTPServer, BaseHTTPRequestHandler, HTTPStatus
import time
from pathlib import Path
import yaml
import cbor2
import pycose.messages, pycose.keys, pycose.headers, pycose.algorithms
configs_by_audience = {}
for config in Path("configs/").glob('*.yaml'):
data = yaml.safe_load(open(config))
configs_by_audience[data['audience']] = data
def generate_token_for(key, scope, exp):
"""Generate an ACE-OSCORE token with fresh key material for communication
with the RS with which the given `key` is agreed."""
# for encrypted token
#
# A realistic server should either implement a counter here, or keep track
# of issued tokens; otherwise an attacking client could keep requesting a
# token, wait for a birthday to happen after 2**52 requests, and use that
# to geain the AS-RS key
iv = secrets.token_bytes(13)
# for cnf (we don't provide extra salt or contextId)
master_secret = secrets.token_bytes(16)
identifier = secrets.token_bytes(4) # we don't allow upgrades, so we just pick random ones; a longer running server would ensure they don't conflict
# keys are CWT confirmation methods; inner are from OSCORE Security Context Parameters
cnf = {4: {
0: identifier,
2: master_secret,
}}
# keys from OAuth Parameters CBOR Mappings
encrypted = {
4: int(exp),
8: cnf,
9: scope,
# Not repeating RS or AS: our RS only have one AS, and they'll know from the unique keys that it's for them
}
enc0 = pycose.messages.Enc0Message(
{
pycose.headers.Algorithm.identifier:
pycose.algorithms.AESCCM16128256.identifier
}, {
pycose.headers.IV.identifier: iv
},
cbor2.dumps(encrypted))
enc0.key = key
token = enc0.encode()
# keys from OAuth Parameters CBOR Mappings
message = {
1: token,
8: cnf,
9: scope,
}
return cbor2.dumps(message)
class AceServer(BaseHTTPRequestHandler):
"""HTTP handler implementing a simple ACE AS.
This supports POSTs to /token as per the ACE OSCORE profile. An OPTIONS
handler ensures that CORS requirements are met.
Unauthenticated requests are redirected to `/`, where the index.html page
is served. That page takes care of the bespoke OAuth-like protocol the
client webapp expects.
"""
def do_GET(self):
if self.path == "/" or self.path.startswith("/?"):
self.send_response(HTTPStatus.OK)
self.end_headers()
self.wfile.write(open('index.html', 'rb').read())
elif self.path == "/token":
self.send_error(HTTPStatus.METHOD_NOT_ALLOWED)
else:
self.send_error(HTTPStatus.NOT_FOUND)
def do_OPTIONS(self):
self.send_response(HTTPStatus.OK)
self.send_header('Access-Control-Allow-Origin', self.headers.get('Origin'))
self.send_header('Access-Control-Allow-Headers', 'Authorization')
self.end_headers()
def do_POST(self):
if self.path != '/token':
self.send_error(HTTPStatus.NOT_FOUND)
return
scope = self.headers.get('Authorization')
BITS_GET = 1 << 0
BITS_POST = 1 << 1
BITS_PUT = 1 << 2
if scope == 'bearer senior':
# GET is not actually implemented
aif = [['/temp', BITS_GET], ['/identify', BITS_POST], ['/leds', BITS_PUT | BITS_GET]]
expiry = 2 * 60 * 60
elif scope == 'bearer junior':
# Not giving the junior a GET on leds yet because the firmware
# can't keep the bits apart yet
aif = [['/temp', BITS_GET], ['/identify', BITS_POST]]
expiry = 5 * 60
else:
self.send_response(HTTPStatus.UNAUTHORIZED)
self.send_header('Access-Control-Allow-Origin', self.headers.get('Origin'))
self.send_header('Access-Control-Expose-Headers', 'Location')
self.send_header('Location', './')
self.end_headers()
return
scope = cbor2.dumps(aif)
request = cbor2.load(self.rfile)
# from ACE Authorization Server Request Creation Hints
audience = request[5]
# ignoring scope, we'll tell them what they can have
rs_key = pycose.keys.SymmetricKey(bytes.fromhex(configs_by_audience[audience]['key']))
self.send_response(HTTPStatus.CREATED)
self.send_header('Content-Format', 'application/ace+cbor')
self.send_header('Access-Control-Allow-Origin', self.headers.get('Origin'))
self.send_header('Access-Control-Allow-Credentials', 'true')
self.end_headers()
self.wfile.write(generate_token_for(rs_key, scope, time.time() + expiry))
class HTTP6Server(HTTPServer):
address_family = socket.AF_INET6
def run():
httpd = HTTP6Server(('::1', 8119), AceServer)
httpd.serve_forever()
if __name__ == "__main__":
#rs_key = pycose.keys.SymmetricKey(bytes(list(b'abc') + list(range(4, 33))))
#print(cbor2.loads(generate_token_for(rs_key, scope=b"somescope", exp=2**32-1)))
run()