|
| 1 | +#!/usr/bin/python3 |
| 2 | + |
| 3 | +''' |
| 4 | + Implements signing with RoboSignatory via fedora-messaging. To run this, |
| 5 | + one needs credentials to the restricted Fedora broker. In a developer |
| 6 | + workflow, one can also run it (and RoboSignatory) against a local rabbitmq |
| 7 | + instance. For more details, see: |
| 8 | +
|
| 9 | + https://fedora-messaging.readthedocs.io/en/latest/quick-start.html |
| 10 | +''' |
| 11 | + |
| 12 | +import argparse |
| 13 | +import gi |
| 14 | +import os |
| 15 | +import shutil |
| 16 | +import subprocess |
| 17 | +import sys |
| 18 | +import tarfile |
| 19 | +import tempfile |
| 20 | +import threading |
| 21 | +import uuid |
| 22 | + |
| 23 | +from multiprocessing import Process |
| 24 | + |
| 25 | +import boto3 |
| 26 | + |
| 27 | +from fedora_messaging import message |
| 28 | +from fedora_messaging.api import publish, twisted_consume |
| 29 | +from fedora_messaging.config import conf |
| 30 | + |
| 31 | +from twisted.internet import reactor |
| 32 | + |
| 33 | +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
| 34 | +from cosalib.meta import GenericBuildMeta as Meta |
| 35 | +from cosalib.builds import Builds |
| 36 | +from cosalib.cmdlib import ( |
| 37 | + get_basearch, |
| 38 | + sha256sum_file, |
| 39 | + import_ostree_commit) |
| 40 | + |
| 41 | +gi.require_version('OSTree', '1.0') |
| 42 | +from gi.repository import Gio, OSTree |
| 43 | + |
| 44 | +# these files are part of fedora-messaging |
| 45 | +FEDORA_MESSAGING_PUBLIC_CONF = { |
| 46 | + 'prod': '/etc/fedora-messaging/fedora.toml', |
| 47 | + 'stg': '/etc/fedora-messaging/fedora.stg.toml', |
| 48 | +} |
| 49 | +FEDORA_MESSAGING_TOPIC_PREFIX = { |
| 50 | + 'prod': 'org.fedoraproject.prod.coreos.build.request', |
| 51 | + 'stg': 'org.fedoraproject.stg.coreos.build.request', |
| 52 | +} |
| 53 | +ROBOSIGNATORY_REQUEST_TIMEOUT_SEC = 60 * 10 |
| 54 | + |
| 55 | +fedenv = 'prod' |
| 56 | + |
| 57 | + |
| 58 | +def main(): |
| 59 | + args = parse_args() |
| 60 | + if args.stg: |
| 61 | + global fedenv |
| 62 | + fedenv = 'stg' |
| 63 | + args.func(args) |
| 64 | + |
| 65 | + |
| 66 | +def parse_args(): |
| 67 | + parser = argparse.ArgumentParser() |
| 68 | + parser.add_argument("--build", help="Build ID", default='latest') |
| 69 | + subparsers = parser.add_subparsers(dest='cmd', title='subcommands') |
| 70 | + subparsers.required = True |
| 71 | + |
| 72 | + robosig = subparsers.add_parser('robosignatory', help='sign with ' |
| 73 | + 'RoboSignatory via fedora-messaging') |
| 74 | + robosig.add_argument("--s3", metavar='<BUCKET>[/PREFIX]', required=True, |
| 75 | + help="bucket and prefix to S3 builds/ dir") |
| 76 | + group = robosig.add_mutually_exclusive_group(required=True) |
| 77 | + group.add_argument("--ostree", help="sign commit", action='store_true') |
| 78 | + group.add_argument("--images", help="sign images", action='store_true') |
| 79 | + robosig.add_argument("--extra-fedmsg-keys", action='append', |
| 80 | + metavar='KEY=VAL', default=[], |
| 81 | + help="extra keys to inject into messages") |
| 82 | + robosig.add_argument("--fedmsg-conf", metavar='CONFIG.TOML', |
| 83 | + help="fedora-messaging config file for publishing") |
| 84 | + robosig.add_argument("--stg", action='store_true', |
| 85 | + help="target the stg infra rather than prod") |
| 86 | + robosig.add_argument("--gpgkeypath", help="path to directory containing " |
| 87 | + "public keys to use for signature verification", |
| 88 | + default="/etc/pki/rpm-gpg") |
| 89 | + robosig.set_defaults(func=cmd_robosignatory) |
| 90 | + |
| 91 | + return parser.parse_args() |
| 92 | + |
| 93 | + |
| 94 | +def cmd_robosignatory(args): |
| 95 | + builds = Builds() |
| 96 | + if args.build == 'latest': |
| 97 | + args.build = builds.get_latest() |
| 98 | + |
| 99 | + s3 = boto3.client('s3') |
| 100 | + args.bucket, args.prefix = get_bucket_and_prefix(args.s3) |
| 101 | + |
| 102 | + args.extra_keys = {} |
| 103 | + for keyval in args.extra_fedmsg_keys: |
| 104 | + key, val = keyval.split('=', 1) # will throw exception if there's no = |
| 105 | + args.extra_keys[key] = val |
| 106 | + |
| 107 | + request = 'ostree-sign' if args.ostree else 'artifacts-sign' |
| 108 | + |
| 109 | + global request_state |
| 110 | + request_state = {"status": "pending"} |
| 111 | + cond = threading.Condition() |
| 112 | + start_consumer_thread(cond, request, { |
| 113 | + 'build_id': args.build, |
| 114 | + 'basearch': get_basearch(), |
| 115 | + **args.extra_keys |
| 116 | + }) |
| 117 | + |
| 118 | + # these two are different enough that they deserve separate handlers |
| 119 | + if args.ostree: |
| 120 | + robosign_ostree(args, s3, cond) |
| 121 | + else: |
| 122 | + assert args.images |
| 123 | + robosign_images(args, s3, cond) |
| 124 | + |
| 125 | + |
| 126 | +def robosign_ostree(args, s3, cond): |
| 127 | + build = Meta(build=args.build) |
| 128 | + builds = Builds() |
| 129 | + builddir = builds.get_build_dir(args.build) |
| 130 | + checksum = build['ostree-commit'] |
| 131 | + |
| 132 | + # Copy commit object to a temporary location. A preferred approach here is |
| 133 | + # to require the pipeline to do a preliminary buildupload and then just |
| 134 | + # point at the final object location instead. Though we'd want |
| 135 | + # https://github.com/coreos/coreos-assembler/issues/668 before doing this |
| 136 | + # so we at least GC on failure. For now, just use a stable path so we |
| 137 | + # clobber previous runs. |
| 138 | + build_dir_commit_obj = os.path.join(builddir, 'ostree-commit-object') |
| 139 | + commit_key = f'{args.prefix}/tmp/ostree-commit-object' |
| 140 | + commitmeta_key = f'{args.prefix}/tmp/ostree-commitmeta-object' |
| 141 | + print(f"Uploading s3://{args.bucket}/{commit_key}") |
| 142 | + s3.upload_file(build_dir_commit_obj, args.bucket, commit_key) |
| 143 | + s3.delete_object(Bucket=args.bucket, Key=commitmeta_key) |
| 144 | + |
| 145 | + send_message(args, 'ostree-sign', { |
| 146 | + 'commit_object': f's3://{args.bucket}/{commit_key}', |
| 147 | + 'checksum': f'sha256:{checksum}' |
| 148 | + }) |
| 149 | + |
| 150 | + validate_response(cond) |
| 151 | + |
| 152 | + # download back sig and verify it in a throwaway repo |
| 153 | + print(f"Verifying OSTree signature") |
| 154 | + with tempfile.TemporaryDirectory(prefix="cosa-sign", dir="tmp") as d: |
| 155 | + repo = OSTree.Repo.new(Gio.File.new_for_path(d)) |
| 156 | + repo.create(OSTree.RepoMode.ARCHIVE) |
| 157 | + |
| 158 | + commit_obj_relpath = f'objects/{checksum[:2]}/{checksum[2:]}.commit' |
| 159 | + commit_obj_path = os.path.join(d, commit_obj_relpath) |
| 160 | + commitmeta_obj_relpath = f'{commit_obj_relpath}meta' |
| 161 | + commitmeta_obj_path = os.path.join(d, commitmeta_obj_relpath) |
| 162 | + |
| 163 | + os.makedirs(os.path.dirname(commit_obj_path), exist_ok=True) |
| 164 | + shutil.copyfile(build_dir_commit_obj, commit_obj_path) |
| 165 | + s3.download_file(args.bucket, commitmeta_key, commitmeta_obj_path) |
| 166 | + |
| 167 | + # this is a bit awkward though the remote API is the only way to point |
| 168 | + # libostree at armored GPG keys |
| 169 | + config = repo.copy_config() |
| 170 | + config.set_string('remote "tmpremote"', 'url', 'https://example.com') |
| 171 | + config.set_string('remote "tmpremote"', 'gpgkeypath', args.gpgkeypath) |
| 172 | + config.set_boolean('remote "tmpremote"', 'gpg-verify', True) |
| 173 | + repo.write_config(config) |
| 174 | + # XXX: work around ostree_repo_write_config not reloading remotes too |
| 175 | + repo.reload_config() |
| 176 | + |
| 177 | + result = repo.verify_commit_for_remote(checksum, 'tmpremote') |
| 178 | + assert result.count_all() == 1 |
| 179 | + t = result.get(0, [OSTree.GpgSignatureAttr.FINGERPRINT, |
| 180 | + OSTree.GpgSignatureAttr.USER_NAME, |
| 181 | + OSTree.GpgSignatureAttr.USER_EMAIL, |
| 182 | + OSTree.GpgSignatureAttr.VALID]) |
| 183 | + fp = t.get_child_value(0).get_string() |
| 184 | + name = t.get_child_value(1).get_string() |
| 185 | + email = t.get_child_value(2).get_string() |
| 186 | + valid = t.get_child_value(3).get_boolean() |
| 187 | + msg = (("Valid " if valid else "Invalid ") |
| 188 | + + f"signature from {name} <{email}> ({fp})") |
| 189 | + # allow unknown signatures in stg |
| 190 | + if not valid and fedenv != 'stg': |
| 191 | + raise Exception(msg) |
| 192 | + print(msg) |
| 193 | + |
| 194 | + # ok, it's valid -- add it to the tarfile |
| 195 | + ostree_image = build['images']['ostree'] |
| 196 | + commit_tarfile = os.path.join(builddir, ostree_image['path']) |
| 197 | + with tarfile.open(commit_tarfile, 'a:') as t: |
| 198 | + t.add(commitmeta_obj_path, arcname=commitmeta_obj_relpath) |
| 199 | + ostree_image['size'] = os.path.getsize(commit_tarfile) |
| 200 | + ostree_image['sha256'] = sha256sum_file(commit_tarfile) |
| 201 | + build.write() |
| 202 | + |
| 203 | + # and finally add it to the tmprepo too so that buildextend-(qemu|metal) |
| 204 | + # will pull it: we could just nuke the repo to force a re-untar, but it |
| 205 | + # might nuke a more recent commit if we're not operating on the latest |
| 206 | + import_ostree_commit('tmp/repo', checksum, commit_tarfile, force=True) |
| 207 | + |
| 208 | + |
| 209 | +def robosign_images(args, s3, cond): |
| 210 | + build = Meta(build=args.build) |
| 211 | + builds = Builds() |
| 212 | + builddir = builds.get_build_dir(args.build) |
| 213 | + |
| 214 | + # NB: we just handle the current basearch for now |
| 215 | + full_prefix = f'{args.prefix}/{args.build}/{get_basearch()}' |
| 216 | + |
| 217 | + # collect all the image paths to sign |
| 218 | + artifacts = [{ |
| 219 | + 'file': f's3://{args.bucket}/{full_prefix}/{img["path"]}', |
| 220 | + 'checksum': f'sha256:{img["sha256"]}' |
| 221 | + } for img in build['images'].values()] |
| 222 | + |
| 223 | + send_message(args, 'artifacts-sign', {'artifacts': artifacts}) |
| 224 | + validate_response(cond) |
| 225 | + |
| 226 | + # download sigs and verify (use /tmp to avoid gpg hitting ENAMETOOLONG) |
| 227 | + with tempfile.TemporaryDirectory(prefix="cosa-sign-") as d: |
| 228 | + def gpg(*args): |
| 229 | + subprocess.check_call(['gpg', '--homedir', d, *args]) |
| 230 | + |
| 231 | + with os.scandir(args.gpgkeypath) as it: |
| 232 | + keys = [entry.path for entry in it if entry.is_file()] |
| 233 | + gpg('--quiet', '--import', *keys) |
| 234 | + |
| 235 | + for img in build['images'].values(): |
| 236 | + sig_s3_key = f'{full_prefix}/{img["path"]}.sig' |
| 237 | + |
| 238 | + tmp_sig_path = f'tmp/{img["path"]}.sig' |
| 239 | + s3.download_file(args.bucket, sig_s3_key, tmp_sig_path) |
| 240 | + |
| 241 | + local_artifact = f'{builddir}/{img["path"]}' |
| 242 | + |
| 243 | + print(f"Verifying signature for {local_artifact}") |
| 244 | + try: |
| 245 | + gpg('--verify', tmp_sig_path, local_artifact) |
| 246 | + except subprocess.CalledProcessError as e: |
| 247 | + # allow unknown signatures in stg |
| 248 | + if fedenv != 'stg': |
| 249 | + raise e |
| 250 | + |
| 251 | + # move into final location |
| 252 | + os.rename(tmp_sig_path, f'{local_artifact}.sig') |
| 253 | + |
| 254 | + # and make S3 object public (XXX: fix robosignatory for this?) |
| 255 | + s3.put_object_acl(Bucket=args.bucket, Key=sig_s3_key, |
| 256 | + ACL='public-read') |
| 257 | + |
| 258 | + |
| 259 | +def get_bucket_and_prefix(path): |
| 260 | + split = path.split("/", 1) |
| 261 | + if len(split) == 1: |
| 262 | + return (split[0], "") |
| 263 | + return split |
| 264 | + |
| 265 | + |
| 266 | +def get_request_topic(request): |
| 267 | + return f'{FEDORA_MESSAGING_TOPIC_PREFIX[fedenv]}.{request}' |
| 268 | + |
| 269 | + |
| 270 | +def get_request_finished_topic(request): |
| 271 | + return get_request_topic(request) + '.finished' |
| 272 | + |
| 273 | + |
| 274 | +def send_message(args, request, body): |
| 275 | + print(f"Sending {request} request for build {args.build}") |
| 276 | + # This is a bit hacky; we fork to publish the message here so that we can |
| 277 | + # load the publishing fedora-messaging config. The TL;DR is: we need auth |
| 278 | + # to publish, but we need to use the public endpoint for consuming so we |
| 279 | + # can create temporary queues. |
| 280 | + p = Process(target=send_message_impl, args=(args, request, body)) |
| 281 | + p.start() |
| 282 | + p.join() |
| 283 | + |
| 284 | + |
| 285 | +def send_message_impl(args, request, body): |
| 286 | + if args.fedmsg_conf: |
| 287 | + conf.load_config(args.fedmsg_conf) |
| 288 | + publish(message.Message(topic=get_request_topic(request), body={ |
| 289 | + 'build_id': args.build, |
| 290 | + 'basearch': get_basearch(), |
| 291 | + **args.extra_keys, |
| 292 | + **body |
| 293 | + })) |
| 294 | + |
| 295 | + |
| 296 | +def validate_response(cond): |
| 297 | + with cond: |
| 298 | + print("Waiting for response from RoboSignatory") |
| 299 | + cond.wait_for(lambda: request_state['status'] != 'pending', |
| 300 | + timeout=ROBOSIGNATORY_REQUEST_TIMEOUT_SEC) |
| 301 | + if request_state['status'] == 'pending': |
| 302 | + raise Exception("Timed out waiting for RoboSignatory") |
| 303 | + if request_state['status'].lower() == 'failure': |
| 304 | + # https://pagure.io/robosignatory/pull-request/38 |
| 305 | + if 'failure-message' not in request_state: |
| 306 | + raise Exception("Signing failed") |
| 307 | + raise Exception(f"Signing failed: {request_state['failure-message']}") |
| 308 | + assert request_state['status'].lower() == 'success', str(request_state) |
| 309 | + |
| 310 | + |
| 311 | +def start_consumer_thread(cond, request, filters): |
| 312 | + registered = threading.Event() |
| 313 | + t = threading.Thread(target=watch_finished_messages, |
| 314 | + args=(cond, registered, request, filters), |
| 315 | + daemon=True) |
| 316 | + t.start() |
| 317 | + registered.wait() |
| 318 | + print(f"Successfully started consumer thread") |
| 319 | + |
| 320 | + |
| 321 | +def watch_finished_messages(cond, registered, request, filters): |
| 322 | + def callback(message): |
| 323 | + # XXX: For now, we filter like this. In the future, we'll generate a |
| 324 | + # request_id and just look for that: |
| 325 | + # https://pagure.io/robosignatory/pull-request/37 |
| 326 | + for (k, v) in filters.items(): |
| 327 | + if k not in message.body or message.body[k] != v: |
| 328 | + return |
| 329 | + with cond: |
| 330 | + global request_state |
| 331 | + request_state = message.body |
| 332 | + cond.notify() |
| 333 | + |
| 334 | + queue = str(uuid.uuid4()) |
| 335 | + |
| 336 | + def registered_cb(consumers): |
| 337 | + for consumer in consumers: |
| 338 | + if consumer.queue == queue: |
| 339 | + registered.set() |
| 340 | + break |
| 341 | + |
| 342 | + def error_cb(failure): |
| 343 | + print(f"Consumer hit failure {failure}") |
| 344 | + reactor.stop() # pylint: disable=E1101 |
| 345 | + |
| 346 | + # use the public config for this; see related comment in send_message |
| 347 | + conf.load_config(FEDORA_MESSAGING_PUBLIC_CONF[fedenv]) |
| 348 | + |
| 349 | + bindings = [{ |
| 350 | + 'exchange': 'amq.topic', |
| 351 | + 'queue': queue, |
| 352 | + 'routing_keys': [get_request_finished_topic(request)] |
| 353 | + }] |
| 354 | + queues = { |
| 355 | + queue: { |
| 356 | + "durable": False, |
| 357 | + "auto_delete": True, |
| 358 | + "exclusive": True, |
| 359 | + "arguments": {} |
| 360 | + } |
| 361 | + } |
| 362 | + |
| 363 | + consumers = twisted_consume(callback, bindings=bindings, queues=queues) |
| 364 | + consumers.addCallback(registered_cb) |
| 365 | + consumers.addErrback(error_cb) |
| 366 | + reactor.run(installSignalHandlers=False) # pylint: disable=E1101 |
| 367 | + |
| 368 | + |
| 369 | +if __name__ == '__main__': |
| 370 | + sys.exit(main()) |
0 commit comments