|
15 | 15 | import java.util.List;
|
16 | 16 | import java.util.Optional;
|
17 | 17 | import java.util.Random;
|
18 |
| -import java.util.stream.Collectors; |
| 18 | +import java.util.SortedSet; |
19 | 19 |
|
20 | 20 | /**
|
21 | 21 | * A scheduler that randomly selects events to deliver, drop, mutate or timeout.
|
@@ -52,14 +52,17 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
|
52 | 52 | }
|
53 | 53 |
|
54 | 54 | List<TimeoutEvent> timeoutEvents = this.getQueuedTimeoutEvents(scenario);
|
55 |
| - List<Event> clientRequestEvents = availableEvents.stream().filter(ClientRequestEvent.class::isInstance).collect(Collectors.toList()); |
56 |
| - List<Event> messageEvents = availableEvents.stream().filter(MessageEvent.class::isInstance).collect(Collectors.toList()); |
| 55 | + List<Event> clientRequestEvents = availableEvents.stream().filter(ClientRequestEvent.class::isInstance).toList(); |
| 56 | + List<MessageEvent> messageEvents = availableEvents.stream().filter(MessageEvent.class::isInstance).map(MessageEvent.class::cast).toList(); |
| 57 | + |
| 58 | + SortedSet<String> faultyReplicaIds = scenario.getFaultyReplicaIds(); |
| 59 | + List<MessageEvent> mutateableMessageEvents = messageEvents.stream().filter(msg -> faultyReplicaIds.contains(msg.getSenderId())).toList(); |
57 | 60 |
|
58 | 61 | int timeoutWeight = timeoutEvents.size() * this.deliverTimeoutWeight();
|
59 | 62 | int deliverMessageWeight = messageEvents.size() * this.deliverMessageWeight();
|
60 | 63 | int deliverClientRequestWeight = clientRequestEvents.size() * this.deliverClientRequestWeight();
|
61 | 64 | int dropMessageWeight = (messageEvents.size() * this.dropMessageWeight(scenario));
|
62 |
| - int mutateMessageWeight = (messageEvents.size() * this.mutateMessageWeight(scenario)); |
| 65 | + int mutateMessageWeight = (mutateableMessageEvents.size() * this.mutateMessageWeight(scenario)); |
63 | 66 | int dieRoll = random.nextInt(timeoutWeight + deliverMessageWeight
|
64 | 67 | + deliverClientRequestWeight + dropMessageWeight + mutateMessageWeight);
|
65 | 68 |
|
@@ -102,7 +105,7 @@ public synchronized Optional<EventDecision> scheduleNext(Scenario scenario) thro
|
102 | 105 | // check if we should mutate-and-deliver a message sent between nodes
|
103 | 106 | dieRoll -= mutateMessageWeight;
|
104 | 107 | if (dieRoll < 0) {
|
105 |
| - Event message = messageEvents.get(random.nextInt(messageEvents.size())); |
| 108 | + Event message = mutateableMessageEvents.get(random.nextInt(mutateableMessageEvents.size())); |
106 | 109 | List<MessageMutationFault> mutators = this.getMessageMutatorService().getMutatorsForEvent(message);
|
107 | 110 |
|
108 | 111 | if (mutators.isEmpty()) {
|
|
0 commit comments