Skip to content

Commit f3fa8f1

Browse files
committed
Add tmp trie store implementation in snapshot processor
1 parent 81df262 commit f3fa8f1

File tree

5 files changed

+236
-38
lines changed

5 files changed

+236
-38
lines changed

rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.ethereum.core.TransactionPool;
4444
import org.ethereum.datasource.KeyValueDataSource;
4545
import org.ethereum.db.BlockStore;
46+
import org.ethereum.util.ByteUtil;
4647
import org.ethereum.util.RLP;
4748
import org.ethereum.util.RLPElement;
4849
import org.ethereum.util.RLPList;
@@ -78,7 +79,7 @@ public class SnapshotProcessor implements InternalService {
7879
private final Blockchain blockchain;
7980
private final TrieStore trieStore;
8081
private final KeyValueDataSource tmpSnapSyncKeyValueDataSource;
81-
public static final int TMP_NODES_SIZE_KEY = -1;
82+
private static final int TMP_NODES_SIZE_KEY = -1;
8283
private final BlockStore blockStore;
8384
private final int chunkSize;
8485
private final int checkpointDistance;
@@ -649,12 +650,23 @@ private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, Sn
649650
}
650651

651652
if (trieElements.size() > 0) {
653+
final var existingNodesSizeInBytes = tmpSnapSyncKeyValueDataSource.get(ByteUtil.intToBytes(TMP_NODES_SIZE_KEY));
654+
var existingNodesSize = 0;
655+
656+
if(existingNodesSizeInBytes != null) {
657+
existingNodesSize = ByteUtil.byteArrayToInt(existingNodesSizeInBytes);
658+
}
659+
652660
for (int i = 0; i < trieElements.size(); i++) {
653661
final RLPElement trieElement = trieElements.get(i);
654662
byte[] value = trieElement.getRLPData();
655663
nodes.add(TrieDTO.decodeFromSync(value));
664+
665+
tmpSnapSyncKeyValueDataSource.put(ByteUtil.intToBytes(existingNodesSize + i), value);
656666
}
657667
nodes.get(0).setLeftHash(firstNodeLeftHash);
668+
669+
tmpSnapSyncKeyValueDataSource.put(ByteUtil.intToBytes(TMP_NODES_SIZE_KEY), ByteUtil.intToBytes(nodes.size() + existingNodesSize));
658670
}
659671

660672
if (lastNodeHashes.size() > 0) {
@@ -673,7 +685,6 @@ private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, Sn
673685
}
674686

675687
if (TrieDTOInOrderRecoverer.verifyChunk(state.getRemoteRootHash(), preRootNodes, nodes, postRootNodes)) {
676-
state.getAllNodes().addAll(nodes);
677688
state.setStateSize(state.getStateSize().add(BigInteger.valueOf(trieElements.size())));
678689
state.setStateChunkSize(state.getStateChunkSize().add(BigInteger.valueOf(message.getChunkOfTrieKeyValue().length)));
679690
if (message.isComplete()) {
@@ -694,29 +705,48 @@ private boolean blocksVerified(SnapSyncState state) {
694705
return lastVerifiedBlockHeader != null && blockStore.isBlockExist(lastVerifiedBlockHeader.getParentHash().getBytes());
695706
}
696707

708+
private void moveNodesToTrie(TrieDTO node, int index) {
709+
trieStore.saveDTO(node);
710+
}
711+
712+
TrieDTO getNodeFromTmpSnapSyncKeyValueDataSource(int index) {
713+
final var nodeInBytes = tmpSnapSyncKeyValueDataSource.get(ByteUtil.intToBytes(index));
714+
return TrieDTO.decodeFromSync(nodeInBytes);
715+
}
716+
697717
/**
698718
* Once state share is received, rebuild the trie, save it in db and save all the blocks.
699719
*/
700720
private boolean rebuildStateAndSave(SnapSyncState state) {
701721
logger.info("Recovering trie...");
702-
final TrieDTO[] nodeArray = state.getAllNodes().toArray(new TrieDTO[0]);
703-
Optional<TrieDTO> result = TrieDTOInOrderRecoverer.recoverTrie(nodeArray, this.trieStore::saveDTO);
722+
final var existingNodesSizeInBytes = tmpSnapSyncKeyValueDataSource.get(ByteUtil.intToBytes(TMP_NODES_SIZE_KEY));
723+
final var existingNodesSize = ByteUtil.byteArrayToInt(existingNodesSizeInBytes);
724+
final var response = TrieDTOInOrderRecoverer.recoverTrie(this::getNodeFromTmpSnapSyncKeyValueDataSource, existingNodesSize, this::moveNodesToTrie);
725+
final var result = response.node();
704726

705-
if (result.isPresent() && Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) {
706-
logger.info("State final validation OK!");
727+
if (result.isEmpty() || !Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) {
728+
logger.error("State final validation FAILED");
707729

708-
this.blockchain.removeBlocksByNumber(0);
709-
//genesis is removed so backwards sync will always start.
730+
return false;
731+
}
710732

711-
BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore);
712-
state.connectBlocks(blockConnector);
713-
logger.info("Setting last block as best block...");
714-
this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty());
715-
this.transactionPool.setBestBlock(state.getLastBlock());
716-
return true;
733+
for (int i = 0; i < existingNodesSize; i++) {
734+
tmpSnapSyncKeyValueDataSource.delete(ByteUtil.intToBytes(i));
717735
}
718-
logger.error("State final validation FAILED");
719-
return false;
736+
737+
tmpSnapSyncKeyValueDataSource.delete(ByteUtil.intToBytes(TMP_NODES_SIZE_KEY));
738+
739+
logger.info("State final validation OK!");
740+
741+
this.blockchain.removeBlocksByNumber(0);
742+
//genesis is removed so backwards sync will always start.
743+
744+
BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore);
745+
state.connectBlocks(blockConnector);
746+
logger.info("Setting last block as best block...");
747+
this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty());
748+
this.transactionPool.setBestBlock(state.getLastBlock());
749+
return true;
720750
}
721751

