@@ -88,18 +88,6 @@ public ZyzzyvaReplica(String replicaId,
88
88
);
89
89
// we set this as a null commit certificate for view changes etc. this is the first instance the system is stable
90
90
this .getMessageLog ().setMaxCC (startCC );
91
-
92
- this .setRequestTimeoutId (this .setTimeout (
93
- "requestTimeout" ,
94
- () -> {
95
- log .warning ("Replica " + this .getId () + " didn't receive next request in time, init view change" );
96
- IHateThePrimaryMessage ihtpm = new IHateThePrimaryMessage (this .getViewNumber ());
97
- ihtpm .sign (this .getId ());
98
- this .broadcastMessage (ihtpm );
99
- this .handleIHateThePrimaryMessage (this .getId (), ihtpm );
100
- },
101
- Duration .ofSeconds (300 )
102
- ));
103
91
}
104
92
105
93
@ Override
@@ -272,7 +260,7 @@ public void handleClientRequest(String clientId, Serializable request) {
272
260
this .calculateHistory (this .getHighestSequenceNumber () + 1 , digest ),
273
261
// digest
274
262
digest );
275
- log .info ("Replica " + this .getId () + " ordering request with sequence number " + orm .getSequenceNumber ());
263
+ log .info ("Replica " + this .getId () + " ordering request with sequence number " + orm .getSequenceNumber () + " and operation " + requestMessage . getOperation () );
276
264
orm .sign (this .getId ());
277
265
OrderedRequestMessageWrapper ormw = new OrderedRequestMessageWrapper (orm , requestMessage );
278
266
this .broadcastMessage (ormw );
@@ -366,12 +354,19 @@ public void handleOrderedRequestMessageWrapper(String sender, OrderedRequestMess
366
354
log .warning ("Failed to clear forward to primary timeout, possibly because it's been triggered" );
367
355
} catch (NullPointerException ignored ) {
368
356
}
369
- SpeculativeResponseWrapper response = this .executeOrderedRequest (ormw );
357
+ SpeculativeResponseWrapper srw = this .executeOrderedRequest (ormw );
370
358
if (ormw .getRequestMessage ().getClientId ().equals ("Noop" )) {
371
359
log .info ("Received a noop" );
372
360
return ;
373
361
}
374
- this .sendReplyToClient (ormw .getRequestMessage ().getClientId (), response );
362
+ this .sendReplyToClient (ormw .getRequestMessage ().getClientId (), srw );
363
+
364
+ // checkpointing
365
+ if (ormw .getOrderedRequest ().getSequenceNumber () % this .getCP_INTERVAL () == 0 ) {
366
+ log .info ("Replica " + this .getId () + " going to checkpoint" );
367
+ this .broadcastMessage (srw .getSpecResponse ());
368
+ this .handleSpeculativeResponse (this .getId (), srw .getSpecResponse ());
369
+ }
375
370
}
376
371
377
372
/**
@@ -394,13 +389,6 @@ public SpeculativeResponseWrapper executeOrderedRequest(OrderedRequestMessageWra
394
389
// updates the request cache
395
390
this .getMessageLog ().putResponseCache (clientId , ormw .getRequestMessage (), srw );
396
391
397
- // checkpointing
398
- if (ormw .getOrderedRequest ().getSequenceNumber () % this .getCP_INTERVAL () == 0 ) {
399
- log .info ("Checkpointing" );
400
- this .broadcastMessage (srw .getSpecResponse ());
401
- this .handleSpeculativeResponse (this .getId (), srw .getSpecResponse ());
402
- }
403
-
404
392
return srw ;
405
393
}
406
394
@@ -756,6 +744,7 @@ private void handleCommitMessage(String sender, CommitMessage commitMessage) {
756
744
log .warning ("Received invalid commit certificate from " + sender );
757
745
return ;
758
746
}
747
+ log .info ("Received a commit certificate from " + sender + " with sequence number " + cc .getSequenceNumber ());
759
748
// commit the operations
760
749
this .handleCommitCertificate (cc );
761
750
LocalCommitMessage lcm = new LocalCommitMessage (
@@ -764,6 +753,7 @@ private void handleCommitMessage(String sender, CommitMessage commitMessage) {
764
753
cc .getHistory (),
765
754
this .getId (),
766
755
sender );
756
+ log .info ("Replica " + this .getId () + " sending a local commit message to " + sender );
767
757
lcm .sign (this .getId ());
768
758
this .sendMessage (lcm , sender );
769
759
}
@@ -801,6 +791,7 @@ private void handleCommitCertificate(CommitCertificate cc) {
801
791
this .getMessageLog ().setMaxCC (cc );
802
792
if (this .getCommitLog ().getHighestSequenceNumber () > this .getMessageLog ().getMaxCC ().getSequenceNumber ()) {
803
793
log .warning ("Replica " + this .getId () + " has a higher sequence number in the commit log than the maxCC" );
794
+ throw new IllegalStateException ("Replica " + this .getId () + " has a higher sequence number in the commit log than the maxCC" );
804
795
}
805
796
}
806
797
@@ -849,7 +840,7 @@ private boolean isValidCommitCertificate(CommitCertificate cc) {
849
840
850
841
// if the currentCC is not null and the sequence number is the same, we return true, this means we commit again?
851
842
if (currentCC != null && cc .getSequenceNumber () == currentCC .getSequenceNumber ()) {
852
- log .warning ("Replica " + this .getId () +" received a commit certificate with the same sequence number" );
843
+ log .warning ("Replica " + this .getId () + " received a commit certificate with the same sequence number " );
853
844
return false ;
854
845
}
855
846
@@ -994,21 +985,6 @@ private void commitToViewChange() {
994
985
log .warning ("MaxCC is null" );
995
986
}
996
987
997
-
998
- // creates the CCs
999
- // if (this.getMessageLog().getMaxCC() != null && this.getMessageLog().getMaxCC().getViewNumber() == this.getViewNumber()) {
1000
- // cc = this.getMessageLog().getMaxCC();
1001
- // }
1002
- // } else if (this.getMessageLog()
1003
- // .getViewConfirmMessages()
1004
- // .getOrDefault(this.getViewNumber(), new ArrayList<>())
1005
- // .size() >= this.faultsTolerated + 1) {
1006
- // cc = new ArrayList<>(this.getMessageLog().getViewConfirmMessages().get(this.getViewNumber()));
1007
- // } else {
1008
- // if(this.getMessageLog().getNewViewMessages().isEmpty()) {log.warning("New view messages is empty"); return;}
1009
- // cc = this.getMessageLog().getNewViewMessages().sequencedValues().getLast();
1010
- // }
1011
-
1012
988
if (this .getMessageLog ().getCheckpointMessages ().get (this .getMessageLog ().getLastCheckpoint ()) == null ) {
1013
989
log .warning ("Checkpoint messages is null" );
1014
990
}
@@ -1032,7 +1008,7 @@ private void commitToViewChange() {
1032
1008
this .broadcastMessage (vcmw );
1033
1009
this .handleViewChangeMessageWrapper (this .getId (), vcmw );
1034
1010
}
1035
-
1011
+ log . info ( "Replica " + this . getId () + " committed to a view change with maxCC " + cc . getSequenceNumber () + ", last checkpoint " + this . getMessageLog (). getLastCheckpoint () + " and highest sequence number " + this . getHighestSequenceNumber ());
1036
1012
this .setDisgruntled (true );
1037
1013
}
1038
1014
@@ -1450,31 +1426,7 @@ public void handleViewConfirmMessage(String sender, ViewConfirmMessage vcm) {
1450
1426
private void reconcileLocalHistoryViewChange (Collection <ViewChangeMessage > viewChangeMessages , SortedMap <Long , OrderedRequestMessageWrapper > calculatedHistory ) {
1451
1427
long latestStableCheckpoint = viewChangeMessages .stream ().map (ViewChangeMessage ::getStableCheckpoint ).max (Long ::compareTo ).orElse (-1L );
1452
1428
CommitCertificate maxCC = viewChangeMessages .stream ().map (ViewChangeMessage ::getCommitCertificate ).max (Comparator .comparingLong (CommitCertificate ::getSequenceNumber )).get ();
1453
- // // when we perform a view change directly after a checkpoint
1454
- // // our maxCC is equal to this
1455
- // if (calculatedHistory.isEmpty()) {
1456
- // log.info("Calculated history is empty probably because we committed right before the view change");
1457
- // if (maxCC.getSequenceNumber() != latestStableCheckpoint) {
1458
- // throw new IllegalStateException("MaxCC sequence number is not equal to the latest stable checkpoint when calculated history is empty");
1459
- // }
1460
- // // we are up to speed, nothing to reconcile
1461
- // if (latestStableCheckpoint == this.getHighestSequenceNumber()) return;
1462
- // // to create a CC, we need 2f + 1 replicas or more to agree
1463
- // // to create new view message we need 2f + 1 replicas, so our maxCC will always equal the last cc.
1464
- // // we therefore roll back to the last CC, since we've already commmitted
1465
- // /// TODO: change this then
1466
- // if (latestStableCheckpoint < this.getHighestSequenceNumber() && this.getMessageLog().getLastCheckpoint() == latestStableCheckpoint) {
1467
- // rollbackToCheckpoint(latestStableCheckpoint, maxCC);
1468
- // return;
1469
- // }
1470
- // // we catch up to the checkpoint
1471
- // if (latestStableCheckpoint > this.getHighestSequenceNumber()) {
1472
- // this.catchUpToCheckpoint(latestStableCheckpoint, calculatedHistory, maxCC);
1473
- // return;
1474
- // }
1475
- // throw new IllegalStateException("Something went wrong in empty reconciliation");
1476
- // }
1477
-
1429
+ log .info ("Replica " + this .getId () + " reconciling local history with maxCC " + maxCC .getSequenceNumber () + ", latest stable checkpoint " + latestStableCheckpoint + " and highest sequence number " + this .getHighestSequenceNumber ());
1478
1430
if (calculatedHistory .isEmpty ()) {
1479
1431
// nothing to reconcile
1480
1432
if (latestStableCheckpoint == this .getHighestSequenceNumber ()) return ;
@@ -1509,24 +1461,49 @@ else if (this.getHighestSequenceNumber() == latestStableCheckpoint) {
1509
1461
// if we have a higher commit certificate, we set it
1510
1462
this .handleCommitCertificate (maxCC );
1511
1463
}
1512
- // max-l > min-s and histories diverge
1464
+ // max-l > min-s
1513
1465
else {
1514
- long lastHistoryKey = this .getHistory ().getLastKey ();
1466
+ long lastHistoryKey ;
1467
+ try {
1468
+ lastHistoryKey = this .getHistory ().getLastKey ();
1469
+ } catch (NoSuchElementException e ) {
1470
+ log .warning ("No last history key found" );
1471
+ return ;
1472
+ }
1473
+
1515
1474
long lastHistory = this .getHistory ().get (lastHistoryKey );
1516
1475
if (calculatedHistory .get (lastHistoryKey ).getOrderedRequest ().getHistory () == lastHistory ) {
1517
1476
// handle the last commit certificate (make sure everything is committed)
1518
1477
// execute from max-l + 1
1478
+
1519
1479
for (long i = this .getHighestSequenceNumber () + 1 ; i <= calculatedHistory .sequencedKeySet ().getLast (); i ++) {
1520
1480
this .executeOrderedRequest (calculatedHistory .get (i ));
1521
1481
}
1522
1482
if (this .getMessageLog ().getLastCheckpoint () < latestStableCheckpoint ) {
1523
1483
this .getMessageLog ().setLastCheckpoint (latestStableCheckpoint );
1524
1484
}
1525
- this .handleCommitCertificate (maxCC );
1485
+
1486
+ if (this .getMessageLog ().getMaxCC ().getSequenceNumber () < maxCC .getSequenceNumber ()) {
1487
+ this .handleCommitCertificate (maxCC );
1488
+ }
1526
1489
}
1527
1490
// histories diverge and we roll back
1528
1491
else {
1529
- this .rollbackToCheckpoint (latestStableCheckpoint , calculatedHistory , maxCC );
1492
+ if (this .getMessageLog ().getMaxCC ().getSequenceNumber () > maxCC .getSequenceNumber ()) {
1493
+ log .info ("Diverging histories, rolling back to checkpoint" );
1494
+ this .getMessageLog ().getOrderedMessages ().clear ();
1495
+ this .getHistory ().clear ();
1496
+ this .getMessageLog ().getRequestCache ().clear ();
1497
+ for (OrderedRequestMessageWrapper ormw : calculatedHistory .sequencedValues ()) {
1498
+ if (ormw .getOrderedRequest ().getSequenceNumber () <= this .getMessageLog ().getMaxCC ().getSequenceNumber ()) {
1499
+ continue ;
1500
+ }
1501
+ this .executeOrderedRequest (ormw );
1502
+ }
1503
+
1504
+ } else {
1505
+ this .rollbackToCheckpoint (latestStableCheckpoint , calculatedHistory , maxCC );
1506
+ }
1530
1507
}
1531
1508
}
1532
1509
@@ -1544,6 +1521,7 @@ else if (this.getHighestSequenceNumber() == latestStableCheckpoint) {
1544
1521
* @param maxCC - the maxCC received from the view change messages
1545
1522
*/
1546
1523
private void catchUpToCheckpoint (long latestStableCheckpoint , SortedMap <Long , OrderedRequestMessageWrapper > calculatedHistory , CommitCertificate maxCC ) {
1524
+ log .info ("Replica " + this .getId () + " catching up to checkpoint " + latestStableCheckpoint );
1547
1525
// set the latest checkpoint
1548
1526
this .getMessageLog ().setLastCheckpoint (latestStableCheckpoint );
1549
1527
// clear the history
@@ -1570,23 +1548,7 @@ private void catchUpToCheckpoint(long latestStableCheckpoint, SortedMap<Long, Or
1570
1548
* @param maxCC - the maxCC received from the view change messages
1571
1549
*/
1572
1550
private void rollbackToCheckpoint (long latestStableCheckpoint , SortedMap <Long , OrderedRequestMessageWrapper > calculatedHistory , CommitCertificate maxCC ) {
1573
- // // removes the checkpoint responses
1574
- // this.getMessageLog().getSpeculativeResponsesCheckpoint().clear();
1575
- // // puts the orm corresponding to the latest stable checkpoint back into the ordered messages
1576
- // OrderedRequestMessageWrapper ccRequest = this.getMessageLog().getOrderedMessages().get(latestStableCheckpoint);
1577
- // if (ccRequest == null) {
1578
- // log.warning("Couldn't find the checkpoint request");
1579
- // }
1580
- // this.getMessageLog().getOrderedMessages().clear();
1581
- // this.getMessageLog().putOrderedRequestMessageWrapper(ccRequest);
1582
- // this.getHistory().clear();
1583
- // if (maxCC.getSequenceNumber() != latestStableCheckpoint) {
1584
- // /// TODO: this doesn't seem right
1585
- // throw new IllegalStateException("MaxCC sequence number is not equal to the latest stable checkpoint when calculated history is empty");
1586
- // }
1587
- // this.getHistory().add(latestStableCheckpoint, maxCC.getHistory());
1588
- // this.setHighestSequenceNumber(latestStableCheckpoint);
1589
-
1551
+ log .info ("Replica " + this .getId () + " rolling back to checkpoint " + latestStableCheckpoint );
1590
1552
// remove everything in the message logs
1591
1553
this .getMessageLog ().getSpeculativeResponsesCheckpoint ().clear ();
1592
1554
this .getMessageLog ().getOrderedMessages ().clear ();
@@ -1665,6 +1627,10 @@ private void handleNewViewMessage(String sender, NewViewMessage nvm) {
1665
1627
if (!isValidNewViewMessage (nvm )) {
1666
1628
return ;
1667
1629
}
1630
+ if (this .getMessageLog ().getNewViewMessages ().containsKey (nvm .getFutureViewNumber ())) {
1631
+ log .info ("Received a new view message for a view number that we've already received" );
1632
+ return ;
1633
+ }
1668
1634
1669
1635
// add the new view message to the message log
1670
1636
this .getMessageLog ().putNewViewMessage (nvm );
@@ -1724,11 +1690,6 @@ private void beginNewView(ViewConfirmMessage vcm) {
1724
1690
this .getMessageLog ().getIHateThePrimaries ().getOrDefault (this .getViewNumber (), new TreeMap <>()).clear ();
1725
1691
this .getMessageLog ().getFillHoleMessages ().clear ();
1726
1692
1727
- // this.getHistory().clear();
1728
- // this.getHistory().add(vcm.getLastKnownSequenceNumber(), vcm.getHistory());
1729
- /// TODO: See if this is higher than what we have so far, because it might mess with the ordering and cause a replica to skip.
1730
- // this.setHighestSequenceNumber(vcm.getLastKnownSequenceNumber());
1731
-
1732
1693
// sets the view number and primary
1733
1694
log .info ("Replica " +
1734
1695
this .getId () +
@@ -1813,6 +1774,7 @@ private void checkIfCommitCheckpoint(long sequenceNumber) {
1813
1774
* @param sequenceNumber - the sequence number to create the checkpoint for
1814
1775
*/
1815
1776
private void createCheckpoint (long sequenceNumber ) {
1777
+ log .info ("Replica " + this .getId () + " created stable checkpoint for sequence number " + sequenceNumber );
1816
1778
// set the last checkpoint
1817
1779
this .getMessageLog ().setLastCheckpoint (sequenceNumber );
1818
1780
@@ -1914,6 +1876,7 @@ private void checkIfCheckpoint(long sequenceNumber) {
1914
1876
);
1915
1877
cm .sign (this .getId ());
1916
1878
this .broadcastMessage (cm );
1879
+ log .info ("Replica " + this .getId () + " sent a checkpoint message for sequence number " + sequenceNumber );
1917
1880
this .handleCheckpointMessage (this .getId (), cm );
1918
1881
}
1919
1882
}
0 commit comments