Skip to content

Commit fb36402

Browse files
committed
Add tmp trie store implementation in snapshot processor
1 parent d529b73 commit fb36402

File tree

8 files changed

+269
-45
lines changed

8 files changed

+269
-45
lines changed

rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.ethereum.core.Blockchain;
4343
import org.ethereum.core.TransactionPool;
4444
import org.ethereum.db.BlockStore;
45+
import org.ethereum.util.ByteUtil;
4546
import org.ethereum.util.RLP;
4647
import org.ethereum.util.RLPElement;
4748
import org.ethereum.util.RLPList;
@@ -53,6 +54,7 @@
5354
import java.util.*;
5455
import java.util.concurrent.BlockingQueue;
5556
import java.util.concurrent.LinkedBlockingQueue;
57+
import java.util.function.Function;
5658

5759
import static co.rsk.net.sync.SnapSyncRequestManager.PeerSelector;
5860

@@ -687,12 +689,23 @@ private void processOrderedStateChunkResponse(SnapSyncState state, SnapStateChun
687689
}
688690

689691
if (trieElements.size() > 0) {
692+
final var existingNodesSizeInBytes = state.getTmpSnapSyncKeyValueDataSource().get(ByteUtil.intToBytes(SnapSyncState.TMP_NODES_SIZE_KEY));
693+
var existingNodesSize = 0;
694+
695+
if(existingNodesSizeInBytes != null) {
696+
existingNodesSize = ByteUtil.byteArrayToInt(existingNodesSizeInBytes);
697+
}
698+
690699
for (int i = 0; i < trieElements.size(); i++) {
691700
final RLPElement trieElement = trieElements.get(i);
692701
byte[] value = trieElement.getRLPData();
693702
nodes.add(TrieDTO.decodeFromSync(value));
703+
704+
state.getTmpSnapSyncKeyValueDataSource().put(ByteUtil.intToBytes(existingNodesSize + i), value);
694705
}
695706
nodes.get(0).setLeftHash(firstNodeLeftHash);
707+
708+
state.getTmpSnapSyncKeyValueDataSource().put(ByteUtil.intToBytes(SnapSyncState.TMP_NODES_SIZE_KEY), ByteUtil.intToBytes(nodes.size() + existingNodesSize));
696709
}
697710

