1
1
package byzzbench .simulator ;
2
2
3
- import byzzbench .simulator .protocols .hbft .message .PanicMessage ;
4
- import byzzbench .simulator .protocols .hbft .message . RequestMessage ;
3
+ import byzzbench .simulator .protocols .hbft .message .* ;
4
+ import byzzbench .simulator .protocols .hbft .pojo . ClientReplyKey ;
5
5
import byzzbench .simulator .transport .MessagePayload ;
6
-
7
6
import com .fasterxml .jackson .annotation .JsonIgnore ;
8
-
9
7
import lombok .Getter ;
10
8
import lombok .experimental .SuperBuilder ;
11
9
12
10
import java .io .Serializable ;
13
11
import java .security .MessageDigest ;
14
12
import java .security .NoSuchAlgorithmException ;
15
13
import java .time .Duration ;
16
- import java .util .ArrayList ;
17
- import java .util .Collection ;
18
- import java .util .HashSet ;
19
- import java .util .Set ;
20
- import java .util .SortedMap ;
21
- import java .util .TreeMap ;
22
-
23
- import byzzbench .simulator .protocols .hbft .message .ReplyMessage ;
24
- import byzzbench .simulator .protocols .hbft .pojo .ClientReplyKey ;
14
+ import java .util .*;
25
15
26
16
27
17
/**
@@ -82,11 +72,11 @@ public class HbftClient extends Client {
82
72
@ Override
83
73
public void sendRequest () {
84
74
String requestId = String .format ("%s/%d" , super .id , super .requestSequenceNumber .incrementAndGet ());
85
- long timestamp = System . currentTimeMillis ();
75
+ long timestamp = this . getCurrentTime (). toEpochMilli ();
86
76
RequestMessage request = new RequestMessage (requestId , timestamp , super .id );
87
77
this .sentRequests .put (super .requestSequenceNumber .get (), request );
88
78
this .sentRequestsByTimestamp .put (timestamp , requestId );
89
- super . getScenario (). getTransport (). multicastClientRequest ( super . id , timestamp , requestId , super . scenario . getTransport (). getNodeIds () );
79
+ this . broadcastRequest ( timestamp , requestId );
90
80
91
81
// Set timeout
92
82
Long timeoutId = this .setTimeout ("REQUEST" , this ::retransmitOrPanic , this .timeout );
@@ -100,30 +90,37 @@ public void retransmitOrPanic() {
100
90
// Based on hBFT 4.1 it uses the identical request
101
91
// TODO: It probably should not be the same timestamp
102
92
long timestamp = this .sentRequests .get (super .requestSequenceNumber .get ()).getTimestamp ();
103
- super . scenario . getTransport (). multicastClientRequest ( super . id , timestamp , requestId , super . scenario . getTransport (). getNodeIds () );
93
+ this . broadcastRequest ( timestamp , requestId );
104
94
} else if (this .shouldPanic (tolerance )) {
105
95
RequestMessage message = this .sentRequests .get (super .requestSequenceNumber .get ());
106
- PanicMessage panic = new PanicMessage (this .digest (message ), System . currentTimeMillis (), super .id );
96
+ PanicMessage panic = new PanicMessage (this .digest (message ), this . getCurrentTime (). toEpochMilli (), super .id );
107
97
super .scenario .getTransport ().multicast (this , super .scenario .getTransport ().getNodeIds (), panic );
108
98
}
109
99
this .clearTimeout (timeouts .get (super .requestSequenceNumber .get ()));
110
100
Long timeoutId = this .setTimeout ("REQUEST" , this ::retransmitOrPanic , this .timeout );
111
101
timeouts .put (super .requestSequenceNumber .get (), timeoutId );
112
102
}
113
103
104
+ private void broadcastRequest (long timestamp , String requestId ) {
105
+ MessagePayload payload = new ClientRequestMessage (timestamp , requestId );
106
+ SortedSet <String > replicaIds = super .scenario .getTransport ().getNodeIds ();
107
+ getScenario ().getTransport ().multicast (this , replicaIds , payload );
108
+ }
109
+
114
110
/**
115
111
* Handles a reply received by the client.
112
+ *
116
113
* @param senderId The ID of the sender of the reply.
117
- * @param reply The reply received by the client.
118
- * @param tolerance the tolerance of the protocol (used for hbft)
114
+ * @param payload The payload received by the client.
119
115
*/
120
- public void handleReply (String senderId , MessagePayload reply , long tolerance , long seqNumber ) {
121
- if (!(reply instanceof ReplyMessage )) {
116
+ public void handleMessage (String senderId , MessagePayload payload ) {
117
+ if (!(payload instanceof ClientReplyMessage clientReplyMessage )) {
122
118
return ;
123
119
}
124
- ClientReplyKey key = new ClientReplyKey (((ReplyMessage ) reply ).getResult ().toString (), seqNumber );
120
+ ReplyMessage reply = clientReplyMessage .getReply ();
121
+ ClientReplyKey key = new ClientReplyKey (reply .getResult ().toString (), reply .getSequenceNumber ());
125
122
// Default is for testing only
126
- String currRequest = this .sentRequestsByTimestamp .getOrDefault ((( ReplyMessage ) reply ) .getTimestamp (), "C/0" );
123
+ String currRequest = this .sentRequestsByTimestamp .getOrDefault (reply .getTimestamp (), "C/0" );
127
124
this .hbftreplies .putIfAbsent (currRequest , new TreeMap <>());
128
125
this .hbftreplies .get (currRequest ).putIfAbsent (key , new ArrayList <>());
129
126
this .hbftreplies .get (currRequest ).get (key ).add (reply );
@@ -132,30 +129,15 @@ public void handleReply(String senderId, MessagePayload reply, long tolerance, l
132
129
* If the client received 2f + 1 correct replies,
133
130
* and the request has not been completed yet.
134
131
*/
135
- if (this .completedReplies (tolerance )
136
- && !this .completedRequests .contains (key )
137
- && super .requestSequenceNumber .get () <= this .maxRequests ) {
138
- this .completedRequests .add (key );
139
- this .clearTimeout (this .timeouts .get (super .requestSequenceNumber .get ()));
140
- this .sendRequest ();
132
+ if (this .completedReplies (clientReplyMessage . getTolerance ())
133
+ && !this .completedRequests .contains (key )
134
+ && super .requestSequenceNumber .get () <= this .maxRequests ) {
135
+ this .completedRequests .add (key );
136
+ this .clearTimeout (this .timeouts .get (super .requestSequenceNumber .get ()));
137
+ this .sendRequest ();
141
138
}
142
139
}
143
140
144
- /**
145
- * Handles a reply received by the client.
146
- *
147
- * @param senderId The ID of the sender of the reply.
148
- * @param reply The reply received by the client.
149
- */
150
- @ Override
151
- public void handleMessage (String senderId , MessagePayload reply ) {
152
- // this.replies.putIfAbsent(super.requestSequenceNumber.get(), new ArrayList<>());
153
- // this.replies.get(super.requestSequenceNumber.get()).add(reply);
154
- // if (super.requestSequenceNumber.get() < this.maxRequests) {
155
- // this.sendRequest();
156
- // }
157
- }
158
-
159
141
/**
160
142
* Set a timeout for this replica.
161
143
*
@@ -166,22 +148,7 @@ public void handleMessage(String senderId, MessagePayload reply) {
166
148
*/
167
149
public long setTimeout (String name , Runnable r , long timeout ) {
168
150
Duration duration = Duration .ofSeconds (timeout );
169
- return super .scenario .getTransport ().setTimeout (this , r , duration );
170
- }
171
-
172
- /**
173
- * Set a timeout for this client.
174
- *
175
- * @param r the runnable to execute when the timeout occurs
176
- * @param timeout the timeout in milliseconds
177
- * @return the timeout ID
178
- */
179
- public long setTimeout (Runnable r , long timeout ) {
180
- Runnable wrapper = () -> {
181
- r .run ();
182
- };
183
- Duration duration = Duration .ofSeconds (timeout );
184
- return super .scenario .getTransport ().setClientTimeout (super .id , wrapper , duration );
151
+ return super .scenario .getTransport ().setTimeout (this , r , duration , name );
185
152
}
186
153
187
154
/**
@@ -206,8 +173,8 @@ public boolean shouldRetransmit(long tolerance) {
206
173
public boolean shouldPanic (long tolerance ) {
207
174
String currRequest = String .format ("%s/%d" , super .id , super .requestSequenceNumber .get ());
208
175
for (ClientReplyKey key : hbftreplies .get (currRequest ).keySet ()) {
209
- return this .hbftreplies .get (currRequest ).get (key ).size () >= tolerance + 1
210
- && this .hbftreplies .get (currRequest ).get (key ).size () < tolerance * 2 + 1 ;
176
+ return this .hbftreplies .get (currRequest ).get (key ).size () >= tolerance + 1
177
+ && this .hbftreplies .get (currRequest ).get (key ).size () < tolerance * 2 + 1 ;
211
178
}
212
179
return false ;
213
180
}
0 commit comments