@@ -287,6 +287,87 @@ def produce_from_str(self, json_str: str, output_file: str = None) -> bool:
287
287
return False
288
288
return self .produce_from_json (data , output_file )
289
289
290
+ def _normalize_vulnerability_id (self , vuln : dict ) -> tuple [str , str ]:
291
+ """
292
+ Normalize vulnerability ID and CVE from different possible field names.
293
+ Returns tuple of (vuln_id, vuln_cve).
294
+ """
295
+ vuln_id = vuln .get ('ID' , '' ) or vuln .get ('id' , '' )
296
+ vuln_cve = vuln .get ('CVE' , '' ) or vuln .get ('cve' , '' )
297
+
298
+ # Skip CPE entries, use CVE if available
299
+ if vuln_id .upper ().startswith ('CPE:' ) and vuln_cve :
300
+ vuln_id = vuln_cve
301
+
302
+ return vuln_id , vuln_cve
303
+
304
+ def _create_vulnerability_entry (self , vuln_id : str , vuln : dict , vuln_cve : str , purl : str ) -> dict :
305
+ """
306
+ Create a new vulnerability entry for CycloneDX format.
307
+ """
308
+ vuln_source = vuln .get ('source' , '' ).lower ()
309
+ return {
310
+ 'id' : vuln_id ,
311
+ 'source' : {
312
+ 'name' : 'NVD' if vuln_source == 'nvd' else 'GitHub Advisories' ,
313
+ 'url' : f'https://nvd.nist.gov/vuln/detail/{ vuln_cve } '
314
+ if vuln_source == 'nvd'
315
+ else f'https://github.com/advisories/{ vuln_id } '
316
+ },
317
+ 'ratings' : [{'severity' : self ._sev_lookup (vuln .get ('severity' , 'unknown' ).lower ())}],
318
+ 'affects' : [{'ref' : purl }]
319
+ }
320
+
321
+ def append_vulnerabilities (self , cdx_dict : dict , vulnerabilities_data : dict , purl : str ) -> dict :
322
+ """
323
+ Append vulnerabilities to an existing CycloneDX dictionary
324
+
325
+ Args:
326
+ cdx_dict (dict): The existing CycloneDX dictionary
327
+ vulnerabilities_data (dict): The vulnerabilities data from get_vulnerabilities_json
328
+ purl (str): The PURL of the component these vulnerabilities affect
329
+
330
+ Returns:
331
+ dict: The updated CycloneDX dictionary with vulnerabilities appended
332
+ """
333
+ if not cdx_dict or not vulnerabilities_data :
334
+ return cdx_dict
335
+
336
+ if 'vulnerabilities' not in cdx_dict :
337
+ cdx_dict ['vulnerabilities' ] = []
338
+
339
+ # Extract vulnerabilities from the response
340
+ vulns_list = vulnerabilities_data .get ('purls' , [])
341
+ if not vulns_list :
342
+ return cdx_dict
343
+
344
+ vuln_items = vulns_list [0 ].get ('vulnerabilities' , [])
345
+
346
+ for vuln in vuln_items :
347
+ vuln_id , vuln_cve = self ._normalize_vulnerability_id (vuln )
348
+
349
+ # Skip empty IDs or CPE-only entries
350
+ if not vuln_id or vuln_id .upper ().startswith ('CPE:' ):
351
+ continue
352
+
353
+ # Check if vulnerability already exists
354
+ existing_vuln = next (
355
+ (v for v in cdx_dict ['vulnerabilities' ] if v .get ('id' ) == vuln_id ),
356
+ None
357
+ )
358
+
359
+ if existing_vuln :
360
+ # Add this PURL to the affects list if not already present
361
+ if not any (ref .get ('ref' ) == purl for ref in existing_vuln .get ('affects' , [])):
362
+ existing_vuln ['affects' ].append ({'ref' : purl })
363
+ else :
364
+ # Create new vulnerability entry
365
+ cdx_dict ['vulnerabilities' ].append (
366
+ self ._create_vulnerability_entry (vuln_id , vuln , vuln_cve , purl )
367
+ )
368
+
369
+ return cdx_dict
370
+
290
371
@staticmethod
291
372
def _sev_lookup (value : str ):
292
373
"""
0 commit comments