@@ -77,6 +77,8 @@ public class Transport {
77
77
*/
78
78
@ JsonIgnore
79
79
private final List <TransportObserver > observers = new ArrayList <>();
80
+ @ Getter
81
+ private boolean isGlobalStabilizationTime = false ;
80
82
81
83
/**
82
84
* Adds an observer to the transport layer.
@@ -350,6 +352,11 @@ public synchronized void deliverEvent(long eventId) throws Exception {
350
352
* @param eventId The ID of the message to drop.
351
353
*/
352
354
public synchronized void dropEvent (long eventId ) {
355
+ // Check if it is GST - no more dropping
356
+ if (this .isGlobalStabilizationTime ) {
357
+ throw new IllegalStateException ("Cannot drop events during GST" );
358
+ }
359
+
353
360
// check if event is a message
354
361
Event e = events .get (eventId );
355
362
if (e instanceof TimeoutEvent ) {
@@ -359,6 +366,7 @@ public synchronized void dropEvent(long eventId) {
359
366
if (e .getStatus () != Event .Status .QUEUED ) {
360
367
throw new IllegalArgumentException ("Event not found or not in QUEUED state" );
361
368
}
369
+
362
370
e .setStatus (Event .Status .DROPPED );
363
371
this .observers .forEach (o -> o .onEventDropped (e ));
364
372
//log.info("Dropped: " + e);
@@ -404,6 +412,13 @@ public synchronized void applyMutation(long eventId, Fault fault) {
404
412
"Event %d is not a message - cannot mutate it." , eventId ));
405
413
}
406
414
415
+ // check if sender is faulty
416
+ if (!this .scenario .isFaultyReplica (m .getSenderId ())) {
417
+ throw new IllegalArgumentException (
418
+ String .format ("Cannot mutate message: sender %s is not marked as faulty" , m .getSenderId ())
419
+ );
420
+ }
421
+
407
422
// create input for the fault
408
423
FaultContext input = new FaultContext (this .scenario , e );
409
424
@@ -647,4 +662,32 @@ public synchronized List<Fault> getEnabledNetworkFaults() {
647
662
public synchronized Fault getNetworkFault (String faultId ) {
648
663
return this .networkFaults .get (faultId );
649
664
}
665
+
666
+ /**
667
+ * Simulates GST event, according to the partial-synchrony model:
668
+ * <ul>
669
+ * <li>All dropped messages are re-queued</li>
670
+ * <li>Prevents further dropping of messages</li>
671
+ * <li>All network partitions are healed</li>
672
+ * <li>Prevents further network partitions</li>
673
+ * </ul>
674
+ */
675
+ public void globalStabilizationTime () {
676
+ this .isGlobalStabilizationTime = true ;
677
+
678
+ // re-queue all dropped messages
679
+ this .events .values ().stream ()
680
+ .filter (e -> e .getStatus () == Event .Status .DROPPED )
681
+ .forEach (e -> {
682
+ e .setStatus (Event .Status .QUEUED );
683
+ this .observers .forEach (o -> o .onEventRequeued (e ));
684
+ });
685
+
686
+ // clear all network faults
687
+ // XXX: Is this the right thing to do?
688
+ this .networkFaults .clear ();
689
+
690
+ // heal all partitions
691
+ this .router .resetPartitions ();
692
+ }
650
693
}
0 commit comments