-
Notifications
You must be signed in to change notification settings - Fork 228
/
Copy pathfill_vulnerability_summary_pipeline.py
74 lines (61 loc) · 3.04 KB
/
fill_vulnerability_summary_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import logging
from aboutcode.pipeline import LoopProgress
from django.db.models import Q
from vulnerabilities.models import Advisory
from vulnerabilities.models import Vulnerability
from vulnerabilities.pipelines import VulnerableCodePipeline
class FillVulnerabilitySummariesPipeline(VulnerableCodePipeline):
"""Pipeline to fill missing vulnerability summaries from advisories."""
pipeline_id = "fill_vulnerability_summaries"
@classmethod
def steps(cls):
return (cls.fill_missing_summaries,)
def fill_missing_summaries(self):
"""Find vulnerabilities without summaries and fill them using advisories with the same aliases."""
vulnerabilities_qs = Vulnerability.objects.filter(summary="")
self.log(
f"Processing {vulnerabilities_qs.count()} vulnerabilities without summaries",
level=logging.INFO,
)
nvd_importer_advisories = Advisory.objects.filter(
created_by="nvd_importer", summary__isnull=False
).exclude(summary="")
self.log(
f"Found {nvd_importer_advisories.count()} advisories with summaries from NVD importer",
level=logging.INFO,
)
progress = LoopProgress(total_iterations=vulnerabilities_qs.count(), logger=self.log)
for vulnerability in progress.iter(vulnerabilities_qs.paginated()):
aliases = vulnerability.aliases.values_list("alias", flat=True)
# get alias that start with CVE- with filter
alias = aliases.filter(alias__startswith="CVE-").first()
# check if the vulnerability has an alias
if not alias:
self.log(
f"Vulnerability {vulnerability.vulnerability_id} has no alias",
level=logging.INFO,
)
continue
# check if the vulnerability has an alias that matches an advisory
matching_advisories = nvd_importer_advisories.filter(Q(aliases__contains=alias))
if matching_advisories.exists():
# Take the first matching advisory with a summary
# get the advisory that was collected the most recently
best_advisory = matching_advisories.order_by("-date_collected").first()
# Note: we filtered above to only get non-empty summaries
vulnerability.summary = best_advisory.summary
vulnerability.save()
self.log(
f"Updated summary for vulnerability {vulnerability.vulnerability_id}",
level=logging.INFO,
)
else:
self.log(f"No advisory found for alias {alias}", level=logging.INFO)