forked from TrustAI/DeepConcolic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathengine.py
1534 lines (1175 loc) · 45.5 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from utils_io import *
from utils_funcs import *
from utils import *
from functools import reduce
from sklearn.model_selection import train_test_split
import yaml # for dumping record
import hashlib # for hashing test inputs
# ---
# Define an alias type for inputs
Input = NewType("Input", np.ndarray)
class InputsDict (NPArrayDict):
pass
# ---
class _InputsStatBasedInitializable:
@abstractmethod
def inputs_stat_initialize (self,
train_data: raw_datat = None,
test_data: raw_datat = None) -> None:
print (self)
raise NotImplementedError
# ---
class _ActivationStatBasedInitializable:
def stat_based_basic_initializers(self):
"""
Stat-based initialization steps (non-batched).
Returns a list of dictionaries (or `None`) with the following
entries:
- name: short description of what's computed;
- layer_indexes: a list or set of indexes for layers whose
activations values are needed;
- once: a callable taking a mapping (as a dictionary) from each
layer index given in `layer_indexes` to activation values for
the corresponding layer; this is to be called only once during
initialization of the analyzer;
- print (optional): a function that prints a summary of results.
"""
return []
def stat_based_incremental_initializers(self):
"""
Stat-based incremental initialization steps.
Returns a list of dictionaries (or `None`) with the following
entries:
- name: short description of what's computed;
- layer_indexes: a list or set of indexes for layers whose
activations values are needed;
- accum: a callable taking batched activation values for every
layer and any accumulator that is (initially `None`), and
returns a new or updated accumulator. This is called at least
once.
- final: optional function that is called with the final
accumulator once all batched activations have been passed to
`accum`;
- print (optional): a function that prints a summary of results.
"""
return []
def stat_based_train_cv_initializers(self):
"""
Stat-based initialization steps with optional cross-validation
performed on training data.
Returns a list of dictionaries (or `None`) with the following
entries:
- name: short description of what's computed;
- layer_indexes: a list or set of indexes for layers whose
activations values are needed;
- test_size & train_size: (as in
`sklearn.model_selection.train_test_split`)
- train: a callable taking some training data as mapping (as a
dictionary) from each layer index given in `layer_indexes` to
activation values for the corresponding layer, and two keyword
arguments `true_labels` and `pred_labels` that hold the
corresponding true and predicted labels. Returns some arbitrary
object, and is to be called only once during initialization;
- test: a callable taking some extra training data and associated
labels as two separate mappings (as a dictionary) from each
layer index given in `layer_indexes` to activation values for
the corresponding layer.
Any function given as entries above is always called before
functions returned by `stat_based_test_cv_initializers`, but after
those retured by `stat_based_basic_initializers` and
`stat_based_incremental_initializers`.
"""
return []
def stat_based_test_cv_initializers(self):
"""
Stat-based initialization steps with optional cross-validation
performed on test data.
Returns a list of dictionaries (or `None`) with the following
entries:
- name: short description of what's computed;
- layer_indexes: a list or set of indexes for layers whose
activations values are needed;
- test_size & train_size: (as in
`sklearn.model_selection.train_test_split`)
- train: a callable taking some test data as mapping (as a
dictionary) from each layer index given in `layer_indexes` to
activation values for the corresponding layer, and two keyword
arguments `true_labels` and `pred_labels` that hold the
corresponding true and predicted labels. Returns some arbitrary
object, and is to be called only once during initialization;
- test: a callable taking some extra test data and associated
labels as two separate mappings (as a dictionary) from each
layer index given in `layer_indexes` to activation values for
the corresponding layer.
Any function given as entries above is always called last.
"""
# - accum_test: a callable that is called with the object returned
# by `train`, along with batched activation values for every layer
# on the test data, and returns a new or updated accumulator.
# This is called at least once.
#
# - final_test: optional function that is called with the final test
# accumulator once all batched test activations have been passed
# to `accum_test`.
return []
# ---
class StaticFilter:
'''
A static filter can be used to compare any concrete input against a
pre-computed dataset.
'''
@abstractmethod
def close_enough(self, x: Input) -> bool:
raise NotImplementedError
# ---
class DynamicFilter:
'''
A dynamic filter can be used to compare any concrete input against a
given reference set.
'''
@abstractmethod
def close_to(self, refs: Sequence[Input], x: Input) -> bool:
raise NotImplementedError
# ---
class Bounds:
"""
Basic abstract class to represent any bounds. (Mostly for typing
arguments and sub-classing.)
"""
@property
def low (self) -> np.array(float):
raise NotImplementedError
@property
def up (self) -> np.array(float):
raise NotImplementedError
@abstractmethod
def __getitem__ (self, _idx: Tuple[int, ...]) -> Tuple[float, float]:
raise NotImplementedError
# ---
class Coverage:
"""Basic helper class to manipulate and type-annotate coverage measures."""
def __init__(self, covered = None, total = None, non_covered = None):
if total != None:
self.total = total
elif covered != None and non_covered != None:
self.total = covered + non_covered
elif covered != None:
self.total = covered
elif non_covered != None:
self.total = non_covered
else:
self.total = 0
if covered != None:
self.c = covered
elif non_covered != None and self.total > 0:
self.c = self.total - non_covered
else:
self.c = 0
def __add__(self, x):
return Coverage (covered = self.c + x.c,
total = self.total + x.total)
def __mul__(self, f: float):
return Coverage (covered = float(self.c) * f,
total = self.total)
@property
def done(self) -> bool:
return self.total == self.c
@property
def as_prop(self) -> float:
return (((1.0 * self.c) / (1.0 * self.total))
if self.total != 0 else 0.0)
def __repr__(self):
return str(self.as_prop)
# ---
class Metric (DynamicFilter):
'''
For now, we can assume that every metric can also be used as a
filter to compare and assess concrete inputs.
'''
def __init__(self, factor = 0.25, scale = 1, **kwds):
'''
The `factor` argument determines closeness when the object is used
as a filter; defaults to 1/4. In turn, `scale` is applied on
involved scalar values (e.g. pixels) when computing distances.
'''
self.factor = factor
self.scale = scale
super().__init__(**kwds)
@abstractmethod
def distance(self, x, y):
'''
Returns the distance between two concrete inputs `x` and `y`.
'''
raise NotImplementedError
@property
def is_int(self):
'''
Holds iff Integer metrics.
'''
return False
# ---
class TestTarget:
'''
Base record of test targets.
'''
@abstractmethod
def cover(self, acts) -> None:
'''
Record that the target has been covered by the given set of
activations.
'''
raise NotImplementedError
def log_repr(self) -> str:
'''
Returns a single-line string representation of the target suitable
for logging.
'''
raise NotImplementedError
# ---
class Analyzer:
'''
Base class for any kind of analyzer that is able to construct new
concrete inputs.
'''
def __init__(self,
analyzed_dnn = None,
input_bounds: Optional[Bounds] = None,
postproc_inputs: Callable[[Sequence[Input]], Sequence[Input]] = id,
**kwds):
assert analyzed_dnn is not None
assert input_bounds is None or isinstance (input_bounds, Bounds)
self._analyzed_dnn = analyzed_dnn
self._input_bounds = input_bounds
self._postproc_inputs = postproc_inputs
super().__init__(**kwds)
# ---
# TODO: `dnn` and the two methods below (previously in test_objectt)
# would deserve to be on their own as they are not strictly speaking
# analyzer-dependent. Yet they stay there for now as analyzers,
# criteria, and engines rely on at least one of them.
@property
def dnn(self) -> keras.Model:
'''
The analyzed DNN.
'''
return self._analyzed_dnn
def eval(self, i, **kwds):
'''
Returns the activations associated to a given input.
'''
return eval (self.dnn, i, **kwds)
def eval_batch(self, i, **kwds):
'''
Returns all activations associated to a given input batch.
'''
return eval_batch (self.dnn, i, **kwds)
@abstractmethod
def input_metric(self) -> Metric:
'''
Returns the metric used to compare concrete inputs.
'''
raise NotImplementedError
@property
def input_bounds(self) -> List[Bounds]:
'''
Returns the bounds on generated inputs.
'''
return [self._input_bounds] if self._input_bounds is not None else []
# ---
class Analyzer4RootedSearch (Analyzer):
'''
Analyzers that are able to find new concrete inputs close to a given
input should inherit this class.
'''
@abstractmethod
def search_input_close_to(self, x, target: TestTarget) -> Optional[Tuple[float, Input]]:
'''
Generates a new concrete input close to `x`, that fulfills test
target `target`.
Returns a tuple `(d, y)`, that is a new concrete input `y` along
with its distance `d` w.r.t the input metric, or `None` is
unsuccessful.
'''
raise NotImplementedError
# ---
class Analyzer4FreeSearch (Analyzer):
'''
Analyzers that are able to find new concrete inputs close to any
input from a give set of test cases.
'''
@abstractmethod
def search_close_inputs(self, target: TestTarget) -> Optional[Tuple[float, Input, input]]:
'''
Generates a new concrete input that fulfills test target `target`.
Returns a tuple `(d, base, new)` where `base` is a concrete
element from a set given on initialization (typically for now, raw
data from `test_object`) and `new` is a new concrete input at
distance `d` from `base`, or `None` is unsuccessful.
'''
raise NotImplementedError
# ---
class Report:
'''
A simple class to take reporting stuff out from the engine.
'''
def __init__(self,
base_name = '',
outdir: OutputDir = None,
save_new_tests = False,
adv_dist_period = 100,
save_input_func = None,
amplify_diffs = False,
inp_up = 1, # XXX: ??? unused.
**kwds):
self.adversarials = []
self.base_name = base_name
self.save_new_tests = save_new_tests
self.adv_dist_period = adv_dist_period
self.outdir = outdir or OutputDir ()
assert isinstance (self.outdir, OutputDir)
self.base = self.outdir.stamped_filename (self.base_name)
self.report_file = self.outdir.filepath (self.base + '_report.txt')
self.save_input_func = save_input_func
self.amplify_diffs = amplify_diffs
p1 ('Reporting into: {0}'.format (self.report_file))
self.ntests = 0
self.nsteps = 0
def _save_input(self, im, name, log = None):
if self.save_input_func != None:
self.save_input_func (im, name, self.outdir.path, log)
def _save_derived_input(self, new, origin, diff = None, log = None):
self._save_input (new[0], new[1], log)
self._save_input (origin[0], origin[1], log)
if diff is not None:
self._save_input (diff[0], diff[1], log)
def save_input(self, i, suff):
self._save_input (i, self.base + '_' + suff)
def new_test(self, new = (), orig = (), dist = None, is_int = None):
if self.save_new_tests:
if self.amplify_diffs:
diff = np.abs(new[0] - orig[0])
diff *= 0.5 / np.max (diff)
else:
diff = new[0] - orig[0]
self._save_derived_input ((new[0], '{0.ntests}-ok-{1}'.format (self, new[1])),
(orig[0], '{0.ntests}-original-{1}'.format (self, orig[1])),
(diff, '{0.ntests}-diff-{1}'.format (self, orig[1])))
self.ntests += 1
@property
def num_steps(self):
return self.nsteps
@property
def num_tests(self):
return self.ntests
@property
def num_adversarials(self):
return len(self.adversarials)
def new_adversarial(self, new = (), orig = (), dist = None, is_int = None):
self.adversarials.append ((orig, new, dist))
self._save_derived_input ((new[0], '{0.ntests}-adv-{1}'.format (self, new[1])),
(orig[0], '{0.ntests}-original-{1}'.format (self, orig[1])),
(np.abs(new[0] - orig[0]), '{0.ntests}-diff-{1}'.format (self, orig[1])))
if self.num_adversarials % self.adv_dist_period == 0:
print_adversarial_distribution (
[ d for o, n, d in self.adversarials ],
self.outdir.filepath (self.base + '_adversarial-distribution.txt'),
int_flag = is_int)
self.ntests += 1
def step(self, *args, dry = False) -> None:
'''
Prints a single report line.
Do not count as new step if `dry` holds.
'''
append_in_file (self.report_file, *args, '\n')
if not dry:
self.nsteps += 1
def record(self, test_cases, record, **kwds) -> None:
"""
Outputs a record about all initial and generated test cases.
The record essentually encodes the tree that enables one to trace
the origins of all generated tests.
"""
tests = [ dict (**record[x],
md5 = hashlib.md5 (x).hexdigest ())
for x in test_cases ]
advrs = [ dict (**record[n[0]],
md5 = hashlib.md5 (n[0]).hexdigest ())
for _o, n, _d in self.adversarials ]
data = dict (passed_tests = tests,
adversarials = advrs,
**kwds)
path = self.outdir.stamped_filepath ('record', suff = '.yml')
with open(path, 'w') as f:
yaml.dump (data, f)
# ---
class EarlyTermination (Exception):
'''
Exception raised by criteria when no new test target can be found.
'''
pass
# ---
class CoverableLayer:
'''
Base class for any layer based on which coverability criteria are
defined.
'''
def __init__(self, layer = None, layer_index = None,
prev: int = None, succ: int = None):
self.layer = layer
self.layer_index = layer_index
self.is_conv = is_conv_layer (layer)
self.prev_layer_index = prev
self.succ_layer_index = succ
def get_info (self):
return dict (layer_name = self.layer.name,
layer_index = self.layer_index,
prev_layer_index = self.prev_layer_index,
succ_layer_index = self.succ_layer_index)
def set_info (self, dnn,
layer_name = None,
layer_index = None,
prev_layer_index = None,
succ_layer_index = None):
self.layer = dnn.get_layer (name = layer_name)
self.layer_index = layer_index
self.is_conv = is_conv_layer (self.layer)
self.prev_layer_index = prev_layer_index
self.succ_layer_index = succ_layer_index
def __repr__(self):
return self.layer.name
# ---
_log_target_selection_level = 1
class Criterion (_ActivationStatBasedInitializable):
'''
Base class for test critieria.
Note that a criterion MUST inherit either (or both)
:class:`Criterion4FreeSearch` or :class:`Criterion4RootedSearch`.
'''
def __init__(self,
clayers: Sequence[CoverableLayer],
*args,
analyzer: Analyzer = None,
prefer_rooted_search: bool = None,
verbose: int = 1,
**kwds):
'''
A criterion operates based on an `analyzer` to find new concrete
inputs.
Flag `prefer_rooted_search` can be used in case both the criterion
and the analyzer support the two kinds of search; the default
behavior is to select rooted search.
'''
assert isinstance (analyzer, Analyzer)
super().__init__(*args, **kwds)
self.cover_layers = clayers
self.analyzer = analyzer
self.test_cases = []
self.verbose = some (verbose, 1)
self.rooted_search = self._rooted_search (prefer_rooted_search)
# True for rooted search, False for free search
def _rooted_search(self, prefer_rooted_search = None):
'''
Holds if rooted-search mode is selected and the criterion and
analyzer pair supports it.
Parameters
----------
prefer_rooted_search: bool, optional
Returns
-------
whether rooted search mode is selected.
'''
rooted_ok = (isinstance (self.analyzer, Analyzer4RootedSearch) and
isinstance (self, Criterion4RootedSearch))
free_ok = (isinstance (self.analyzer, Analyzer4FreeSearch) and
isinstance (self, Criterion4FreeSearch))
if not (free_ok or rooted_ok):
sys.exit ('Incompatible pair criterion/analyzer')
if free_ok and rooted_ok and prefer_rooted_search is None:
p1 ('Arbitrarily selecting rooted search against free search.')
return rooted_ok and (prefer_rooted_search is None or prefer_rooted_search)
# ---
@abstractmethod
def __repr__(self):
raise NotImplementedError
@abstractmethod
def finalize_setup(self) -> None:
"""
Called once after any stat-based initialization (see, e.g.,
:meth:`stat_based_basic_initializers`), and before any call to
:meth:`add_new_test_cases`, :meth:`coverage`, and
:meth:`search_next`.
"""
raise NotImplementedError
# self.analyzer.finalize_setup ()
def terminate(self) -> None:
"""
Called once upon termination of the test case generation.
Use this to dump some files or reports.
"""
pass
def reset(self):
'''
Empties the set of test cases
'''
self.test_cases = []
@abstractmethod
def coverage(self) -> Coverage:
'''
Returns a measure of the current coverage.
'''
raise NotImplementedError
@property
def metric(self) -> Metric:
'''
Returns the metric used by the analyzer to compare concrete
inputs.
'''
return self.analyzer.input_metric ()
@property
def num_test_cases(self) -> int:
'''
Returns the number of test cases.
'''
return len(self.test_cases)
def pop_test(self) -> None:
'''
Removes the last registered test (while keeping its coverage).
'''
self.test_cases.pop ()
@abstractmethod
def covered_layer_indexes (self) -> List[int]:
return [ cl.layer_index for cl in self.cover_layers ]
# final as well
def add_new_test_cases(self, tl: Sequence[Input],
covered_target: TestTarget = None) -> None:
"""
As its name says, this method adds a given series of inputs into
the set of test cases. It then calls :meth:`register_new_activations`.
"""
tp1 ('Adding {} test case{}'.format (*s_(len (tl))))
self.test_cases.extend (tl)
layer_indexes = self.covered_layer_indexes ()
for acts in self._batched_activations (tl, allow_input_layer = False,
layer_indexes = layer_indexes):
if covered_target is not None:
covered_target.cover (acts)
self.register_new_activations (acts)
def _batched_activations(self, tl: Sequence[Input], **kwds) -> range:
batches = np.array_split (tl, len (tl) // 100 + 1)
for batch in batches:
acts = self.analyzer.eval_batch (batch, **kwds)
yield (acts)
del acts
@abstractmethod
def register_new_activations(self, acts) -> None:
"""
Method called whenever new test cases are registered. Overload
this method to update coverage.
"""
raise NotImplementedError
def search_next(self) -> Tuple[Union[Tuple[Input, Input, float], None], TestTarget]:
'''
Selects a new test target based (see
:class:`Criterion4RootedSearch` and
:class:`Criterion4FreeSearch`), and then uses the analyzer to find
a new concrete input.
Returns a pair of:
- either `None` in case of failure of the analyzer, or a triple
`(x0, x1, d)`, `x1` being the new concrete input generated by
the analyzer;
- the test target considered.
'''
if self.rooted_search:
x0, target = self.find_next_rooted_test_target ()
if self.verbose >= _log_target_selection_level:
p1 (f'| Targeting {target}')
x1_attempt = self.analyzer.search_input_close_to (x0, target)
if x1_attempt == None:
return None, target
else:
d, x1 = x1_attempt
return (x0, x1, d), target
else:
target = self.find_next_test_target ()
if self.verbose >= _log_target_selection_level:
p1 (f'| Targeting {target}')
attempt = self.analyzer.search_close_inputs (target)
if attempt == None:
return None, target
else:
d, x0, x1 = attempt
return (x0, x1, d), target
# ---
# ---
class Criterion4RootedSearch (Criterion):
'''
Any criterion that can be used to find a pair of a base test case
and a test target should inherit this class.
'''
@abstractmethod
def find_next_rooted_test_target(self) -> Tuple[Input, TestTarget]:
'''
Seeks a new test target associated with an existing test input
taken from the set of recorded test cases.
Note this method MUST perform enough bookkeeping so that two
successive calls that are not interleaved with any call to
:meth:`Criterion.add_new_test_cases` return different results.
This property is to enforce progress upon unsuccessful search of
concrete inputs.
'''
raise NotImplementedError
# ---
class Criterion4FreeSearch (Criterion):
'''
Any criterion that can be used to select a test target without
relying on activation data or previously inserted test cases should
inherit this class.
'''
@abstractmethod
def find_next_test_target(self) -> TestTarget:
'''
Seeks and returns a new test target.
'''
raise NotImplementedError
# ---
def setup_basic_report(criterion: Criterion, **kwds) -> Report:
'''
Returns a very basic report object that feeds files whose base names
are constructed from the provided criterion.
Extra keyword arguments are passed on to the constructor of
:class:`Report`.
'''
return Report (base_name = '{0}_{0.metric}'.format (criterion), **kwds)
# ---
class Engine:
'''
Core Deepconcolic engine.
'''
def __init__(self, test_data, train_data,
criterion: Criterion,
custom_filters: Sequence[Union[StaticFilter, DynamicFilter]] = [],
**kwds):
"""
Builds a test engine with the given DNN, reference data, and test
criterion. Uses the input metric provided by the
criterion-specific analyzer as filter for assessing legitimacy of
new test inputs, unless `custom_filters` is not `None`.
"""
self.ref_data = test_data
self.train_data = train_data
self.criterion = criterion
fltrs = [criterion.metric]
fltrs += custom_filters \
if isinstance (custom_filters, list) \
else [custom_filters]
# NB: note some filters may belong to both lists:
self.static_filters = [ f for f in fltrs if isinstance (f, StaticFilter) ]
self.dynamic_filters = [ f for f in fltrs if isinstance (f, DynamicFilter) ]
super().__init__(**kwds)
self._stat_based_inits ()
self._initialized = False
def __repr__(self):
return 'criterion {0} with norm {0.metric}'.format (self.criterion)
def _run_test(self, x):
return prediction (self.criterion.analyzer.dnn, x)
def _run_tests(self, xl):
return predictions (self.criterion.analyzer.dnn, xl)
def _initialize_search (self, report: Report, initial_test_cases = None):
'''
Method called once at the beginning of search.
'''
xl = []
if initial_test_cases is not None and initial_test_cases > 0:
x = self.ref_data.data
if self.ref_data.labels is not None:
x = x[self._run_tests (x) == self.ref_data.labels]
x = np.random.default_rng().choice (a = x, axis = 0,
size = min (initial_test_cases, len (x)))
if self.ref_data.labels is not None:
p1 ('Initializing with {} randomly selected test case{} that {} correctly classified.'
.format(*s_(len (x)), is_are_(len (x))[1]))
else:
p1 ('Initializing with {} randomly selected test case{}.'
.format(*s_(len (x))))
self.criterion.add_new_test_cases (x)
elif initial_test_cases is None and self.criterion.rooted_search:
p1 ('Randomly selecting an input from test data.')
x = np.random.default_rng().choice (a = self.ref_data.data, axis = 0)
report.save_input (x, 'seed-input')
self.criterion.add_new_test_cases ([x])
def run(self,
report: Union[Report, Callable[[Criterion], Report]] = setup_basic_report,
initial_test_cases = None,
check_root_only: bool = True,
max_iterations = -1,
**kwds) -> Report:
'''
Uses `report` to construct a helper for outputing logs and new
test cases, and then starts the engine for either: up to
`max_iterations` iterations (i.e. number of runs of the analyzer)
if `max_iterations >= 0`, or else until full coverage is reached,
or the criterion is fulfilled (whichever happens first).
Set `check_root_only` to `False` to ensure every new generated
test case that is close enough to any reference test data is
kept. Leaving it to `True` speeds the oracle check by only
comparing new tests agains the original reference version.
'''
criterion = self.criterion
if not self._initialized:
criterion.finalize_setup ()
p1 ('Starting tests for {}{}.'
.format (self, '' if max_iterations < 0 else
' ({} max iterations)'.format (max_iterations)))
self._initialized = True
else:
p1 ('Continuing tests for {}{}.'
.format (self, '' if max_iterations < 0 else
' ({} max iterations)'.format (max_iterations)))
initial_test_cases = initial_test_cases or 0
report = report if isinstance (report, Report) else \
report (criterion, **kwds)
# Initialize search to add new test cases in every call to run:
self._initialize_search (report, initial_test_cases)
coverage = criterion.coverage ()
p1 ('#0 {}: {.as_prop:10.8%}'.format(criterion, coverage))
report.step ('{0}-cover: {1} #test cases: {0.num_test_cases} '
.format(criterion, coverage),
'#adversarial examples: 0',
dry = True)
iteration = 1
init_tests = report.num_tests
init_adversarials = report.num_adversarials
# Note some test cases might be inserted multiple times: in such a
# case only the max index will be remembered as origin:
record = InputsDict ([(x, dict (root_index = i,
index = i,
label = int (self._run_test (x))))
for i, x in enumerate (criterion.test_cases)]) \
if criterion.rooted_search else InputsDict ()
check_root_only &= criterion.rooted_search
try: