-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmsys2-repo-verify
executable file
·260 lines (208 loc) · 8.12 KB
/
msys2-repo-verify
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
#!/usr/bin/env python3
"""
This script verifies that all files in a repository are properly signed by trusted keys.
It ensures that:
* all required files in the repo have signatures
* signatures are from keys we know about
* signatures are valid
* checksums of archives are correct (since we are already reading the files)
"""
from __future__ import annotations
import os
import sys
import tempfile
import shutil
import subprocess
import argparse
import logging
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from fastprogress.fastprogress import progress_bar
PACKAGER_KEYS = [
"AD35 1C50 AE08 5775 EB59 333B 5F92 EFC1 A47D 45A1",
"8777 1331 B3F1 FF52 6385 6A6D 974C 8BE4 9078 F532",
"5F94 4B02 7F7F E209 1985 AA2E FA11 531A A0AA 7F57",
]
INSTALLER_KEYS = ["0EBF 782C 5D53 F7E5 FB02 A667 46BD 761F 7A49 B0EC"]
# Files that don't need signatures
NOT_SIGNED = {"lastupdate", "lastsync", "README.txt"}
def is_archive(path: str | Path) -> bool:
path = Path(path)
extensions = {".zst", ".xz", ".gz"}
return path.suffix in extensions or path.resolve().suffix in extensions
def test_archive(path: str | Path) -> bool:
"""Test the integrity of an archive file"""
path = Path(path)
resolved = path.resolve()
if path.suffix == ".zst" or resolved.suffix == ".zst":
cmd = ["zstd", "--quiet", "--test", str(path.resolve())]
elif path.suffix == ".xz" or resolved.suffix == ".xz":
cmd = ["xz", "--quiet", "--test", str(path.resolve())]
elif path.suffix == ".gz" or resolved.suffix == ".gz":
cmd = ["gzip", "--quiet", "--test", str(path.resolve())]
else:
raise Exception(f"Unknown file extension: {path.suffix!r}")
result = subprocess.run(cmd, capture_output=True, check=False, text=True)
return result.returncode == 0
class GpgManager:
def __init__(self, keys: list[str]) -> None:
self.keys = keys
self.gnupghome: str | None = None
self.logger = logging.getLogger("GpgManager")
def __enter__(self) -> GpgManager:
return self
def __exit__(
self, exc_type: type | None, exc_val: Exception | None, exc_tb: object
) -> None:
self.cleanup()
def cleanup(self) -> None:
if self.gnupghome is not None:
shutil.rmtree(self.gnupghome)
def _get_env(self) -> dict[str, str]:
if self.gnupghome is None:
self.gnupghome = tempfile.mkdtemp(prefix="gpg_verify_")
os.chmod(self.gnupghome, 0o700)
env: dict[str, str] = os.environ.copy()
env["GNUPGHOME"] = self.gnupghome
return env
def setup(self) -> None:
self.logger.info("Importing trusted keys")
subprocess.run(
[
"gpg",
"--batch",
"--quiet",
"--keyserver",
"keyserver.ubuntu.com",
"--recv",
]
+ self.keys,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self._get_env(),
text=True,
)
for key in self.keys:
self.logger.debug(f"Setting trust for key: {key}")
subprocess.run(
[
"gpg",
"--quiet",
"--batch",
"--no-tty",
"--command-fd",
"0",
"--expert",
"--edit-key",
key,
"trust",
],
input="5\ny\n",
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self._get_env(),
text=True,
)
def verify_signature(self, sig_path: Path, file_path: Path) -> bool:
result = subprocess.run(
["gpg", "--batch", "--quiet", "--verify", str(sig_path), str(file_path)],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self._get_env(),
text=True,
)
if result.returncode != 0 or not result.stderr.strip().endswith("[ultimate]"):
self.logger.error(f"GPG verification failed for: {file_path}")
self.logger.error(f"GPG error: {result.stderr}")
return False
return True
class SignatureVerifier:
def __init__(self, repo_path: str | Path, gpg_manager: GpgManager) -> None:
self.repo_path: Path = Path(repo_path)
self.gpg_manager: GpgManager = gpg_manager
self.logger = logging.getLogger("SignatureVerifier")
def verify_file(self, filepath: Path) -> tuple[bool, str]:
rel_path: Path = filepath.relative_to(self.repo_path)
self.logger.debug(f"Processing: {rel_path}")
if filepath.name.endswith(".sig"):
base_path: Path = filepath.with_suffix("")
if not base_path.is_file():
return False, "signature file without base file"
return True, "ok"
sig_path: Path = Path(f"{filepath}.sig")
if not sig_path.is_file():
if filepath.name in NOT_SIGNED:
return True, "skipped"
return False, "missing signature file"
if is_archive(filepath):
self.logger.debug(f"Testing archive: {rel_path}")
if not test_archive(filepath):
return False, "invalid archive"
self.logger.debug(f"Verifying signature: {rel_path}")
if self.gpg_manager.verify_signature(sig_path, filepath):
return True, "ok"
else:
return False, "invalid signature"
def verify_repository(self) -> bool:
failed_files: list[tuple[Path, str]] = []
if not self.repo_path.exists():
self.logger.error(f"Repository path does not exist: {self.repo_path}")
raise FileNotFoundError(f"Repository not found: {self.repo_path}")
files: list[Path] = []
for root, _, filenames in os.walk(self.repo_path):
for filename in filenames:
files.append(Path(root) / filename)
self.logger.info(f"Found {len(files)} files to verify")
def verify(filepath: Path) -> tuple[tuple[bool, str], Path]:
return (self.verify_file(filepath), filepath)
with ThreadPoolExecutor(max_workers=os.cpu_count() or 8) as executor:
for result, filepath in progress_bar(
executor.map(verify, files), leave=False, total=len(files)
):
status, message = result
if not status:
failed_files.append((filepath, message))
if failed_files:
self.logger.error("Failed files:")
for filepath, message in failed_files:
self.logger.error(f"{filepath}: {message}")
return False
return True
def main() -> int:
parser = argparse.ArgumentParser(
description="Verify signatures for all files in a repository"
)
parser.add_argument(
"repo",
metavar="REPO",
help="Path to the repository to verify",
)
parser.add_argument(
"--verbose", "-v", action="store_true", help="Enable verbose output"
)
args = parser.parse_args()
log_level: int = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(
level=log_level, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s"
)
logger = logging.getLogger("main")
with GpgManager(PACKAGER_KEYS + INSTALLER_KEYS) as gpg_manager:
gpg_manager.setup()
verifier = SignatureVerifier(args.repo, gpg_manager)
if verifier.verify_repository():
logger.info("All files have valid signatures")
print("\nALL OK! All files have valid signatures.")
return 0
else:
logger.error(
"Some files have missing or invalid signatures, or are corrupt"
)
print(
"\nVERIFICATION FAILED! Some files have missing or invalid signatures, or are corrupt."
)
return 1
if __name__ == "__main__":
sys.exit(main())