-
Notifications
You must be signed in to change notification settings - Fork 228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Apache Log4j Advisories #1744
Open
NucleonGodX
wants to merge
6
commits into
aboutcode-org:main
Choose a base branch
from
NucleonGodX:apachelog
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
208a9d5
initial push, add apachelog4importer
NucleonGodX 434f637
codestyle changes
NucleonGodX baac7bd
Merge branch 'main' into apachelog
NucleonGodX 32c9345
suggestions applied, importer converted to pipeline
NucleonGodX 79e83d1
clean-up
NucleonGodX ee6578a
severity_systems and weaknesses updated for importer
NucleonGodX File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,320 @@ | ||
# | ||
# Copyright (c) nexB Inc. and others. All rights reserved. | ||
# VulnerableCode is a trademark of nexB Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. | ||
# See https://github.com/aboutcode-org/vulnerablecode for support or download. | ||
# See https://aboutcode.org for more information about nexB OSS projects. | ||
# | ||
|
||
import logging | ||
import re | ||
from collections import defaultdict | ||
from typing import Iterable | ||
|
||
import pytz | ||
from cyclonedx.model.bom import Bom | ||
from dateutil.parser import parse | ||
from defusedxml import ElementTree as SafeElementTree | ||
from packageurl import PackageURL | ||
from univers.versions import MavenVersion | ||
|
||
from vulnerabilities import severity_systems | ||
from vulnerabilities.importer import AdvisoryData | ||
from vulnerabilities.importer import AffectedPackage | ||
from vulnerabilities.importer import Reference | ||
from vulnerabilities.importer import VulnerabilitySeverity | ||
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline | ||
from vulnerabilities.utils import fetch_response | ||
from vulnerabilities.utils import get_cwe_id | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ApacheLog4jImporterPipeline(VulnerableCodeBaseImporterPipeline): | ||
""" | ||
Import security advisories from Apache Log4j's security database. | ||
""" | ||
|
||
pipeline_id = "apache_log4j_importer" | ||
XML_URL = "https://logging.apache.org/cyclonedx/vdr.xml" | ||
ASF_PAGE_URL = "https://logging.apache.org/security.html" | ||
spdx_license_expression = "Apache-2.0" | ||
license_url = "https://www.apache.org/licenses/" | ||
importer_name = "Apache Log4j Importer" | ||
|
||
version_set = [ | ||
"2.0-beta1", | ||
"2.0-beta2", | ||
"2.0-beta3", | ||
"2.0-beta5", | ||
"2.0-alpha1", | ||
"2.0-beta7", | ||
"2.0-beta8", | ||
"2.0-beta9", | ||
"2.0-rc1", | ||
"2.0-beta4", | ||
"2.0-beta6", | ||
"2.0-rc2", | ||
"2.0", | ||
"2.0.1", | ||
"2.0.2", | ||
"2.1", | ||
"2.2", | ||
"2.3", | ||
"2.3.1", | ||
"2.3.2", | ||
"2.4", | ||
"2.4.1", | ||
"2.5", | ||
"2.6", | ||
"2.6.1", | ||
"2.6.2", | ||
"2.7", | ||
"2.8", | ||
"2.8.1", | ||
"2.8.2", | ||
"2.9.0", | ||
"2.9.1", | ||
"2.10.0", | ||
"2.11.0", | ||
"2.11.1", | ||
"2.11.2", | ||
"2.12.0", | ||
"2.12.1", | ||
"2.12.2", | ||
"2.12.3", | ||
"2.12.4", | ||
"2.13.0", | ||
"2.13.1", | ||
"2.13.2", | ||
"2.13.3", | ||
"2.14.0", | ||
"2.14.1", | ||
"2.15.0", | ||
"2.15.1", | ||
"2.16.0", | ||
"2.17.0", | ||
"2.17.1", | ||
] | ||
|
||
@classmethod | ||
def steps(cls): | ||
""" | ||
Return pipeline steps. | ||
""" | ||
return ( | ||
cls.collect_and_store_advisories, | ||
cls.import_new_advisories, | ||
) | ||
|
||
def advisories_count(self) -> int: | ||
""" | ||
Return total number of advisories. | ||
""" | ||
return 0 | ||
|
||
def collect_advisories(self) -> Iterable[AdvisoryData]: | ||
""" | ||
Collect Apache Log4j advisories from the CycloneDX VDR file. | ||
""" | ||
xml_content = fetch_response(self.XML_URL).content | ||
if not xml_content: | ||
logger.error("No XML content fetched.") | ||
return [] | ||
|
||
cleaned_xml_data = self._clean_xml_data(xml_content) | ||
if not cleaned_xml_data: | ||
logger.error("Failed to clean XML data") | ||
return [] | ||
|
||
bom = Bom.from_xml(SafeElementTree.fromstring(cleaned_xml_data)) | ||
|
||
for vulnerability in bom.vulnerabilities: | ||
if not vulnerability.id: | ||
continue | ||
|
||
yield from self._process_vulnerability(vulnerability) | ||
|
||
def _clean_xml_data(self, xml_content): | ||
""" | ||
Clean XML data by removing XML schema instance attributes. | ||
""" | ||
root = SafeElementTree.fromstring(xml_content) | ||
for elem in root.iter(): | ||
attribs_to_remove = [ | ||
k for k in elem.attrib if "{http://www.w3.org/2001/XMLSchema-instance}" in k | ||
] | ||
for attrib in attribs_to_remove: | ||
del elem.attrib[attrib] | ||
return SafeElementTree.tostring(root, encoding="utf-8") | ||
|
||
def _process_vulnerability(self, vulnerability) -> Iterable[AdvisoryData]: | ||
""" | ||
Process a single vulnerability and return AdvisoryData. | ||
""" | ||
cve_id = vulnerability.id | ||
description = vulnerability.description or "" | ||
|
||
date_published = None | ||
if vulnerability.published: | ||
published_str = str(vulnerability.published) | ||
date_published = parse(published_str).replace(tzinfo=pytz.UTC) | ||
severities = [] | ||
weaknesses = [] | ||
for cwe in vulnerability.cwes: | ||
cwe_id = cwe | ||
weaknesses.append(get_cwe_id(f"CWE-{cwe_id}")) | ||
|
||
references = [ | ||
Reference(url=f"https://nvd.nist.gov/vuln/detail/{cve_id}", reference_id=cve_id), | ||
Reference(url=f"{self.ASF_PAGE_URL}#{cve_id}", reference_id=cve_id), | ||
] | ||
|
||
for rating in vulnerability.ratings: | ||
cvssv3_score = str(rating.score) | ||
cvssv3_vector = rating.vector | ||
cvssv3_url = str(rating.source.url) | ||
severities.append( | ||
VulnerabilitySeverity( | ||
system=severity_systems.CVSSV3, | ||
value=cvssv3_score, | ||
scoring_elements=cvssv3_vector, | ||
) | ||
) | ||
references.append(Reference(url=cvssv3_url, severities=severities)) | ||
|
||
fixed_versions = self._extract_fixed_versions(vulnerability.recommendation) | ||
affected_packages = self._get_affected_packages(vulnerability, fixed_versions) | ||
|
||
if affected_packages: | ||
yield AdvisoryData( | ||
aliases=[cve_id], | ||
summary=description, | ||
affected_packages=affected_packages, | ||
references=references, | ||
date_published=date_published, | ||
weaknesses=weaknesses, | ||
url=f"{self.ASF_PAGE_URL}#{cve_id}", | ||
) | ||
|
||
def _extract_fixed_versions(self, recommendation): | ||
""" | ||
Extract fixed versions from recommendation text. | ||
""" | ||
if not recommendation: | ||
return [] | ||
|
||
recommendation_str = str(recommendation) | ||
version_pattern = r"\b(2\.\d+(?:\.\d+)?(?:-[a-zA-Z0-9]+)?)\b" | ||
found_versions = re.findall(version_pattern, recommendation_str) | ||
|
||
valid_versions = [ver for ver in found_versions if ver in self.version_set] | ||
|
||
return list(dict.fromkeys(valid_versions)) | ||
|
||
def _get_affected_packages(self, vulnerability, fixed_versions): | ||
""" | ||
Get affected packages for a vulnerability. | ||
""" | ||
version_groups = defaultdict(list) | ||
|
||
for vuln_target in vulnerability.affects: | ||
for version_range in vuln_target.versions: | ||
if version_range.version and not version_range.range: | ||
self._process_single_version(version_range, fixed_versions, version_groups) | ||
elif version_range.range: | ||
self._process_version_range(version_range, fixed_versions, version_groups) | ||
|
||
affected_packages = [] | ||
for fixed_version, versions in version_groups.items(): | ||
unique_versions = sorted(set(versions), key=lambda x: MavenVersion(x)) | ||
version_range_str = f"vers:maven/{('|'.join(unique_versions))}" | ||
|
||
affected_packages.append( | ||
AffectedPackage( | ||
package=PackageURL( | ||
type="apache", | ||
name="log4j-core", | ||
), | ||
affected_version_range=version_range_str, | ||
fixed_version=fixed_version, | ||
) | ||
) | ||
|
||
return affected_packages | ||
|
||
def _process_single_version(self, version_range, fixed_versions, version_groups): | ||
""" | ||
Process a single version and add it to version groups. | ||
""" | ||
current_version = version_range.version.replace("vers:maven/", "") | ||
fixed_version = next( | ||
(ver for ver in fixed_versions if MavenVersion(ver) >= MavenVersion(current_version)), | ||
None, | ||
) | ||
if fixed_version: | ||
version_groups[fixed_version].append(current_version) | ||
|
||
def _process_version_range(self, version_range, fixed_versions, version_groups): | ||
""" | ||
Process a version range and add affected versions to version groups. | ||
""" | ||
start_version, end_version = self._parse_version_range(version_range.range) | ||
if not start_version or not end_version: | ||
return | ||
|
||
affected_versions = self._get_versions_in_range( | ||
start_version, end_version, self.version_set | ||
) | ||
if not affected_versions: | ||
return | ||
|
||
fixed_version = self._get_fixed_version(fixed_versions, end_version) | ||
if fixed_version: | ||
version_groups[fixed_version].extend(affected_versions) | ||
|
||
def _parse_version_range(self, range_str): | ||
""" | ||
Parse version range string and return start and end versions. | ||
""" | ||
if re.match(r"^vers:maven/\d+\.\d+(?:\.\d+)?(?:-[a-zA-Z0-9]+)?$", range_str): | ||
single_version = range_str.replace("vers:maven/", "").strip() | ||
return single_version, single_version | ||
|
||
range_parts = range_str.replace("vers:maven/", "").split("|") | ||
|
||
if ">=" in range_parts[0] and "<" in range_parts[1]: | ||
start_version = range_parts[0].replace(">=", "").strip() | ||
end_version = range_parts[1].replace("<", "").strip() | ||
return start_version, end_version | ||
|
||
return None, None | ||
|
||
def _get_versions_in_range(self, start_version, end_version, version_set): | ||
""" | ||
Get list of versions between start and end versions. | ||
""" | ||
start_mv = MavenVersion(start_version) | ||
end_mv = MavenVersion(end_version) | ||
|
||
versions_in_range = [ | ||
ver | ||
for ver in version_set | ||
if MavenVersion(ver) >= start_mv and MavenVersion(ver) < end_mv | ||
] | ||
|
||
return versions_in_range | ||
|
||
def _get_fixed_version(self, fixed_versions, end_version): | ||
""" | ||
Get appropriate fixed version for a given end version. | ||
""" | ||
end_mv = MavenVersion(end_version) | ||
|
||
for fix_ver in fixed_versions: | ||
fix_mv = MavenVersion(fix_ver) | ||
if fix_mv >= end_mv: | ||
return fix_ver | ||
|
||
return None |
36 changes: 36 additions & 0 deletions
36
vulnerabilities/tests/pipelines/test_apache_log4j_importer_pipeline.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# | ||
# Copyright (c) nexB Inc. and others. All rights reserved. | ||
# VulnerableCode is a trademark of nexB Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. | ||
# See https://github.com/aboutcode-org/vulnerablecode for support or download. | ||
# See https://aboutcode.org for more information about nexB OSS projects. | ||
# | ||
|
||
import os | ||
from pathlib import Path | ||
|
||
from cyclonedx.model.bom import Bom | ||
from defusedxml import ElementTree as SafeElementTree | ||
|
||
from vulnerabilities.pipelines.apache_log4j_importer import ApacheLog4jImporterPipeline | ||
from vulnerabilities.tests import util_tests | ||
|
||
TEST_DATA = Path(__file__).parent.parent / "test_data" / "apache_log4j" | ||
|
||
|
||
def test_to_advisories(): | ||
with open(os.path.join(TEST_DATA, "log4j.xml")) as f: | ||
raw_data = f.read() | ||
|
||
importer = ApacheLog4jImporterPipeline() | ||
cleaned_data = importer._clean_xml_data(raw_data) | ||
bom = Bom.from_xml(SafeElementTree.fromstring(cleaned_data)) | ||
advisories = [] | ||
for vulnerability in bom.vulnerabilities: | ||
advisories.extend(importer._process_vulnerability(vulnerability)) | ||
|
||
result = [data.to_dict() for data in advisories] | ||
|
||
expected_file = os.path.join(TEST_DATA, "parse-advisory-apache-log4j-expected.json") | ||
util_tests.check_results_against_json(result, expected_file) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We likely need some improver to get and expand the list of known versions