Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Huawei Pipeline Added with Tests #1770

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from vulnerabilities.pipelines import alpine_linux_importer
from vulnerabilities.pipelines import github_importer
from vulnerabilities.pipelines import gitlab_importer
from vulnerabilities.pipelines import huawei_bulletin_importer
from vulnerabilities.pipelines import nginx_importer
from vulnerabilities.pipelines import npm_importer
from vulnerabilities.pipelines import nvd_importer
Expand Down Expand Up @@ -78,6 +79,7 @@
nvd_importer.NVDImporterPipeline,
pysec_importer.PyPIImporterPipeline,
alpine_linux_importer.AlpineLinuxImporterPipeline,
huawei_bulletin_importer.HuaweiBulletinImporterPipeline,
]

IMPORTERS_REGISTRY = {
Expand Down
187 changes: 187 additions & 0 deletions vulnerabilities/pipelines/huawei_bulletin_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import logging
from datetime import datetime
from datetime import timezone
from typing import Iterable
from typing import Tuple

import requests
from bs4 import BeautifulSoup
from packageurl import PackageURL
from univers.version_range import VersionRange
from univers.versions import GenericVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
from vulnerabilities.severity_systems import GENERIC

logger = logging.getLogger(__name__)


def extract_version(version_str: str) -> Tuple[str, str]:
"""
Gets the version type and number from a messy Huawei version string.
Returns tuple of (type, version) - returns empty type if cant parse
"""
version_str = version_str.lower().strip()
if version_str.startswith("harmonyos"):
return "harmonyos", version_str.split("harmonyos")[-1].strip()
elif version_str.startswith("emui"):
return "emui", version_str.split("emui")[-1].strip()
return "", version_str


class HuaweiBulletinImporterPipeline(VulnerableCodeBaseImporterPipeline):
"""
Scrapes security bullitin data from Huawei's website:
https://consumer.huawei.com/en/support/bulletin/
"""

pipeline_id = "huawei_bulletin_importer"
spdx_license_expression = "LicenseRef-Terms-Of-Use"
license_url = "https://consumer.huawei.com/en/legal/terms-of-use/"
url = "https://consumer.huawei.com/en/support/bulletin/"
importer_name = "Huawei Bulletin Importer"

def __init__(self):
super().__init__()
self.raw_data = None

@classmethod
def steps(cls):
"""The steps we need to run in order"""
return (
cls.fetch_bulletin,
cls.collect_and_store_advisories,
cls.import_new_advisories,
)

def fetch_bulletin(self):
"""
Gets the bullitin data from Huawei's site.
Stores raw html for processing later.
"""
self.log(f"Fetching {self.url}")
try:
response = requests.get(f"{self.url}2024/9/")
response.raise_for_status()
self.raw_data = BeautifulSoup(response.text, "html.parser")
except Exception as e:
self.log(f"Failed to fetch Huawei bulletin: {e}", logging.ERROR)
raise

def advisories_count(self) -> int:
"""
Counts how many advisorys we found.
Returns 0 if something went wrong.
"""
if not self.raw_data:
return 0

tables = self.raw_data.find_all("table")
if not tables:
return 0
huawei_rows = len(tables[0].find_all("tr")[1:])
thirdparty_rows = len(tables[1].find_all("tr")[1:])
return huawei_rows + thirdparty_rows

def collect_advisories(self) -> Iterable[AdvisoryData]:
"""
Parse the bullitin and extract all the advisorys.
Returns empty list if something goes wrong.
"""
if not self.raw_data:
return []

tables = self.raw_data.find_all("table")
if len(tables) < 2:
return []

for row in tables[0].find_all("tr")[1:]:
cols = row.find_all("td")
if len(cols) != 5:
continue
advisory = {
"cve_id": cols[0].text.strip(),
"description": cols[1].text.strip(),
"impact": cols[2].text.strip(),
"severity": cols[3].text.strip(),
"affected_versions": cols[4].text.strip(),
"is_huawei": True,
}
advisory_data = self.to_advisory_data(advisory)
if advisory_data:
yield advisory_data
for row in tables[1].find_all("tr")[1:]:
cols = row.find_all("td")
if len(cols) != 3:
continue

advisory = {
"cve_id": cols[0].text.strip(),
"severity": cols[1].text.strip(),
"affected_versions": cols[2].text.strip(),
"is_huawei": False,
}
advisory_data = self.to_advisory_data(advisory)
if advisory_data:
yield advisory_data

def to_advisory_data(self, data) -> AdvisoryData:
"""
Takes raw data and makes it into propper advisory format.
Returns None if theres any problems.
"""
try:
if not data.get("cve_id"):
return None
affected_packages = []
versions = [v.strip() for v in data["affected_versions"].split(",") if v.strip()]

for version in versions:
system_type, version_number = extract_version(version)
if not system_type:
continue
affected_packages.append(
AffectedPackage(
package=PackageURL(type="huawei", name=system_type),
affected_version_range=VersionRange.from_string(
f"vers:generic/={version_number}"
),
)
)
if not affected_packages:
return None
severity = VulnerabilitySeverity(system=GENERIC, value=data["severity"].lower())
references = [
Reference(
reference_id=data["cve_id"],
url=f"https://nvd.nist.gov/vuln/detail/{data['cve_id']}",
severities=[severity],
)
]
if data.get("is_huawei"):
summary = "\n".join(filter(None, [data.get("description"), data.get("impact")]))
else:
summary = f"Third-party vulnerability affecting {''.join(data['affected_versions'].split())}"
return AdvisoryData(
summary=summary if summary else f"Security update for {data['cve_id']}",
aliases=[data["cve_id"]],
affected_packages=affected_packages,
references=references,
date_published=None,
url=f"{self.url}2024/9/",
)
except Exception as e:
self.log(f"Failed to process advisory {data.get('cve_id')}: {e}", logging.ERROR)
return None
193 changes: 193 additions & 0 deletions vulnerabilities/tests/pipelines/test_huawei_bulletin_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
from pathlib import Path
from unittest.mock import patch

import pytest
from bs4 import BeautifulSoup
from packageurl import PackageURL
from univers.version_range import VersionRange

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.severity_systems import GENERIC
from vulnerabilities.tests import util_tests

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
TEST_DATA = os.path.join(BASE_DIR, "test_data", "huawei")


def normalize_version_range(version_str):
"""Quick helper to fix version string format"""
if not version_str.startswith("vers:generic/="):
version = version_str.split("/")[-1]
return f"vers:generic/={version}"
return version_str


def test_to_advisory_data():
"""Main test for parsing the bullitin data"""
with open(os.path.join(TEST_DATA, "huawei-test.html")) as f:
mock_response = BeautifulSoup(f.read(), features="html.parser")

expected_file = os.path.join(TEST_DATA, "huawei-expected.json")

with open(expected_file) as f:
import json

expected = json.load(f)
with patch("requests.get") as mock_response_get:
mock_response_get.return_value.text = mock_response
from vulnerabilities.pipelines.huawei_bulletin_importer import (
HuaweiBulletinImporterPipeline,
)

pipeline = HuaweiBulletinImporterPipeline()
pipeline.raw_data = mock_response
results = [data.to_dict() for data in pipeline.collect_advisories()]

for result, exp in zip(
sorted(results, key=lambda x: x["aliases"][0]),
sorted(expected, key=lambda x: x["aliases"][0]),
):
assert result["aliases"] == exp["aliases"]
assert result["summary"] == exp["summary"]
assert len(result["affected_packages"]) == len(exp["affected_packages"])
for r_pkg, e_pkg in zip(
sorted(result["affected_packages"], key=lambda x: x["affected_version_range"]),
sorted(exp["affected_packages"], key=lambda x: x["affected_version_range"]),
):
assert normalize_version_range(
r_pkg["affected_version_range"]
) == normalize_version_range(e_pkg["affected_version_range"])
assert r_pkg["package"]["name"] == e_pkg["package"]["name"]
assert r_pkg["package"]["type"] == e_pkg["package"]["type"]


test_cases = [
(
{
"cve_id": "CVE-2024-45449",
"description": "Permission verification vulnerability",
"impact": "May affect service confidentiality",
"severity": "Medium",
"affected_versions": "HarmonyOS4.2.0, HarmonyOS2.0.0",
"is_huawei": True,
},
AdvisoryData(
summary="Permission verification vulnerability\nMay affect service confidentiality",
aliases=["CVE-2024-45449"],
affected_packages=[
AffectedPackage(
package=PackageURL(type="huawei", name="harmonyos"),
affected_version_range=VersionRange.from_string("vers:generic/=4.2.0"),
),
AffectedPackage(
package=PackageURL(type="huawei", name="harmonyos"),
affected_version_range=VersionRange.from_string("vers:generic/=2.0.0"),
),
],
references=[
Reference(
url="https://nvd.nist.gov/vuln/detail/CVE-2024-45449",
reference_id="CVE-2024-45449",
severities=[VulnerabilitySeverity(system=GENERIC, value="medium")],
)
],
date_published=None,
url="https://consumer.huawei.com/en/support/bulletin/2024/9/",
),
),
(
{
"cve_id": "CVE-2024-34740",
"severity": "High",
"affected_versions": "HarmonyOS4.2.0, HarmonyOS4.0.0",
"is_huawei": False,
},
AdvisoryData(
aliases=["CVE-2024-34740"],
summary="Third-party vulnerability affecting HarmonyOS4.2.0,HarmonyOS4.0.0",
affected_packages=[
AffectedPackage(
package=PackageURL(type="huawei", name="harmonyos"),
affected_version_range=VersionRange.from_string("vers:generic/=4.2.0"),
),
AffectedPackage(
package=PackageURL(type="huawei", name="harmonyos"),
affected_version_range=VersionRange.from_string("vers:generic/=4.0.0"),
),
],
references=[
Reference(
url="https://nvd.nist.gov/vuln/detail/CVE-2024-34740",
reference_id="CVE-2024-34740",
severities=[VulnerabilitySeverity(system=GENERIC, value="high")],
)
],
date_published=None,
url="https://consumer.huawei.com/en/support/bulletin/2024/9/",
),
),
]


@pytest.mark.parametrize(("test_data", "expected"), test_cases)
def test_to_advisory_data_conversion(test_data, expected):
"""Test converting raw data into proper advisorys"""
from vulnerabilities.pipelines.huawei_bulletin_importer import HuaweiBulletinImporterPipeline

pipeline = HuaweiBulletinImporterPipeline()
found = pipeline.to_advisory_data(test_data)
assert expected.aliases == found.aliases
assert expected.summary == found.summary
exp_pkgs = sorted(expected.affected_packages, key=lambda x: str(x.affected_version_range))
found_pkgs = sorted(found.affected_packages, key=lambda x: str(x.affected_version_range))
assert len(exp_pkgs) == len(found_pkgs)
for exp_pkg, found_pkg in zip(exp_pkgs, found_pkgs):
assert exp_pkg.package.type == found_pkg.package.type
assert exp_pkg.package.name == found_pkg.package.name
assert str(exp_pkg.affected_version_range) == str(found_pkg.affected_version_range)
assert len(expected.references) == len(found.references)
for exp_ref, found_ref in zip(expected.references, found.references):
assert exp_ref.reference_id == found_ref.reference_id
assert exp_ref.url == found_ref.url
assert len(exp_ref.severities) == len(found_ref.severities)

for exp_sev, found_sev in zip(exp_ref.severities, found_ref.severities):
assert exp_sev.system == found_sev.system
assert exp_sev.value == found_sev.value


def test_invalid_data():
"""Make sure we handle bad input properly"""
from vulnerabilities.pipelines.huawei_bulletin_importer import HuaweiBulletinImporterPipeline

pipeline = HuaweiBulletinImporterPipeline()
bad_data = {
"description": "Test desc",
"severity": "High",
"affected_versions": "HarmonyOS4.2.0",
}
assert pipeline.to_advisory_data(bad_data) is None
bad_data = {"cve_id": "CVE-2024-12345", "severity": "High", "affected_versions": ""}
assert pipeline.to_advisory_data(bad_data) is None


def test_extract_version():
"""Check that we parse version strings ok"""
from vulnerabilities.pipelines.huawei_bulletin_importer import extract_version

assert extract_version("HarmonyOS4.2.0") == ("harmonyos", "4.2.0")
assert extract_version("EMUI 14.0.0") == ("emui", "14.0.0")
assert extract_version("invalid version") == ("", "invalid version")
Loading