Skip to content

Commit 8482d82

Browse files
committed
pandas tests, cve_dset, cpe_dset unify json_path approach
1 parent 14c369b commit 8482d82

13 files changed

+163
-91
lines changed

notebooks/examples/common_criteria.ipynb

+1-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@
194194
"outputs": [],
195195
"source": [
196196
"# Automatically match CPEs and CVEs\n",
197-
"_, cpe_dset, _ = dset.compute_cpe_heuristics()\n",
197+
"dset.compute_cpe_heuristics()\n",
198198
"dset.compute_related_cves()"
199199
]
200200
},

sec_certs/dataset/common_criteria.py

+1
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ def from_web_latest(cls) -> "CCDataset":
188188
return cls.from_web(config.cc_latest_snapshot, "Downloading CC Dataset", "cc_latest_dataset.json")
189189

190190
def _set_local_paths(self):
191+
super()._set_local_paths()
191192
for cert in self:
192193
cert.set_local_paths(self.reports_pdf_dir, self.targets_pdf_dir, self.reports_txt_dir, self.targets_txt_dir)
193194

sec_certs/dataset/cpe.py

+44-39
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44
import tempfile
55
import xml.etree.ElementTree as ET
66
import zipfile
7-
from dataclasses import InitVar, dataclass, field
87
from pathlib import Path
9-
from typing import Any, ClassVar, Dict, Iterator, List, Set, Tuple, Union, cast
8+
from typing import Any, ClassVar, Dict, Iterator, List, Optional, Set, Tuple, Union, cast
109

1110
import pandas as pd
1211

@@ -19,29 +18,39 @@
1918
logger = logging.getLogger(__name__)
2019

2120

22-
@dataclass
2321
class CPEDataset(ComplexSerializableType):
2422
"""
2523
Dataset of CPE records. Includes look-up dictionaries for fast search.
2624
"""
2725

28-
was_enhanced_with_vuln_cpes: bool
29-
json_path: Path
30-
cpes: Dict[str, CPE]
31-
vendor_to_versions: Dict[str, Set[str]] = field(
32-
init=False, default_factory=dict
33-
) # Look-up dict cpe_vendor: list of viable versions
34-
vendor_version_to_cpe: Dict[Tuple[str, str], Set[CPE]] = field(
35-
init=False, default_factory=dict
36-
) # Look-up dict (cpe_vendor, cpe_version): List of viable cpe items
37-
title_to_cpes: Dict[str, Set[CPE]] = field(
38-
init=False, default_factory=dict
39-
) # Look-up dict title: List of cert items
40-
vendors: Set[str] = field(init=False, default_factory=set)
41-
42-
init_lookup_dicts: InitVar[bool] = True
43-
cpe_xml_basename: ClassVar[str] = "official-cpe-dictionary_v2.3.xml"
44-
cpe_url: ClassVar[str] = "https://nvd.nist.gov/feeds/xml/cpe/dictionary/" + cpe_xml_basename + ".zip"
26+
CPE_XML_BASENAME: ClassVar[str] = "official-cpe-dictionary_v2.3.xml"
27+
CPE_URL: ClassVar[str] = "https://nvd.nist.gov/feeds/xml/cpe/dictionary/" + CPE_XML_BASENAME + ".zip"
28+
29+
def __init__(
30+
self,
31+
was_enhanced_with_vuln_cpes: bool,
32+
cpes: Dict[str, CPE],
33+
json_path: Optional[Union[str, Path]] = None,
34+
):
35+
self.was_enhanced_with_vuln_cpes = was_enhanced_with_vuln_cpes
36+
self.cpes = cpes
37+
self._json_path = Path(json_path) if json_path else Path.cwd() / (type(self).__name__).lower()
38+
39+
self.vendor_to_versions: Dict[str, Set[str]] = dict()
40+
self.vendor_version_to_cpe: Dict[Tuple[str, str], Set[CPE]] = dict()
41+
self.title_to_cpes: Dict[str, Set[CPE]] = dict()
42+
self.vendors: Set[str] = set()
43+
44+
self.build_lookup_dicts()
45+
46+
@property
47+
def json_path(self) -> Path:
48+
return self._json_path
49+
50+
@json_path.setter
51+
def json_path(self, new_json_path: Union[str, Path]) -> None:
52+
self._json_path = Path(new_json_path)
53+
self.to_json()
4554