722752
private void generateChunkRequestTasks(SnapSyncState state) {

rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import co.rsk.net.SnapshotProcessor;
2525
import co.rsk.net.messages.*;
2626
import co.rsk.scoring.EventType;
27-
import co.rsk.trie.TrieDTO;
2827
import com.google.common.annotations.VisibleForTesting;
2928
import com.google.common.collect.Lists;
3029
import org.apache.commons.lang3.tuple.Pair;
@@ -66,7 +65,6 @@ public class SnapSyncState extends BaseSyncState {
6665
private BigInteger stateSize = BigInteger.ZERO;
6766
private BigInteger stateChunkSize = BigInteger.ZERO;
6867
private boolean stateFetched;
69-
private final List<TrieDTO> allNodes;
7068

7169
private long remoteTrieSize;
7270
private byte[] remoteRootHash;
@@ -94,7 +92,6 @@ public SnapSyncState(SyncEventsHandler syncEventsHandler, SnapshotProcessor snap
9492
super(syncEventsHandler, syncConfiguration);
9593
this.snapshotProcessor = snapshotProcessor;
9694
this.snapRequestManager = snapRequestManager;
97-
this.allNodes = Lists.newArrayList();
9895
this.blocks = Lists.newArrayList();
9996
this.thread = new Thread(new SyncMessageHandler("SNAP/client", responseQueue, listener) {
10097

@@ -281,10 +278,6 @@ public void connectBlocks(BlockConnectorHelper blockConnectorHelper) {
281278
blockConnectorHelper.startConnecting(blocks);
282279
}
283280

284-
public List<TrieDTO> getAllNodes() {
285-
return allNodes;
286-
}
287-
288281
public BigInteger getStateSize() {
289282
return stateSize;
290283
}

rskj-core/src/main/java/co/rsk/trie/TrieDTOInOrderRecoverer.java

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,35 +25,51 @@
2525
import org.slf4j.LoggerFactory;
2626

2727
import java.util.*;
28-
import java.util.function.Consumer;
28+
import java.util.function.BiConsumer;
29+
import java.util.function.Function;
2930

3031
public class TrieDTOInOrderRecoverer {
3132

33+
public record RecoverSubtreeResponse(Optional<TrieDTO> node, int index) {
34+
}
35+
3236
private static final Logger logger = LoggerFactory.getLogger(TrieDTOInOrderRecoverer.class);
3337

3438
private TrieDTOInOrderRecoverer() {
3539
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
3640
}
3741

38-
public static Optional<TrieDTO> recoverTrie(TrieDTO[] trieCollection, Consumer<? super TrieDTO> processTrieDTO) {
39-
Optional<TrieDTO> result = recoverSubtree(trieCollection, 0, trieCollection.length - 1, processTrieDTO);
40-
result.ifPresent(processTrieDTO);
41-
return result;
42+
public static RecoverSubtreeResponse recoverTrie(Function<Integer, TrieDTO> getNodeFn, int trieCollectionSize, BiConsumer<? super TrieDTO, Integer> processTrieDTO) {
43+
final var response = recoverSubtree(getNodeFn, 0, trieCollectionSize - 1, processTrieDTO);
44+
final var result = response.node();
45+
result.ifPresent(node -> processTrieDTO.accept(node, response.index()));
46+
47+
return response;
4248
}
4349

44-
private static Optional<TrieDTO> recoverSubtree(TrieDTO[] trieCollection, int start, int end, Consumer<? super TrieDTO> processTrieDTO) {
50+
private static RecoverSubtreeResponse recoverSubtree(Function<Integer, TrieDTO> getNodeFn, int start, int end, BiConsumer<? super TrieDTO, Integer> processTrieDTO) {
4551
if (end - start < 0) {
46-
return Optional.empty();
52+
return new RecoverSubtreeResponse(Optional.empty(), -1);
4753
}
4854
if (end - start == 0) {
49-
return Optional.of(fromTrieDTO(trieCollection[start], Optional.empty(), Optional.empty()));
55+
final var recoveredNode = Optional.of(fromTrieDTO(getNodeFn.apply(start), Optional.empty(), Optional.empty()));
56+
57+
return new RecoverSubtreeResponse(recoveredNode, start);
5058
}
51-
int indexRoot = findRoot(trieCollection, start, end);
52-
Optional<TrieDTO> left = recoverSubtree(trieCollection, start, indexRoot - 1, processTrieDTO);
53-
left.ifPresent(processTrieDTO);
54-
Optional<TrieDTO> right = recoverSubtree(trieCollection, indexRoot + 1, end, processTrieDTO);
55-
right.ifPresent(processTrieDTO);
56-
return Optional.of(fromTrieDTO(trieCollection[indexRoot], left, right));
59+
60+
int indexRoot = findRoot(getNodeFn, start, end);
61+
62+
final var leftResponse = recoverSubtree(getNodeFn, start, indexRoot - 1, processTrieDTO);
63+
final var left = leftResponse.node();
64+
left.ifPresent(node -> processTrieDTO.accept(node, leftResponse.index()));
65+
66+
final var rightResponse = recoverSubtree(getNodeFn, indexRoot + 1, end, processTrieDTO);
67+
final var right = rightResponse.node();
68+
right.ifPresent(node -> processTrieDTO.accept(node, rightResponse.index()));
69+
70+
final var recoveredNode = Optional.of(fromTrieDTO(getNodeFn.apply(indexRoot), left, right));
71+
72+
return new RecoverSubtreeResponse(recoveredNode, indexRoot);
5773
}
5874

5975
public static boolean verifyChunk(byte[] remoteRootHash, List<TrieDTO> preRootNodes, List<TrieDTO> nodes, List<TrieDTO> postRootNodes) {
@@ -65,8 +81,9 @@ public static boolean verifyChunk(byte[] remoteRootHash, List<TrieDTO> preRootNo
6581
return false;
6682
}
6783
TrieDTO[] nodeArray = allNodes.toArray(new TrieDTO[0]);
68-
Optional<TrieDTO> result = TrieDTOInOrderRecoverer.recoverTrie(nodeArray, t -> {
84+
final var response = TrieDTOInOrderRecoverer.recoverTrie(index -> nodeArray[index], nodeArray.length, (node, index) -> {
6985
});
86+
final var result = response.node();
7087
if (!result.isPresent() || !Arrays.equals(remoteRootHash, result.get().calculateHash())) {
7188
logger.warn("Root hash does not match! Calculated is present: {}", result.isPresent());
7289
return false;
@@ -85,6 +102,25 @@ private static int findRoot(TrieDTO[] trieCollection, int start, int end) {
85102
return max;
86103
}
87104

105+
private static int findRoot(Function<Integer, TrieDTO> getNodeFn, int start, int end) {
106+
int max = start;
107+
TrieDTO maxNode = null;
108+
109+
for (int i = start; i <= end; i++) {
110+
final var node = getNodeFn.apply(i);
111+
112+
if(maxNode == null) {
113+
maxNode = node;
114+
}
115+
116+
if (getValue(node) > getValue(maxNode)) {
117+
max = i;
118+
maxNode = node;
119+
}
120+
}
121+
return max;
122+
}
123+
88124
private static TrieDTO fromTrieDTO(TrieDTO result,
89125
Optional<TrieDTO> left,
90126
Optional<TrieDTO> right) {

0 commit comments

Comments
 (0)