@@ -75,6 +75,8 @@ public class Transport {
75
75
*/
76
76
@ JsonIgnore
77
77
private final List <TransportObserver > observers = new ArrayList <>();
78
+ @ Getter
79
+ private boolean isGlobalStabilizationTime = false ;
78
80
79
81
/**
80
82
* Adds an observer to the transport layer.
@@ -275,12 +277,18 @@ public synchronized void deliverEvent(long eventId) throws Exception {
275
277
* @param eventId The ID of the message to drop.
276
278
*/
277
279
public synchronized void dropEvent (long eventId ) {
280
+ // Check if it is GST - no more dropping
281
+ if (this .isGlobalStabilizationTime ) {
282
+ throw new IllegalStateException ("Cannot drop events during GST" );
283
+ }
284
+
278
285
// check if event is a message
279
286
Event e = events .get (eventId );
280
287
281
288
if (e .getStatus () != Event .Status .QUEUED ) {
282
289
throw new IllegalArgumentException ("Event not found or not in QUEUED state" );
283
290
}
291
+
284
292
e .setStatus (Event .Status .DROPPED );
285
293
this .observers .forEach (o -> o .onEventDropped (e ));
286
294
log .info ("Dropped: " + e );
@@ -491,4 +499,32 @@ public synchronized List<Fault> getEnabledNetworkFaults() {
491
499
public synchronized Fault getNetworkFault (String faultId ) {
492
500
return this .networkFaults .get (faultId );
493
501
}
502
+
503
+ /**
504
+ * Simulates GST event, according to the partial-synchrony model:
505
+ * <ul>
506
+ * <li>All dropped messages are re-queued</li>
507
+ * <li>Prevents further dropping of messages</li>
508
+ * <li>All network partitions are healed</li>
509
+ * <li>Prevents further network partitions</li>
510
+ * </ul>
511
+ */
512
+ public void globalStabilizationTime () {
513
+ this .isGlobalStabilizationTime = true ;
514
+
515
+ // re-queue all dropped messages
516
+ this .events .values ().stream ()
517
+ .filter (e -> e .getStatus () == Event .Status .DROPPED )
518
+ .forEach (e -> {
519
+ e .setStatus (Event .Status .QUEUED );
520
+ this .observers .forEach (o -> o .onEventRequeued (e ));
521
+ });
522
+
523
+ // clear all network faults
524
+ // XXX: Is this the right thing to do?
525
+ this .networkFaults .clear ();
526
+
527
+ // heal all partitions
528
+ this .router .resetPartitions ();
529
+ }
494
530
}
0 commit comments