Skip to content

Commit

Permalink
Fix NVD API CVSSv4 structure with the help of new safe_get with loggi…
Browse files Browse the repository at this point in the history
…ng (#315)
  • Loading branch information
oh2fih authored Feb 28, 2025
1 parent 44e747a commit 18846c7
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 46 deletions.
54 changes: 54 additions & 0 deletions CveXplore/core/database_maintenance/api_handlers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import re
from typing import Union
from abc import abstractmethod

from CveXplore.common.cpe_converters import split_cpe_name
Expand All @@ -19,6 +21,8 @@ def __init__(self, feed_type: str, logger_name: str):

self.api_handler = NvdNistApi(proxies=self.config.HTTP_PROXY_DICT)

self.missing_key_statistics = {}

def process_item(self, item: dict):
item = self.process_the_item(item)

Expand All @@ -35,6 +39,56 @@ def process_item(self, item: dict):
# ).entry
return item

def safe_get(
self, item: dict, dict_path: Union[list, str]
) -> Union[None, dict, list, str]:
"""Safely retrieve a value from a nested dictionary or list and track missing keys."""
# Ensure dict_path is a list of keys (or indices)
if isinstance(dict_path, str):
keys = re.split(r"\.|\[|\]", dict_path) # Split by '.' or '[' or ']'
keys = [key for key in keys if key]
else:
keys = dict_path

current_data = item

for key in keys:
if isinstance(current_data, list):
# Handle list indices (e.g., [0]) by converting them to integers
try:
key = int(key)
current_data = current_data[key]
except (ValueError, IndexError):
self._record_missing_key(dict_path)
return None
elif isinstance(current_data, dict):
if key not in current_data:
self._record_missing_key(dict_path)
return None
current_data = current_data[key]
else:
self._record_missing_key(dict_path)
return None
return current_data

def _record_missing_key(self, dict_path: Union[list, str]) -> None:
"""Track missing keys with their path in the missing_key_statistics."""
path_str = (
".".join(map(str, dict_path)) if isinstance(dict_path, list) else dict_path
)
self.missing_key_statistics[path_str] = (
self.missing_key_statistics.get(path_str, 0) + 1
)

def log_statistics(self) -> None:
"""Log statistics for missing keys."""
if self.missing_key_statistics:
for path, count in self.missing_key_statistics.items():
self.logger.warning(
f"Missing keys in processed data: {path} missing from {count} items"
)
self.missing_key_statistics.clear()

@staticmethod
def split_cpe_name(cpename: str) -> list[str]:
return split_cpe_name(cpename)
Expand Down
106 changes: 60 additions & 46 deletions CveXplore/core/database_maintenance/sources_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,55 +407,67 @@ def process_the_item(self, item: dict = None):
if "cvssMetricV40" in item["cve"]["metrics"]:
cve["impact4"] = {}
cve["exploitability4"] = {}
cve["impact4"]["vulnerable_system_confidentiality"] = item["cve"][
"metrics"
]["cvssMetricV40"][0]["cvssData"]["vulnerableSystemConfidentiality"]
cve["impact4"]["vulnerable_system_integrity"] = item["cve"]["metrics"][
"cvssMetricV40"
][0]["cvssData"]["vulnerableSystemIntegrity"]
cve["impact4"]["vulnerable_system_availability"] = item["cve"][
"metrics"
]["cvssMetricV40"][0]["cvssData"]["vulnerableSystemAvailability"]
cve["impact4"]["subsequent_system_confidentiality"] = item["cve"][
"metrics"
]["cvssMetricV40"][0]["cvssData"]["subsequentSystemConfidentiality"]
cve["impact4"]["subsequent_system_integrity"] = item["cve"]["metrics"][
"cvssMetricV40"
][0]["cvssData"]["subsequentSystemIntegrity"]
cve["impact4"]["subsequent_system_availability"] = item["cve"][
"metrics"
]["cvssMetricV40"][0]["cvssData"]["subsequentSystemAvailability"]
cve["exploitability4"]["attackvector"] = item["cve"]["metrics"][
"cvssMetricV40"
][0]["cvssData"]["attackVector"]
cve["exploitability4"]["attackcomplexity"] = item["cve"]["metrics"][
"cvssMetricV40"
][0]["cvssData"]["attackComplexity"]
cve["exploitability4"]["attackrequirements"] = item["cve"]["metrics"][
"cvssMetricV40"
][0]["cvssData"]["attackRequirements"]
cve["exploitability4"]["privilegesrequired"] = item["cve"]["metrics"][
"cvssMetricV40"
][0]["cvssData"]["privilegesRequired"]
cve["exploitability4"]["userinteraction"] = item["cve"]["metrics"][
"cvssMetricV40"
][0]["cvssData"]["userInteraction"]
cve["exploitability4"]["exploitmaturity"] = item["cve"]["metrics"][
"cvssMetricV40"
][0]["cvssData"]["exploitMaturity"]
cve["cvss4"] = float(
item["cve"]["metrics"]["cvssMetricV40"][0]["cvssData"]["baseScore"]
cve["impact4"]["vulnerable_system_confidentiality"] = self.safe_get(
item,
"cve.metrics.cvssMetricV40.[0].cvssData.vulnConfidentialityImpact",
)
cve["impact4"]["vulnerable_system_integrity"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.vulnIntegrityImpact"
)
cve["impact4"]["vulnerable_system_availability"] = self.safe_get(
item,
"cve.metrics.cvssMetricV40.[0].cvssData.vulnAvailabilityImpact",
)
cve["impact4"]["subsequent_system_confidentiality"] = self.safe_get(
item,
"cve.metrics.cvssMetricV40.[0].cvssData.subConfidentialityImpact",
)
cve["impact4"]["subsequent_system_integrity"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.subIntegrityImpact"
)
cve["impact4"]["subsequent_system_availability"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.subAvailabilityImpact"
)
cve["impact4"]["attackvector"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.attackVector"
)
cve["exploitability4"]["attackcomplexity"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.attackComplexity"
)
cve["exploitability4"]["attackrequirements"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.attackRequirements"
)
cve["exploitability4"]["privilegesrequired"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.privilegesRequired"
)
cve["exploitability4"]["userinteraction"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.userInteraction"
)
cve["exploitability4"]["exploitmaturity"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.exploitMaturity"
)
if self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.baseScore"
):
cve["cvss4"] = float(
self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.baseScore"
)
)
else:
cve["cvss4"] = None
cve["cvss4Vector"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].cvssData.vectorString"
)
cve["cvss4Vector"] = item["cve"]["metrics"]["cvssMetricV40"][0][
"cvssData"
]["vectorString"]
cve["cvss4Time"] = parse_datetime(
item["cve"]["lastModified"], ignoretz=True
self.safe_get(item, "cve.lastModified"), ignoretz=True
)
cve["cvss4Type"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].type"
)
cve["cvss4Source"] = self.safe_get(
item, "cve.metrics.cvssMetricV40.[0].source"
)
cve["cvss4Type"] = item["cve"]["metrics"]["cvssMetricV40"][0]["type"]
cve["cvss4Source"] = item["cve"]["metrics"]["cvssMetricV40"][0][
"source"
]
else:
cve["cvss4"] = None

Expand Down Expand Up @@ -977,6 +989,8 @@ def process_downloads(self, sites: list = None, manual_days: int = 0):
# Set the last update time in the info collection
self.setColUpdate(self.feed_type.lower(), self.last_modified)

self.log_statistics()

self.logger.info(
f"Duration: {datetime.timedelta(seconds=time.time() - start_time)}"
)
Expand Down

0 comments on commit 18846c7

Please sign in to comment.