-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcheck_kernel_commits.py
More file actions
355 lines (317 loc) · 16.1 KB
/
check_kernel_commits.py
File metadata and controls
355 lines (317 loc) · 16.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
#!/usr/bin/env python3
import argparse
import os
import re
import subprocess
import sys
import textwrap
from typing import Optional
from ciq_helpers import (
CIQ_find_fixes_in_mainline,
CIQ_get_commit_body,
CIQ_hash_exists_in_ref,
CIQ_run_git,
)
def ref_exists(repo, ref):
"""Return True if the given ref exists in the repository, False otherwise."""
try:
CIQ_run_git(repo, ["rev-parse", "--verify", "--quiet", ref])
return True
except RuntimeError:
return False
def get_pr_commits(repo, pr_branch, base_branch):
"""Get a list of commit SHAs that are in the PR branch but not in the base branch."""
output = CIQ_run_git(repo, ["rev-list", f"{base_branch}..{pr_branch}"])
return output.strip().splitlines()
def get_short_hash_and_subject(repo, sha):
"""Get the abbreviated commit hash and subject for a given commit SHA."""
output = CIQ_run_git(repo, ["log", "-n", "1", "--format=%h%x00%s", sha]).strip()
short_hash, subject = output.split("\x00", 1)
return short_hash, subject
def hash_exists_in_mainline(repo, upstream_ref, hash_):
"""
Return True if hash_ is reachable from upstream_ref (i.e., is an ancestor of it).
"""
return CIQ_hash_exists_in_ref(repo, upstream_ref, hash_)
def wrap_paragraph(text, width=80, initial_indent="", subsequent_indent=""):
"""Wrap a paragraph of text to the specified width and indentation."""
wrapper = textwrap.TextWrapper(
width=width,
initial_indent=initial_indent,
subsequent_indent=subsequent_indent,
break_long_words=False,
break_on_hyphens=False,
)
return wrapper.fill(text)
def extract_cve_from_message(msg):
"""Extract CVE reference from commit message. Returns CVE ID or None.
Only matches 'cve CVE-2025-12345', ignores 'cve-bf' and 'cve-pre' variants."""
match = re.search(r"(?<!\S)cve\s+(CVE-\d{4}-\d+)", msg, re.IGNORECASE)
if match:
return match.group(1).upper()
return None
def run_cve_search(vulns_repo, kernel_repo, query) -> tuple[bool, Optional[str]]:
"""
Run the cve_search script from the vulns repo.
Returns (success, output_message).
"""
cve_search_path = os.path.join(vulns_repo, "scripts", "cve_search")
if not os.path.exists(cve_search_path):
raise RuntimeError(f"cve_search script not found at {cve_search_path}")
env = os.environ.copy()
env["CVEKERNELTREE"] = kernel_repo
result = subprocess.run([cve_search_path, query], text=True, capture_output=True, check=False, env=env)
# cve_search outputs results to stdout
return result.returncode == 0, result.stdout.strip()
def main():
parser = argparse.ArgumentParser(description="Check upstream references and Fixes: tags in PR branch commits.")
parser.add_argument("--repo", help="Path to the git repo", required=True)
parser.add_argument("--pr_branch", help="Name of the PR branch", required=True)
parser.add_argument("--base_branch", help="Name of the base branch", required=True)
parser.add_argument("--markdown", action="store_true", help="Output in Markdown, suitable for GitHub PR comments")
parser.add_argument(
"--upstream-ref",
default="origin/kernel-mainline",
help="Reference to upstream mainline branch (default: origin/kernel-mainline)",
)
parser.add_argument(
"--check-cves",
action="store_true",
help="Check that CVE references in commit messages match upstream commit hashes",
)
parser.add_argument(
"--vulns-dir", default="../vulns", help="Path to the kernel vulnerabilities repo (default: ../vulns)"
)
args = parser.parse_args()
upstream_ref = args.upstream_ref
# Set up vulns repo path if CVE checking is enabled
vulns_repo = None
if args.check_cves:
vulns_repo = args.vulns_dir
vulns_repo_url = "https://git.kernel.org/pub/scm/linux/security/vulns.git"
if os.path.exists(vulns_repo):
# Repository exists, update it with git pull
try:
CIQ_run_git(vulns_repo, ["pull"])
except RuntimeError as e:
print(f"WARNING: Failed to update vulns repo: {e}")
print("Continuing with existing repository...")
else:
# Repository doesn't exist, clone it
try:
result = subprocess.run(
["git", "clone", vulns_repo_url, vulns_repo], text=True, capture_output=True, check=False
)
if result.returncode != 0:
print(f"ERROR: Failed to clone vulns repo: {result.stderr}")
sys.exit(1)
except Exception as e:
print(f"ERROR: Failed to clone vulns repo: {e}")
sys.exit(1)
# Validate that all required refs exist before continuing
missing_refs = []
for refname, refval in [
("upstream reference", upstream_ref),
("PR branch", args.pr_branch),
("base branch", args.base_branch),
]:
if not ref_exists(args.repo, refval):
missing_refs.append((refname, refval))
if missing_refs:
for refname, refval in missing_refs:
print(f"ERROR: The {refname} '{refval}' does not exist in the given repo.")
print("Please fetch or create the required references before running this script.")
sys.exit(1)
pr_commits = get_pr_commits(args.repo, args.pr_branch, args.base_branch)
if not pr_commits:
if args.markdown:
print("> ℹ️ **No commits found in PR branch that are not in base branch.**")
else:
print("No commits found in PR branch that are not in base branch.")
sys.exit(0)
any_findings = False
out_lines = []
for sha in reversed(pr_commits): # oldest first
short_hash, subject = get_short_hash_and_subject(args.repo, sha)
pr_commit_desc = f"{short_hash} ({subject})"
msg = CIQ_get_commit_body(args.repo, sha)
upstream_hashes = re.findall(r"^commit\s+([0-9a-fA-F]{40})", msg, re.MULTILINE)
for uhash in upstream_hashes:
short_uhash = uhash[:12]
# Ensure the referenced commit in the PR actually exists in the upstream ref.
exists = hash_exists_in_mainline(args.repo, upstream_ref, uhash)
if not exists:
any_findings = True
if args.markdown:
out_lines.append(
f"- ❗ PR commit `{pr_commit_desc}` references upstream commit \n"
f" `{short_uhash}` which does **not** exist in the upstream Linux kernel.\n"
)
else:
prefix = "[NOTFOUND] "
header = (
f"{prefix}PR commit {pr_commit_desc} references upstream commit "
f"{short_uhash}, which does not exist in kernel-mainline."
)
out_lines.append(
wrap_paragraph(
header, width=80, initial_indent="", subsequent_indent=" " * len(prefix)
) # spaces for '[NOTFOUND] '
)
out_lines.append("") # blank line
continue
fixes = CIQ_find_fixes_in_mainline(args.repo, args.pr_branch, upstream_ref, uhash)
if fixes:
any_findings = True
# Check CVEs for bugfix commits if enabled
fix_cves = {}
if args.check_cves:
for fix_hash, fix_display in fixes:
try:
success, cve_output = run_cve_search(vulns_repo, args.repo, fix_hash)
if success:
# Parse the CVE from the result
match = re.search(r"(CVE-\d{4}-\d+)\s+is assigned to git id", cve_output)
if match:
bugfix_cve = match.group(1)
fix_cves[fix_hash] = bugfix_cve
except (RuntimeError, subprocess.SubprocessError) as e:
# Log a warning instead of silently ignoring errors when checking bugfix CVEs
print(f"Warning: Failed to check CVE for bugfix commit {fix_hash}: {e}", file=sys.stderr)
# Build the fixes display text with CVE info
fixes_lines = []
for fix_hash, display_str in fixes:
if fix_hash in fix_cves:
fixes_lines.append(f"{display_str} ({fix_cves[fix_hash]})")
else:
fixes_lines.append(display_str)
fixes_text = "\n".join(fixes_lines)
if args.markdown:
fixes_block = " " + fixes_text.replace("\n", "\n ")
out_lines.append(
f"- ⚠️ PR commit `{pr_commit_desc}` references upstream commit \n"
f" `{short_uhash}` which has been referenced by a `Fixes:` tag in the upstream \n"
f" Linux kernel:\n\n"
f"```text\n{fixes_block}\n```\n"
)
else:
prefix = "[FIXES] "
header = (
f"{prefix}PR commit {pr_commit_desc} references upstream commit "
f"{short_uhash}, which has Fixes tags:"
)
out_lines.append(
wrap_paragraph(
header, width=80, initial_indent="", subsequent_indent=" " * len(prefix)
) # spaces for '[FIXES] '
)
out_lines.append("") # blank line after 'Fixes tags:'
for line in fixes_text.splitlines():
out_lines.append(" " + line)
out_lines.append("") # blank line
# Check CVE if enabled
if args.check_cves:
cve_id = extract_cve_from_message(msg)
# Check if the upstream commit has a CVE associated with it
try:
success, cve_output = run_cve_search(vulns_repo, args.repo, uhash)
if success:
# Parse the output to get the CVE from the result
# Expected format: "CVE-2024-35962 is assigned to git id
# 65acf6e0501ac8880a4f73980d01b5d27648b956"
match = re.search(r"(CVE-\d{4}-\d+)\s+is assigned to git id", cve_output)
if match:
found_cve = match.group(1)
if cve_id:
# PR commit has a CVE reference - check if it matches
if found_cve != cve_id:
any_findings = True
if args.markdown:
out_lines.append(
f"- ❌ PR commit `{pr_commit_desc}` references `{cve_id}` but \n"
f" upstream commit `{short_uhash}` is associated with `{found_cve}`\n"
)
else:
prefix = "[CVE-MISMATCH] "
header = (
f"{prefix}PR commit {pr_commit_desc} references {cve_id} but "
f"upstream commit {short_uhash} is associated with {found_cve}"
)
out_lines.append(
wrap_paragraph(
header, width=80, initial_indent="", subsequent_indent=" " * len(prefix)
)
)
out_lines.append("") # blank line
else:
# PR commit doesn't reference a CVE, but upstream has one
any_findings = True
if args.markdown:
out_lines.append(
f"- ⚠️ PR commit `{pr_commit_desc}` does not reference a CVE but \n"
f" upstream commit `{short_uhash}` is associated with `{found_cve}`\n"
)
else:
prefix = "[CVE-MISSING] "
header = (
f"{prefix}PR commit {pr_commit_desc} does not reference a CVE but "
f"upstream commit {short_uhash} is associated with {found_cve}"
)
out_lines.append(
wrap_paragraph(
header, width=80, initial_indent="", subsequent_indent=" " * len(prefix)
)
)
out_lines.append("") # blank line
else:
# The upstream commit has no CVE assigned
if cve_id:
# PR commit claims a CVE but upstream has none
any_findings = True
if args.markdown:
out_lines.append(
f"- ❌ PR commit `{pr_commit_desc}` references `{cve_id}` but \n"
f" upstream commit `{short_uhash}` has no CVE assigned\n"
)
else:
prefix = "[CVE-NOTFOUND] "
header = (
f"{prefix}PR commit {pr_commit_desc} references {cve_id} but "
f"upstream commit {short_uhash} has no CVE assigned"
)
out_lines.append(
wrap_paragraph(
header, width=80, initial_indent="", subsequent_indent=" " * len(prefix)
)
)
out_lines.append("") # blank line
except (subprocess.SubprocessError, OSError) as e:
# Error running cve_search
if cve_id:
any_findings = True
if args.markdown:
out_lines.append(
f"- ⚠️ PR commit `{pr_commit_desc}` references `{cve_id}` but \n"
f" failed to verify: {e}\n"
)
else:
prefix = "[CVE-ERROR] "
header = f"{prefix}PR commit {pr_commit_desc} references {cve_id} but failed to verify: {e}"
out_lines.append(
wrap_paragraph(header, width=80, initial_indent="", subsequent_indent=" " * len(prefix))
)
out_lines.append("") # blank line
if any_findings:
if args.markdown:
print("## :mag: Upstream Linux Kernel Commit Check\n")
print("\n".join(out_lines))
print("*This is an automated message from the kernel commit checker workflow.*")
else:
print("\n".join(out_lines))
else:
if args.markdown:
print("> ✅ **All referenced commits exist upstream and have no Fixes: tags.**")
else:
print("All referenced commits exist upstream and have no Fixes: tags.")
if __name__ == "__main__":
main()