4655
def __iter__(self) -> Iterator[CPE]:
4756
yield from self.cpes.values()
@@ -65,11 +74,7 @@ def __eq__(self, other: object) -> bool:
6574

6675
@property
6776
def serialized_attributes(self) -> List[str]:
68-
return ["was_enhanced_with_vuln_cpes", "json_path", "cpes"]
69-
70-
def __post_init__(self, init_lookup_dicts: bool):
71-
if init_lookup_dicts:
72-
self.build_lookup_dicts()
77+
return ["was_enhanced_with_vuln_cpes", "cpes"]
7378

7479
def build_lookup_dicts(self) -> None:
7580
"""
@@ -94,28 +99,25 @@ def build_lookup_dicts(self) -> None:
9499
self.title_to_cpes[cpe.title].add(cpe)
95100

96101
@classmethod
97-
def from_web(cls, json_path: Union[str, Path], init_lookup_dicts: bool = True) -> "CPEDataset":
102+
def from_web(cls, json_path: Optional[Union[str, Path]] = None) -> "CPEDataset":
98103
"""
99104
Creates CPEDataset from NIST resources published on-line
100105
101106
:param Union[str, Path] json_path: Path to store the dataset to
102-
:param bool init_lookup_dicts: If dictionaries for fast matching should be computed, defaults to True
103107
:return CPEDataset: The resulting dataset
104108
"""
105109
with tempfile.TemporaryDirectory() as tmp_dir:
106-
xml_path = Path(tmp_dir) / cls.cpe_xml_basename
107-
zip_path = Path(tmp_dir) / (cls.cpe_xml_basename + ".zip")
108-
helpers.download_file(cls.cpe_url, zip_path)
110+
xml_path = Path(tmp_dir) / cls.CPE_XML_BASENAME
111+
zip_path = Path(tmp_dir) / (cls.CPE_XML_BASENAME + ".zip")
112+
helpers.download_file(cls.CPE_URL, zip_path)
109113

110114
with zipfile.ZipFile(zip_path, "r") as zip_ref:
111115
zip_ref.extractall(tmp_dir)
112116

113-
return cls._from_xml(xml_path, json_path, init_lookup_dicts)
117+
return cls._from_xml(xml_path, json_path)
114118

115119
@classmethod
116-
def _from_xml(
117-
cls, xml_path: Union[str, Path], json_path: Union[str, Path], init_lookup_dicts: bool = True
118-
) -> "CPEDataset":
120+
def _from_xml(cls, xml_path: Union[str, Path], json_path: Optional[Union[str, Path]] = None) -> "CPEDataset":
119121
logger.info("Loading CPE dataset from XML.")
120122
root = ET.parse(xml_path).getroot()
121123
dct = {}
@@ -136,7 +138,7 @@ def _from_xml(
136138

137139
dct[cpe_uri] = cached_cpe(cpe_uri, title)
138140

139-
return cls(False, Path(json_path), dct, init_lookup_dicts)
141+
return cls(False, dct, json_path)
140142

141143
@classmethod
142144
def from_json(cls, input_path: Union[str, Path]) -> "CPEDataset":
@@ -147,19 +149,22 @@ def from_json(cls, input_path: Union[str, Path]) -> "CPEDataset":
147149
:return CPEDataset: the resulting dataset.
148150
"""
149151
dset = cast("CPEDataset", ComplexSerializableType.from_json(input_path))
150-
dset.json_path = Path(input_path)
152+
dset._json_path = Path(input_path)
151153
return dset
152154

