16
16
import java .util .*;
17
17
import java .util .concurrent .atomic .AtomicLong ;
18
18
import java .util .stream .Collectors ;
19
+ import java .util .stream .Stream ;
19
20
20
21
/**
21
22
* Transport layer for the simulator.
@@ -74,6 +75,7 @@ public class Transport {
74
75
* List of observers for the transport layer.
75
76
*/
76
77
@ JsonIgnore
78
+ @ Getter (onMethod_ = {@ Synchronized })
77
79
private final List <TransportObserver > observers = new ArrayList <>();
78
80
@ Getter
79
81
private boolean isGlobalStabilizationTime = false ;
@@ -84,7 +86,7 @@ public class Transport {
84
86
* @param observer The observer to add.
85
87
*/
86
88
public synchronized void addObserver (TransportObserver observer ) {
87
- this .observers .add (observer );
89
+ this .getObservers () .add (observer );
88
90
}
89
91
90
92
/**
@@ -93,7 +95,7 @@ public synchronized void addObserver(TransportObserver observer) {
93
95
* @param observer The observer to remove.
94
96
*/
95
97
public synchronized void removeObserver (TransportObserver observer ) {
96
- this .observers .remove (observer );
98
+ this .getObservers () .remove (observer );
97
99
}
98
100
99
101
/**
@@ -177,7 +179,7 @@ public synchronized void sendMessage(Node sender, MessagePayload message,
177
179
* @return A list of events in the given state
178
180
*/
179
181
public synchronized List <Event > getEventsInState (Event .Status status ) {
180
- return this .events .values ()
182
+ return this .getEvents () .values ()
181
183
.stream ()
182
184
.filter (m -> m .getStatus () == status )
183
185
.toList ();
@@ -190,14 +192,14 @@ public synchronized List<Event> getEventsInState(Event.Status status) {
190
192
*/
191
193
private synchronized void appendEvent (Event event ) {
192
194
// add the event to the map
193
- this .events .put (event .getEventId (), event );
195
+ this .getEvents () .put (event .getEventId (), event );
194
196
195
197
// apply automatic faults
196
198
this .automaticFaults .values ()
197
199
.forEach (f -> f .testAndAccept (new FaultContext (this .scenario , event )));
198
200
199
201
// notify observers
200
- this .observers .forEach (o -> o .onEventAdded (event ));
202
+ this .getObservers () .forEach (o -> o .onEventAdded (event ));
201
203
}
202
204
203
205
/**
@@ -221,13 +223,16 @@ public synchronized void multicast(Node sender, SortedSet<String> recipients,
221
223
222
224
// if they don't have connectivity, drop it directly
223
225
if (!router .haveConnectivity (sender .getId (), recipient )) {
226
+ System .out .println ("Dropped: " + sender .getId () + "->" + recipient + ": " + payload );
227
+ // print partition IDs for both nodes
228
+ System .out .println ("Partition IDs: " + router .getPartitions ().get (sender .getId ()) + " => " + router .getPartitions ().get (recipient ));
224
229
this .dropEvent (messageId );
225
230
}
226
231
}
227
232
}
228
233
229
234
public synchronized void deliverEvent (long eventId ) throws Exception {
230
- Event e = events .get (eventId );
235
+ Event e = this . getEvents () .get (eventId );
231
236
232
237
// check if null
233
238
if (e == null ) {
@@ -251,7 +256,7 @@ public synchronized void deliverEvent(long eventId) throws Exception {
251
256
e .setStatus (Event .Status .DELIVERED );
252
257
253
258
// For timeouts, this should be called before, so the Replica time is updated
254
- this .observers .forEach (o -> o .onEventDelivered (e ));
259
+ this .getObservers () .forEach (o -> o .onEventDelivered (e ));
255
260
256
261
switch (e ) {
257
262
case ClientRequestEvent c -> {
@@ -283,14 +288,14 @@ public synchronized void dropEvent(long eventId) {
283
288
}
284
289
285
290
// check if event is a message
286
- Event e = events .get (eventId );
291
+ Event e = this . getEvents () .get (eventId );
287
292
288
293
if (e .getStatus () != Event .Status .QUEUED ) {
289
294
throw new IllegalArgumentException ("Event not found or not in QUEUED state" );
290
295
}
291
296
292
297
e .setStatus (Event .Status .DROPPED );
293
- this .observers .forEach (o -> o .onEventDropped (e ));
298
+ this .getObservers () .forEach (o -> o .onEventDropped (e ));
294
299
log .info ("Dropped: " + e );
295
300
}
296
301
@@ -301,7 +306,7 @@ public synchronized void dropEvent(long eventId) {
301
306
* @return The event with the given ID.
302
307
*/
303
308
public synchronized Event getEvent (long eventId ) {
304
- return events .get (eventId );
309
+ return this . getEvents () .get (eventId );
305
310
}
306
311
307
312
/**
@@ -311,7 +316,7 @@ public synchronized Event getEvent(long eventId) {
311
316
* @param fault The fault to apply.
312
317
*/
313
318
public synchronized void applyMutation (long eventId , Fault fault ) {
314
- Event e = events .get (eventId );
319
+ Event e = this . getEvents () .get (eventId );
315
320
316
321
// check if event does not exist
317
322
if (e == null ) {
@@ -360,7 +365,7 @@ public synchronized void applyMutation(long eventId, Fault fault) {
360
365
// append the event to the schedule
361
366
mutateMessageEvent .setStatus (Event .Status .DELIVERED );
362
367
this .scenario .getSchedule ().appendEvent (mutateMessageEvent );
363
- this .observers .forEach (o -> o .onMessageMutation (mutateMessageEvent .getPayload ()));
368
+ this .getObservers () .forEach (o -> o .onMessageMutation (mutateMessageEvent .getPayload ()));
364
369
365
370
log .info ("Mutated: " + m );
366
371
}
@@ -392,7 +397,7 @@ public synchronized void applyFault(Fault fault) {
392
397
.build ();
393
398
faultEvent .setStatus (Event .Status .DELIVERED );
394
399
this .scenario .getSchedule ().appendEvent (faultEvent );
395
- this .observers .forEach (o -> o .onFault (fault ));
400
+ this .getObservers () .forEach (o -> o .onFault (fault ));
396
401
}
397
402
398
403
/**
@@ -414,7 +419,7 @@ public synchronized long setTimeout(Node node, Runnable runnable,
414
419
.task (runnable )
415
420
.build ();
416
421
this .appendEvent (timeoutEvent );
417
- this .observers .forEach (o -> o .onTimeout (timeoutEvent ));
422
+ this .getObservers () .forEach (o -> o .onTimeout (timeoutEvent ));
418
423
log .info ("Timeout set for " + node .getId () + " in " + timeout + "ms: " + timeoutEvent );
419
424
return timeoutEvent .getEventId ();
420
425
}
@@ -425,7 +430,7 @@ public synchronized long setTimeout(Node node, Runnable runnable,
425
430
* @param eventId The ID of the event to clear.
426
431
*/
427
432
public synchronized void clearTimeout (Node node , long eventId ) {
428
- Event e = events .get (eventId );
433
+ Event e = this . getEvents () .get (eventId );
429
434
430
435
if (e == null ) {
431
436
throw new IllegalArgumentException ("Event not found: " + eventId );
@@ -440,7 +445,7 @@ public synchronized void clearTimeout(Node node, long eventId) {
440
445
}
441
446
442
447
timeoutEvent .setStatus (Event .Status .DROPPED );
443
- this .observers .forEach (o -> o .onEventDropped (timeoutEvent ));
448
+ this .getObservers () .forEach (o -> o .onEventDropped (timeoutEvent ));
444
449
}
445
450
446
451
/**
@@ -450,7 +455,7 @@ public synchronized void clearTimeout(Node node, long eventId) {
450
455
* @return A list of event IDs of queued timeouts.
451
456
*/
452
457
public synchronized List <Long > getQueuedTimeouts (Node node ) {
453
- return this .events .values ()
458
+ return this .getEvents () .values ()
454
459
.stream ()
455
460
.filter (e -> e instanceof TimeoutEvent t &&
456
461
t .getNodeId ().equals (node .getId ()) &&
@@ -470,9 +475,9 @@ public synchronized void clearReplicaTimeouts(Node node) {
470
475
471
476
// remove all event IDs
472
477
for (Long eventId : eventIds ) {
473
- Event e = events .get (eventId );
478
+ Event e = this . getEvents () .get (eventId );
474
479
e .setStatus (Event .Status .DROPPED );
475
- this .observers .forEach (o -> o .onEventDropped (e ));
480
+ this .getObservers () .forEach (o -> o .onEventDropped (e ));
476
481
}
477
482
}
478
483
@@ -510,21 +515,26 @@ public synchronized Fault getNetworkFault(String faultId) {
510
515
* </ul>
511
516
*/
512
517
public void globalStabilizationTime () {
513
- this .isGlobalStabilizationTime = true ;
518
+ // clear all network faults
519
+ // XXX: Is this the right thing to do?
520
+ this .networkFaults .clear ();
521
+
522
+ // heal all partitions
523
+ this .router .resetPartitions ();
514
524
515
525
// re-queue all dropped messages
516
- this .events .values ().stream ()
526
+ Stream <Event > droppedEvents = this .getEvents ().values ().stream ()
527
+ .filter (e -> e .getStatus () == Event .Status .DROPPED );
528
+
529
+ long numDroppedEvents = droppedEvents .count ();
530
+ System .out .println ("Events dropped that will be requeued: " + numDroppedEvents );
531
+ this .getEvents ().values ().stream ()
517
532
.filter (e -> e .getStatus () == Event .Status .DROPPED )
518
533
.forEach (e -> {
519
534
e .setStatus (Event .Status .QUEUED );
520
- this .observers .forEach (o -> o .onEventRequeued (e ));
535
+ this .getObservers () .forEach (o -> o .onEventRequeued (e ));
521
536
});
522
537
523
- // clear all network faults
524
- // XXX: Is this the right thing to do?
525
- this .networkFaults .clear ();
526
-
527
- // heal all partitions
528
- this .router .resetPartitions ();
538
+ this .isGlobalStabilizationTime = true ;
529
539
}
530
540
}
0 commit comments