diff --git a/CveXplore/core/database_maintenance/api_handlers.py b/CveXplore/core/database_maintenance/api_handlers.py index 5d86e9591..5cf486ace 100644 --- a/CveXplore/core/database_maintenance/api_handlers.py +++ b/CveXplore/core/database_maintenance/api_handlers.py @@ -1,3 +1,5 @@ +import re +from typing import Union from abc import abstractmethod from CveXplore.common.cpe_converters import split_cpe_name @@ -19,6 +21,8 @@ def __init__(self, feed_type: str, logger_name: str): self.api_handler = NvdNistApi(proxies=self.config.HTTP_PROXY_DICT) + self.missing_key_statistics = {} + def process_item(self, item: dict): item = self.process_the_item(item) @@ -35,6 +39,56 @@ def process_item(self, item: dict): # ).entry return item + def safe_get( + self, item: dict, dict_path: Union[list, str] + ) -> Union[None, dict, list, str]: + """Safely retrieve a value from a nested dictionary or list and track missing keys.""" + # Ensure dict_path is a list of keys (or indices) + if isinstance(dict_path, str): + keys = re.split(r"\.|\[|\]", dict_path) # Split by '.' or '[' or ']' + keys = [key for key in keys if key] + else: + keys = dict_path + + current_data = item + + for key in keys: + if isinstance(current_data, list): + # Handle list indices (e.g., [0]) by converting them to integers + try: + key = int(key) + current_data = current_data[key] + except (ValueError, IndexError): + self._record_missing_key(dict_path) + return None + elif isinstance(current_data, dict): + if key not in current_data: + self._record_missing_key(dict_path) + return None + current_data = current_data[key] + else: + self._record_missing_key(dict_path) + return None + return current_data + + def _record_missing_key(self, dict_path: Union[list, str]) -> None: + """Track missing keys with their path in the missing_key_statistics.""" + path_str = ( + ".".join(map(str, dict_path)) if isinstance(dict_path, list) else dict_path + ) + self.missing_key_statistics[path_str] = ( + self.missing_key_statistics.get(path_str, 0) + 1 + ) + + def log_statistics(self) -> None: + """Log statistics for missing keys.""" + if self.missing_key_statistics: + for path, count in self.missing_key_statistics.items(): + self.logger.warning( + f"Missing keys in processed data: {path} missing from {count} items" + ) + self.missing_key_statistics.clear() + @staticmethod def split_cpe_name(cpename: str) -> list[str]: return split_cpe_name(cpename) diff --git a/CveXplore/core/database_maintenance/sources_process.py b/CveXplore/core/database_maintenance/sources_process.py index 8481ceddb..d0f482767 100644 --- a/CveXplore/core/database_maintenance/sources_process.py +++ b/CveXplore/core/database_maintenance/sources_process.py @@ -407,55 +407,67 @@ def process_the_item(self, item: dict = None): if "cvssMetricV40" in item["cve"]["metrics"]: cve["impact4"] = {} cve["exploitability4"] = {} - cve["impact4"]["vulnerable_system_confidentiality"] = item["cve"][ - "metrics" - ]["cvssMetricV40"][0]["cvssData"]["vulnerableSystemConfidentiality"] - cve["impact4"]["vulnerable_system_integrity"] = item["cve"]["metrics"][ - "cvssMetricV40" - ][0]["cvssData"]["vulnerableSystemIntegrity"] - cve["impact4"]["vulnerable_system_availability"] = item["cve"][ - "metrics" - ]["cvssMetricV40"][0]["cvssData"]["vulnerableSystemAvailability"] - cve["impact4"]["subsequent_system_confidentiality"] = item["cve"][ - "metrics" - ]["cvssMetricV40"][0]["cvssData"]["subsequentSystemConfidentiality"] - cve["impact4"]["subsequent_system_integrity"] = item["cve"]["metrics"][ - "cvssMetricV40" - ][0]["cvssData"]["subsequentSystemIntegrity"] - cve["impact4"]["subsequent_system_availability"] = item["cve"][ - "metrics" - ]["cvssMetricV40"][0]["cvssData"]["subsequentSystemAvailability"] - cve["exploitability4"]["attackvector"] = item["cve"]["metrics"][ - "cvssMetricV40" - ][0]["cvssData"]["attackVector"] - cve["exploitability4"]["attackcomplexity"] = item["cve"]["metrics"][ - "cvssMetricV40" - ][0]["cvssData"]["attackComplexity"] - cve["exploitability4"]["attackrequirements"] = item["cve"]["metrics"][ - "cvssMetricV40" - ][0]["cvssData"]["attackRequirements"] - cve["exploitability4"]["privilegesrequired"] = item["cve"]["metrics"][ - "cvssMetricV40" - ][0]["cvssData"]["privilegesRequired"] - cve["exploitability4"]["userinteraction"] = item["cve"]["metrics"][ - "cvssMetricV40" - ][0]["cvssData"]["userInteraction"] - cve["exploitability4"]["exploitmaturity"] = item["cve"]["metrics"][ - "cvssMetricV40" - ][0]["cvssData"]["exploitMaturity"] - cve["cvss4"] = float( - item["cve"]["metrics"]["cvssMetricV40"][0]["cvssData"]["baseScore"] + cve["impact4"]["vulnerable_system_confidentiality"] = self.safe_get( + item, + "cve.metrics.cvssMetricV40.[0].cvssData.vulnConfidentialityImpact", + ) + cve["impact4"]["vulnerable_system_integrity"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.vulnIntegrityImpact" + ) + cve["impact4"]["vulnerable_system_availability"] = self.safe_get( + item, + "cve.metrics.cvssMetricV40.[0].cvssData.vulnAvailabilityImpact", + ) + cve["impact4"]["subsequent_system_confidentiality"] = self.safe_get( + item, + "cve.metrics.cvssMetricV40.[0].cvssData.subConfidentialityImpact", + ) + cve["impact4"]["subsequent_system_integrity"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.subIntegrityImpact" + ) + cve["impact4"]["subsequent_system_availability"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.subAvailabilityImpact" + ) + cve["impact4"]["attackvector"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.attackVector" + ) + cve["exploitability4"]["attackcomplexity"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.attackComplexity" + ) + cve["exploitability4"]["attackrequirements"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.attackRequirements" + ) + cve["exploitability4"]["privilegesrequired"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.privilegesRequired" + ) + cve["exploitability4"]["userinteraction"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.userInteraction" + ) + cve["exploitability4"]["exploitmaturity"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.exploitMaturity" + ) + if self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.baseScore" + ): + cve["cvss4"] = float( + self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.baseScore" + ) + ) + else: + cve["cvss4"] = None + cve["cvss4Vector"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.vectorString" ) - cve["cvss4Vector"] = item["cve"]["metrics"]["cvssMetricV40"][0][ - "cvssData" - ]["vectorString"] cve["cvss4Time"] = parse_datetime( - item["cve"]["lastModified"], ignoretz=True + self.safe_get(item, "cve.lastModified"), ignoretz=True + ) + cve["cvss4Type"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].type" + ) + cve["cvss4Source"] = self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].source" ) - cve["cvss4Type"] = item["cve"]["metrics"]["cvssMetricV40"][0]["type"] - cve["cvss4Source"] = item["cve"]["metrics"]["cvssMetricV40"][0][ - "source" - ] else: cve["cvss4"] = None @@ -977,6 +989,8 @@ def process_downloads(self, sites: list = None, manual_days: int = 0): # Set the last update time in the info collection self.setColUpdate(self.feed_type.lower(), self.last_modified) + self.log_statistics() + self.logger.info( f"Duration: {datetime.timedelta(seconds=time.time() - start_time)}" )