Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NVD OS vs App Tagging using Custom Field #198

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions Export_Check_NVD.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
import requests
import csv
import time
import json
import gzip
import io
from collections import defaultdict
import sys

token_variable = 'API_KEY' # replace with your actual token

def request_data_export(token_variable):
url = "https://api.kennasecurity.com/data_exports"
headers = {
'X-Risk-Token': token_variable,
'accept': 'application/json',
'content-type': 'application/json'
}
data = {
"export_settings": {
"format": "json",
"model": "vulnerability",
"slim": False,
"fields": [
"cve_id"
]
},
"status": [
"open",
"risk accepted",
"false positive"
]
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
return response.json()['search_id']
else:
print(f"Failed to send POST request. Status Code: {response.status_code}. Response Text: {response.text}")
return None

def wait_for_data_export(search_id, token_variable, max_wait_time=1200, sleep_time=10):
start_time = time.time()
status_url = f"https://api.kennasecurity.com/data_exports/status?search_id={search_id}"
headers = {
'X-Risk-Token': token_variable,
'accept': 'application/json'
}
while True:
status_response = requests.get(status_url, headers=headers)
if status_response.status_code == 200 and status_response.json().get('message') == "Export ready for download":
url = f"https://api.kennasecurity.com/data_exports?search_id={search_id}"
headers = {
'X-Risk-Token': token_variable,
'accept': 'application/gzip'
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
decompressed_file = gzip.GzipFile(fileobj=io.BytesIO(response.content))
data = json.load(decompressed_file)
return data
else:
print(f"Failed to fetch data. Status Code: {response.status_code}. Response Text: {response.text}")
return None
elif time.time() - start_time > max_wait_time:
print(f"Timed out after waiting for {max_wait_time} seconds.")
return None
else:
print(f"Data export is still in progress. Waiting for {sleep_time} seconds before trying again.")
time.sleep(sleep_time)

custom_field_id = 4 # replace with the custom field from your environmnet
def send_bulk_updates(ids, app_or_os, custom_field_id, token_variable):
url = "https://api.kennasecurity.com/vulnerabilities/bulk"
headers = {
'X-Risk-Token': token_variable,
'accept': 'application/json',
'content-type': 'application/json'
}
payload = {
"vulnerability": {
"custom_fields": {
str(custom_field_id): app_or_os
}
},
"vulnerability_ids": ids
}
response = requests.put(url, headers=headers, json=payload)
if response.status_code == 200:
print(f"POST request successfully for IDs: {ids}")
else:
print(f"Failed to send POST request for IDs: {ids}. Response Status Code: {response.status_code}. Response Text: {response.text}")

# Request data export and get the search_id
search_id = request_data_export(token_variable)

if search_id:
# Use the search_id to get the data export
vulns_data = wait_for_data_export(search_id, token_variable)
if vulns_data is None:
print("Failed to fetch data export.")
sys.exit(1)


if vulns_data:
vulns_cves = set(vuln['cve_id'] for vuln in vulns_data['vulnerabilities'] if vuln['cve_id'].startswith('CVE-'))

# Initialize a dictionary to count the number of CVEs for each type
type_counts = {'Application': 0, 'OS': 0, 'Hardware': 0, 'Network': 0}

# Initialize a dictionary to count the number of unique IDs for each type
id_counts = defaultdict(set)

# Create a dictionary where each CVE ID maps to a list of IDs
vulns_id = defaultdict(list)
for vuln in vulns_data['vulnerabilities']:
if isinstance(vuln, dict) and isinstance(vuln.get('cve_id'), str) and vuln['cve_id'].startswith('CVE-'):
vulns_id[vuln['cve_id']].append(vuln['id'])

# Map of product types
product_type_map = {'a': 'Application', 'o': 'OS', 'h': 'Hardware', 'n': 'Network'}

url = 'https://services.nvd.nist.gov/rest/json/cves/2.0/'
parameters = {'resultsPerPage': 2000, 'startIndex': 0}

# Get the start time
start_time = time.time()

# Create an empty dictionary to hold CVE data
cve_data = {}
fetch_failed = False # Initialize fetch_failed

while True:
response = requests.get(url, params=parameters)

if response.status_code == 200:
data = response.json()

for cve in data['vulnerabilities']:
cve_id = cve['cve']['id']
# Check if CVE is in both NVD_API code and export_vulns_cve.json file
if cve_id in vulns_cves:
# Check if 'configurations' field is present
if 'configurations' in cve['cve']:
types = [] # Create a list to hold the product types
for config in cve['cve']['configurations']:
for node in config['nodes']:
if 'cpeMatch' in node:
for match in node['cpeMatch']:
product_type = product_type_map.get(match['criteria'].split(':')[2], '')
if product_type and product_type not in types:
types.append(product_type)

# Check for the specific combinations and update types accordingly
if ('Application' in types and 'Hardware' in types) or \
('OS' in types and 'Hardware' in types) or \
('OS' in types and 'Network' in types) or \
('Application' in types and 'Network' in types):
types = [t for t in types if t not in ['Hardware', 'Network']]

if 'Application' in types and 'OS' in types:
types = [types[0]] # Keep only the first product type

# Initialize the dictionary for this cve_id if it doesn't exist
if cve_id not in cve_data:
cve_data[cve_id] = {'CVE ID': cve_id, 'Type': [], 'id': set()}
# Add the type and id to the dictionary for this cve_id
cve_data[cve_id]['Type'].extend(types)
cve_data[cve_id]['id'].update(vulns_id[cve_id])
else:
print('Failed to fetch data from API')
print('Status code:', response.status_code)
print('Response text:', response.text)
fetch_failed = True # Set fetch_failed to True if response status is not 200

if parameters['startIndex'] + parameters['resultsPerPage'] < data['totalResults']:
parameters['startIndex'] += parameters['resultsPerPage']
# Print out progress update
progress = (parameters['startIndex'] / data['totalResults']) * 100
print(f'Progress: {round(progress, 1)}%')
else:
break

# Add a delay between each request
time.sleep(6)

if fetch_failed:
print('Exiting due to API fetch failure')

# Open the CSV file
with open('output_nvd_cve_vuln_ids.csv', 'w', newline='') as csvfile:
fieldnames = ['CVE ID', 'Type', 'id']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()

# Write the data to the CSV
for cve_id in cve_data:
writer.writerow({
'CVE ID': cve_id,
'Type': ', '.join(cve_data[cve_id]['Type']),
'id': ', '.join(map(str, cve_data[cve_id]['id']))
})
# Increment the count for each type in this row
for product_type in cve_data[cve_id]['Type']:
type_counts[product_type] += 1
id_counts[product_type].update(cve_data[cve_id]['id'])

# Calculate and print the time taken
end_time = time.time()
time_taken = end_time - start_time
hours, remainder = divmod(time_taken, 3600)
minutes, seconds = divmod(remainder, 60)
print(f'Time taken: {int(hours)} hours, {int(minutes)} minutes, {round(seconds, 1)} seconds')

# Print the number of CVEs and IDs for each type
for type_name in type_counts.keys():
print(f'{type_name}: {type_counts[type_name]} CVEs, {len(id_counts[type_name])} IDs')

# Create sets for app, os, and hardware
app_set = set()
os_set = set()
hardware_set = set()
network_set = set()

# Go through the cve_data and add IDs to the appropriate sets
for cve_id in cve_data:
if 'Application' in cve_data[cve_id]['Type']:
app_set.update(cve_data[cve_id]['id'])
if 'OS' in cve_data[cve_id]['Type']:
os_set.update(cve_data[cve_id]['id'])
if 'Hardware' in cve_data[cve_id]['Type']:
hardware_set.update(cve_data[cve_id]['id'])
if 'Network' in cve_data[cve_id]['Type']:
network_set.update(cve_data[cve_id]['id'])

# Print the application, os & hardware set results
#print('Application set:')
#print(app_set)
#print('\nOS set:')
#print(os_set)
#print('\nHardware set:')
#print(hardware_set)


# Set your threshold number for grouping IDs
thresh_num = 25000 # Threshold for how many IDs you want to send in each request. Max possible is 30k as per API docs

# The sets with the IDs
sets = {'application': list(app_set), 'os': list(os_set), 'hardware': list(hardware_set), 'network': list(network_set)}

for set_type, ids in sets.items():
while len(ids) > 0:
batch = ids[:thresh_num]
custom_field_value = set_type
send_bulk_updates(batch, custom_field_value, custom_field_id, token_variable)
time.sleep(0.2) # Add a delay of 0.2 seconds between each request
ids = ids[thresh_num:]

41 changes: 41 additions & 0 deletions README
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Create CSV with CVEs, their Type (OS, Application, Hardware or Network based on NVD) and Vulnerability IDs from teh customer's environment and also tag the custom field with the type

## Introduction
Our customers wanted the ability to identify & filter vulnerabilities by their classification like OS & Application.
Full working process on how this can be done using the custom field option can be found documented in the attached Vulnerability Type Use case Word Document.

This script handles the step#2 listed on the document, where a CSV is required as an input with the CVEs and their classification.
NVD database was referenced to get this information by following the steps below:

1. Access the CVE API of NVD to get the details - https://nvd.nist.gov/developers/vulnerabilities
2. For each CVE entry, check the 'criteria' field in the 'configurations' section. This field contains a URI that identifies the affected product and version. (explained below in CPE section)
3. The 'criteria' URI is composed of several components, each separated by a colon. The second component indicates the product type - an application ('a') or an operating system ('o').
4. By examining this component, you can categorize the CVE either as application-related or OS-related.

Types of product type:
1. a: (Application): This is used to denote that the component is an application. An example would be cpe:2.3:a:microsoft:internet_explorer:8.0.7600.16385:*:*:*:*:*:*:*.
2. o: (Operating System): This is used to denote that the component is an operating system. An example would be cpe:2.3:o:microsoft:windows_7:-:*:*:*:*:*:*:*.
3. h: (Hardware): This is used to denote that the component is a piece of hardware. An example would be cpe:2.3:h:dell:poweredge_2950:-:*:*:*:*:*:*:*.
4. n: (Network): This is used to denote that the component is network. An example would be cpe:2.3:n:tls:example_tls:-:*:*:*:*:*:*:*.


## Usage
python Export_Check_NVD.py


## Updates/Edits needed to execute the script

### 1: Update the URL in the script to use your endpoint w.r.t customer's environment. By default, https://api.kennasecurity.com/ is being used.

### 2: Update the *token_variable = 'API_KEY'* with the API KEY from customer's environment

### 3: Update *custom_field_id = 4* with the custom field id from your customer's environment

### 4: By default the script waits for maximum time of 20 minutes to get the export from the customer's environment, in case your export is big and needs more time,
please update the *max_wait_time=1200* parameter (in seconds) to accomodate your export.
Note: The scipt was tested with 1200 seconds (20 minutes) with record count of ~8M and it executed successfully.

## Requirements
* python
* json
* csv
Binary file added Vulnerability Typing Use Case.docx
Binary file not shown.
Loading