Skip to content

Commit 818eea0

Browse files
committed
Fix #133
1 parent 63e3eab commit 818eea0

File tree

7 files changed

+112
-15
lines changed

7 files changed

+112
-15
lines changed

simulator/src/main/java/byzzbench/simulator/Client.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void handleMessage(String senderId, MessagePayload reply) {
8282

8383
@Override
8484
public Instant getCurrentTime() {
85-
return this.scenario.getTimekeeper().getTime(this);
85+
return this.scenario.getTimekeeper().incrementAndGetTime(this);
8686
}
8787

8888
/**

simulator/src/main/java/byzzbench/simulator/Replica.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public void notifyObserversTimeout() {
269269

270270
@JsonIgnore
271271
public Instant getCurrentTime() {
272-
return this.scenario.getTimekeeper().getTime(this);
272+
return this.scenario.getTimekeeper().incrementAndGetTime(this);
273273
}
274274

275275
/**

simulator/src/main/java/byzzbench/simulator/Scenario.java

+8
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,14 @@ public interface Scenario extends Serializable {
130130
*/
131131
List<ScenarioObserver> getObservers();
132132

133+
134+
/**
135+
* Add an observer to the scenario.
136+
*
137+
* @param observer The observer to add.
138+
*/
139+
void addObserver(ScenarioObserver observer);
140+
133141
/**
134142
* Load the parameters for the scenario.
135143
*
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,104 @@
11
package byzzbench.simulator;
22

3+
import byzzbench.simulator.faults.Fault;
4+
import byzzbench.simulator.transport.Event;
5+
import byzzbench.simulator.transport.MutateMessageEventPayload;
6+
import byzzbench.simulator.transport.TimeoutEvent;
7+
import byzzbench.simulator.transport.TransportObserver;
38
import com.fasterxml.jackson.annotation.JsonIgnore;
4-
import lombok.RequiredArgsConstructor;
59

610
import java.io.Serializable;
11+
import java.time.Duration;
712
import java.time.Instant;
8-
import java.util.concurrent.atomic.AtomicLong;
13+
import java.util.SortedMap;
14+
import java.util.TreeMap;
915

1016
/**
11-
* A timekeeper that provides timestamps to the replicas in the simulator.
17+
* A timekeeper that provides timestamps to the nodes in the simulator.
1218
*/
13-
@RequiredArgsConstructor
14-
public class Timekeeper implements Serializable {
19+
public class Timekeeper implements Serializable, TransportObserver {
20+
/**
21+
* The increment to use each time a timestamp is requested.
22+
*/
23+
public static final Duration INCREMENT = Duration.ofMillis(1);
24+
1525
@JsonIgnore
16-
private final transient Scenario scenario;
17-
private final AtomicLong counter = new AtomicLong(0);
26+
private final Scenario scenario;
27+
28+
/**
29+
* The current time for each node
30+
*/
31+
private final SortedMap<String, Instant> times = new TreeMap<>();
32+
33+
/**
34+
* Create a new timekeeper for the given scenario.
35+
*
36+
* @param scenario the scenario to create the timekeeper for
37+
*/
38+
public Timekeeper(Scenario scenario) {
39+
this.scenario = scenario;
40+
this.scenario.getTransport().addObserver(this);
41+
}
42+
43+
/**
44+
* Advance the time for the given node and return the new time.
45+
*
46+
* @param node the node to advance the time for
47+
* @return the new time for the node
48+
*/
49+
public Instant incrementAndGetTime(Node node) {
50+
Instant current = getTime(node);
51+
Instant next = current.plus(INCREMENT);
52+
times.put(node.getId(), next);
53+
return next;
54+
}
1855

56+
/**
57+
* Get the current time for the given node.
58+
*
59+
* @param node the node to get the time for
60+
* @return the current time for the node
61+
*/
1962
public Instant getTime(Node node) {
20-
return Instant.ofEpochMilli(counter.incrementAndGet());
63+
return times.computeIfAbsent(node.getId(), k -> Instant.ofEpochMilli(0));
64+
}
65+
66+
@Override
67+
public void onEventAdded(Event event) {
68+
// nothing to do
69+
}
70+
71+
@Override
72+
public void onEventDropped(Event event) {
73+
// nothing to do
74+
}
75+
76+
@Override
77+
public void onEventDelivered(Event event) {
78+
// check if it was a timeout
79+
if (!(event instanceof TimeoutEvent timeoutEvent)) {
80+
return;
81+
}
82+
83+
// set counter to max of its current value and the expiration time
84+
String nodeId = timeoutEvent.getNodeId();
85+
Instant expiration = timeoutEvent.getExpiresAt();
86+
Instant current = times.getOrDefault(nodeId, Instant.ofEpochMilli(0));
87+
times.put(nodeId, current.isAfter(expiration) ? current : expiration);
88+
}
89+
90+
@Override
91+
public void onMessageMutation(MutateMessageEventPayload payload) {
92+
// nothing to do
93+
}
94+
95+
@Override
96+
public void onFault(Fault fault) {
97+
// nothing to do
98+
}
99+
100+
@Override
101+
public void onTimeout(TimeoutEvent event) {
102+
// nothing to do
21103
}
22104
}

simulator/src/main/java/byzzbench/simulator/protocols/pbft/Principal.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public void set_out_key(PbftReplica r, int[] k, long t) {
165165
*/
166166

167167
this.tstamp = t;
168-
my_tstamp = r.getScenario().getTimekeeper().getTime(r);
168+
my_tstamp = r.getCurrentTime();
169169
}
170170
}
171171

simulator/src/main/java/byzzbench/simulator/transport/Transport.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,9 @@ public synchronized void deliverEvent(long eventId) throws Exception {
248248
this.scenario.getSchedule().appendEvent(e);
249249
e.setStatus(Event.Status.DELIVERED);
250250

251+
// For timeouts, this should be called before, so the Replica time is updated
252+
this.observers.forEach(o -> o.onEventDelivered(e));
253+
251254
switch (e) {
252255
case ClientRequestEvent c -> {
253256
this.scenario.getNodes().get(c.getRecipientId()).handleMessage(c.getSenderId(), c.getPayload());
@@ -263,8 +266,6 @@ public synchronized void deliverEvent(long eventId) throws Exception {
263266
}
264267
}
265268

266-
this.observers.forEach(o -> o.onEventDelivered(e));
267-
268269
log.info("Delivered " + e);
269270
}
270271

simulator/src/main/java/byzzbench/simulator/transport/TransportObserver.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,43 @@
55
public interface TransportObserver {
66
/**
77
* Called when an event is added to the transport layer.
8+
*
89
* @param event The event that was added.
910
*/
1011
void onEventAdded(Event event);
1112

1213
/**
1314
* Called when the status of an event changes to {@link Event.Status#DROPPED}.
15+
*
1416
* @param event The event that was dropped.
1517
*/
1618
void onEventDropped(Event event);
1719

1820
/**
1921
* Called when the status of an event changes to {@link Event.Status#DELIVERED}.
22+
*
2023
* @param event The event that was delivered.
2124
*/
2225
void onEventDelivered(Event event);
2326

2427
/**
2528
* Called when a message is mutated.
29+
*
2630
* @param payload The payload of the mutation.
2731
*/
2832
void onMessageMutation(MutateMessageEventPayload payload);
2933

3034
/**
3135
* Called when a fault is injected.
36+
*
3237
* @param fault The fault that was injected.
3338
*/
3439
void onFault(Fault fault);
3540

3641
/**
37-
* Called when a timeout event is triggered.
38-
* @param event The timeout event that was triggered.
42+
* Called when a timeout event is created.
43+
*
44+
* @param event The timeout event that was created.
3945
*/
4046
void onTimeout(TimeoutEvent event);
4147
}

0 commit comments

Comments
 (0)