16
16
import java .util .List ;
17
17
import java .util .Optional ;
18
18
import java .util .Random ;
19
- import java .util .stream . Collectors ;
19
+ import java .util .SortedSet ;
20
20
21
21
/**
22
22
* A scheduler that randomly selects events to deliver, drop, mutate or timeout.
26
26
public class RandomScheduler extends BaseScheduler {
27
27
private final Random random = new Random ();
28
28
29
-
30
29
public RandomScheduler (ByzzBenchConfig config , MessageMutatorService messageMutatorService ) {
31
30
super (config , messageMutatorService );
32
31
}
33
32
33
+ public <T > T getRandomElement (List <T > list ) {
34
+ return list .get (random .nextInt (list .size ()));
35
+ }
36
+
34
37
@ Override
35
38
public String getId () {
36
39
return "Random" ;
@@ -53,21 +56,24 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
53
56
}
54
57
55
58
List <TimeoutEvent > timeoutEvents = this .getQueuedTimeoutEvents (scenario );
56
- List <Event > clientRequestEvents = availableEvents .stream ().filter (ClientRequestEvent .class ::isInstance ).collect (Collectors .toList ());
57
- List <Event > messageEvents = availableEvents .stream ().filter (MessageEvent .class ::isInstance ).collect (Collectors .toList ());
59
+ List <Event > clientRequestEvents = availableEvents .stream ().filter (ClientRequestEvent .class ::isInstance ).toList ();
60
+ List <MessageEvent > messageEvents = availableEvents .stream ().filter (MessageEvent .class ::isInstance ).map (MessageEvent .class ::cast ).toList ();
61
+
62
+ SortedSet <String > faultyReplicaIds = scenario .getFaultyReplicaIds ();
63
+ List <MessageEvent > mutateableMessageEvents = messageEvents .stream ().filter (msg -> faultyReplicaIds .contains (msg .getSenderId ())).toList ();
58
64
59
65
int timeoutWeight = timeoutEvents .size () * this .deliverTimeoutWeight ();
60
66
int deliverMessageWeight = messageEvents .size () * this .deliverMessageWeight ();
61
67
int deliverClientRequestWeight = clientRequestEvents .size () * this .deliverClientRequestWeight ();
62
68
int dropMessageWeight = (messageEvents .size () * this .dropMessageWeight (scenario ));
63
- int mutateMessageWeight = (messageEvents .size () * this .mutateMessageWeight (scenario ));
69
+ int mutateMessageWeight = (mutateableMessageEvents .size () * this .mutateMessageWeight (scenario ));
64
70
int dieRoll = random .nextInt (timeoutWeight + deliverMessageWeight
65
71
+ deliverClientRequestWeight + dropMessageWeight + mutateMessageWeight );
66
72
67
73
// check if we should trigger a timeout
68
74
dieRoll -= timeoutWeight ;
69
75
if (dieRoll < 0 ) {
70
- Event timeout = timeoutEvents . get ( random . nextInt ( timeoutEvents . size ()) );
76
+ Event timeout = getRandomElement ( timeoutEvents );
71
77
scenario .getTransport ().deliverEvent (timeout .getEventId ());
72
78
EventDecision decision = new EventDecision (EventDecision .DecisionType .DELIVERED , timeout .getEventId ());
73
79
return Optional .of (decision );
@@ -85,7 +91,7 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
85
91
// check if we should target delivering a request from a client to a replica
86
92
dieRoll -= deliverClientRequestWeight ;
87
93
if (dieRoll < 0 ) {
88
- Event request = clientRequestEvents . get ( random . nextInt ( clientRequestEvents . size ()) );
94
+ Event request = getRandomElement ( clientRequestEvents );
89
95
scenario .getTransport ().deliverEvent (request .getEventId ());
90
96
EventDecision decision = new EventDecision (EventDecision .DecisionType .DELIVERED , request .getEventId ());
91
97
return Optional .of (decision );
@@ -94,7 +100,7 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
94
100
// check if we should drop a message sent between nodes
95
101
dieRoll -= dropMessageWeight ;
96
102
if (dieRoll < 0 ) {
97
- Event message = messageEvents . get ( random . nextInt ( messageEvents . size ()) );
103
+ Event message = getRandomElement ( messageEvents );
98
104
scenario .getTransport ().dropEvent (message .getEventId ());
99
105
if (!(message instanceof GossipMessage )) {
100
106
EventDecision decision = new EventDecision (EventDecision .DecisionType .DROPPED , message .getEventId ());
@@ -105,7 +111,7 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
105
111
// check if we should mutate-and-deliver a message sent between nodes
106
112
dieRoll -= mutateMessageWeight ;
107
113
if (dieRoll < 0 ) {
108
- Event message = messageEvents . get ( random . nextInt ( messageEvents . size ()) );
114
+ Event message = getRandomElement ( mutateableMessageEvents );
109
115
List <MessageMutationFault > mutators = this .getMessageMutatorService ().getMutatorsForEvent (message );
110
116
111
117
if (mutators .isEmpty ()) {
@@ -117,7 +123,7 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
117
123
}
118
124
scenario .getTransport ().applyMutation (
119
125
message .getEventId (),
120
- mutators . get ( random . nextInt ( mutators . size ()) ));
126
+ getRandomElement ( mutators ));
121
127
scenario .getTransport ().deliverEvent (message .getEventId ());
122
128
123
129
EventDecision decision = new EventDecision (EventDecision .DecisionType .MUTATED_AND_DELIVERED , message .getEventId ());
0 commit comments