698711
if (lastNodeHashes.size() > 0) {
@@ -711,7 +724,6 @@ private void processOrderedStateChunkResponse(SnapSyncState state, SnapStateChun
711724
}
712725

713726
if (TrieDTOInOrderRecoverer.verifyChunk(state.getRemoteRootHash(), preRootNodes, nodes, postRootNodes)) {
714-
state.getAllNodes().addAll(nodes);
715727
state.setStateSize(state.getStateSize().add(BigInteger.valueOf(trieElements.size())));
716728
state.setStateChunkSize(state.getStateChunkSize().add(BigInteger.valueOf(message.getChunkOfTrieKeyValue().length)));
717729
if (message.isComplete()) {
@@ -732,29 +744,50 @@ private boolean blocksVerified(SnapSyncState state) {
732744
return lastVerifiedBlockHeader != null && blockStore.isBlockExist(lastVerifiedBlockHeader.getParentHash().getBytes());
733745
}
734746

747+
private void moveNodesToTrie(TrieDTO node, int index) {
748+
trieStore.saveDTO(node);
749+
}
750+
751+
Function<Integer, TrieDTO> getNodeFromTmpSnapSyncKeyValueDataSource(SnapSyncState state) {
752+
return (Integer index) -> {
753+
final var nodeInBytes = state.getTmpSnapSyncKeyValueDataSource().get(ByteUtil.intToBytes(index));
754+
return TrieDTO.decodeFromSync(nodeInBytes);
755+
};
756+
}
757+
735758
/**
736759
* Once state share is received, rebuild the trie, save it in db and save all the blocks.
737760
*/
738761
private boolean rebuildStateAndSave(SnapSyncState state) {
739762
logger.info("Recovering trie...");
740-
final TrieDTO[] nodeArray = state.getAllNodes().toArray(new TrieDTO[0]);
741-
Optional<TrieDTO> result = TrieDTOInOrderRecoverer.recoverTrie(nodeArray, this.trieStore::saveDTO);
763+
final var existingNodesSizeInBytes = state.getTmpSnapSyncKeyValueDataSource().get(ByteUtil.intToBytes(SnapSyncState.TMP_NODES_SIZE_KEY));
764+
final var existingNodesSize = ByteUtil.byteArrayToInt(existingNodesSizeInBytes);
765+
final var response = TrieDTOInOrderRecoverer.recoverTrie(this.getNodeFromTmpSnapSyncKeyValueDataSource(state), existingNodesSize, this::moveNodesToTrie);
766+
final var result = response.node();
742767

743-
if (result.isPresent() && Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) {
744-
logger.info("State final validation OK!");
768+
if (result.isEmpty() || !Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) {
769+
logger.error("State final validation FAILED");
745770

746-
this.blockchain.removeBlocksByNumber(0);
747-
//genesis is removed so backwards sync will always start.
771+
return false;
772+
}
748773

749-
BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore);
750-
state.connectBlocks(blockConnector);
751-
logger.info("Setting last block as best block...");
752-
this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty());
753-
this.transactionPool.setBestBlock(state.getLastBlock());
754-
return true;
774+
for (int i = 0; i < existingNodesSize; i++) {
775+
state.getTmpSnapSyncKeyValueDataSource().delete(ByteUtil.intToBytes(i));
755776
}
756-
logger.error("State final validation FAILED");
757-
return false;
777+
778+
state.getTmpSnapSyncKeyValueDataSource().delete(ByteUtil.intToBytes(SnapSyncState.TMP_NODES_SIZE_KEY));
779+
780+
logger.info("State final validation OK!");
781+
782+
this.blockchain.removeBlocksByNumber(0);
783+
//genesis is removed so backwards sync will always start.
784+
785+
BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore);
786+
state.connectBlocks(blockConnector);
787+
logger.info("Setting last block as best block...");
788+
this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty());
789+
this.transactionPool.setBestBlock(state.getLastBlock());
790+
return true;
758791
}
759792

