diff --git a/contrib/trezor_agent_recover.py b/contrib/trezor_agent_recover.py new file mode 100755 index 00000000..93cef003 --- /dev/null +++ b/contrib/trezor_agent_recover.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python +'''Export secret GPG key using BIP13 derivation scheme. +IMPORTANT: Never run this code with your own mnemonic on a PC +with an internet connection or with any kind of persistent storage. +It may leak your mnemonic, exposing any secret key managed by the +TREZOR - which may result in Bitcoin loss!!! +''' + +from __future__ import print_function + +import sys +import logging +import argparse +import getpass +import hashlib +import hmac +import struct + +from typing import Tuple + +from mnemonic import Mnemonic # type: ignore +from ecdsa import curves, SigningKey # type: ignore +from libagent import util, formats # type: ignore +from libagent.gpg import encode, protocol, client # type: ignore +from libagent.formats import KeyFlags # type: ignore + +HARDENED_INDEX = 0x80000000 +curve_name_to_curve = {"nist256p1": curves.NIST256p} +curve_name_to_seed = {"nist256p1": "Nist256p1 seed"} + +logger = logging.getLogger("export_keys") + + +def get_curve(curve_name: str) -> curves.Curve: + return curve_name_to_curve[curve_name] + + +def mnemonic_to_seed(mnemonic): + return Mnemonic("english").to_seed(mnemonic)[:64] + + +def privkey_and_chaincode_from_seed( + seed: bytes, + curve_name: str +) -> Tuple[bytes, bytes]: + """Return private key and chaincode from provided seed + + >>> test_seed = mnemonic_to_seed(test_mnemonic) + >>> privkey_and_chaincode_from_seed(test_seed, "nist256p1") + (b'\\x1e\\xa4\\\\\\x10\\xd3\\x1a\\xd4\\xb8...\\xc9#t\\xf8\\xcf\\xcb') + """ + curve_secret = curve_name_to_seed[curve_name] + secret = hmac.new(curve_secret.encode(), seed, hashlib.sha512).digest() + + return secret[:32], secret[32:] + + +def derive_private_child( + curve_name: str, + privkey: bytes, + chaincode: bytes, + index: int +) -> Tuple[bytes, bytes]: + """Derives private child key and chaincode using parent child key, + chaincode and index + + >>> test_seed = mnemonic_to_seed(test_mnemonic) + >>> p, c = privkey_and_chaincode_from_seed(test_seed, "nist256p1") + >>> derive_private_child("nist256p1", p, c, 2147483661) + (b'(\\x9f\\xccH\\xd7\\x96yKEQ\\x83\\xde\\x11\\xfaW...\\xc9\\xc5\\x82W\\x01`') + """ + curve = get_curve(curve_name) + secexp = util.bytes2num(privkey) + + assert index & HARDENED_INDEX, "index not hardened" + + data = b'\x00' + privkey + index.to_bytes(4, "big") + payload = hmac.new(chaincode, data, hashlib.sha512).digest() + + B = util.bytes2num(payload[:32]) + + assert B < curve.order, "curve order too small" + + B += secexp + B %= curve.order + + child_private = util.num2bytes(B, 32) + + return child_private, payload[32:] + + +def derive(seed, path, curve_name): + """Derives gpg key from provided bip32 path + + >>> seed = mnemonic_to_seed(test_mnemonic) + >>> sk = derive(seed, + ... [2147483661, 3641273873, 3222207101, 2735596413, 2741857293], + ... "nist256p1") + >>> sk.verifying_key.to_string().hex() + '32dd7bda4eb424e57ec2594bc2dad...eb1ca14a6f518c204e32b24c5f18b4' + """ + logger.debug("seed: %s", seed.hex()) + + privkey, chaincode = privkey_and_chaincode_from_seed(seed, curve_name) + logger.debug("master privkey: %s", privkey.hex()) + logger.debug("master chaincode: %s", chaincode.hex()) + + for i in path: + privkey, chaincode = derive_private_child( + curve_name, privkey, chaincode, i) + + logger.debug("ckd: %d -> %s %s", i, privkey.hex(), chaincode.hex()) + + logger.debug("child privkey: %s", privkey.hex()) + + secexp = util.bytes2num(privkey) + + curve = get_curve(curve_name) + sk = SigningKey.from_secret_exponent( + secexp=secexp, + curve=curve, + hashfunc=hashlib.sha256) + + return sk + + +def pack(sk): + secexp = util.bytes2num(sk.to_string()) + mpi_bytes = protocol.mpi(secexp) + checksum = sum(bytearray(mpi_bytes)) & 0xFFFF + return b'\x00' + mpi_bytes + struct.pack('>H', checksum) + + +def sigencode(r, s, _): + return (r, s) + + +def create_signer(signing_key): + def signer(digest): + return signing_key.sign_digest_deterministic( + digest, hashfunc=hashlib.sha256, sigencode=sigencode) + return signer + + +def append_subkeys(seed, signer_func, primary_bytes, user_id, curve_name, creation_time, + signing=True, encryption=True, authentication=False, private=False): + + if signing: + identity = client.create_identity(user_id=user_id, curve_name=curve_name, + keyflag=KeyFlags.SIGN) + sk = derive(seed, identity.get_bip32_address(), curve_name) + cross_signer_func = create_signer(sk) + signing_subkey = protocol.PublicKey(curve_name=curve_name, created=creation_time, + verifying_key=sk.verifying_key, keyflag=KeyFlags.SIGN) + signing_bytes = encode.create_subkey(primary_bytes=primary_bytes, subkey=signing_subkey, + signer_func=signer_func, cross_signer_func=cross_signer_func, + secret_bytes=(pack(sk) if private else b'')) + else: + signing_bytes = b'' + + if encryption: + identity = client.create_identity(user_id=user_id, curve_name=curve_name, + keyflag=KeyFlags.ENCRYPT) + sk = derive(seed, identity.get_bip32_address(), curve_name) + encryption_curve_name = formats.get_ecdh_curve_name(curve_name) + encryption_subkey = protocol.PublicKey(curve_name=encryption_curve_name, + created=creation_time, verifying_key=sk.verifying_key, keyflag=KeyFlags.ENCRYPT) + encryption_bytes = encode.create_subkey(primary_bytes=primary_bytes, + subkey=encryption_subkey, signer_func=signer_func, + secret_bytes=(pack(sk) if private else b'')) + else: + encryption_bytes = b'' + + if authentication: + identity = client.create_identity(user_id=user_id, curve_name=curve_name, + keyflag=KeyFlags.AUTHENTICATE) + sk = derive(seed, identity.get_bip32_address(), curve_name) + authentication_subkey = protocol.PublicKey(curve_name=curve_name, created=creation_time, + verifying_key=sk.verifying_key, keyflag=KeyFlags.AUTHENTICATE) + authentication_bytes = encode.create_subkey(primary_bytes=primary_bytes, + subkey=authentication_subkey, signer_func=signer_func, + secret_bytes=(pack(sk) if private else b'')) + else: + authentication_bytes = b'' + + return primary_bytes + signing_bytes + encryption_bytes + authentication_bytes + +def export_key(user_id, curve_name, + time=0, smartcard=True, private=False, + seed=None, mnemonic=None): + + if seed is None: + assert mnemonic is not None, "seed or mnemonic not provided" + seed = mnemonic_to_seed(mnemonic) + + if smartcard: + certify = KeyFlags.CERTIFY + signing = True + encryption = True + authentication = True + else: + certify = KeyFlags.CERTIFY_AND_SIGN + signing = False + encryption = True + authentication = False + + # primary key for certification/signing + certifying_identity = client.create_identity(user_id=user_id, + curve_name=curve_name, keyflag=certify) + + sk = derive(seed, certifying_identity.get_bip32_address(), curve_name) + certifying_signer_func = create_signer(sk) + + primary = protocol.PublicKey( + curve_name=curve_name, created=time, + verifying_key=sk.verifying_key, keyflag=certify) + primary_bytes = encode.create_primary(user_id=user_id, + pubkey=primary, + signer_func=certifying_signer_func, + secret_bytes=(pack(sk) if private else b'')) + + # subkeys + result = append_subkeys(seed, certifying_signer_func, primary_bytes, user_id, curve_name, time, + signing=signing, encryption=encryption, authentication=authentication, private=private) + + if private: + return protocol.armor(result, 'PRIVATE KEY BLOCK') + else: + return protocol.armor(result, 'PUBLIC KEY BLOCK') + + +def main(): + print(__doc__) + + test_mnemonic = "all all all all all all all all all all all all" + example_seed = mnemonic_to_seed(test_mnemonic).hex() + example_identity = "First Last " + + parser = argparse.ArgumentParser( + description="trezor-agent gpg key recovery tool", + formatter_class=argparse.RawTextHelpFormatter) + + parser.add_argument( + "--mnemonic", type=str, default=None, + help="trezor mnemonic (example: {})".format(test_mnemonic)) + + parser.add_argument( + "--seed", type=str, default=None, + help="trezor seed (example: {})".format(example_seed[:64] + "...")) + + parser.add_argument( + "--identity", type=str, default=None, + help="gpg key user identity (example: '{}')".format(example_identity)) + + parser.add_argument( + "--timestamp", type=int, default=0, + help="timestamp to use (default: 0)") + + parser.add_argument( + "--smartcard", type=bool, default=True, + help="smartcard subkeys (default: true)" + ) + + parser.add_argument( + "--debug", action="store_true", + help="enable debugging") + + args = parser.parse_args() + + if not args.identity: + user_id = input('Enter your identity (example: {}): '.format(example_identity)) # noqa + else: + user_id = args.identity + + if not (args.mnemonic or args.seed): + mnemonic = getpass.getpass('Enter your mnemonic: ') + else: + mnemonic = args.mnemonic + + seed = bytes.fromhex(args.seed) if args.seed else None + + logging.basicConfig( + stream=sys.stderr, + level=logging.DEBUG if args.debug else logging.INFO) + + + curve_name = 'nist256p1' + public_key = export_key(user_id, curve_name, time=args.timestamp, + seed=seed, mnemonic=mnemonic, smartcard=args.smartcard, + private=False) + + private_key = export_key(user_id, curve_name, time=args.timestamp, + seed=seed, mnemonic=mnemonic, smartcard=args.smartcard, + private=True) + + print('Use "gpg2 --import" on the following GPG key blocks:\n') + print(public_key) + print(private_key) + + +if __name__ == '__main__': + main() diff --git a/libagent/device/fake_device.py b/libagent/device/fake_device.py index 82bfe7b9..6a2d81f5 100644 --- a/libagent/device/fake_device.py +++ b/libagent/device/fake_device.py @@ -42,7 +42,7 @@ def connect(self): def close(self): """Close the device.""" - def pubkey(self, identity, ecdh=False): + def pubkey(self, identity): """Return public key.""" _verify_support(identity) data = self.vk.to_string() diff --git a/libagent/device/interface.py b/libagent/device/interface.py index a21aad77..a7616312 100644 --- a/libagent/device/interface.py +++ b/libagent/device/interface.py @@ -9,6 +9,7 @@ import unidecode from .. import formats, util +from ..formats import KeyFlags, keyflag_to_index log = logging.getLogger(__name__) @@ -62,14 +63,16 @@ class DeviceError(Error): class Identity: """Represent SLIP-0013 identity, together with a elliptic curve choice.""" - def __init__(self, identity_str, curve_name): + def __init__(self, identity_str, curve_name, keyflag): """Configure for specific identity and elliptic curve usage.""" self.identity_dict = string_to_identity(identity_str) + self.identity_dict['index'] = keyflag_to_index(keyflag) self.curve_name = curve_name + self.keyflag = keyflag def items(self): """Return a copy of identity_dict items.""" - return [(k, unidecode.unidecode(v)) + return [(k, v if isinstance(v, int) else unidecode.unidecode(v)) for k, v in self.identity_dict.items()] def to_bytes(self): @@ -79,28 +82,41 @@ def to_bytes(self): def to_string(self): """Return identity serialized to string.""" - return '<{}|{}>'.format(identity_to_string(self.identity_dict), self.curve_name) + return u'<{}|{}|{}>'.format(self.identity_dict['index'], + identity_to_string(self.identity_dict), + self.curve_name) - def get_bip32_address(self, ecdh=False): + def get_bip32_address(self): """Compute BIP32 derivation address according to SLIP-0013/0017.""" - index = struct.pack('L', to_hash.tell()) @@ -123,6 +142,9 @@ def _parse_signature(stream): if embedded: log.debug('embedded sigs: %s', embedded) p['embedded'] = embedded + key_id = _parse_key_id(p['unhashed_subpackets']) + if key_id: + p['key_id'] = key_id p['hash_prefix'] = stream.readfmt('2s') if p['pubkey_alg'] in ECDSA_ALGO_IDS: @@ -182,7 +204,8 @@ def _parse_pubkey(stream, packet_type='pubkey'): packet_data = packet.getvalue() data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) + packet_data) - p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:] + p['fingerprint'] = hashlib.sha1(data_to_hash).digest() + p['key_id'] = p['fingerprint'][-8:] p['_to_hash'] = data_to_hash log.debug('key ID: %s', util.hexlify(p['key_id'])) return p @@ -286,7 +309,8 @@ def _parse_pubkey_packets(pubkey_bytes): stream = io.BytesIO(pubkey_bytes) packets_per_pubkey = [] for p in parse_packets(stream): - if p['type'] == 'pubkey': + if p['type'] == 'pubkey' or \ + p['type'] == 'subkey': # Add a new packet list for each pubkey. packets_per_pubkey.append([]) packets_per_pubkey[-1].append(p) @@ -294,12 +318,46 @@ def _parse_pubkey_packets(pubkey_bytes): def load_by_keygrip(pubkey_bytes, keygrip): - """Return public key and first user ID for specified keygrip.""" - for packets in _parse_pubkey_packets(pubkey_bytes): - user_ids = [p for p in packets if p['type'] == 'user_id'] + """Return key, user IDs, and keyflag for specified keygrip.""" + stream = io.BytesIO(pubkey_bytes) + packets = list(parse_packets(stream)) + packets_per_key = [] + user_ids = [] + for p in packets: + if p['type'] == 'pubkey' or \ + p['type'] == 'subkey': + # Add a new packet list for each key. + packets_per_key.append([]) + packets_per_key[-1].append(p) + + for packets in packets_per_key: + user_ids += [p for p in packets if p['type'] == 'user_id'] + + for packets in packets_per_key: + + # Each key packet is followed by a signature packet + # The key packet contains the keygrip + # The signature packet contains the keyflag in the hashed area + # Map them together + mapping = {} + for i in range(0, len(packets)): + p = packets[i] + if p['type'] == 'pubkey' or \ + p['type'] == 'subkey': + kg = p['keygrip'] + for j in range(i+1, len(packets)): + sp = packets[j] + if sp['type'] == 'signature': + keyflag = sp['keyflag'] + mapping[kg] = keyflag + break + else: + continue + for p in packets: if p.get('keygrip') == keygrip: - return p, user_ids + return p, user_ids, mapping[keygrip] + raise KeyError('{} keygrip not found'.format(util.hexlify(keygrip))) diff --git a/libagent/gpg/encode.py b/libagent/gpg/encode.py index 44c3d2e6..5fb35bb7 100644 --- a/libagent/gpg/encode.py +++ b/libagent/gpg/encode.py @@ -4,6 +4,7 @@ from .. import util from . import decode, keyring, protocol +from ..formats import KeyFlags log = logging.getLogger(__name__) @@ -21,7 +22,7 @@ def create_primary(user_id, pubkey, signer_func, secret_bytes=b''): # https://tools.ietf.org/html/rfc4880#section-5.2.3.7 protocol.subpacket_byte(0x0B, 9), # preferred symmetric algo (AES-256) # https://tools.ietf.org/html/rfc4880#section-5.2.3.4 - protocol.subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign) + protocol.subpacket_byte(0x1B, 1), # key flags (certify) # https://tools.ietf.org/html/rfc4880#section-5.2.3.21 protocol.subpacket_bytes(0x15, [8, 9, 10]), # preferred hash # https://tools.ietf.org/html/rfc4880#section-5.2.3.8 @@ -48,7 +49,7 @@ def create_primary(user_id, pubkey, signer_func, secret_bytes=b''): return pubkey_packet + user_id_packet + sign_packet -def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''): +def create_subkey(primary_bytes, subkey, signer_func, cross_signer_func=None, secret_bytes=b''): """Export new subkey to GPG primary key.""" subkey_packet = protocol.packet(tag=(7 if secret_bytes else 14), blob=subkey.data() + secret_bytes) @@ -57,31 +58,32 @@ def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''): data_to_sign = primary['_to_hash'] + subkey.data_to_hash() - if subkey.ecdh: - embedded_sig = None - else: + if subkey.keyflag == KeyFlags.SIGN: + # Primary Key Binding Signature hashed_subpackets = [ - protocol.subpacket_time(subkey.created)] # signature time + protocol.subpacket_time(subkey.created + 1), # signature time + protocol.subpacket_bytes(33, b'\x04' + subkey.fingerprint()) + ] unhashed_subpackets = [ protocol.subpacket(16, subkey.key_id())] # issuer key id embedded_sig = protocol.make_signature( - signer_func=signer_func, + signer_func=cross_signer_func, data_to_sign=data_to_sign, public_algo=subkey.algo_id, sig_type=0x19, hashed_subpackets=hashed_subpackets, unhashed_subpackets=unhashed_subpackets) - # Subkey Binding Signature - - # Key flags: https://tools.ietf.org/html/rfc4880#section-5.2.3.21 - # (certify & sign) (encrypt) - flags = (2) if (not subkey.ecdh) else (4 | 8) + else: + embedded_sig = None + # Subkey Binding Signature hashed_subpackets = [ protocol.subpacket_time(subkey.created), # signature time - protocol.subpacket_byte(0x1B, flags)] + protocol.subpacket_byte(0x1B, subkey.keyflag), + protocol.subpacket_bytes(33, b'\x04' + primary['fingerprint']) + ] unhashed_subpackets = [] unhashed_subpackets.append(protocol.subpacket(16, primary['key_id'])) @@ -100,4 +102,4 @@ def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''): hashed_subpackets=hashed_subpackets, unhashed_subpackets=unhashed_subpackets) sign_packet = protocol.packet(tag=2, blob=signature) - return primary_bytes + subkey_packet + sign_packet + return subkey_packet + sign_packet diff --git a/libagent/gpg/keyring.py b/libagent/gpg/keyring.py index 2260b82d..f9899ffd 100644 --- a/libagent/gpg/keyring.py +++ b/libagent/gpg/keyring.py @@ -222,7 +222,7 @@ def get_keygrip(user_id, sp=subprocess): def gpg_version(sp=subprocess): - """Get a keygrip of the primary GPG key of the specified user.""" + """Get the version of the GPG binary.""" args = gpg_command(['--version']) output = check_output(args=args, sp=sp) line = output.split(b'\n', maxsplit=1)[0] # b'gpg (GnuPG) 2.1.11' diff --git a/libagent/gpg/protocol.py b/libagent/gpg/protocol.py index f722b517..68c40f4a 100644 --- a/libagent/gpg/protocol.py +++ b/libagent/gpg/protocol.py @@ -8,6 +8,7 @@ import nacl.signing from .. import formats, util +from ..formats import KeyFlags log = logging.getLogger(__name__) @@ -190,20 +191,27 @@ def get_curve_name_by_oid(oid): class PublicKey: """GPG representation for public key packets.""" - def __init__(self, curve_name, created, verifying_key, ecdh=False): + def __init__(self, curve_name, created, verifying_key, keyflag=KeyFlags.CERTIFY): """Contruct using a ECDSA VerifyingKey object.""" self.curve_name = curve_name self.curve_info = SUPPORTED_CURVES[curve_name] self.created = int(created) # time since Epoch self.verifying_key = verifying_key - self.ecdh = bool(ecdh) - if ecdh: - self.algo_id = ECDH_ALGO_ID - self.ecdh_packet = b'\x03\x01\x08\x07' - else: + self.keyflag = keyflag + + if keyflag == KeyFlags.CERTIFY or \ + keyflag == KeyFlags.SIGN or \ + keyflag == KeyFlags.AUTHENTICATE or \ + keyflag == KeyFlags.CERTIFY_AND_SIGN: + self.algo_id = self.curve_info['algo_id'] self.ecdh_packet = b'' + elif keyflag == KeyFlags.ENCRYPT: + + self.algo_id = ECDH_ALGO_ID + self.ecdh_packet = b'\x03\x01\x08\x07' + def keygrip(self): """Compute GPG keygrip of the verifying key.""" return self.curve_info['keygrip'](self.verifying_key) @@ -222,12 +230,12 @@ def data_to_hash(self): """Data for digest computation.""" return b'\x99' + util.prefix_len('>H', self.data()) - def _fingerprint(self): + def fingerprint(self): return hashlib.sha1(self.data_to_hash()).digest() def key_id(self): """Short (8 byte) GPG key ID.""" - return self._fingerprint()[-8:] + return self.fingerprint()[-8:] def __repr__(self): """Short (8 hexadecimal digits) GPG key ID.""" diff --git a/libagent/gpg/tests/49717CC09A348E5DAAF345903784A7264F609C5F.gpg b/libagent/gpg/tests/49717CC09A348E5DAAF345903784A7264F609C5F.gpg new file mode 100644 index 00000000..29677788 Binary files /dev/null and b/libagent/gpg/tests/49717CC09A348E5DAAF345903784A7264F609C5F.gpg differ diff --git a/libagent/gpg/tests/test_decode.py b/libagent/gpg/tests/test_decode.py index 7cd240eb..5c686424 100644 --- a/libagent/gpg/tests/test_decode.py +++ b/libagent/gpg/tests/test_decode.py @@ -1,5 +1,8 @@ +import binascii +import glob import io import pathlib +import os import pytest @@ -60,6 +63,42 @@ def test_load_by_keygrip_missing(): with pytest.raises(KeyError): decode.load_by_keygrip(pubkey_bytes=b'', keygrip=b'') +def test_load_by_keygrip(): + # contrib/trezor_agent_recover.py --identity "test@example.com" --timestamp 0 --mnemonic "all all all all all all all all all all all all" + with open(os.path.join(cwd, "49717CC09A348E5DAAF345903784A7264F609C5F.gpg"), 'rb') as f: + + # Primary Key + p_kg = binascii.unhexlify("930E34F72D88B9BF4FA5372D7ED493D0DC738DAD") + data, uids, keyflag = decode.load_by_keygrip(f.read(), p_kg) + f.seek(0) + + assert data['keygrip'] == p_kg + assert keyflag == 1 + + # Signing Subkey + s_kg = binascii.unhexlify("94F380990548D9644271F149D4FDF0D808F54127") + data, uids, keyflag = decode.load_by_keygrip(f.read(), s_kg) + f.seek(0) + + assert data['keygrip'] == s_kg + assert keyflag == 2 + + # Authentication Subkey + a_kg = binascii.unhexlify("4D2EB4C79914B876AAFA941C7FB51B72E4D348BD") + data, uids, keyflag = decode.load_by_keygrip(f.read(), a_kg) + f.seek(0) + + assert data['keygrip'] == a_kg + assert keyflag == 32 # 0x20 + + # Encryption Subkey + e_kg = binascii.unhexlify("CCAB28DB355C0993CF6E6F994066B08A7F873127") + data, uids, keyflag = decode.load_by_keygrip(f.read(), e_kg) + f.seek(0) + + assert data['keygrip'] == e_kg + assert keyflag == 12 # 0x4 | 0x8 + def test_keygrips(): pubkey_bytes = (cwd / "romanz-pubkey.gpg").open("rb").read() @@ -70,7 +109,7 @@ def test_keygrips(): ] for keygrip in keygrips: - pubkey_dict, user_ids = decode.load_by_keygrip(pubkey_bytes, keygrip) + pubkey_dict, user_ids, keyflag = decode.load_by_keygrip(pubkey_bytes, keygrip) assert pubkey_dict['keygrip'] == keygrip assert [u['value'] for u in user_ids] == [ b'Roman Zeyde ', diff --git a/libagent/gpg/tests/test_protocol.py b/libagent/gpg/tests/test_protocol.py index 233be13d..ea941d09 100644 --- a/libagent/gpg/tests/test_protocol.py +++ b/libagent/gpg/tests/test_protocol.py @@ -3,6 +3,7 @@ import pytest from ... import formats +from ...formats import KeyFlags from .. import protocol @@ -64,20 +65,20 @@ def signer_func(digest): b'\x00\x00\x00\x01\x00\x00\xd0\xe5\x00\x03\x07\x00\x04\x08') -def test_nist256p1(): +def test_nist256p1_signing(): sk = ecdsa.SigningKey.from_secret_exponent(secexp=1, curve=ecdsa.NIST256p) vk = sk.get_verifying_key() pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256, - created=42, verifying_key=vk) + created=42, verifying_key=vk, keyflag=KeyFlags.SIGN) assert repr(pk) == 'GPG public key nist256p1/F82361D9' assert pk.keygrip() == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0' -def test_nist256p1_ecdh(): +def test_nist256p1_encryption(): sk = ecdsa.SigningKey.from_secret_exponent(secexp=1, curve=ecdsa.NIST256p) vk = sk.get_verifying_key() pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256, - created=42, verifying_key=vk, ecdh=True) + created=42, verifying_key=vk, keyflag=KeyFlags.ENCRYPT) assert repr(pk) == 'GPG public key nist256p1/5811DF46' assert pk.keygrip() == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0' diff --git a/libagent/ssh/__init__.py b/libagent/ssh/__init__.py index 5d6734ea..8eb9f692 100644 --- a/libagent/ssh/__init__.py +++ b/libagent/ssh/__init__.py @@ -17,6 +17,7 @@ from .. import device, formats, server, util from . import client, protocol +from ..formats import KeyFlags log = logging.getLogger(__name__) @@ -267,7 +268,7 @@ def main(device_type): identities = list(parse_config(contents)) else: identities = [device.interface.Identity( - identity_str=args.identity, curve_name=args.ecdsa_curve_name)] + identity_str=args.identity, curve_name=args.ecdsa_curve_name, keyflag=KeyFlags.CERTIFY_AND_SIGN)] for index, identity in enumerate(identities): identity.identity_dict['proto'] = 'ssh' log.info('identity #%d: %s', index, identity.to_string()) diff --git a/libagent/ssh/tests/test_client.py b/libagent/ssh/tests/test_client.py index 9c982244..4092c33e 100644 --- a/libagent/ssh/tests/test_client.py +++ b/libagent/ssh/tests/test_client.py @@ -1,4 +1,5 @@ import io +from ...formats import KeyFlags import mock import pytest @@ -12,7 +13,7 @@ b'\xdd\xbc+\xfar~\x9dAis') PUBKEY_TEXT = ('ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzd' 'HAyNTYAAABBBNgotaZgvnQwlaw6Wztd3Cy93D/XwOzdvCv6cn6dQWlzNMEQeW' - 'VUfhvrGljR2Z/CMRONY6ejB+9PnpUOPuzYqi8= \n') + 'VUfhvrGljR2Z/CMRONY6ejB+9PnpUOPuzYqi8= <0|localhost:22|nist256p1>\n') class MockDevice(device.interface.Device): # pylint: disable=abstract-method @@ -24,7 +25,7 @@ def package_name(cls): def connect(self): return mock.Mock() - def pubkey(self, identity, ecdh=False): # pylint: disable=unused-argument + def pubkey(self, identity): # pylint: disable=unused-argument assert self.conn return formats.decompress_pubkey(pubkey=PUBKEY, curve_name=identity.curve_name) @@ -51,7 +52,7 @@ def sign(self, identity, blob): def test_ssh_agent(): identity = device.interface.Identity(identity_str='localhost:22', - curve_name=CURVE) + curve_name=CURVE, keyflag=KeyFlags.CERTIFY) c = client.Client(device=MockDevice()) assert c.export_public_keys([identity]) == [PUBKEY_TEXT] signature = c.sign_ssh_challenge(blob=BLOB, identity=identity) diff --git a/libagent/ssh/tests/test_protocol.py b/libagent/ssh/tests/test_protocol.py index e226b809..05469665 100644 --- a/libagent/ssh/tests/test_protocol.py +++ b/libagent/ssh/tests/test_protocol.py @@ -1,3 +1,4 @@ +from ...formats import KeyFlags import mock import pytest @@ -25,7 +26,7 @@ def fake_connection(keys, signer): def test_list(): key = formats.import_public_key(NIST256_KEY) - key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1') + key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1', KeyFlags.CERTIFY) h = protocol.Handler(fake_connection(keys=[key], signer=None)) reply = h.handle(LIST_MSG) assert reply == LIST_NIST256_REPLY @@ -45,14 +46,14 @@ def test_unsupported(): def ecdsa_signer(identity, blob): - assert identity.to_string() == '' + assert identity.to_string() == '<0|ssh://localhost|nist256p1>' assert blob == NIST256_BLOB return NIST256_SIG def test_ecdsa_sign(): key = formats.import_public_key(NIST256_KEY) - key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1') + key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1', KeyFlags.CERTIFY) h = protocol.Handler(fake_connection(keys=[key], signer=ecdsa_signer)) reply = h.handle(NIST256_SIGN_MSG) assert reply == NIST256_SIGN_REPLY @@ -66,12 +67,12 @@ def test_sign_missing(): def test_sign_wrong(): def wrong_signature(identity, blob): - assert identity.to_string() == '' + assert identity.to_string() == '<0|ssh://localhost|nist256p1>' assert blob == NIST256_BLOB return b'\x00' * 64 key = formats.import_public_key(NIST256_KEY) - key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1') + key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1', KeyFlags.CERTIFY) h = protocol.Handler(fake_connection(keys=[key], signer=wrong_signature)) with pytest.raises(ValueError): h.handle(NIST256_SIGN_MSG) @@ -82,7 +83,7 @@ def cancel_signature(identity, blob): # pylint: disable=unused-argument raise IOError() key = formats.import_public_key(NIST256_KEY) - key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1') + key['identity'] = device.interface.Identity('ssh://localhost', 'nist256p1', KeyFlags.CERTIFY) h = protocol.Handler(fake_connection(keys=[key], signer=cancel_signature)) assert h.handle(NIST256_SIGN_MSG) == protocol.failure() @@ -96,14 +97,14 @@ def cancel_signature(identity, blob): # pylint: disable=unused-argument def ed25519_signer(identity, blob): - assert identity.to_string() == '' + assert identity.to_string() == '<0|ssh://localhost|ed25519>' assert blob == ED25519_BLOB return ED25519_SIG def test_ed25519_sign(): key = formats.import_public_key(ED25519_KEY) - key['identity'] = device.interface.Identity('ssh://localhost', 'ed25519') + key['identity'] = device.interface.Identity('ssh://localhost', 'ed25519', KeyFlags.CERTIFY) h = protocol.Handler(fake_connection(keys=[key], signer=ed25519_signer)) reply = h.handle(ED25519_SIGN_MSG) assert reply == ED25519_SIGN_REPLY diff --git a/libagent/tests/test_interface.py b/libagent/tests/test_interface.py index 5df6ee0d..3d06f2d2 100644 --- a/libagent/tests/test_interface.py +++ b/libagent/tests/test_interface.py @@ -1,7 +1,8 @@ +from ..formats import KeyFlags from ..device import interface def test_unicode(): - i = interface.Identity('ko\u017eu\u0161\u010dek@host', 'ed25519') + i = interface.Identity(u'ko\u017eu\u0161\u010dek@host', 'ed25519', KeyFlags.CERTIFY) assert i.to_bytes() == b'kozuscek@host' - assert sorted(i.items()) == [('host', 'host'), ('user', 'kozuscek')] + assert sorted(i.items()) == [('host', 'host'), ('index', 0), ('user', 'kozuscek')]