Skip to content

Commit eba05f7

Browse files
committed
major refactor - differentiate events from actions, WIP to properly persist schedules
1 parent 053340c commit eba05f7

File tree

80 files changed

+1264
-843
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+1264
-843
lines changed

.idea/dataSources.xml

+19
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules/simulator/byzzbench.simulator.aot.iml

+15
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

simulator/src/main/java/byzzbench/simulator/Client.java

+1-8
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,6 @@ public class Client implements Serializable, Node {
4343
*/
4444
private final AtomicLong requestSequenceNumber = new AtomicLong(0);
4545

46-
/**
47-
* The maximum number of requests that can be sent by the client.
48-
*/
49-
private final long maxRequests = 1000;
50-
5146
/**
5247
* The replies received by the client.
5348
*/
@@ -85,9 +80,7 @@ public void sendRequest(String recipientId) {
8580
*/
8681
public void handleMessage(String senderId, MessagePayload reply) {
8782
this.replies.add(reply);
88-
if (this.requestSequenceNumber.get() < this.maxRequests) {
89-
this.sendRequest();
90-
}
83+
this.sendRequest();
9184
}
9285

9386
@Override

simulator/src/main/java/byzzbench/simulator/GenericTerminationPredicate.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package byzzbench.simulator;
22

3-
import byzzbench.simulator.transport.Action;
4-
import byzzbench.simulator.transport.MessageAction;
3+
import byzzbench.simulator.transport.Event;
4+
import byzzbench.simulator.transport.MessageEvent;
55
import byzzbench.simulator.transport.messages.MessageWithRound;
66

77
import java.util.List;
@@ -27,10 +27,10 @@ public boolean test(Scenario scenario) {
2727

2828
// Check if the maximum number of rounds has been reached
2929
if (maxRounds > 0) {
30-
List<MessageWithRound> queuedMessages = scenario.getTransport().getEventsInState(Action.Status.DELIVERED)
30+
List<MessageWithRound> queuedMessages = scenario.getTransport().getEventsInState(Event.Status.DELIVERED)
3131
.stream()
32-
.filter(e -> e instanceof MessageAction msgEvent)
33-
.map(e -> (MessageAction) e)
32+
.filter(e -> e instanceof MessageEvent msgEvent)
33+
.map(e -> (MessageEvent) e)
3434
.filter(e -> e.getPayload() instanceof MessageWithRound)
3535
.map(e -> (MessageWithRound) e.getPayload())
3636
.filter(msg -> msg.getRound() > maxRounds)

simulator/src/main/java/byzzbench/simulator/Scenario.java

+97-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package byzzbench.simulator;
22

3-
import byzzbench.simulator.domain.Schedule;
3+
import byzzbench.simulator.domain.*;
44
import byzzbench.simulator.faults.Fault;
55
import byzzbench.simulator.faults.ScenarioContext;
66
import byzzbench.simulator.faults.factories.ByzzFuzzScenarioFaultFactory;
@@ -11,6 +11,9 @@
1111
import byzzbench.simulator.state.AgreementPredicate;
1212
import byzzbench.simulator.state.LivenessPredicate;
1313
import byzzbench.simulator.state.adob.AdobDistributedState;
14+
import byzzbench.simulator.transport.Event;
15+
import byzzbench.simulator.transport.MessageEvent;
16+
import byzzbench.simulator.transport.TimeoutEvent;
1417
import byzzbench.simulator.transport.Transport;
1518
import com.fasterxml.jackson.annotation.JsonIgnore;
1619
import com.fasterxml.jackson.databind.JsonNode;
@@ -24,6 +27,7 @@
2427
import java.io.Serializable;
2528
import java.util.*;
2629
import java.util.stream.Collectors;
30+
import java.util.stream.Stream;
2731

2832
/**
2933
* Represents a runnable scenario.
@@ -251,6 +255,98 @@ public final void loadParameters(JsonNode parameters) {
251255
this.loadScenarioParameters(parameters);
252256
}
253257

258+
/**
259+
* Get the currently queued events of a specific type in the scenario.
260+
*
261+
* @param eventClass The class of the event to get.
262+
* @param <T> The type of the event.
263+
* @return The list of events.
264+
*/
265+
private <T extends Event> Stream<T> getQueuedEventsOfType(Class<T> eventClass) {
266+
return getTransport().getEventsInState(Event.Status.QUEUED)
267+
.stream()
268+
.filter(eventClass::isInstance)
269+
.map(eventClass::cast);
270+
}
271+
272+
/**
273+
* Get the available {@link DeliverMessageAction} in the scenario in the current state.
274+
*
275+
* @return The list of available {@link DeliverMessageAction}.
276+
*/
277+
private List<DeliverMessageAction> getAvailableDeliverMessageAction() {
278+
switch (scheduler.getConfig().getScheduler().getExecutionMode()) {
279+
// return the first queued message in each mailbox
280+
case SYNC -> {
281+
Set<String> recipientIdsSeen = new HashSet<>();
282+
return getQueuedEventsOfType(MessageEvent.class)
283+
.filter(event -> recipientIdsSeen.add(event.getRecipientId()))
284+
.map(DeliverMessageAction::fromEvent)
285+
.toList();
286+
}
287+
// return all queued message events in all mailboxes
288+
case ASYNC -> {
289+
return getQueuedEventsOfType(MessageEvent.class)
290+
.map(DeliverMessageAction::fromEvent)
291+
.toList();
292+
}
293+
default ->
294+
throw new IllegalStateException("Unknown execution mode: " + scheduler.getConfig().getScheduler().getExecutionMode());
295+
}
296+
}
297+
298+
/**
299+
* Get the available {@link TriggerTimeoutAction} in the scenario in the current state.
300+
*
301+
* @return The list of available {@link TriggerTimeoutAction}.
302+
*/
303+
private List<TriggerTimeoutAction> getAvailableTriggerTimeoutAction() {
304+
// get all time out events in order of expiration (earliest first)
305+
Set<String> recipientIdsSeen = new HashSet<>();
306+
Stream<TimeoutEvent> firstTimeoutForEachReplica = getQueuedEventsOfType(TimeoutEvent.class)
307+
.sorted(Comparator.comparing(TimeoutEvent::getExpiresAt))
308+
.filter(event -> recipientIdsSeen.add(event.getRecipientId()));
309+
310+
switch (this.getScheduler().getConfig().getScheduler().getExecutionMode()) {
311+
// return the first timeout for each replica without a message in their mailbox
312+
case SYNC -> {
313+
// return only timeouts for replicas without messages in their mailbox
314+
return firstTimeoutForEachReplica
315+
.filter(event -> !recipientIdsSeen.contains(event.getRecipientId()))
316+
.map(TriggerTimeoutAction::fromEvent)
317+
.toList();
318+
}
319+
// return the first timeout for each replica
320+
case ASYNC -> {
321+
return firstTimeoutForEachReplica
322+
.map(TriggerTimeoutAction::fromEvent)
323+
.toList();
324+
}
325+
default ->
326+
throw new IllegalStateException("Unknown execution mode: " + this.getScheduler().getConfig().getScheduler().getExecutionMode());
327+
}
328+
}
329+
330+
private List<FaultInjectionAction> getAvailableFaultInjectionAction() {
331+
throw new UnsupportedOperationException("Not implemented!");
332+
}
333+
334+
/**
335+
* Get the available actions in the scenario in the current state.
336+
*
337+
* @return The list of available actions.
338+
*/
339+
public List<? extends Action> getAvailableActions() {
340+
return Stream.of(
341+
getAvailableDeliverMessageAction().stream(),
342+
getAvailableTriggerTimeoutAction().stream())
343+
.reduce(Stream::concat)
344+
.orElseGet(Stream::empty)
345+
.toList();
346+
347+
// TODO faults
348+
}
349+
254350
/**
255351
* Loads the parameters for the scenario from a JSON object.
256352
*

simulator/src/main/java/byzzbench/simulator/SimulatorApplication.java

+7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package byzzbench.simulator;
22

3+
import byzzbench.simulator.config.ByzzBenchConfig;
34
import lombok.Getter;
5+
import lombok.RequiredArgsConstructor;
46
import org.springframework.boot.SpringApplication;
57
import org.springframework.boot.autoconfigure.SpringBootApplication;
68
import org.springframework.boot.context.event.ApplicationReadyEvent;
@@ -9,9 +11,11 @@
911
import java.time.Instant;
1012

1113
@SpringBootApplication
14+
@RequiredArgsConstructor
1215
public class SimulatorApplication {
1316
@Getter
1417
private static Instant startTime;
18+
private final ByzzBenchConfig config;
1519

1620
public static void main(String[] args) {
1721
SpringApplication.run(SimulatorApplication.class, args);
@@ -23,5 +27,8 @@ public static void main(String[] args) {
2327
@EventListener(ApplicationReadyEvent.class)
2428
public void onStartup() {
2529
startTime = Instant.now();
30+
31+
// print config
32+
System.out.println("Configuration: " + config);
2633
}
2734
}

simulator/src/main/java/byzzbench/simulator/Timekeeper.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package byzzbench.simulator;
22

33
import byzzbench.simulator.faults.Fault;
4-
import byzzbench.simulator.transport.Action;
4+
import byzzbench.simulator.transport.Event;
55
import byzzbench.simulator.transport.MutateMessageEventPayload;
6-
import byzzbench.simulator.transport.TimeoutAction;
6+
import byzzbench.simulator.transport.TimeoutEvent;
77
import byzzbench.simulator.transport.TransportObserver;
88
import com.fasterxml.jackson.annotation.JsonIgnore;
99

@@ -64,24 +64,24 @@ public Instant getTime(Node node) {
6464
}
6565

6666
@Override
67-
public void onEventAdded(Action Action) {
67+
public void onEventAdded(Event Event) {
6868
// nothing to do
6969
}
7070

7171
@Override
72-
public void onEventDropped(Action Action) {
72+
public void onEventDropped(Event Event) {
7373
// nothing to do
7474
}
7575

7676
@Override
77-
public void onEventRequeued(Action Action) {
77+
public void onEventRequeued(Event Event) {
7878
// nothing to do
7979
}
8080

8181
@Override
82-
public void onEventDelivered(Action Action) {
82+
public void onEventDelivered(Event Event) {
8383
// check if it was a timeout
84-
if (!(Action instanceof TimeoutAction timeoutEvent)) {
84+
if (!(Event instanceof TimeoutEvent timeoutEvent)) {
8585
return;
8686
}
8787

@@ -103,7 +103,7 @@ public void onFault(Fault fault) {
103103
}
104104

105105
@Override
106-
public void onTimeout(TimeoutAction event) {
106+
public void onTimeout(TimeoutEvent event) {
107107
// nothing to do
108108
}
109109
}

simulator/src/main/java/byzzbench/simulator/config/ByzzBenchConfig.java

+18-18
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package byzzbench.simulator.config;
22

3-
import byzzbench.simulator.SimulatorApplication;
43
import lombok.Data;
54
import org.springframework.boot.context.properties.ConfigurationProperties;
65
import org.springframework.stereotype.Component;
76
import org.springframework.validation.annotation.Validated;
87

9-
import java.nio.file.Path;
108
import java.util.ArrayList;
119
import java.util.HashMap;
1210
import java.util.List;
@@ -20,21 +18,20 @@
2018
@Data
2119
@Validated
2220
public class ByzzBenchConfig {
21+
22+
2323
/**
2424
* Whether to start generating scenarios on startup automatically.
2525
*/
2626
private boolean autostart = false;
27-
2827
/**
2928
* The number of scenarios to run at a time. Defaults to Integer.MAX_VALUE.
3029
*/
3130
private int numScenarios = Integer.MAX_VALUE;
32-
3331
/**
34-
* The path to the output directory. Defaults to "./output".
32+
* Policy for saving schedules in the database.
3533
*/
36-
private Path outputPath = Path.of("output");
37-
34+
private SaveScheduleMode saveSchedules = SaveScheduleMode.ALL;
3835
/**
3936
* Scheduler parameters.
4037
*/
@@ -44,13 +41,20 @@ public class ByzzBenchConfig {
4441
*/
4542
private ScenarioConfig scenario = new ScenarioConfig();
4643

47-
/**
48-
* Get the output path for this run, in the format "output/{start-time}".
49-
*
50-
* @return The output path for this run.
51-
*/
52-
public Path getOutputPathForThisRun() {
53-
return this.outputPath.resolve(String.valueOf(SimulatorApplication.getStartTime().getEpochSecond()));
44+
// execution should be either "async" or "sync". Here is the enum:
45+
public enum SaveScheduleMode {
46+
/**
47+
* Save all schedules in the database.
48+
*/
49+
ALL,
50+
/**
51+
* Save only schedules that did not terminate successfully.
52+
*/
53+
BUGGY,
54+
/**
55+
* Do not save any schedules.
56+
*/
57+
NONE,
5458
}
5559

5660
/**
@@ -103,10 +107,6 @@ public final class SchedulerConfig {
103107
* Weighted probability of delivering a message
104108
*/
105109
private int deliverMessageWeight = 99;
106-
/**
107-
* Weighted probability of delivering a request from a client
108-
*/
109-
private int deliverClientRequestWeight = 99;
110110
/**
111111
* Weighted probability of dropping a message.
112112
* The default is 0 (no messages dropped as a scheduler decision).

0 commit comments

Comments
 (0)