760793
private void generateChunkRequestTasks(SnapSyncState state) {

rskj-core/src/main/java/co/rsk/net/SyncProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ public void startBlockForwardSyncing(Peer peer) {
329329
@Override
330330
public void startSnapSync(Peer peer) {
331331
logger.info("Start Snap syncing with {}", peer.getPeerNodeID());
332-
setSyncState(new SnapSyncState(this, snapshotProcessor, syncConfiguration, tmpSnapSyncKeyValueDataSource, databaseDir));
332+
setSyncState(new SnapSyncState(this, snapshotProcessor, syncConfiguration, databaseDir));
333333
}
334334

335335
@Override

rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import co.rsk.net.Peer;
2424
import co.rsk.net.messages.*;
2525
import co.rsk.scoring.EventType;
26-
import co.rsk.trie.TrieDTO;
2726
import com.google.common.annotations.VisibleForTesting;
2827
import com.google.common.collect.Lists;
2928
import org.apache.commons.lang3.tuple.Pair;
@@ -74,7 +73,6 @@ public class SnapSyncState extends BaseSyncState {
7473
private BigInteger stateSize = BigInteger.ZERO;
7574
private BigInteger stateChunkSize = BigInteger.ZERO;
7675
private boolean stateFetched;
77-
private final List<TrieDTO> allNodes;
7876

7977
private long remoteTrieSize;
8078
private byte[] remoteRootHash;
@@ -93,21 +91,24 @@ public class SnapSyncState extends BaseSyncState {
9391

9492
private KeyValueDataSource tmpSnapSyncKeyValueDataSource;
9593
public static final String TMP_NODES_DIR_NAME = "snapSyncTmpNodes";
94+
// We are going to use a key / value store, to store all the nodes that we receive from snap server,
95+
// since we are using a map, we don't know how many nodes are stored there, which means that we need
96+
// to save the size of all the nodes added. This is why we have this key and it is -1, because the
97+
// map can't have a key less than 0. That way we make sure it will not be used by mistake.
9698
public static final int TMP_NODES_SIZE_KEY = -1;
9799

98-
public SnapSyncState(SyncEventsHandler syncEventsHandler, SnapProcessor snapshotProcessor, SyncConfiguration syncConfiguration, KeyValueDataSource tmpSnapSyncKeyValueDataSource, String databaseDir) {
99-
this(syncEventsHandler, snapshotProcessor, new SnapSyncRequestManager(syncConfiguration, syncEventsHandler), syncConfiguration, null, tmpSnapSyncKeyValueDataSource, databaseDir);
100+
public SnapSyncState(SyncEventsHandler syncEventsHandler, SnapProcessor snapshotProcessor, SyncConfiguration syncConfiguration, String databaseDir) {
101+
this(syncEventsHandler, snapshotProcessor, new SnapSyncRequestManager(syncConfiguration, syncEventsHandler), syncConfiguration, null, databaseDir);
100102
}
101103

102104
@VisibleForTesting
103105
SnapSyncState(SyncEventsHandler syncEventsHandler, SnapProcessor snapshotProcessor,
104106
SnapSyncRequestManager snapRequestManager, SyncConfiguration syncConfiguration,
105-
@Nullable SyncMessageHandler.Listener listener, KeyValueDataSource tmpSnapSyncKeyValueDataSource,
107+
@Nullable SyncMessageHandler.Listener listener,
106108
String databaseDir) {
107109
super(syncEventsHandler, syncConfiguration);
108110
this.snapshotProcessor = snapshotProcessor;
109111
this.snapRequestManager = snapRequestManager;
110-
this.allNodes = Lists.newArrayList();
111112
this.blocks = Lists.newArrayList();
112113
this.thread = new Thread(new SyncMessageHandler("SNAP/client", responseQueue, listener) {
113114

@@ -151,7 +152,7 @@ private synchronized KeyValueDataSource getTmpSnapSyncKeyValueDataSource(String
151152
}
152153

153154
final var currentDbKind = KeyValueDataSourceUtils.getDbKindValueFromDbKindFile(databaseDir);
154-
tmpSnapSyncKeyValueDataSource = KeyValueDataSourceUtils.makeDataSource(tmpDatabasePath, currentDbKind);;
155+
tmpSnapSyncKeyValueDataSource = KeyValueDataSourceUtils.makeDataSourceButNotInit(tmpDatabasePath, currentDbKind);
155156
}
156157

157158
return tmpSnapSyncKeyValueDataSource;
@@ -167,6 +168,11 @@ public void onEnter() {
167168
logger.warn(INVALID_STATE_IS_RUNNING_MSG, isRunning);
168169
return;
169170
}
171+
172+
if (tmpSnapSyncKeyValueDataSource != null) {
173+
tmpSnapSyncKeyValueDataSource.init();
174+
}
175+
170176
isRunning = Boolean.TRUE;
171177
thread.start();
172178
snapshotProcessor.startSyncing(this);
@@ -337,10 +343,6 @@ public void connectBlocks(BlockConnectorHelper blockConnectorHelper) {
337343
blockConnectorHelper.startConnecting(blocks);
338344
}
339345

340-
public List<TrieDTO> getAllNodes() {
341-
return allNodes;
342-
}
343-
344346
public BigInteger getStateSize() {
345347
return stateSize;
346348
}
@@ -382,6 +384,10 @@ public boolean isRunning() {
382384
}
383385

384386
public void finish() {
387+
if (tmpSnapSyncKeyValueDataSource != null) {
388+
tmpSnapSyncKeyValueDataSource.close();
389+
}
390+
385391
if (isRunning != Boolean.TRUE) {
386392
logger.warn(INVALID_STATE_IS_RUNNING_MSG, isRunning);
387393
return;
@@ -396,6 +402,10 @@ public void finish() {
396402
}
397403

398404
public void fail(Peer peer, EventType eventType, String message, Object... arguments) {
405+
if (tmpSnapSyncKeyValueDataSource != null) {
406+
tmpSnapSyncKeyValueDataSource.close();
407+
}
408+
399409
if (isRunning != Boolean.TRUE) {
400410
logger.warn(INVALID_STATE_IS_RUNNING_MSG, isRunning);
401411
return;

rskj-core/src/main/java/co/rsk/trie/TrieDTOInOrderRecoverer.java

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,35 +25,59 @@
2525
import org.slf4j.LoggerFactory;
2626

2727
import java.util.*;
28-
import java.util.function.Consumer;
28+
import java.util.function.BiConsumer;
29+
import java.util.function.Function;
2930

3031
public class TrieDTOInOrderRecoverer {
3132

33+
public record RecoverSubtreeResponse(Optional<TrieDTO> node, int index) {
34+
}
35+
3236
private static final Logger logger = LoggerFactory.getLogger(TrieDTOInOrderRecoverer.class);
3337

3438
private TrieDTOInOrderRecoverer() {
3539
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
3640
}
3741

38-
public static Optional<TrieDTO> recoverTrie(TrieDTO[] trieCollection, Consumer<? super TrieDTO> processTrieDTO) {
39-
Optional<TrieDTO> result = recoverSubtree(trieCollection, 0, trieCollection.length - 1, processTrieDTO);
40-
result.ifPresent(processTrieDTO);
41-
return result;
42+
/**
43+
* Recover a Trie structure given a list of nodes.
44+
*
45+
* @param getNodeFnByIndex will retrieve the node by index given any source
46+
* @param trieCollectionSize will be size of all the nodes to be recovered
47+
* @param processTrieDTO will update the nodes in the source. This is because some hashes will be calculated for validation purposes.
48+
* @return
49+
*/
50+
public static RecoverSubtreeResponse recoverTrie(Function<Integer, TrieDTO> getNodeFnByIndex, int trieCollectionSize, BiConsumer<? super TrieDTO, Integer> processTrieDTO) {
51+
final var response = recoverSubtree(getNodeFnByIndex, 0, trieCollectionSize - 1, processTrieDTO);
52+
final var result = response.node();
53+
result.ifPresent(node -> processTrieDTO.accept(node, response.index()));
54+
55+
return response;
4256
}
4357

44-
private static Optional<TrieDTO> recoverSubtree(TrieDTO[] trieCollection, int start, int end, Consumer<? super TrieDTO> processTrieDTO) {
58+
private static RecoverSubtreeResponse recoverSubtree(Function<Integer, TrieDTO> getNodeFnByIndex, int start, int end, BiConsumer<? super TrieDTO, Integer> processTrieDTO) {
4559
if (end - start < 0) {
46-
return Optional.empty();
60+
return new RecoverSubtreeResponse(Optional.empty(), -1);
4761
}
4862
if (end - start == 0) {
49-
return Optional.of(fromTrieDTO(trieCollection[start], Optional.empty(), Optional.empty()));
63+
final var recoveredNode = Optional.of(fromTrieDTO(getNodeFnByIndex.apply(start), Optional.empty(), Optional.empty()));
64+
65+
return new RecoverSubtreeResponse(recoveredNode, start);
5066
}
51-
int indexRoot = findRoot(trieCollection, start, end);
52-
Optional<TrieDTO> left = recoverSubtree(trieCollection, start, indexRoot - 1, processTrieDTO);
53-
left.ifPresent(processTrieDTO);
54-
Optional<TrieDTO> right = recoverSubtree(trieCollection, indexRoot + 1, end, processTrieDTO);
55-
right.ifPresent(processTrieDTO);
56-
return Optional.of(fromTrieDTO(trieCollection[indexRoot], left, right));
67+
68+
int indexRoot = findRoot(getNodeFnByIndex, start, end);
69+
70+
final var leftResponse = recoverSubtree(getNodeFnByIndex, start, indexRoot - 1, processTrieDTO);
71+
final var left = leftResponse.node();
72+
left.ifPresent(node -> processTrieDTO.accept(node, leftResponse.index()));
73+
74+
final var rightResponse = recoverSubtree(getNodeFnByIndex, indexRoot + 1, end, processTrieDTO);
75+
final var right = rightResponse.node();
76+
right.ifPresent(node -> processTrieDTO.accept(node, rightResponse.index()));
77+
78+
final var recoveredNode = Optional.of(fromTrieDTO(getNodeFnByIndex.apply(indexRoot), left, right));
79+
80+
return new RecoverSubtreeResponse(recoveredNode, indexRoot);
5781
}
5882

5983
public static boolean verifyChunk(byte[] remoteRootHash, List<TrieDTO> preRootNodes, List<TrieDTO> nodes, List<TrieDTO> postRootNodes) {
@@ -65,8 +89,9 @@ public static boolean verifyChunk(byte[] remoteRootHash, List<TrieDTO> preRootNo
6589
return false;
6690
}
6791
TrieDTO[] nodeArray = allNodes.toArray(new TrieDTO[0]);
68-
Optional<TrieDTO> result = TrieDTOInOrderRecoverer.recoverTrie(nodeArray, t -> {
92+
final var response = TrieDTOInOrderRecoverer.recoverTrie(index -> nodeArray[index], nodeArray.length, (node, index) -> {
6993
});
94+
final var result = response.node();
7095
if (!result.isPresent() || !Arrays.equals(remoteRootHash, result.get().calculateHash())) {
7196
logger.warn("Root hash does not match! Calculated is present: {}", result.isPresent());
7297
return false;
@@ -85,6 +110,25 @@ private static int findRoot(TrieDTO[] trieCollection, int start, int end) {
85110
return max;
86111
}
87112

113+
private static int findRoot(Function<Integer, TrieDTO> getNodeFnByIndex, int start, int end) {
114+
int max = start;
115+
TrieDTO maxNode = null;
116+
117+
for (int i = start; i <= end; i++) {
118+
final var node = getNodeFnByIndex.apply(i);
119+
120+
if(maxNode == null) {
121+
maxNode = node;
122+
}
123+
124+
if (getValue(node) > getValue(maxNode)) {
125+
max = i;
126+
maxNode = node;
127+
}
128+
}
129+
return max;
130+
}
131+
88132
private static TrieDTO fromTrieDTO(TrieDTO result,
89133
Optional<TrieDTO> left,
90134
Optional<TrieDTO> right) {

rskj-core/src/main/java/org/ethereum/datasource/KeyValueDataSourceUtils.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class KeyValueDataSourceUtils {
1919
private KeyValueDataSourceUtils() { /* hidden */ }
2020

2121
@Nonnull
22-
public static KeyValueDataSource makeDataSource(@Nonnull Path datasourcePath, @Nonnull DbKind kind) {
22+
public static KeyValueDataSource makeDataSourceButNotInit(@Nonnull Path datasourcePath, @Nonnull DbKind kind) {
2323
String name = datasourcePath.getFileName().toString();
2424
String databaseDir = datasourcePath.getParent().toString();
2525

@@ -35,6 +35,13 @@ public static KeyValueDataSource makeDataSource(@Nonnull Path datasourcePath, @N
3535
throw new IllegalArgumentException("kind");
3636
}
3737

38+
return ds;
39+
}
40+
41+
@Nonnull
42+
public static KeyValueDataSource makeDataSource(@Nonnull Path datasourcePath, @Nonnull DbKind kind) {
43+
KeyValueDataSource ds = makeDataSourceButNotInit(datasourcePath, kind);
44+
3845
ds.init();
3946

4047
return ds;

0 commit comments

Comments
 (0)