diff --git a/CveXplore/core/database_maintenance/sources_process.py b/CveXplore/core/database_maintenance/sources_process.py index d0f48276..e7a62705 100644 --- a/CveXplore/core/database_maintenance/sources_process.py +++ b/CveXplore/core/database_maintenance/sources_process.py @@ -386,25 +386,37 @@ def process_the_item(self, item: dict = None): return None cve = { - "id": item["cve"]["id"], - "assigner": item["cve"]["sourceIdentifier"], - "status": item["cve"]["vulnStatus"], - "published": parse_datetime(item["cve"]["published"], ignoretz=True), - "modified": parse_datetime(item["cve"]["lastModified"], ignoretz=True), - "lastModified": parse_datetime(item["cve"]["lastModified"], ignoretz=True), + "id": self.safe_get(item, "cve.id"), + "assigner": self.safe_get(item, "cve.sourceIdentifier"), + "status": self.safe_get(item, "cve.vulnStatus"), + "published": ( + parse_datetime(self.safe_get(item, "cve.published"), ignoretz=True) + if self.safe_get(item, "cve.published") + else None + ), + "modified": ( + parse_datetime(self.safe_get(item, "cve.lastModified"), ignoretz=True) + if self.safe_get(item, "cve.lastModified") + else None + ), + "lastModified": ( + parse_datetime(self.safe_get(item, "cve.lastModified"), ignoretz=True) + if self.safe_get(item, "cve.lastModified") + else None + ), } - for description in item["cve"]["descriptions"]: + for description in self.safe_get(item, "cve.descriptions"): if description["lang"] == "en": if "summary" in cve: cve["summary"] += f" {description['value']}" else: cve["summary"] = description["value"] - if "metrics" in item["cve"]: + if "metrics" in self.safe_get(item, "cve"): cve["access"] = {} cve["impact"] = {} - if "cvssMetricV40" in item["cve"]["metrics"]: + if "cvssMetricV40" in self.safe_get(item, "cve.metrics"): cve["impact4"] = {} cve["exploitability4"] = {} cve["impact4"]["vulnerable_system_confidentiality"] = self.safe_get( @@ -446,21 +458,26 @@ def process_the_item(self, item: dict = None): cve["exploitability4"]["exploitmaturity"] = self.safe_get( item, "cve.metrics.cvssMetricV40.[0].cvssData.exploitMaturity" ) - if self.safe_get( - item, "cve.metrics.cvssMetricV40.[0].cvssData.baseScore" - ): - cve["cvss4"] = float( + cve["cvss4"] = ( + float( self.safe_get( item, "cve.metrics.cvssMetricV40.[0].cvssData.baseScore" ) ) - else: - cve["cvss4"] = None + if self.safe_get( + item, "cve.metrics.cvssMetricV40.[0].cvssData.baseScore" + ) + else None + ) cve["cvss4Vector"] = self.safe_get( item, "cve.metrics.cvssMetricV40.[0].cvssData.vectorString" ) - cve["cvss4Time"] = parse_datetime( - self.safe_get(item, "cve.lastModified"), ignoretz=True + cve["cvss4Time"] = ( + parse_datetime( + self.safe_get(item, "cve.lastModified"), ignoretz=True + ) + if self.safe_get(item, "cve.lastModified") + else None ) cve["cvss4Type"] = self.safe_get( item, "cve.metrics.cvssMetricV40.[0].type" @@ -471,137 +488,214 @@ def process_the_item(self, item: dict = None): else: cve["cvss4"] = None - if "cvssMetricV31" in item["cve"]["metrics"]: + if "cvssMetricV31" in self.safe_get(item, "cve.metrics"): cve["impact3"] = {} cve["exploitability3"] = {} - cve["impact3"]["availability"] = item["cve"]["metrics"][ - "cvssMetricV31" - ][0]["cvssData"]["availabilityImpact"] - cve["impact3"]["confidentiality"] = item["cve"]["metrics"][ - "cvssMetricV31" - ][0]["cvssData"]["confidentialityImpact"] - cve["impact3"]["integrity"] = item["cve"]["metrics"]["cvssMetricV31"][ - 0 - ]["cvssData"]["integrityImpact"] - cve["exploitability3"]["attackvector"] = item["cve"]["metrics"][ - "cvssMetricV31" - ][0]["cvssData"]["attackVector"] - cve["exploitability3"]["attackcomplexity"] = item["cve"]["metrics"][ - "cvssMetricV31" - ][0]["cvssData"]["attackComplexity"] - cve["exploitability3"]["privilegesrequired"] = item["cve"]["metrics"][ - "cvssMetricV31" - ][0]["cvssData"]["privilegesRequired"] - cve["exploitability3"]["userinteraction"] = item["cve"]["metrics"][ - "cvssMetricV31" - ][0]["cvssData"]["userInteraction"] - cve["exploitability3"]["scope"] = item["cve"]["metrics"][ - "cvssMetricV31" - ][0]["cvssData"]["scope"] - cve["cvss3"] = float( - item["cve"]["metrics"]["cvssMetricV31"][0]["cvssData"]["baseScore"] + cve["impact3"]["availability"] = self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].cvssData.availabilityImpact" ) - cve["cvss3Vector"] = item["cve"]["metrics"]["cvssMetricV31"][0][ - "cvssData" - ]["vectorString"] - cve["impactScore3"] = float( - item["cve"]["metrics"]["cvssMetricV31"][0]["impactScore"] + cve["impact3"]["confidentiality"] = self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].cvssData.confidentialityImpact" ) - cve["exploitabilityScore3"] = float( - item["cve"]["metrics"]["cvssMetricV31"][0]["exploitabilityScore"] + cve["impact3"]["integrity"] = self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].cvssData.integrityImpact" ) - cve["cvss3Time"] = parse_datetime( - item["cve"]["lastModified"], ignoretz=True + cve["exploitability3"]["attackvector"] = self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].cvssData.attackVector" ) - cve["cvss3Type"] = item["cve"]["metrics"]["cvssMetricV31"][0]["type"] - cve["cvss3Source"] = item["cve"]["metrics"]["cvssMetricV31"][0][ - "source" - ] - elif "cvssMetricV30" in item["cve"]["metrics"]: + cve["exploitability3"]["attackcomplexity"] = self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].cvssData.attackComplexity" + ) + cve["exploitability3"]["privilegesrequired"] = self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].cvssData.privilegesRequired" + ) + cve["exploitability3"]["userinteraction"] = self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].cvssData.userInteraction" + ) + cve["exploitability3"]["scope"] = self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].cvssData.scope" + ) + cve["cvss3"] = ( + float( + self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].cvssData.baseScore" + ) + ) + if self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].cvssData.baseScore" + ) + else None + ) + cve["cvss3Vector"] = self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].cvssData.vectorString" + ) + cve["impactScore3"] = ( + float( + self.safe_get(item, "cve.metrics.cvssMetricV31.[0].impactScore") + ) + if self.safe_get(item, "cve.metrics.cvssMetricV31.[0].impactScore") + else None + ) + cve["exploitabilityScore3"] = ( + float( + self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].exploitabilityScore" + ) + ) + if self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].exploitabilityScore" + ) + else None + ) + cve["cvss3Time"] = ( + parse_datetime( + self.safe_get(item, "cve.lastModified"), ignoretz=True + ) + if self.safe_get(item, "cve.lastModified") + else None + ) + cve["cvss3Type"] = self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].type" + ) + cve["cvss3Source"] = self.safe_get( + item, "cve.metrics.cvssMetricV31.[0].source" + ) + elif "cvssMetricV30" in self.safe_get(item, "cve.metrics"): cve["impact3"] = {} cve["exploitability3"] = {} - cve["impact3"]["availability"] = item["cve"]["metrics"][ - "cvssMetricV30" - ][0]["cvssData"]["availabilityImpact"] - cve["impact3"]["confidentiality"] = item["cve"]["metrics"][ - "cvssMetricV30" - ][0]["cvssData"]["confidentialityImpact"] - cve["impact3"]["integrity"] = item["cve"]["metrics"]["cvssMetricV30"][ - 0 - ]["cvssData"]["integrityImpact"] - cve["exploitability3"]["attackvector"] = item["cve"]["metrics"][ - "cvssMetricV30" - ][0]["cvssData"]["attackVector"] - cve["exploitability3"]["attackcomplexity"] = item["cve"]["metrics"][ - "cvssMetricV30" - ][0]["cvssData"]["attackComplexity"] - cve["exploitability3"]["privilegesrequired"] = item["cve"]["metrics"][ - "cvssMetricV30" - ][0]["cvssData"]["privilegesRequired"] - cve["exploitability3"]["userinteraction"] = item["cve"]["metrics"][ - "cvssMetricV30" - ][0]["cvssData"]["userInteraction"] - cve["exploitability3"]["scope"] = item["cve"]["metrics"][ - "cvssMetricV30" - ][0]["cvssData"]["scope"] - cve["cvss3"] = float( - item["cve"]["metrics"]["cvssMetricV30"][0]["cvssData"]["baseScore"] + cve["impact3"]["availability"] = self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].cvssData.availabilityImpact" ) - cve["cvss3Vector"] = item["cve"]["metrics"]["cvssMetricV30"][0][ - "cvssData" - ]["vectorString"] - cve["impactScore3"] = float( - item["cve"]["metrics"]["cvssMetricV30"][0]["impactScore"] + cve["impact3"]["confidentiality"] = self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].cvssData.confidentialityImpact" ) - cve["exploitabilityScore3"] = float( - item["cve"]["metrics"]["cvssMetricV30"][0]["exploitabilityScore"] + cve["impact3"]["integrity"] = self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].cvssData.integrityImpact" ) - cve["cvss3Time"] = parse_datetime( - item["cve"]["lastModified"], ignoretz=True + cve["exploitability3"]["attackvector"] = self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].cvssData.attackVector" + ) + cve["exploitability3"]["attackcomplexity"] = self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].cvssData.attackComplexity" + ) + cve["exploitability3"]["privilegesrequired"] = self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].cvssData.privilegesRequired" + ) + cve["exploitability3"]["userinteraction"] = self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].cvssData.userInteraction" + ) + cve["exploitability3"]["scope"] = self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].cvssData.scope" + ) + cve["cvss3"] = ( + float( + self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].cvssData.baseScore" + ) + ) + if self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].cvssData.baseScore" + ) + else None + ) + cve["impactScore3"] = ( + float( + self.safe_get(item, "cve.metrics.cvssMetricV30.[0].impactScore") + ) + if self.safe_get(item, "cve.metrics.cvssMetricV30.[0].impactScore") + else None + ) + cve["exploitabilityScore3"] = ( + float( + self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].exploitabilityScore" + ) + ) + if self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].exploitabilityScore" + ) + else None + ) + cve["cvss3Time"] = ( + parse_datetime( + self.safe_get(item, "cve.lastModified"), ignoretz=True + ) + if self.safe_get(item, "cve.lastModified") + else None + ) + cve["cvss3Type"] = self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].type" + ) + cve["cvss3Source"] = self.safe_get( + item, "cve.metrics.cvssMetricV30.[0].source" ) - cve["cvss3Type"] = item["cve"]["metrics"]["cvssMetricV30"][0]["type"] - cve["cvss3Source"] = item["cve"]["metrics"]["cvssMetricV30"][0][ - "source" - ] else: cve["cvss3"] = None - if "cvssMetricV2" in item["cve"]["metrics"]: - cve["access"]["authentication"] = item["cve"]["metrics"][ - "cvssMetricV2" - ][0]["cvssData"]["authentication"] - cve["access"]["complexity"] = item["cve"]["metrics"]["cvssMetricV2"][0][ - "cvssData" - ]["accessComplexity"] - cve["access"]["vector"] = item["cve"]["metrics"]["cvssMetricV2"][0][ - "cvssData" - ]["accessVector"] - cve["impact"]["availability"] = item["cve"]["metrics"]["cvssMetricV2"][ - 0 - ]["cvssData"]["availabilityImpact"] - cve["impact"]["confidentiality"] = item["cve"]["metrics"][ - "cvssMetricV2" - ][0]["cvssData"]["confidentialityImpact"] - cve["impact"]["integrity"] = item["cve"]["metrics"]["cvssMetricV2"][0][ - "cvssData" - ]["integrityImpact"] - cve["cvss"] = float( - item["cve"]["metrics"]["cvssMetricV2"][0]["cvssData"]["baseScore"] + if "cvssMetricV2" in self.safe_get(item, "cve.metrics"): + cve["access"]["authentication"] = self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].cvssData.authentication" + ) + cve["access"]["complexity"] = self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].cvssData.accessComplexity" + ) + cve["access"]["vector"] = self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].cvssData.accessVector" + ) + cve["impact"]["availability"] = self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].cvssData.availabilityImpact" + ) + cve["impact"]["confidentiality"] = self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].cvssData.confidentialityImpact" + ) + cve["impact"]["integrity"] = self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].cvssData.integrityImpact" + ) + cve["cvss"] = ( + float( + self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].cvssData.baseScore" + ) + ) + if self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].cvssData.baseScore" + ) + else None ) - cve["exploitabilityScore"] = float( - item["cve"]["metrics"]["cvssMetricV2"][0]["exploitabilityScore"] + cve["exploitabilityScore"] = ( + float( + self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].exploitabilityScore" + ) + ) + if self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].exploitabilityScore" + ) + else None ) - cve["impactScore"] = float( - item["cve"]["metrics"]["cvssMetricV2"][0]["impactScore"] + cve["impactScore"] = ( + float( + self.safe_get(item, "cve.metrics.cvssMetricV2.[0].impactScore") + ) + if self.safe_get(item, "cve.metrics.cvssMetricV2.[0].impactScore") + else None ) - cve["cvssTime"] = parse_datetime( - item["cve"]["lastModified"], ignoretz=True + cve["cvssTime"] = ( + parse_datetime( + self.safe_get(item, "cve.lastModified"), ignoretz=True + ) + if self.safe_get(item, "cve.lastModified") + else None ) # NVD JSON lacks the CVSS time which was present in the original XML format - cve["cvssVector"] = item["cve"]["metrics"]["cvssMetricV2"][0][ - "cvssData" - ]["vectorString"] - cve["cvssType"] = item["cve"]["metrics"]["cvssMetricV2"][0]["type"] - cve["cvssSource"] = item["cve"]["metrics"]["cvssMetricV2"][0]["source"] + cve["cvssVector"] = self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].cvssData.vectorString" + ) + cve["cvssType"] = self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].type" + ) + cve["cvssSource"] = self.safe_get( + item, "cve.metrics.cvssMetricV2.[0].source" + ) else: cve["cvss"] = None @@ -613,7 +707,7 @@ def process_the_item(self, item: dict = None): "cvssMetricV30", "cvssMetricV2", ]: - if version in item["cve"]["metrics"]: + if version in self.safe_get(item, "cve.metrics"): for metric in item["cve"]["metrics"][version]: cvss_key = ( "cvss4" @@ -624,96 +718,102 @@ def process_the_item(self, item: dict = None): else "cvss2" ) ) - source = metric["source"] + source = self.safe_get(metric, "source") entry = { - "type": metric["type"], - "vectorString": metric["cvssData"]["vectorString"], - "baseScore": metric["cvssData"]["baseScore"], + "type": self.safe_get(metric, "type"), + "vectorString": self.safe_get(metric, "cvssData.vectorString"), + "baseScore": self.safe_get(metric, "cvssData.baseScore"), } if cvss_key == "cvss4": entry.update( { - "vulnerable_system_confidentiality": metric[ - "cvssData" - ].get("vulnerableSystemConfidentiality"), - "vulnerable_system_integrity": metric["cvssData"].get( - "vulnerableSystemIntegrity" + "vulnerable_system_confidentiality": self.safe_get( + metric, "cvssData.vulnConfidentialityImpact" + ), + "vulnerable_system_integrity": self.safe_get( + metric, "cvssData.vulnIntegrityImpact" + ), + "vulnerable_system_availability": self.safe_get( + metric, "cvssData.vulnAvailabilityImpact" + ), + "subsequent_system_confidentiality": self.safe_get( + metric, "cvssData.subConfidentialityImpact" ), - "vulnerable_system_availability": metric[ - "cvssData" - ].get("vulnerableSystemAvailability"), - "subsequent_system_confidentiality": metric[ - "cvssData" - ].get("subsequentSystemConfidentiality"), - "subsequent_system_integrity": metric["cvssData"].get( - "subsequentSystemIntegrity" + "subsequent_system_integrity": self.safe_get( + metric, "cvssData.subIntegrityImpact" ), - "subsequent_system_availability": metric[ - "cvssData" - ].get("subsequentSystemAvailability"), - "attackVector": metric["cvssData"].get("attackVector"), - "attackComplexity": metric["cvssData"].get( - "attackComplexity" + "subsequent_system_availability": self.safe_get( + metric, "cvssData.subAvailabilityImpact" ), - "attackRequirements": metric["cvssData"].get( - "attackRequirements" + "attackVector": self.safe_get( + metric, "cvssData.attackVector" ), - "privilegesRequired": metric["cvssData"].get( - "privilegesRequired" + "attackComplexity": self.safe_get( + metric, "cvssData.attackComplexity" ), - "userInteraction": metric["cvssData"].get( - "userInteraction" + "attackRequirements": self.safe_get( + metric, "cvssData.attackRequirements" ), - "exploitMaturity": metric["cvssData"].get( - "exploitMaturity" + "privilegesRequired": self.safe_get( + metric, "cvssData.privilegesRequired" + ), + "userInteraction": self.safe_get( + metric, "cvssData.userInteraction" + ), + "exploitMaturity": self.safe_get( + metric, "cvssData.exploitMaturity" ), } ) elif cvss_key == "cvss3": entry.update( { - "confidentialityImpact": metric["cvssData"].get( - "confidentialityImpact" + "confidentialityImpact": self.safe_get( + metric, "cvssData.confidentialityImpact" ), - "integrityImpact": metric["cvssData"].get( - "integrityImpact" + "integrityImpact": self.safe_get( + metric, "cvssData.integrityImpact" ), - "availabilityImpact": metric["cvssData"].get( - "availabilityImpact" + "availabilityImpact": self.safe_get( + metric, "cvssData.availabilityImpact" ), - "attackVector": metric["cvssData"].get("attackVector"), - "attackComplexity": metric["cvssData"].get( - "attackComplexity" + "attackVector": self.safe_get( + metric, "cvssData.attackVector" ), - "privilegesRequired": metric["cvssData"].get( - "privilegesRequired" + "attackComplexity": self.safe_get( + metric, "cvssData.attackComplexity" ), - "userInteraction": metric["cvssData"].get( - "userInteraction" + "privilegesRequired": self.safe_get( + metric, "cvssData.privilegesRequired" ), - "scope": metric["cvssData"].get("scope"), + "userInteraction": self.safe_get( + metric, "cvssData.userInteraction" + ), + "scope": self.safe_get(metric, "cvssData.scope"), } ) elif cvss_key == "cvss2": entry.update( { - "authentication": metric["cvssData"].get( - "authentication" + "authentication": self.safe_get( + metric, "cvssData.authentication" + ), + "accessComplexity": self.safe_get( + metric, "cvssData.accessComplexity" ), - "accessComplexity": metric["cvssData"].get( - "accessComplexity" + "accessVector": self.safe_get( + metric, "cvssData.accessVector" ), - "accessVector": metric["cvssData"].get("accessVector"), - "confidentialityImpact": metric["cvssData"].get( - "confidentialityImpact" + "confidentialityImpact": self.safe_get( + metric, "cvssData.confidentialityImpact" ), - "integrityImpact": metric["cvssData"].get( - "integrityImpact" + "integrityImpact": self.safe_get( + metric, "cvssData.integrityImpact" ), - "availabilityImpact": metric["cvssData"].get( - "availabilityImpact" + "availabilityImpact": self.safe_get( + metric, "cvssData.availabilityImpact" ), } )