-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtraverser.py
92 lines (81 loc) · 4.01 KB
/
traverser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python
import logging
logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
logger.setLevel(logging.INFO)
class JsonTraverse:
"""
Traverse a json dict.
Usage:
>> json_dict = json.load(json_file)
>> traverser = JsonTraverse(data)
value = traverser(['interface_speed', 'current', 'string'], default="UNKNOWN", starting_default_index=2)
"""
def __init__(self, json_dict):
self.json_dict = json_dict
def __call__(self, nested_keys, default, attribute=None, starting_default_index=None):
return self._inner_deep_dict_traverse(collection=self.json_dict, nested_keys=nested_keys,
attribute=attribute, default=default,
starting_default_index=starting_default_index,
depth=0)
@staticmethod
def _inner_deep_dict_traverse(collection, nested_keys, default, attribute, starting_default_index, depth):
starting_default_index = starting_default_index if starting_default_index is not None else len(nested_keys)
for idx, cur_key in enumerate(nested_keys):
if isinstance(collection, dict):
# go deep
depth += 1
try:
collection = collection[cur_key]
except KeyError as e:
# Key is not found in dict: wrong path
if default and idx >= starting_default_index:
return default
# No default or index is below starting default index
JsonTraverse._raise_exception(nested_keys, attribute, e)
elif isinstance(collection, list):
# go wide
for item_list in collection:
if not isinstance(item_list, (dict, list)):
# Ignore all non dict\list
continue
try:
# recursion with dict\list and the next nested_keys
# fix starting default index to the new structure
collection = JsonTraverse._inner_deep_dict_traverse(
collection=item_list,
nested_keys=[cur_key] + nested_keys[idx + 1:],
attribute=attribute,
default=default,
starting_default_index=starting_default_index - depth,
depth=depth
)
return collection
except KeyError as e:
# Key was not found in list - continue with next list item
continue
else:
# No list item fit nested keys: wrong path
JsonTraverse._raise_exception(nested_keys, attribute)
else:
raise AssertionError(f"Type not supported (yet): {type(collection)}")
# Finished looking - return the last collection or value (str, int, float, etc)
return JsonTraverse._check_attribute(collection, attribute, nested_keys, default)
@staticmethod
def _check_attribute(collection, attribute, nested_keys, default):
if not attribute:
return collection
try:
return next(col for key, value in attribute.items() for col in collection if col[key] == value)
except StopIteration as e:
if not isinstance(default, Exception):
logger.info(f"{nested_keys} was not found, using default={default}")
return default
raise JsonTraverse._raise_exception(nested_keys, attribute, e)
@staticmethod
def _raise_exception(path, attribute, e=None):
# do something with exception
output = '->'.join(path)
if attribute:
output += f" [{attribute}]"
raise KeyError(f"Nested path was not found in json: {output}")