Skip to content

Commit 46d1f85

Browse files
committed
Adding on snap block error behaviour
1 parent 464c53a commit 46d1f85

File tree

2 files changed

+103
-98
lines changed

2 files changed

+103
-98
lines changed

rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java

Lines changed: 95 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class SnapshotProcessor implements InternalService {
6262
public static final int BLOCK_NUMBER_CHECKPOINT = 5000;
6363
public static final int BLOCK_CHUNK_SIZE = 400;
6464
public static final int BLOCKS_REQUIRED = 6000;
65+
public static final long CHUNK_ITEM_SIZE = 1024L;
6566
private final Blockchain blockchain;
6667
private final TrieStore trieStore;
6768
private final BlockStore blockStore;
@@ -77,7 +78,6 @@ public class SnapshotProcessor implements InternalService {
7778

7879
private volatile Boolean isRunning;
7980
private final Thread thread;
80-
8181
public SnapshotProcessor(Blockchain blockchain,
8282
TrieStore trieStore,
8383
SnapshotPeersInformation peersInformation,
@@ -157,6 +157,7 @@ void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage igno
157157
logger.debug("SERVER - Processing snapshot status request.");
158158
long bestBlockNumber = blockchain.getBestBlock().getNumber();
159159
long checkpointBlockNumber = bestBlockNumber - (bestBlockNumber % BLOCK_NUMBER_CHECKPOINT);
160+
logger.debug("SERVER - checkpointBlockNumber: {}, bestBlockNumber: {}", checkpointBlockNumber, bestBlockNumber);
160161
List<Block> blocks = Lists.newArrayList();
161162
List<BlockDifficulty> difficulties = Lists.newArrayList();
162163
for (long i = checkpointBlockNumber - BLOCK_CHUNK_SIZE; i < checkpointBlockNumber; i++) {
@@ -165,8 +166,10 @@ void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage igno
165166
difficulties.add(blockStore.getTotalDifficultyForHash(block.getHash().getBytes()));
166167
}
167168

169+
logger.trace("SERVER - Sending snapshot status response. From block {} to block {} - chunksize {}", blocks.get(0).getNumber(), blocks.get(blocks.size() - 1).getNumber(), BLOCK_CHUNK_SIZE);
168170
Block checkpointBlock = blockchain.getBlockByNumber(checkpointBlockNumber);
169171
blocks.add(checkpointBlock);
172+
logger.trace("SERVER - adding checkpoint block: {}", checkpointBlock.getNumber());
170173
difficulties.add(blockStore.getTotalDifficultyForHash(checkpointBlock.getHash().getBytes()));
171174
byte[] rootHash = checkpointBlock.getStateRoot();
172175
Optional<TrieDTO> opt = trieStore.retrieveDTO(rootHash);
@@ -196,7 +199,7 @@ public void processSnapStatusResponse(SnapSyncState state, Peer sender, SnapStat
196199
state.addBlock(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i)));
197200
}
198201
logger.debug("CLIENT - Processing snapshot status response - last blockNumber: {} triesize: {}", lastBlock.getNumber(), state.getRemoteTrieSize());
199-
logger.debug("Blocks included in the response: {} from {} to {}", blocksFromResponse.size(), blocksFromResponse.get(0).getNumber(),blocksFromResponse.get(blocksFromResponse.size()-1).getNumber());
202+
logger.debug("Blocks included in the response: {} from {} to {}", blocksFromResponse.size(), blocksFromResponse.get(0).getNumber(), blocksFromResponse.get(blocksFromResponse.size() - 1).getNumber());
200203
requestBlocksChunk(sender, blocksFromResponse.get(0).getNumber());
201204
generateChunkRequestTasks(state);
202205
startRequestingChunks(state);
@@ -255,7 +258,7 @@ public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBloc
255258
state.addBlock(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i)));
256259
}
257260
long nextChunk = blocksFromResponse.get(0).getNumber();
258-
logger.debug("CLIENT - SnapBlock - nexChunk : {} - lastRequired {}, missing {}", nextChunk, lastRequiredBlock, nextChunk-lastRequiredBlock);
261+
logger.debug("CLIENT - SnapBlock - nexChunk : {} - lastRequired {}, missing {}", nextChunk, lastRequiredBlock, nextChunk - lastRequiredBlock);
259262
if (nextChunk > lastRequiredBlock) {
260263
requestBlocksChunk(sender, nextChunk);
261264
} else {
@@ -267,10 +270,9 @@ public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBloc
267270
* STATE CHUNK
268271
*/
269272
private void requestStateChunk(Peer peer, long from, long blockNumber, int chunkSize) {
270-
logger.debug("CLIENT - Requesting state chunk to node {} - block {} - from {}", peer.getPeerNodeID(), blockNumber, from);
273+
logger.debug("CLIENT - Requesting state chunk to node {} - block {} - chunkNumber {}", peer.getPeerNodeID(), blockNumber, from / chunkSize);
271274
SnapStateChunkRequestMessage message = new SnapStateChunkRequestMessage(messageId++, blockNumber, from, chunkSize);
272275
peer.sendMessage(message);
273-
logger.debug("CLIENT - Request sent state chunk to node {} - block {} - from {}", peer.getPeerNodeID(), blockNumber, from);
274276
}
275277

276278
public void processStateChunkRequest(Peer sender, SnapStateChunkRequestMessage requestMessage) {
@@ -297,7 +299,7 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage
297299

298300
List<byte[]> trieEncoded = new ArrayList<>();
299301
Block block = blockchain.getBlockByNumber(request.getBlockNumber());
300-
final long to = request.getFrom() + (request.getChunkSize() * 1024);
302+
final long to = request.getFrom() + (request.getChunkSize() * CHUNK_ITEM_SIZE);
301303
logger.debug("SERVER - Processing state chunk request from node {}. From {} to calculated {} being chunksize {}", sender.getPeerNodeID(), request.getFrom(), to, request.getChunkSize());
302304
logger.debug("SERVER - Sending state chunk from {} to {}", request.getFrom(), to);
303305
TrieDTOInOrderIterator it = new TrieDTOInOrderIterator(trieStore, block.getStateRoot(), request.getFrom(), to);
@@ -319,7 +321,6 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage
319321
byte[] firstNodeLeftHash = RLP.encodeElement(first.getLeftHash());
320322
byte[] nodesBytes = RLP.encodeList(trieEncoded.toArray(new byte[0][0]));
321323
byte[] lastNodeHashes = last != null ? RLP.encodeList(RLP.encodeElement(getBytes(last.getLeftHash())), RLP.encodeElement(getBytes(last.getRightHash()))) : RLP.encodedEmptyList();
322-
323324
// Last we add the root nodes on the right of the last visited node. They are used to validate the chunk.
324325
List<byte[]> postRootNodes = it.getNodesLeftVisiting().stream().map((t) -> RLP.encodeList(RLP.encodeElement(t.getEncoded()), RLP.encodeElement(getBytes(t.getRightHash())))).collect(Collectors.toList());
325326
byte[] postRootNodesBytes = !postRootNodes.isEmpty() ? RLP.encodeList(postRootNodes.toArray(new byte[0][0])) : RLP.encodedEmptyList();
@@ -334,7 +335,7 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage
334335
}
335336

336337
public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage responseMessage) {
337-
logger.debug("CLIENT - State chunk received from {} to {} of {}", responseMessage.getFrom(), responseMessage.getTo(), state.getLastBlock());
338+
logger.debug("CLIENT - State chunk received chunkNumber {}. From {} to {} of total size {}", responseMessage.getFrom() / CHUNK_ITEM_SIZE, responseMessage.getFrom(), responseMessage.getTo(), state.getRemoteTrieSize());
338339

339340
PriorityQueue<SnapStateChunkResponseMessage> queue = state.getSnapStateChunkQueue();
340341
queue.add(responseMessage);
@@ -346,120 +347,122 @@ public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateC
346347
if (nextMessage.getFrom() == nextExpectedFrom) {
347348
try {
348349
processOrderedStateChunkResponse(state, peer, queue.poll());
350+
state.setNextExpectedFrom(nextExpectedFrom + chunkSize * CHUNK_ITEM_SIZE);
349351
} catch (Exception e) {
350352
logger.error("Error while processing chunk response. {}", e.getMessage(), e);
353+
onStateChunkResponseError(peer, nextMessage);
351354
}
352-
state.setNextExpectedFrom(nextExpectedFrom + chunkSize * 1024L);
353355
} else {
354356
break;
355357
}
356358
}
357359

