-
Notifications
You must be signed in to change notification settings - Fork 228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fillup missing vulnerabilities summary #1767
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,74 @@ | ||||||||||
# | ||||||||||
# Copyright (c) nexB Inc. and others. All rights reserved. | ||||||||||
# VulnerableCode is a trademark of nexB Inc. | ||||||||||
# SPDX-License-Identifier: Apache-2.0 | ||||||||||
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. | ||||||||||
# See https://github.com/aboutcode-org/vulnerablecode for support or download. | ||||||||||
# See https://aboutcode.org for more information about nexB OSS projects. | ||||||||||
# | ||||||||||
|
||||||||||
import logging | ||||||||||
|
||||||||||
from aboutcode.pipeline import LoopProgress | ||||||||||
from django.db.models import Q | ||||||||||
|
||||||||||
from vulnerabilities.models import Advisory | ||||||||||
from vulnerabilities.models import Vulnerability | ||||||||||
from vulnerabilities.pipelines import VulnerableCodePipeline | ||||||||||
|
||||||||||
|
||||||||||
class FillVulnerabilitySummariesPipeline(VulnerableCodePipeline): | ||||||||||
"""Pipeline to fill missing vulnerability summaries from advisories.""" | ||||||||||
|
||||||||||
pipeline_id = "fill_vulnerability_summaries" | ||||||||||
|
||||||||||
@classmethod | ||||||||||
def steps(cls): | ||||||||||
return (cls.fill_missing_summaries,) | ||||||||||
|
||||||||||
def fill_missing_summaries(self): | ||||||||||
"""Find vulnerabilities without summaries and fill them using advisories with the same aliases.""" | ||||||||||
vulnerabilities_qs = Vulnerability.objects.filter(summary="") | ||||||||||
self.log( | ||||||||||
f"Processing {vulnerabilities_qs.count()} vulnerabilities without summaries", | ||||||||||
level=logging.INFO, | ||||||||||
) | ||||||||||
nvd_importer_advisories = Advisory.objects.filter( | ||||||||||
created_by="nvd_importer", summary__isnull=False | ||||||||||
).exclude(summary="") | ||||||||||
Comment on lines
+36
to
+38
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Null check on summary field is not required since default is
Suggested change
|
||||||||||
self.log( | ||||||||||
f"Found {nvd_importer_advisories.count()} advisories with summaries from NVD importer", | ||||||||||
level=logging.INFO, | ||||||||||
) | ||||||||||
|
||||||||||
progress = LoopProgress(total_iterations=vulnerabilities_qs.count(), logger=self.log) | ||||||||||
|
||||||||||
for vulnerability in progress.iter(vulnerabilities_qs.paginated()): | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should stop using
Suggested change
|
||||||||||
aliases = vulnerability.aliases.values_list("alias", flat=True) | ||||||||||
# get alias that start with CVE- with filter | ||||||||||
alias = aliases.filter(alias__startswith="CVE-").first() | ||||||||||
TG1999 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
# check if the vulnerability has an alias | ||||||||||
if not alias: | ||||||||||
self.log( | ||||||||||
f"Vulnerability {vulnerability.vulnerability_id} has no alias", | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
level=logging.INFO, | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
) | ||||||||||
continue | ||||||||||
|
||||||||||
# check if the vulnerability has an alias that matches an advisory | ||||||||||
matching_advisories = nvd_importer_advisories.filter(Q(aliases__contains=alias)) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't need
Suggested change
|
||||||||||
|
||||||||||
if matching_advisories.exists(): | ||||||||||
# Take the first matching advisory with a summary | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
# get the advisory that was collected the most recently | ||||||||||
best_advisory = matching_advisories.order_by("-date_collected").first() | ||||||||||
# Note: we filtered above to only get non-empty summaries | ||||||||||
vulnerability.summary = best_advisory.summary | ||||||||||
TG1999 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
vulnerability.save() | ||||||||||
self.log( | ||||||||||
f"Updated summary for vulnerability {vulnerability.vulnerability_id}", | ||||||||||
level=logging.INFO, | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is debug log
Suggested change
|
||||||||||
) | ||||||||||
else: | ||||||||||
self.log(f"No advisory found for alias {alias}", level=logging.INFO) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Debug logs should be marked as such, and at the end we should log the count of vulnerabilities successfully populated with summary.
Suggested change
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
# | ||
# Copyright (c) nexB Inc. and others. All rights reserved. | ||
# VulnerableCode is a trademark of nexB Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. | ||
# See https://github.com/aboutcode-org/vulnerablecode for support or download. | ||
# See https://aboutcode.org for more information about nexB OSS projects. | ||
# | ||
|
||
import datetime | ||
from pathlib import Path | ||
|
||
import pytz | ||
from django.test import TestCase | ||
|
||
from vulnerabilities.models import Advisory | ||
from vulnerabilities.models import Alias | ||
from vulnerabilities.models import Vulnerability | ||
from vulnerabilities.pipelines.fill_vulnerability_summary_pipeline import ( | ||
FillVulnerabilitySummariesPipeline, | ||
) | ||
|
||
|
||
class FillVulnerabilitySummariesPipelineTest(TestCase): | ||
def setUp(self): | ||
self.data = Path(__file__).parent.parent / "test_data" | ||
|
||
def test_fill_missing_summaries_from_nvd(self): | ||
""" | ||
Test that vulnerabilities without summaries get them from NVD advisories. | ||
""" | ||
|
||
# Create a vulnerability without a summary | ||
vulnerability = Vulnerability.objects.create( | ||
vulnerability_id="VCID-1234", | ||
summary="", | ||
) | ||
alias = Alias.objects.create(alias="CVE-2024-1234", vulnerability=vulnerability) | ||
|
||
# Create an NVD advisory with a summary | ||
Advisory.objects.create( | ||
summary="Test vulnerability summary", | ||
created_by="nvd_importer", | ||
date_collected=datetime.datetime(2024, 1, 1, tzinfo=pytz.UTC), | ||
aliases=["CVE-2024-1234"], | ||
) | ||
|
||
# Run the pipeline | ||
pipeline = FillVulnerabilitySummariesPipeline() | ||
pipeline.fill_missing_summaries() | ||
|
||
# Check that the vulnerability now has a summary | ||
vulnerability.refresh_from_db() | ||
self.assertEqual(vulnerability.summary, "Test vulnerability summary") | ||
|
||
def test_no_matching_advisory(self): | ||
""" | ||
Test handling of vulnerabilities that have no matching NVD advisory. | ||
""" | ||
# Create a vulnerability without a summary | ||
vulnerability = Vulnerability.objects.create( | ||
vulnerability_id="VCID-1234", | ||
summary="", | ||
) | ||
Alias.objects.create(alias="CVE-2024-1234", vulnerability=vulnerability) | ||
|
||
# Run the pipeline | ||
pipeline = FillVulnerabilitySummariesPipeline() | ||
pipeline.fill_missing_summaries() | ||
|
||
# Check that the vulnerability still has no summary | ||
vulnerability.refresh_from_db() | ||
self.assertEqual(vulnerability.summary, "") | ||
|
||
def test_vulnerability_without_alias(self): | ||
""" | ||
Test handling of vulnerabilities that have no aliases. | ||
""" | ||
|
||
# Create a vulnerability without a summary or alias | ||
vulnerability = Vulnerability.objects.create( | ||
vulnerability_id="VCID-1234", | ||
summary="", | ||
) | ||
|
||
# Run the pipeline | ||
pipeline = FillVulnerabilitySummariesPipeline() | ||
pipeline.fill_missing_summaries() | ||
|
||
# Check that the vulnerability still has no summary | ||
vulnerability.refresh_from_db() | ||
self.assertEqual(vulnerability.summary, "") | ||
|
||
def test_non_nvd_advisory_ignored(self): | ||
""" | ||
Test that advisories from sources other than NVD are ignored. | ||
""" | ||
|
||
# Create a vulnerability without a summary | ||
vulnerability = Vulnerability.objects.create( | ||
vulnerability_id="VCID-1234", | ||
summary="", | ||
) | ||
alias = Alias.objects.create(alias="CVE-2024-1234", vulnerability=vulnerability) | ||
|
||
# Create a non-NVD advisory with a summary | ||
Advisory.objects.create( | ||
summary="Test vulnerability summary", | ||
created_by="other_importer", | ||
date_collected=datetime.datetime(2024, 1, 1, tzinfo=pytz.UTC), | ||
aliases=["CVE-2024-1234"], | ||
) | ||
|
||
# Run the pipeline | ||
pipeline = FillVulnerabilitySummariesPipeline() | ||
pipeline.fill_missing_summaries() | ||
|
||
# Check that the vulnerability still has no summary | ||
vulnerability.refresh_from_db() | ||
self.assertEqual(vulnerability.summary, "") | ||
|
||
def test_multiple_matching_advisories(self): | ||
""" | ||
Test that the most recent matching advisory is used when there are multiple. | ||
""" | ||
vulnerability = Vulnerability.objects.create( | ||
vulnerability_id="VCID-1234", | ||
summary="", | ||
) | ||
alias = Alias.objects.create(alias="CVE-2024-1234", vulnerability=vulnerability) | ||
|
||
# Create two NVD advisories with the same alias | ||
Advisory.objects.create( | ||
summary="First matching advisory", | ||
created_by="nvd_importer", | ||
date_collected=datetime.datetime(2024, 1, 1, tzinfo=pytz.UTC), | ||
aliases=["CVE-2024-1234"], | ||
) | ||
|
||
Advisory.objects.create( | ||
summary="Second matching advisory", | ||
created_by="nvd_importer", | ||
date_collected=datetime.datetime(2024, 1, 2, tzinfo=pytz.UTC), | ||
aliases=["CVE-2024-1234"], | ||
) | ||
|
||
# Run the pipeline | ||
pipeline = FillVulnerabilitySummariesPipeline() | ||
pipeline.fill_missing_summaries() | ||
|
||
# Check that the vulnerability now has the most recent summary | ||
vulnerability.refresh_from_db() | ||
self.assertEqual(vulnerability.summary, "Second matching advisory") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest we use
populate
overfill
, we should also update the file name accordingly.