|
| 1 | +""" |
| 2 | +Testing the PKCS#11 shim layer. |
| 3 | +Heavily inspired by from https://github.com/IdentityPython/pyXMLSecurity by leifj |
| 4 | +under licence "As is", see https://github.com/IdentityPython/pyXMLSecurity/blob/master/LICENSE.txt |
| 5 | +""" |
| 6 | + |
| 7 | +import logging |
| 8 | +import os |
| 9 | +import shutil |
| 10 | +import subprocess |
| 11 | +import tempfile |
| 12 | +import traceback |
| 13 | +import unittest |
| 14 | +from typing import Dict, List, Optional, Tuple |
| 15 | + |
| 16 | +DATA_DIR = os.path.join(os.path.dirname(__file__), "data") |
| 17 | + |
| 18 | + |
| 19 | +def paths_for_component(component: str, default_paths: List[str]): |
| 20 | + env_path = os.environ.get(component) |
| 21 | + return [env_path] if env_path else default_paths |
| 22 | + |
| 23 | + |
| 24 | +def find_alts(component_name, alts: List[str]) -> str: |
| 25 | + for a in alts: |
| 26 | + if os.path.exists(a): |
| 27 | + return a |
| 28 | + raise unittest.SkipTest("Required component is missing: {}".format(component_name)) |
| 29 | + |
| 30 | + |
| 31 | +def run_cmd(args, softhsm_conf=None) -> Tuple[bytes, bytes]: |
| 32 | + env = {} |
| 33 | + if softhsm_conf is not None: |
| 34 | + env['SOFTHSM_CONF'] = softhsm_conf |
| 35 | + env['SOFTHSM2_CONF'] = softhsm_conf |
| 36 | + proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) |
| 37 | + out, err = proc.communicate() |
| 38 | + if err is not None and len(err) > 0: |
| 39 | + logging.error(err) |
| 40 | + if out is not None and len(out) > 0: |
| 41 | + logging.debug(out) |
| 42 | + rv = proc.wait() |
| 43 | + if rv: |
| 44 | + with open(softhsm_conf) as f: |
| 45 | + conf = f.read() |
| 46 | + msg = '[cmd: {cmd}] [code: {code}] [stdout: {out}] [stderr: {err}] [config: {conf}]' |
| 47 | + msg = msg.format( |
| 48 | + cmd=" ".join(args), code=rv, out=out.strip(), err=err.strip(), conf=conf, |
| 49 | + ) |
| 50 | + raise RuntimeError(msg) |
| 51 | + return out, err |
| 52 | + |
| 53 | + |
| 54 | +component_default_paths: Dict[str, List[str]] = { |
| 55 | + 'P11_MODULE': [ |
| 56 | + '/usr/lib/softhsm/libsofthsm2.so', |
| 57 | + '/usr/lib/x86_64-linux-gnu/softhsm/libsofthsm2.so', |
| 58 | + '/usr/lib/softhsm/libsofthsm.so', |
| 59 | + '/usr/lib64/softhsm/libsofthsm2.so', |
| 60 | + ], |
| 61 | + 'P11_ENGINE': [ |
| 62 | + '/usr/lib/ssl/engines/libpkcs11.so', |
| 63 | + '/usr/lib/engines/engine_pkcs11.so', |
| 64 | + '/usr/lib/x86_64-linux-gnu/engines-1.1/pkcs11.so', |
| 65 | + '/usr/lib64/engines-1.1/pkcs11.so', |
| 66 | + '/usr/lib64/engines-1.1/libpkcs11.so', |
| 67 | + '/usr/lib64/engines-3/pkcs11.so', |
| 68 | + '/usr/lib64/engines-3/libpkcs11.so', |
| 69 | + '/usr/lib/x86_64-linux-gnu/engines-3/pkcs11.so', |
| 70 | + '/usr/lib/x86_64-linux-gnu/engines-3/libpkcs11.so', |
| 71 | + ], |
| 72 | + 'PKCS11_TOOL': [ |
| 73 | + '/usr/bin/pkcs11-tool', |
| 74 | + ], |
| 75 | + 'SOFTHSM': [ |
| 76 | + '/usr/bin/softhsm2-util', |
| 77 | + '/usr/bin/softhsm', |
| 78 | + ], |
| 79 | + 'OPENSSL': [ |
| 80 | + '/usr/bin/openssl', |
| 81 | + ], |
| 82 | +} |
| 83 | + |
| 84 | +component_path: Dict[str, str] = { |
| 85 | + component_name: find_alts(component_name, paths_for_component(component_name, default_paths)) |
| 86 | + for component_name, default_paths in component_default_paths.items() |
| 87 | +} |
| 88 | + |
| 89 | +softhsm_version = 1 |
| 90 | +if component_path['SOFTHSM'].endswith('softhsm2-util'): |
| 91 | + softhsm_version = 2 |
| 92 | + |
| 93 | +openssl_version = subprocess.check_output([component_path['OPENSSL'], |
| 94 | + 'version'] |
| 95 | + )[8:11].decode() |
| 96 | + |
| 97 | +p11_test_files: List[str] = [] |
| 98 | +softhsm_conf: Optional[str] = None |
| 99 | +softhsm_db: Optional[str] = None |
| 100 | + |
| 101 | + |
| 102 | +def _temp_file() -> str: |
| 103 | + f = tempfile.NamedTemporaryFile(delete=False) |
| 104 | + p11_test_files.append(f.name) |
| 105 | + return f.name |
| 106 | + |
| 107 | + |
| 108 | +def _temp_dir() -> str: |
| 109 | + d = tempfile.mkdtemp() |
| 110 | + p11_test_files.append(d) |
| 111 | + return d |
| 112 | + |
| 113 | + |
| 114 | +@unittest.skipIf(component_path['P11_MODULE'] is None, "SoftHSM PKCS11 module not installed") |
| 115 | +def setup() -> None: |
| 116 | + logging.debug("Creating test pkcs11 token using softhsm") |
| 117 | + try: |
| 118 | + global softhsm_conf |
| 119 | + softhsm_conf = _temp_file() |
| 120 | + logging.debug("Generating softhsm.conf") |
| 121 | + with open(softhsm_conf, "w") as f: |
| 122 | + if softhsm_version == 2: |
| 123 | + softhsm_db = _temp_dir() |
| 124 | + f.write(""" |
| 125 | +# Generated by test |
| 126 | +directories.tokendir = %s |
| 127 | +objectstore.backend = file |
| 128 | +log.level = DEBUG |
| 129 | +""" % softhsm_db) |
| 130 | + else: |
| 131 | + softhsm_db = _temp_file() |
| 132 | + f.write(""" |
| 133 | +# Generated by test |
| 134 | +0:%s |
| 135 | +""" % softhsm_db) |
| 136 | + |
| 137 | + logging.debug("Initializing the token") |
| 138 | + out, err = run_cmd([component_path['SOFTHSM'], |
| 139 | + '--slot', '0', |
| 140 | + '--label', 'test', |
| 141 | + '--init-token', |
| 142 | + '--pin', 'secret1', |
| 143 | + '--so-pin', 'secret2'], |
| 144 | + softhsm_conf=softhsm_conf) |
| 145 | + |
| 146 | + # logging.debug("Generating 1024 bit RSA key in token") |
| 147 | + # run_cmd([component_path['PKCS11_TOOL'], |
| 148 | + # '--module', component_path['P11_MODULE'], |
| 149 | + # '-l', |
| 150 | + # '-k', |
| 151 | + # '--key-type', 'rsa:1024', |
| 152 | + # '--id', 'a1b2', |
| 153 | + # '--label', 'test', |
| 154 | + # '--pin', 'secret1'], softhsm_conf=softhsm_conf) |
| 155 | + |
| 156 | + hash_priv_key = _temp_file() |
| 157 | + logging.debug("Converting test private key to format for softhsm") |
| 158 | + run_cmd([component_path['OPENSSL'], 'pkcs8', |
| 159 | + '-topk8', |
| 160 | + '-inform', 'PEM', |
| 161 | + '-outform', 'PEM', |
| 162 | + '-nocrypt', |
| 163 | + '-in', os.path.join(DATA_DIR, 'rsakey.pem'), |
| 164 | + '-out', hash_priv_key], softhsm_conf=softhsm_conf) |
| 165 | + |
| 166 | + logging.debug("Importing the test key to softhsm") |
| 167 | + run_cmd([component_path['SOFTHSM'], |
| 168 | + '--import', hash_priv_key, |
| 169 | + '--token', 'test', |
| 170 | + '--id', 'a1b2', |
| 171 | + '--label', 'test', |
| 172 | + '--pin', 'secret1'], |
| 173 | + softhsm_conf=softhsm_conf) |
| 174 | + run_cmd([component_path['PKCS11_TOOL'], |
| 175 | + '--module', component_path['P11_MODULE'], |
| 176 | + '-l', |
| 177 | + '--pin', 'secret1', '-O'], softhsm_conf=softhsm_conf) |
| 178 | + signer_cert_pem = _temp_file() |
| 179 | + openssl_conf = _temp_file() |
| 180 | + logging.debug("Generating OpenSSL config for version {}".format(openssl_version)) |
| 181 | + with open(openssl_conf, "w") as f: |
| 182 | + # Might be needed with some versions of openssl, but in more recent versions dynamic_path breaks it. |
| 183 | + # dynamic_path = ( |
| 184 | + # "dynamic_path = %s" % component_path['P11_ENGINE'] |
| 185 | + # if openssl_version.startswith(b'1.') |
| 186 | + # else "" |
| 187 | + # ) |
| 188 | + f.write("\n".join([ |
| 189 | + "openssl_conf = openssl_def", |
| 190 | + "[openssl_def]", |
| 191 | + "engines = engine_section", |
| 192 | + "[engine_section]", |
| 193 | + "pkcs11 = pkcs11_section", |
| 194 | + "[req]", |
| 195 | + "distinguished_name = req_distinguished_name", |
| 196 | + "[req_distinguished_name]", |
| 197 | + "[pkcs11_section]", |
| 198 | + "engine_id = pkcs11", |
| 199 | + # dynamic_path, |
| 200 | + "MODULE_PATH = %s" % component_path['P11_MODULE'], |
| 201 | + "init = 0", |
| 202 | + ])) |
| 203 | + |
| 204 | + with open(openssl_conf, "r") as f: |
| 205 | + logging.debug('-------- START DEBUG openssl_conf --------') |
| 206 | + logging.debug(f.readlines()) |
| 207 | + logging.debug('-------- END DEBUG openssl_conf --------') |
| 208 | + logging.debug('-------- START DEBUG paths --------') |
| 209 | + logging.debug(run_cmd(['ls', '-ld', component_path['P11_ENGINE']])) |
| 210 | + logging.debug(run_cmd(['ls', '-ld', component_path['P11_MODULE']])) |
| 211 | + logging.debug('-------- END DEBUG paths --------') |
| 212 | + |
| 213 | + signer_cert_der = _temp_file() |
| 214 | + |
| 215 | + logging.debug("Generating self-signed certificate") |
| 216 | + run_cmd([component_path['OPENSSL'], 'req', |
| 217 | + '-new', |
| 218 | + '-x509', |
| 219 | + '-subj', "/CN=Test Signer", |
| 220 | + '-engine', 'pkcs11', |
| 221 | + '-config', openssl_conf, |
| 222 | + '-keyform', 'engine', |
| 223 | + '-key', 'label_test', |
| 224 | + '-passin', 'pass:secret1', |
| 225 | + '-out', signer_cert_pem], softhsm_conf=softhsm_conf) |
| 226 | + |
| 227 | + run_cmd([component_path['OPENSSL'], 'x509', |
| 228 | + '-inform', 'PEM', |
| 229 | + '-outform', 'DER', |
| 230 | + '-in', signer_cert_pem, |
| 231 | + '-out', signer_cert_der], softhsm_conf=softhsm_conf) |
| 232 | + |
| 233 | + logging.debug("Importing certificate into token") |
| 234 | + |
| 235 | + run_cmd([component_path['PKCS11_TOOL'], |
| 236 | + '--module', component_path['P11_MODULE'], |
| 237 | + '-l', |
| 238 | + '--slot-index', '0', |
| 239 | + '--id', 'a1b2', |
| 240 | + '--label', 'test', |
| 241 | + '-y', 'cert', |
| 242 | + '-w', signer_cert_der, |
| 243 | + '--pin', 'secret1'], softhsm_conf=softhsm_conf) |
| 244 | + |
| 245 | + # TODO: Should be teardowned in teardown: |
| 246 | + os.environ['SOFTHSM_CONF'] = softhsm_conf |
| 247 | + os.environ['SOFTHSM2_CONF'] = softhsm_conf |
| 248 | + |
| 249 | + except Exception as ex: |
| 250 | + print("-" * 64) |
| 251 | + traceback.print_exc() |
| 252 | + print("-" * 64) |
| 253 | + logging.error("PKCS11 tests disabled: unable to initialize test token: %s" % ex) |
| 254 | + raise ex |
| 255 | + |
| 256 | + |
| 257 | +def teardown() -> None: |
| 258 | + global p11_test_files |
| 259 | + for o in p11_test_files: |
| 260 | + if os.path.exists(o): |
| 261 | + if os.path.isdir(o): |
| 262 | + shutil.rmtree(o) |
| 263 | + else: |
| 264 | + os.unlink(o) |
| 265 | + p11_test_files = [] |
0 commit comments