358360
if (!responseMessage.isComplete()) {
359-
logger.debug("CLIENT - State chunk response not complete. Requesting next chunk." );
361+
logger.debug("CLIENT - State chunk response not complete. Requesting next chunk.");
360362
executeNextChunkRequestTask(state, peer);
361363
}
362364
}
363365

366+
private void onStateChunkResponseError(Peer peer, SnapStateChunkResponseMessage responseMessage) {
367+
logger.error("Error while processing chunk response from {} of peer {}. Asking for chunk again.", responseMessage.getFrom(), peer.getPeerNodeID());
368+
Peer alternativePeer = peersInformation.getBestSnapPeerCandidates().stream()
369+
.filter(listedPeer -> !listedPeer.getPeerNodeID().equals(peer.getPeerNodeID()))
370+
.findFirst()
371+
.orElse(peer);
372+
logger.debug("Requesting state chunk \"from\" {} to peer {}", responseMessage.getFrom(), peer.getPeerNodeID());
373+
requestStateChunk(alternativePeer, responseMessage.getFrom(), responseMessage.getBlockNumber(), chunkSize);
374+
}
364375

365-
private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage message) {
366-
try {
367-
logger.debug("CLIENT - Processing State chunk received from {} to {}", message.getFrom(), message.getTo());
368-
peersInformation.getOrRegisterPeer(peer);
369-
state.onNewChunk();
370-
371-
RLPList nodeLists = RLP.decodeList(message.getChunkOfTrieKeyValue());
372-
final RLPList preRootElements = RLP.decodeList(nodeLists.get(0).getRLPData());
373-
final RLPList trieElements = RLP.decodeList(nodeLists.get(1).getRLPData());
374-
byte[] firstNodeLeftHash = nodeLists.get(2).getRLPData();
375-
final RLPList lastNodeHashes = RLP.decodeList(nodeLists.get(3).getRLPData());
376-
final RLPList postRootElements = RLP.decodeList(nodeLists.get(4).getRLPData());
377-
logger.debug(
378-
"CLIENT - Received state chunk of {} elements ({} bytes).",
379-
trieElements.size(),
380-
message.getChunkOfTrieKeyValue().length
381-
);
382-
List<TrieDTO> preRootNodes = new ArrayList<>();
383-
List<TrieDTO> nodes = new ArrayList<>();
384-
List<TrieDTO> postRootNodes = new ArrayList<>();
385-
386-
387-
for (int i = 0; i < preRootElements.size(); i++) {
388-
final RLPList trieElement = (RLPList) preRootElements.get(i);
389-
final byte[] value = trieElement.get(0).getRLPData();
390-
final byte[] leftHash = trieElement.get(1).getRLPData();
391-
TrieDTO node = TrieDTO.decodeFromSync(value);
392-
node.setLeftHash(leftHash);
393-
preRootNodes.add(node);
394-
}
395376

396-
if (trieElements.size() > 0) {
397-
for (int i = 0; i < trieElements.size(); i++) {
398-
final RLPElement trieElement = trieElements.get(i);
399-
byte[] value = trieElement.getRLPData();
400-
nodes.add(TrieDTO.decodeFromSync(value));
401-
}
402-
nodes.get(0).setLeftHash(firstNodeLeftHash);
403-
}
377+
private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage message) throws Exception {
378+
logger.debug("CLIENT - Processing State chunk received from {} to {}", message.getFrom(), message.getTo());
379+
peersInformation.getOrRegisterPeer(peer);
380+
state.onNewChunk();
381+
382+
RLPList nodeLists = RLP.decodeList(message.getChunkOfTrieKeyValue());
383+
final RLPList preRootElements = RLP.decodeList(nodeLists.get(0).getRLPData());
384+
final RLPList trieElements = RLP.decodeList(nodeLists.get(1).getRLPData());
385+
byte[] firstNodeLeftHash = nodeLists.get(2).getRLPData();
386+
final RLPList lastNodeHashes = RLP.decodeList(nodeLists.get(3).getRLPData());
387+
final RLPList postRootElements = RLP.decodeList(nodeLists.get(4).getRLPData());
388+
List<TrieDTO> preRootNodes = new ArrayList<>();
389+
List<TrieDTO> nodes = new ArrayList<>();
390+
List<TrieDTO> postRootNodes = new ArrayList<>();
391+
392+
393+
for (int i = 0; i < preRootElements.size(); i++) {
394+
final RLPList trieElement = (RLPList) preRootElements.get(i);
395+
final byte[] value = trieElement.get(0).getRLPData();
396+
final byte[] leftHash = trieElement.get(1).getRLPData();
397+
TrieDTO node = TrieDTO.decodeFromSync(value);
398+
node.setLeftHash(leftHash);
399+
preRootNodes.add(node);
400+
}
404401

405-
if (lastNodeHashes.size() > 0) {
406-
TrieDTO lastNode = nodes.get(nodes.size() - 1);
407-
lastNode.setLeftHash(lastNodeHashes.get(0).getRLPData());
408-
lastNode.setRightHash(lastNodeHashes.get(1).getRLPData());
402+
if (trieElements.size() > 0) {
403+
for (int i = 0; i < trieElements.size(); i++) {
404+
final RLPElement trieElement = trieElements.get(i);
405+
byte[] value = trieElement.getRLPData();
406+
nodes.add(TrieDTO.decodeFromSync(value));
409407
}
408+
nodes.get(0).setLeftHash(firstNodeLeftHash);
409+
}
410410

411-
for (int i = 0; i < postRootElements.size(); i++) {
412-
final RLPList trieElement = (RLPList) postRootElements.get(i);
413-
final byte[] value = trieElement.get(0).getRLPData();
414-
final byte[] rightHash = trieElement.get(1).getRLPData();
415-
TrieDTO node = TrieDTO.decodeFromSync(value);
416-
node.setRightHash(rightHash);
417-
postRootNodes.add(node);
418-
}
411+
if (lastNodeHashes.size() > 0) {
412+
TrieDTO lastNode = nodes.get(nodes.size() - 1);
413+
lastNode.setLeftHash(lastNodeHashes.get(0).getRLPData());
414+
lastNode.setRightHash(lastNodeHashes.get(1).getRLPData());
415+
}
419416

420-
if (TrieDTOInOrderRecoverer.verifyChunk(state.getRemoteRootHash(), preRootNodes, nodes, postRootNodes)) {
421-
state.getAllNodes().addAll(nodes);
422-
state.setStateSize(state.getStateSize().add(BigInteger.valueOf(trieElements.size())));
423-
state.setStateChunkSize(state.getStateChunkSize().add(BigInteger.valueOf(message.getChunkOfTrieKeyValue().length)));
424-
logger.debug("CLIENT - State progress: {} chunks ({} bytes)", state.getStateSize(), state.getStateChunkSize());
425-
if (!message.isComplete()) {
426-
executeNextChunkRequestTask(state, peer);
427-
} else {
428-
rebuildStateAndSave(state);
429-
logger.info("CLIENT - Snapshot sync finished!");
430-
stopSyncing(state);
431-
}
417+
for (int i = 0; i < postRootElements.size(); i++) {
418+
final RLPList trieElement = (RLPList) postRootElements.get(i);
419+
final byte[] value = trieElement.get(0).getRLPData();
420+
final byte[] rightHash = trieElement.get(1).getRLPData();
421+
TrieDTO node = TrieDTO.decodeFromSync(value);
422+
node.setRightHash(rightHash);
423+
postRootNodes.add(node);
424+
}
425+
426+
if (TrieDTOInOrderRecoverer.verifyChunk(state.getRemoteRootHash(), preRootNodes, nodes, postRootNodes)) {
427+
state.getAllNodes().addAll(nodes);
428+
state.setStateSize(state.getStateSize().add(BigInteger.valueOf(trieElements.size())));
429+
state.setStateChunkSize(state.getStateChunkSize().add(BigInteger.valueOf(message.getChunkOfTrieKeyValue().length)));
430+
if (!message.isComplete()) {
431+
executeNextChunkRequestTask(state, peer);
432432
} else {
433-
logger.error("Error while verifying chunk response: {}", message);
434-
throw new Exception("Error verifying chunk.");
433+
boolean result = rebuildStateAndSave(state);
434+
logger.info("CLIENT - Snapshot sync finished {}! ", result ? "successfully" : "with errors");
435+
stopSyncing(state);
435436
}
436-
} catch (Exception e) {
437-
logger.error("Error while processing chunk response.", e);
437+
} else {
438+
logger.error("Error while verifying chunk response: {}", message);
439+
throw new Exception("Error verifying chunk.");
438440
}
439441
}
440442

441443
/**
442444
* Once state share is received, rebuild the trie, save it in db and save all the blocks.
443445
*/
444-
private void rebuildStateAndSave(SnapSyncState state) {
445-
logger.info("CLIENT - State Completed! {} chunks ({} bytes) - chunk size = {}",
446-
state.getStateSize(), state.getStateChunkSize(), this.chunkSize);
447-
final TrieDTO[] nodeArray = state.getAllNodes().toArray(new TrieDTO[0]);
446+
private boolean rebuildStateAndSave(SnapSyncState state) {
448447
logger.info("CLIENT - Recovering trie...");
448+
final TrieDTO[] nodeArray = state.getAllNodes().toArray(new TrieDTO[0]);
449449
Optional<TrieDTO> result = TrieDTOInOrderRecoverer.recoverTrie(nodeArray, this.trieStore::saveDTO);
450-
if (!result.isPresent() || !Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) {
451-
logger.error("CLIENT - State final validation FAILED");
452-
} else {
450+
451+
if (result.isPresent() && Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) {
453452
logger.info("CLIENT - State final validation OK!");
454-
}
455453

456-
logger.info("CLIENT - Saving previous blocks...");
457-
this.blockchain.removeBlocksByNumber(0);
458-
BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore);
459-
state.connectBlocks(blockConnector);
460-
logger.info("CLIENT - Setting last block as best block...");
461-
this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty());
462-
this.transactionPool.setBestBlock(state.getLastBlock());
454+
this.blockchain.removeBlocksByNumber(0);
455+
//genesis is removed so backwards sync will always start.
456+
457+
BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore);
458+
state.connectBlocks(blockConnector);
459+
logger.info("CLIENT - Setting last block as best block...");
460+
this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty());
461+
this.transactionPool.setBestBlock(state.getLastBlock());
462+
return true;
463+
}
464+
logger.error("CLIENT - State final validation FAILED");
465+
return false;
463466
}
464467

465468
private void generateChunkRequestTasks(SnapSyncState state) {
@@ -468,7 +471,7 @@ private void generateChunkRequestTasks(SnapSyncState state) {
468471
while (from < state.getRemoteTrieSize()) {
469472
ChunkTask task = new ChunkTask(state.getLastBlock().getNumber(), from);
470473
state.getChunkTaskQueue().add(task);
471-
from += chunkSize * 1024L;
474+
from += chunkSize * CHUNK_ITEM_SIZE;
472475
}
473476
}
474477

0 commit comments

Comments
 (0)