Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rskj-core/src/main/java/co/rsk/config/RskSystemProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.ethereum.crypto.ECKey;
import org.ethereum.crypto.HashUtil;
import org.ethereum.listener.GasPriceCalculator;
import org.ethereum.net.client.Capability;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -410,6 +411,17 @@ public int getLongSyncLimit() {
public boolean isServerSnapshotSyncEnabled() { return configFromFiles.getBoolean("sync.snapshot.server.enabled");}
public boolean isClientSnapshotSyncEnabled() { return configFromFiles.getBoolean("sync.snapshot.client.enabled");}

@Override
public List<String> peerCapabilities() {
List<String> capabilities = super.peerCapabilities();

if (isServerSnapshotSyncEnabled()) {
capabilities.add(Capability.SNAP);
}

return capabilities;
}

public int getSnapshotChunkTimeout() {
return configFromFiles.getInt("sync.snapshot.client.chunkRequestTimeout");
}
Expand Down
1 change: 1 addition & 0 deletions rskj-core/src/main/java/co/rsk/net/Peer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ public interface Peer {

double score(long currentTime, MessageType type);
void imported(boolean best);
boolean isSnapCapable();
}
8 changes: 6 additions & 2 deletions rskj-core/src/main/java/co/rsk/net/sync/PeersInformation.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public Optional<Peer> getBestPeer() {
public Optional<Peer> getBestSnapPeer() {
return getBestPeer(
getBestCandidatesStream()
.filter(this::isSnapPeerCandidate)
.filter(this::isSnapPeerCandidateOrCapable)
);
}

Expand Down Expand Up @@ -199,7 +199,7 @@ public List<Peer> getBestPeerCandidates() {
@Override
public List<Peer> getBestSnapPeerCandidates() {
return getBestCandidatesStream()
.filter(this::isSnapPeerCandidate)
.filter(entry -> entry.getKey().isSnapCapable())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
Expand Down Expand Up @@ -289,4 +289,8 @@ public void clearOldFailedPeers() {
private boolean isSnapPeerCandidate(Map.Entry<Peer, SyncPeerStatus> entry) {
return syncConfiguration.getNodeIdToSnapshotTrustedPeerMap().containsKey(entry.getKey().getPeerNodeID().toString());
}

private boolean isSnapPeerCandidateOrCapable(Map.Entry<Peer, SyncPeerStatus> entry) {
return isSnapPeerCandidate(entry) || entry.getKey().isSnapCapable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class Capability implements Comparable<Capability> {

public static final String P2P = "p2p";
public static final String RSK = "rsk";
public static final String SNAP = "snap";
public static final byte SNAP_VERSION = (byte) 1;

private final String name;
private final byte version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.ethereum.net.client;

import org.ethereum.config.SystemProperties;
import co.rsk.config.RskSystemProperties;
import org.ethereum.net.eth.EthVersion;
import org.ethereum.net.p2p.HelloMessage;

Expand All @@ -29,39 +29,47 @@
import java.util.TreeSet;

import static org.ethereum.net.client.Capability.RSK;
import static org.ethereum.net.client.Capability.SNAP;
import static org.ethereum.net.client.Capability.SNAP_VERSION;
import static org.ethereum.net.eth.EthVersion.fromCode;

/**
* Created by Anton Nashatyrev on 13.10.2015.
*/
public class ConfigCapabilitiesImpl implements ConfigCapabilities{

private final SystemProperties config;
private final RskSystemProperties config;

private SortedSet<Capability> allCaps = new TreeSet<>();
private SortedSet<Capability> allCapabilities = new TreeSet<>();

public ConfigCapabilitiesImpl(SystemProperties config) {
public ConfigCapabilitiesImpl(RskSystemProperties config) {
if (config.syncVersion() != null) {
EthVersion eth = fromCode(config.syncVersion());
if (eth != null) {
allCaps.add(new Capability(RSK, eth.getCode()));
allCapabilities.add(new Capability(RSK, eth.getCode()));
}
} else {
for (EthVersion v : EthVersion.supported()) {
allCaps.add(new Capability(RSK, v.getCode()));
allCapabilities.add(new Capability(RSK, v.getCode()));
}
}

if (config.isServerSnapshotSyncEnabled() && allCapabilities.stream().anyMatch(Capability::isRSK)) {
allCapabilities.add(new Capability(SNAP, SNAP_VERSION));
}

this.config = config;
}


/**
* Gets the capabilities listed in 'peer.capabilities' config property
* sorted by their names.
*/
public List<Capability> getConfigCapabilities() {
List<Capability> ret = new ArrayList<>();
List<String> caps = config.peerCapabilities();
for (Capability capability : allCaps) {
for (Capability capability : allCapabilities) {
if (caps.contains(capability.getName())) {
ret.add(capability);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,15 @@ private void processHelloMessage(ChannelHandlerContext ctx, HelloMessage helloMe
List<Capability> capInCommon = configCapabilities.getSupportedCapabilities(helloMessage);
channel.initMessageCodes(capInCommon);
for (Capability capability : capInCommon) {
// It seems that the only supported capability is RSK, and everything else is ignored.
// RSK Capability is the condition needed to be able to finish the Handshake successfully.
if (Capability.RSK.equals(capability.getName())) {
publicRLPxHandshakeFinished(ctx, helloMessage, fromCode(capability.getVersion()));
return;
}
}

// RSK must be present. If RSK is not found, the handshake process does not complete.
// Other capabilities, such as SNAP, cannot be considered if RSK is not supported.
throw new RuntimeException("The remote peer didn't support the RSK capability");
}

Expand Down
24 changes: 23 additions & 1 deletion rskj-core/src/main/java/org/ethereum/net/server/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import co.rsk.net.eth.RskWireProtocol;
import co.rsk.net.messages.Message;
import co.rsk.net.messages.MessageType;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.ethereum.net.MessageQueue;
Expand All @@ -50,6 +51,7 @@
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand All @@ -75,14 +77,16 @@ public class Channel implements Peer {
private final PeerStatistics peerStats = new PeerStatistics();

private Stats stats;
private final boolean isSnapCapable;

public Channel(MessageQueue msgQueue,
MessageCodec messageCodec,
NodeManager nodeManager,
RskWireProtocol.Factory rskWireProtocolFactory,
Eth62MessageFactory eth62MessageFactory,
StaticMessages staticMessages,
String remoteId) {
String remoteId,
List<Capability> capabilities) {
this.msgQueue = msgQueue;
this.messageCodec = messageCodec;
this.nodeManager = nodeManager;
Expand All @@ -91,6 +95,19 @@ public Channel(MessageQueue msgQueue,
this.staticMessages = staticMessages;
this.isActive = remoteId != null && !remoteId.isEmpty();
this.stats = new Stats();
this.isSnapCapable = capabilities.stream()
.anyMatch(capability -> Capability.SNAP.equals(capability.getName()));
}

@VisibleForTesting
public Channel(MessageQueue msgQueue,
MessageCodec messageCodec,
NodeManager nodeManager,
RskWireProtocol.Factory rskWireProtocolFactory,
Eth62MessageFactory eth62MessageFactory,
StaticMessages staticMessages,
String remoteId) {
this(msgQueue, messageCodec, nodeManager, rskWireProtocolFactory, eth62MessageFactory, staticMessages, remoteId, new ArrayList<>());
}

public void sendHelloMessage(ChannelHandlerContext ctx, FrameCodec frameCodec, String nodeId,
Expand Down Expand Up @@ -263,6 +280,11 @@ public void imported(boolean best) {
stats.imported(best);
}

@Override
public boolean isSnapCapable() {
return isSnapCapable;
}

@Override
public String toString() {
return String.format("%s | %s", getPeerId(), inetSocketAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void initChannel(NioSocketChannel ch) {
P2pHandler p2pHandler = new P2pHandler(ethereumListener, messageQueue, config.getPeerP2PPingInterval());
MessageCodec messageCodec = new MessageCodec(ethereumListener, config);
HandshakeHandler handshakeHandler = new HandshakeHandler(config, peerScoringManager, p2pHandler, messageCodec, configCapabilities);
Channel channel = new Channel(messageQueue, messageCodec, nodeManager, rskWireProtocolFactory, eth62MessageFactory, staticMessages, remoteId);
Channel channel = new Channel(messageQueue, messageCodec, nodeManager, rskWireProtocolFactory, eth62MessageFactory, staticMessages, remoteId, configCapabilities.getConfigCapabilities());

ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(config.peerChannelReadTimeout(), TimeUnit.SECONDS));
ch.pipeline().addLast("handshakeHandler", handshakeHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public double score(long currentTime, MessageType type) {
public void imported(boolean best) {
}

@Override
public boolean isSnapCapable() {
return false;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
5 changes: 5 additions & 0 deletions rskj-core/src/test/java/co/rsk/net/simples/SimplePeer.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public double score(long currentTime, MessageType type) {
public void imported(boolean best) {
}

@Override
public boolean isSnapCapable() {
return false;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
23 changes: 12 additions & 11 deletions rskj-core/src/test/java/co/rsk/net/sync/PeersInformationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ private PeersInformation setupTopBestScenario(double topBest) {

PeersInformation peersInformation = new PeersInformation(channelManager, syncConfiguration, blockchain, peerScoringManager, random);

Peer peer1 = setupPeer(peersInformation, null, "peer1", "peerHost1.COM", true, 1L, 1L, true );
Peer peer2 = setupPeer(peersInformation, null, "peer2", "peerHost2.COM", true, 2L, 2L, true );
Peer peer3 = setupPeer(peersInformation, null, "peer3", "peerHost3.COM", true, 3L, 3L, true );
Peer peer4 = setupPeer(peersInformation, null, "peer4", "peerHost4.COM", true, 4L, 4L, true );
Peer peer5 = setupPeer(peersInformation, null, "peer5", "peerHost5.COM", true, 5L, 5L, true );
Peer peer1 = setupPeer(peersInformation, null, "peer1", "peerHost1.COM", true, 1L, 1L, true, false );
Peer peer2 = setupPeer(peersInformation, null, "peer2", "peerHost2.COM", true, 2L, 2L, true, false );
Peer peer3 = setupPeer(peersInformation, null, "peer3", "peerHost3.COM", true, 3L, 3L, true, false );
Peer peer4 = setupPeer(peersInformation, null, "peer4", "peerHost4.COM", true, 4L, 4L, true, false );
Peer peer5 = setupPeer(peersInformation, null, "peer5", "peerHost5.COM", true, 5L, 5L, true, false );

Mockito.when(channelManager.getActivePeers()).thenReturn(Stream.of(
peer1, peer2, peer3, peer4, peer5
Expand All @@ -223,11 +223,11 @@ private PeersInformation setupTopBestSnapshotScenario(double topBest) {

PeersInformation peersInformation = new PeersInformation(channelManager, syncConfiguration, blockchain, peerScoringManager, random);

Peer snapPeer1 = setupPeer(peersInformation, trustedSnapPeersMap, "0x0FF", "snapPeerHost1.COM", true, 10L, 10L, true );
Peer snapPeer2 = setupPeer(peersInformation, trustedSnapPeersMap, "0xAFE", "snapPeerHost2.COM", true, 20L, 20L, true );
Peer snapPeer3 = setupPeer(peersInformation, trustedSnapPeersMap, "0xA8E", "snapPeerHost3.COM", true, 30L, 30L, false );
Peer snapPeer4 = setupPeer(peersInformation, trustedSnapPeersMap, "0xA9E", "snapPeerHost4.COM", false, 40L, 40L, true );
setupPeer(peersInformation, trustedSnapPeersMap, "0xA9E", "snapPeerHost5.COM", false, 50L, 50L, true );
Peer snapPeer1 = setupPeer(peersInformation, trustedSnapPeersMap, "0x0FF", "snapPeerHost1.COM", true, 10L, 10L, true, true );
Peer snapPeer2 = setupPeer(peersInformation, trustedSnapPeersMap, "0xAFE", "snapPeerHost2.COM", true, 20L, 20L, true, true );
Peer snapPeer3 = setupPeer(peersInformation, trustedSnapPeersMap, "0xA8E", "snapPeerHost3.COM", true, 30L, 30L, false, true );
Peer snapPeer4 = setupPeer(peersInformation, trustedSnapPeersMap, "0xA9E", "snapPeerHost4.COM", false, 40L, 40L, true, true );
setupPeer(peersInformation, trustedSnapPeersMap, "0xA9E", "snapPeerHost5.COM", false, 50L, 50L, true, true );

Mockito.when(channelManager.getActivePeers()).thenReturn(Stream.of(
snapPeer1, snapPeer2, snapPeer3, snapPeer4
Expand All @@ -238,9 +238,10 @@ private PeersInformation setupTopBestSnapshotScenario(double topBest) {
}
private Peer setupPeer(PeersInformation peersInformation, Map<String, Node> trustedSnapPeersMap, String nodeId, String nodeHost,
boolean hasGoodReputation, long bestBlockNumber, long blockDifficulty,
boolean hasLowerTotalDifficultyThan) {
boolean hasLowerTotalDifficultyThan, boolean isSnapCapable) {
Peer peer = Mockito.mock(Peer.class);

Mockito.when(peer.isSnapCapable()).thenReturn(isSnapCapable);
Mockito.when(peer.getPeerNodeID()).thenReturn(new NodeID(nodeId.getBytes()));

if(trustedSnapPeersMap!=null){
Expand Down
27 changes: 27 additions & 0 deletions rskj-core/src/test/java/org/ethereum/net/HelloMessageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

class HelloMessageTest {

Expand Down Expand Up @@ -108,4 +109,30 @@ void test3() {
assertEquals(listenPort, helloMessage.getListenPort());
assertEquals(peerId, helloMessage.getPeerId());
}

@Test
void test5() {

//Init
byte version = 2;
String clientStr = "Ethereum(++)/v0.7.9/Release/Linux/g++";
List<Capability> capabilities = Arrays.asList(
new Capability(Capability.RSK, EthVersion.UPPER),
new Capability(Capability.SNAP, EthVersion.UPPER),
new Capability(Capability.P2P, P2pHandler.VERSION));
int listenPort = 992;
String peerId = "1fbf1e41f08078918c9f7b6734594ee56d7f538614f602c71194db0a1af5a";

HelloMessage helloMessage = new HelloMessage(version, clientStr, capabilities, listenPort, peerId);
logger.info(helloMessage.toString());

assertEquals(P2pMessageCodes.HELLO, helloMessage.getCommand());
assertEquals(version, helloMessage.getP2PVersion());
assertEquals(clientStr, helloMessage.getClientId());
assertEquals(3, helloMessage.getCapabilities().size());
assertTrue(helloMessage.getCapabilities().stream()
.anyMatch(cap -> Capability.SNAP.equals(cap.getName())));
assertEquals(listenPort, helloMessage.getListenPort());
assertEquals(peerId, helloMessage.getPeerId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import co.rsk.config.RskSystemProperties;
import co.rsk.config.TestSystemProperties;
import co.rsk.scoring.PeerScoringManager;
import com.typesafe.config.ConfigFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -37,10 +38,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.ethereum.net.client.Capability.RSK;
import static org.ethereum.net.client.Capability.SNAP;
import static org.ethereum.net.client.Capability.SNAP_VERSION;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
Expand All @@ -58,7 +62,12 @@ class HandshakeHandlerTest {

@BeforeEach
void setup() {
RskSystemProperties config = new TestSystemProperties();
RskSystemProperties config = new TestSystemProperties(rawConfig ->
ConfigFactory.parseString("{" +
"sync.snapshot.server.enabled = true," +
"peer.capabilities = [rsk, eth, shh, snap]" +
"}").withFallback(rawConfig));

hhKey = config.getMyKey();
handler = new HandshakeHandler(
config,
Expand Down Expand Up @@ -98,6 +107,20 @@ void shouldDisconnectIfRskCapabilityIsMissing() throws Exception {
assertFalse(ch.isOpen());
}

@Test
void shouldConnectWithSnapCapability() throws Exception {
simulateHandshakeStartedByPeer(Arrays.asList(new Capability(SNAP, SNAP_VERSION), new Capability(RSK, EthVersion.UPPER)));
// this will only happen when an exception is raised
assertTrue(ch.isOpen());
}

@Test
void shouldDisconnectWithSnapCapabilityIfRskCapabilityIsMissing() throws Exception {
simulateHandshakeStartedByPeer(Arrays.asList(new Capability(SNAP, SNAP_VERSION)));
// this will only happen when an exception is raised
assertFalse(ch.isOpen());
}

// This is sort of an integration test. It interacts with the handshake handler and multiple other objects to
// simulate a handshake initiated by a remote peer.
// In the future, the handshake classes should be rewritten to allow unit testing.
Expand Down
2 changes: 1 addition & 1 deletion rskj-core/src/test/resources/rskj.conf
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ peer {
]

# The protocols supported by peer
# can be: [eth, shh, bzz]
# can be: [eth, shh, bzz, snap]
capabilities = [eth]

# Peer for server to listen for incoming
Expand Down