15
15
import java .util .List ;
16
16
import java .util .Optional ;
17
17
import java .util .Random ;
18
- import java .util .stream . Collectors ;
18
+ import java .util .SortedSet ;
19
19
20
20
/**
21
21
* A scheduler that randomly selects events to deliver, drop, mutate or timeout.
25
25
public class RandomScheduler extends BaseScheduler {
26
26
private final Random random = new Random ();
27
27
28
-
29
28
public RandomScheduler (ByzzBenchConfig config , MessageMutatorService messageMutatorService ) {
30
29
super (config , messageMutatorService );
31
30
}
32
31
32
+ public <T > T getRandomElement (List <T > list ) {
33
+ return list .get (random .nextInt (list .size ()));
34
+ }
35
+
33
36
@ Override
34
37
public String getId () {
35
38
return "Random" ;
@@ -52,21 +55,24 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
52
55
}
53
56
54
57
List <TimeoutEvent > timeoutEvents = this .getQueuedTimeoutEvents (scenario );
55
- List <Event > clientRequestEvents = availableEvents .stream ().filter (ClientRequestEvent .class ::isInstance ).collect (Collectors .toList ());
56
- List <Event > messageEvents = availableEvents .stream ().filter (MessageEvent .class ::isInstance ).collect (Collectors .toList ());
58
+ List <Event > clientRequestEvents = availableEvents .stream ().filter (ClientRequestEvent .class ::isInstance ).toList ();
59
+ List <MessageEvent > messageEvents = availableEvents .stream ().filter (MessageEvent .class ::isInstance ).map (MessageEvent .class ::cast ).toList ();
60
+
61
+ SortedSet <String > faultyReplicaIds = scenario .getFaultyReplicaIds ();
62
+ List <MessageEvent > mutateableMessageEvents = messageEvents .stream ().filter (msg -> faultyReplicaIds .contains (msg .getSenderId ())).toList ();
57
63
58
64
int timeoutWeight = timeoutEvents .size () * this .deliverTimeoutWeight ();
59
65
int deliverMessageWeight = messageEvents .size () * this .deliverMessageWeight ();
60
66
int deliverClientRequestWeight = clientRequestEvents .size () * this .deliverClientRequestWeight ();
61
67
int dropMessageWeight = (messageEvents .size () * this .dropMessageWeight (scenario ));
62
- int mutateMessageWeight = (messageEvents .size () * this .mutateMessageWeight (scenario ));
68
+ int mutateMessageWeight = (mutateableMessageEvents .size () * this .mutateMessageWeight (scenario ));
63
69
int dieRoll = random .nextInt (timeoutWeight + deliverMessageWeight
64
70
+ deliverClientRequestWeight + dropMessageWeight + mutateMessageWeight );
65
71
66
72
// check if we should trigger a timeout
67
73
dieRoll -= timeoutWeight ;
68
74
if (dieRoll < 0 ) {
69
- Event timeout = timeoutEvents . get ( random . nextInt ( timeoutEvents . size ()) );
75
+ Event timeout = getRandomElement ( timeoutEvents );
70
76
scenario .getTransport ().deliverEvent (timeout .getEventId ());
71
77
EventDecision decision = new EventDecision (EventDecision .DecisionType .DELIVERED , timeout .getEventId ());
72
78
return Optional .of (decision );
@@ -84,7 +90,7 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
84
90
// check if we should target delivering a request from a client to a replica
85
91
dieRoll -= deliverClientRequestWeight ;
86
92
if (dieRoll < 0 ) {
87
- Event request = clientRequestEvents . get ( random . nextInt ( clientRequestEvents . size ()) );
93
+ Event request = getRandomElement ( clientRequestEvents );
88
94
scenario .getTransport ().deliverEvent (request .getEventId ());
89
95
EventDecision decision = new EventDecision (EventDecision .DecisionType .DELIVERED , request .getEventId ());
90
96
return Optional .of (decision );
@@ -93,7 +99,7 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
93
99
// check if we should drop a message sent between nodes
94
100
dieRoll -= dropMessageWeight ;
95
101
if (dieRoll < 0 ) {
96
- Event message = messageEvents . get ( random . nextInt ( messageEvents . size ()) );
102
+ Event message = getRandomElement ( messageEvents );
97
103
scenario .getTransport ().dropEvent (message .getEventId ());
98
104
EventDecision decision = new EventDecision (EventDecision .DecisionType .DROPPED , message .getEventId ());
99
105
return Optional .of (decision );
@@ -102,7 +108,7 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
102
108
// check if we should mutate-and-deliver a message sent between nodes
103
109
dieRoll -= mutateMessageWeight ;
104
110
if (dieRoll < 0 ) {
105
- Event message = messageEvents . get ( random . nextInt ( messageEvents . size ()) );
111
+ Event message = getRandomElement ( mutateableMessageEvents );
106
112
List <MessageMutationFault > mutators = this .getMessageMutatorService ().getMutatorsForEvent (message );
107
113
108
114
if (mutators .isEmpty ()) {
@@ -112,7 +118,7 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
112
118
}
113
119
scenario .getTransport ().applyMutation (
114
120
message .getEventId (),
115
- mutators . get ( random . nextInt ( mutators . size ()) ));
121
+ getRandomElement ( mutators ));
116
122
scenario .getTransport ().deliverEvent (message .getEventId ());
117
123
118
124
EventDecision decision = new EventDecision (EventDecision .DecisionType .MUTATED_AND_DELIVERED , message .getEventId ());
0 commit comments