153155
@classmethod
154-
def from_dict(cls, dct: Dict[str, Any], init_lookup_dicts: bool = True) -> "CPEDataset":
156+
def from_dict(cls, dct: Dict[str, Any]) -> "CPEDataset":
155157
"""
156158
Loads dataset from dictionary.
157159
158160
:param Dict[str, Any] dct: Dictionary that holds the dataset
159-
:param bool init_lookup_dicts: Whether look-up dicts should be computed as a part of initialization, defaults to True
160161
:return CPEDataset: the resulting dataset.
161162
"""
162-
return cls(dct["was_enhanced_with_vuln_cpes"], Path("../"), dct["cpes"], init_lookup_dicts)
163+
return cls(
164+
dct["was_enhanced_with_vuln_cpes"],
165+
dct["cpes"],
166+
Path("../"),
167+
)
163168

164169
def to_pandas(self) -> pd.DataFrame:
165170
"""

sec_certs/dataset/cve.py

+29-21
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
import shutil
77
import tempfile
88
import zipfile
9-
from dataclasses import dataclass, field
109
from pathlib import Path
11-
from typing import Dict, Final, List, Optional, Set, Union
10+
from typing import ClassVar, Dict, List, Optional, Set, Union
1211

1312
import numpy as np
1413
import pandas as pd
@@ -18,18 +17,29 @@
1817
from sec_certs.config.configuration import config
1918
from sec_certs.sample.cpe import CPE, cached_cpe
2019
from sec_certs.sample.cve import CVE
21-
from sec_certs.serialization.json import ComplexSerializableType, CustomJSONDecoder, CustomJSONEncoder
20+
from sec_certs.serialization.json import ComplexSerializableType, CustomJSONDecoder
2221
from sec_certs.utils.parallel_processing import process_parallel
2322

2423
logger = logging.getLogger(__name__)
2524

2625

27-
@dataclass
2826
class CVEDataset(ComplexSerializableType):
29-
cves: Dict[str, CVE]
30-
cpe_to_cve_ids_lookup: Dict[str, Set[str]] = field(init=False)
31-
cve_url: Final[str] = "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-"
32-
cpe_match_feed_url: Final[str] = "https://nvd.nist.gov/feeds/json/cpematch/1.0/nvdcpematch-1.0.json.zip"
27+
CVE_URL: ClassVar[str] = "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-"
28+
CPE_MATCH_FEED_URL: ClassVar[str] = "https://nvd.nist.gov/feeds/json/cpematch/1.0/nvdcpematch-1.0.json.zip"
29+
30+
def __init__(self, cves: Dict[str, CVE], json_path: Optional[Union[str, Path]] = None):
31+
self.cves = cves
32+
self._json_path = Path(json_path) if json_path else Path.cwd() / (type(self).__name__).lower()
33+
self.cpe_to_cve_ids_lookup: Dict[str, Set[str]] = dict()
34+
35+
@property
36+
def json_path(self) -> Path:
37+
return self._json_path
38+
39+
@json_path.setter
40+
def json_path(self, new_json_path: Union[str, Path]) -> None:
41+
self._json_path = Path(new_json_path)
42+
self.to_json()
3343

3444
@property
3545
def serialized_attributes(self) -> List[str]:
@@ -89,7 +99,7 @@ def download_cves(cls, output_path_str: str, start_year: int, end_year: int):
8999
if not output_path.exists:
90100
output_path.mkdir()
91101

92-
urls = [cls.cve_url + str(x) + ".json.zip" for x in range(start_year, end_year + 1)]
102+
urls = [cls.CVE_URL + str(x) + ".json.zip" for x in range(start_year, end_year + 1)]
93103

94104
logger.info(f"Identified {len(urls)} CVE files to fetch from nist.gov. Downloading them into {output_path}")
95105
with tempfile.TemporaryDirectory() as tmp_dir:
@@ -113,7 +123,12 @@ def from_nist_json(cls, input_path: str) -> "CVEDataset":
113123
return cls({x.cve_id: x for x in cves})
114124

115125
@classmethod
116-
def from_web(cls, start_year: int = 2002, end_year: int = datetime.datetime.now().year):
126+
def from_web(
127+
cls,
128+
start_year: int = 2002,
129+
end_year: int = datetime.datetime.now().year,
130+
json_path: Optional[Union[str, Path]] = None,
131+
):
117132
logger.info("Building CVE dataset from nist.gov website.")
118133
with tempfile.TemporaryDirectory() as tmp_dir:
119134
cls.download_cves(tmp_dir, start_year, end_year)
@@ -131,20 +146,13 @@ def from_web(cls, start_year: int = 2002, end_year: int = datetime.datetime.now(
131146
for r in results:
132147
all_cves.update(r.cves)
133148

134-
return cls(all_cves)
135-
136-
def to_json(self, output_path: Optional[Union[str, Path]] = None):
137-
if output_path is None:
138-
raise RuntimeError(
139-
f"You tried to serialize an object ({type(self)}) that does not have implicit json path. Please provide json_path."
140-
)
141-
with Path(output_path).open("w") as handle:
142-
json.dump(self, handle, indent=4, cls=CustomJSONEncoder, ensure_ascii=False)
149+
return cls(all_cves, json_path=json_path)
143150

144151
@classmethod
145152
def from_json(cls, input_path: Union[str, Path]):
146153
with Path(input_path).open("r") as handle:
147154
dset = json.load(handle, cls=CustomJSONDecoder)
155+
dset._json_path = input_path
148156
return dset
149157

150158
def get_cve_ids_for_cpe_uri(self, cpe_uri: str) -> Optional[Set[str]]:
@@ -204,10 +212,10 @@ def parse_values_cpe(field: Dict) -> List[CPE]:
204212
if not input_filepath or not input_filepath.is_file():
205213
logger.debug("NIST mapping file not available, going to download.")
206214
with tempfile.TemporaryDirectory() as tmp_dir:
207-
filename = Path(self.cpe_match_feed_url).name
215+
filename = Path(self.CPE_MATCH_FEED_URL).name
208216
download_path = Path(tmp_dir) / filename
209217
unzipped_path = Path(tmp_dir) / filename.rstrip(".zip")
210-
helpers.download_file(self.cpe_match_feed_url, download_path)
218+
helpers.download_file(self.CPE_MATCH_FEED_URL, download_path)
211219

212220
with zipfile.ZipFile(download_path, "r") as zip_handle:
213221
zip_handle.extractall(tmp_dir)

sec_certs/dataset/dataset.py

+12-9
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,10 @@ def from_json(cls: Type[DatasetSubType], input_path: Union[str, Path]) -> Datase
182182
return dset
183183

184184
def _set_local_paths(self) -> None:
185-
raise NotImplementedError("Not meant to be implemented by the base class.")
185+
if self.auxillary_datasets.cpe_dset:
186+
self.auxillary_datasets.cpe_dset.json_path = self.cpe_dataset_path
187+
if self.auxillary_datasets.cve_dset:
188+
self.auxillary_datasets.cve_dset.json_path = self.cve_dataset_path
186189

187190
# Workaround from https://peps.python.org/pep-0673/ applied.
188191
def _copy_dataset_contents(self: DatasetSubType, old_dset: DatasetSubType) -> None:
@@ -246,14 +249,14 @@ def _download_parallel(urls: Collection[str], paths: Collection[Path], prune_cor
246249
logger.error(f"Corrupted file at: {p}")
247250
p.unlink()
248251

249-
def _prepare_cpe_dataset(self, download_fresh_cpes: bool = False, init_lookup_dicts: bool = True) -> CPEDataset:
252+
def _prepare_cpe_dataset(self, download_fresh_cpes: bool = False) -> CPEDataset:
250253
logger.info("Preparing CPE dataset.")
251254
if not self.auxillary_datasets_dir.exists():
252255
self.auxillary_datasets_dir.mkdir(parents=True)
253256

254257
if not self.cpe_dataset_path.exists() or download_fresh_cpes is True:
255-
cpe_dataset = CPEDataset.from_web(self.cpe_dataset_path, init_lookup_dicts)
256-
cpe_dataset.to_json(str(self.cpe_dataset_path))
258+
cpe_dataset = CPEDataset.from_web(self.cpe_dataset_path)
259+
cpe_dataset.to_json()
257260
else:
258261
cpe_dataset = CPEDataset.from_json(str(self.cpe_dataset_path))
259262

@@ -267,10 +270,10 @@ def _prepare_cve_dataset(
267270
self.auxillary_datasets_dir.mkdir(parents=True)
268271

269272
if not self.cve_dataset_path.exists() or download_fresh_cves is True:
270-
cve_dataset = CVEDataset.from_web()
271-
cve_dataset.to_json(str(self.cve_dataset_path))
273+
cve_dataset = CVEDataset.from_web(json_path=self.cve_dataset_path)
274+
cve_dataset.to_json()
272275
else:
273-
cve_dataset = CVEDataset.from_json(str(self.cve_dataset_path))
276+
cve_dataset = CVEDataset.from_json(self.cve_dataset_path)
274277

275278
cve_dataset.build_lookup_dict(use_nist_cpe_matching_dict, self.nist_cve_cpe_matching_dset_path)
276279
return cve_dataset
@@ -307,8 +310,8 @@ def filter_condition(cpe: CPE) -> bool:
307310
return True
308311

309312
logger.info("Computing heuristics: Finding CPE matches for certificates")
310-
self.auxillary_datasets.cpe_dset = self._prepare_cpe_dataset(download_fresh_cpes, init_lookup_dicts=False)
311-
self.auxillary_datasets.cpe_dset.build_lookup_dicts()
313+
if not self.auxillary_datasets.cpe_dset or download_fresh_cpes:
314+
self.auxillary_datasets.cpe_dset = self._prepare_cpe_dataset(download_fresh_cpes)
312315

313316
# Temporarily disabled, see: https://github.com/crocs-muni/sec-certs/issues/173
314317
# if not cpe_dset.was_enhanced_with_vuln_cpes:

sec_certs/sample/cpe.py

-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ class CPE(PandasSerializableType, ComplexSerializableType):
2626
"item_name",
2727
"version",
2828
"title",
29-
"start_version",
30-
"end_version",
3129
]
3230

3331
def __init__(

tests/cc/test_cc_analysis.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def cpes(cpe_single_sign_on: CPE) -> Set[CPE]:
5050

5151
@pytest.fixture(scope="module")
5252
def cpe_dset(cpes: Set[CPE]) -> CPEDataset:
53-
return CPEDataset(True, Path("../"), {x.uri: x for x in cpes})
53+
return CPEDataset(False, {x.uri: x for x in cpes}, Path("../"))
5454

5555

5656
@pytest.fixture(scope="module")

tests/cc/test_cc_dataset.py

+7
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,10 @@ def test_download_csv_html_files():
164164
for x in dset.active_csv_tuples:
165165
assert x[1].exists()
166166
assert x[1].stat().st_size >= constants.MIN_CC_CSV_SIZE
167+
168+
169+
def test_to_pandas(toy_dataset: CCDataset):
170+
df = toy_dataset.to_pandas()
171+
assert df.shape == (len(toy_dataset), len(CommonCriteriaCert.pandas_columns))
172+
assert df.index.name == "dgst"
173+
assert set(df.columns) == (set(CommonCriteriaCert.pandas_columns).union({"year_from"})) - {"dgst"}
+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
def test_methods_not_meant_to_be_implemented():
2+
pass
3+
4+
5+
def test_download_artifacts():
6+
pass
7+
8+
9+
def test_convert_artifacts():
10+
pass
11+
12+
13+
def test_extract_data():
14+
pass
15+
16+
17+
def test_to_json():
18+
pass
19+
20+
21+
def test_from_json():
22+
pass
23+
24+
25+
def test_to_pandas():
26+
pass
27+
28+
29+
def test_from_web():
30+
pass

0 commit comments

Comments
 (0)