Skip to content

Commit 8c2993d

Browse files
authored
fix(dot/network): fix bugs in notifications protocol handlers; add metrics for inbound/outbound streams (#2010)
1 parent 84ec792 commit 8c2993d

17 files changed

+443
-301
lines changed

dot/network/block_announce_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func TestValidateBlockAnnounceHandshake(t *testing.T) {
155155
inboundHandshakeData: new(sync.Map),
156156
}
157157
testPeerID := peer.ID("noot")
158-
nodeA.notificationsProtocols[BlockAnnounceMsgType].inboundHandshakeData.Store(testPeerID, handshakeData{})
158+
nodeA.notificationsProtocols[BlockAnnounceMsgType].inboundHandshakeData.Store(testPeerID, &handshakeData{})
159159

160160
err := nodeA.validateBlockAnnounceHandshake(testPeerID, &BlockAnnounceHandshake{
161161
BestBlockNumber: 100,

dot/network/connmgr.go

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/libp2p/go-libp2p-core/connmgr"
1313
"github.com/libp2p/go-libp2p-core/network"
1414
"github.com/libp2p/go-libp2p-core/peer"
15-
"github.com/libp2p/go-libp2p-core/protocol"
1615
ma "github.com/multiformats/go-multiaddr"
1716

1817
"github.com/ChainSafe/gossamer/dot/peerset"
@@ -23,11 +22,9 @@ type ConnManager struct {
2322
sync.Mutex
2423
host *host
2524
min, max int
25+
connectHandler func(peer.ID)
2626
disconnectHandler func(peer.ID)
2727

28-
// closeHandlerMap contains close handler corresponding to a protocol.
29-
closeHandlerMap map[protocol.ID]func(peerID peer.ID)
30-
3128
// protectedPeers contains a list of peers that are protected from pruning
3229
// when we reach the maximum numbers of peers.
3330
protectedPeers *sync.Map // map[peer.ID]struct{}
@@ -47,7 +44,6 @@ func newConnManager(min, max int, peerSetCfg *peerset.ConfigSet) (*ConnManager,
4744
return &ConnManager{
4845
min: min,
4946
max: max,
50-
closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)),
5147
protectedPeers: new(sync.Map),
5248
persistentPeers: new(sync.Map),
5349
peerSetHandler: psh,
@@ -68,19 +64,19 @@ func (cm *ConnManager) Notifee() network.Notifiee {
6864
return nb
6965
}
7066

71-
// TagPeer peer
67+
// TagPeer is unimplemented
7268
func (*ConnManager) TagPeer(peer.ID, string, int) {}
7369

74-
// UntagPeer peer
70+
// UntagPeer is unimplemented
7571
func (*ConnManager) UntagPeer(peer.ID, string) {}
7672

77-
// UpsertTag peer
73+
// UpsertTag is unimplemented
7874
func (*ConnManager) UpsertTag(peer.ID, string, func(int) int) {}
7975

80-
// GetTagInfo peer
76+
// GetTagInfo is unimplemented
8177
func (*ConnManager) GetTagInfo(peer.ID) *connmgr.TagInfo { return &connmgr.TagInfo{} }
8278

83-
// TrimOpenConns peer
79+
// TrimOpenConns is unimplemented
8480
func (*ConnManager) TrimOpenConns(context.Context) {}
8581

8682
// Protect peer will add the given peer to the protectedPeerMap which will
@@ -97,7 +93,7 @@ func (cm *ConnManager) Unprotect(id peer.ID, _ string) bool {
9793
return wasDeleted
9894
}
9995

100-
// Close peer
96+
// Close is unimplemented
10197
func (*ConnManager) Close() error { return nil }
10298

10399
// IsProtected returns whether the given peer is protected from pruning or not.
@@ -134,6 +130,7 @@ func (cm *ConnManager) unprotectedPeers(peers []peer.ID) []peer.ID {
134130
func (cm *ConnManager) Connected(n network.Network, c network.Conn) {
135131
logger.Tracef(
136132
"Host %s connected to peer %s", n.LocalPeer(), c.RemotePeer())
133+
cm.connectHandler(c.RemotePeer())
137134

138135
cm.Lock()
139136
defer cm.Unlock()
@@ -143,7 +140,9 @@ func (cm *ConnManager) Connected(n network.Network, c network.Conn) {
143140
return
144141
}
145142

143+
// TODO: peer scoring doesn't seem to prevent us from going over the max.
146144
// if over the max peer count, disconnect from (total_peers - maximum) peers
145+
// (#2039)
147146
for i := 0; i < over; i++ {
148147
unprotPeers := cm.unprotectedPeers(n.Peers())
149148
if len(unprotPeers) == 0 {
@@ -170,31 +169,19 @@ func (cm *ConnManager) Disconnected(_ network.Network, c network.Conn) {
170169
logger.Tracef("Host %s disconnected from peer %s", c.LocalPeer(), c.RemotePeer())
171170

172171
cm.Unprotect(c.RemotePeer(), "")
173-
if cm.disconnectHandler != nil {
174-
cm.disconnectHandler(c.RemotePeer())
175-
}
172+
cm.disconnectHandler(c.RemotePeer())
176173
}
177174

178-
// OpenedStream is called when a stream opened
175+
// OpenedStream is called when a stream is opened
179176
func (cm *ConnManager) OpenedStream(_ network.Network, s network.Stream) {
180177
logger.Tracef("Stream opened with peer %s using protocol %s",
181178
s.Conn().RemotePeer(), s.Protocol())
182179
}
183180

184-
func (cm *ConnManager) registerCloseHandler(protocolID protocol.ID, cb func(id peer.ID)) {
185-
cm.closeHandlerMap[protocolID] = cb
186-
}
187-
188-
// ClosedStream is called when a stream closed
181+
// ClosedStream is called when a stream is closed
189182
func (cm *ConnManager) ClosedStream(_ network.Network, s network.Stream) {
190183
logger.Tracef("Stream closed with peer %s using protocol %s",
191184
s.Conn().RemotePeer(), s.Protocol())
192-
193-
cm.Lock()
194-
defer cm.Unlock()
195-
if closeCB, ok := cm.closeHandlerMap[s.Protocol()]; ok {
196-
closeCB(s.Conn().RemotePeer())
197-
}
198185
}
199186

200187
func (cm *ConnManager) isPersistent(p peer.ID) bool {

dot/network/discovery.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,21 @@ func (d *discovery) findPeers(ctx context.Context) {
197197

198198
logger.Tracef("found new peer %s via DHT", peer.ID)
199199

200+
// TODO: this isn't working on the devnet (#2026)
201+
// can remove the code block below which directly connects
202+
// once that's fixed
200203
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
201204
d.handler.AddPeer(0, peer.ID)
205+
206+
// found a peer, try to connect if we need more peers
207+
if len(d.h.Network().Peers()) >= d.maxPeers {
208+
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
209+
return
210+
}
211+
212+
if err = d.h.Connect(d.ctx, peer); err != nil {
213+
logger.Tracef("failed to connect to discovered peer %s: %s", peer.ID, err)
214+
}
202215
}
203216
}
204217
}

dot/network/errors.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright 2021 ChainSafe Systems (ON)
2+
// SPDX-License-Identifier: LGPL-3.0-only
3+
4+
package network
5+
6+
import (
7+
"errors"
8+
)
9+
10+
var (
11+
errCannotValidateHandshake = errors.New("failed to validate handshake")
12+
errMessageTypeNotValid = errors.New("message type is not valid")
13+
errMessageIsNotHandshake = errors.New("failed to convert message to Handshake")
14+
errMissingHandshakeMutex = errors.New("outboundHandshakeMutex does not exist")
15+
errInvalidHandshakeForPeer = errors.New("peer previously sent invalid handshake")
16+
errHandshakeTimeout = errors.New("handshake timeout reached")
17+
)

dot/network/host_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ func TestStreamCloseMetadataCleanup(t *testing.T) {
313313
info := nodeA.notificationsProtocols[BlockAnnounceMsgType]
314314

315315
// Set handshake data to received
316-
info.inboundHandshakeData.Store(nodeB.host.id(), handshakeData{
316+
info.inboundHandshakeData.Store(nodeB.host.id(), &handshakeData{
317317
received: true,
318318
validated: true,
319319
})

dot/network/inbound.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2021 ChainSafe Systems (ON)
2+
// SPDX-License-Identifier: LGPL-3.0-only
3+
4+
package network
5+
6+
import (
7+
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
8+
)
9+
10+
func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) {
11+
// we NEED to reset the stream if we ever return from this function, as if we return,
12+
// the stream will never again be read by us, so we need to tell the remote side we're
13+
// done with this stream, and they should also forget about it.
14+
defer s.resetInboundStream(stream)
15+
s.streamManager.logNewStream(stream)
16+
17+
peer := stream.Conn().RemotePeer()
18+
msgBytes := s.bufPool.get()
19+
defer s.bufPool.put(msgBytes)
20+
21+
for {
22+
n, err := readStream(stream, msgBytes[:])
23+
if err != nil {
24+
logger.Tracef(
25+
"failed to read from stream id %s of peer %s using protocol %s: %s",
26+
stream.ID(), stream.Conn().RemotePeer(), stream.Protocol(), err)
27+
return
28+
}
29+
30+
s.streamManager.logMessageReceived(stream.ID())
31+
32+
// decode message based on message type
33+
msg, err := decoder(msgBytes[:n], peer, isInbound(stream)) // stream should always be inbound if it passes through service.readStream
34+
if err != nil {
35+
logger.Tracef("failed to decode message from stream id %s using protocol %s: %s",
36+
stream.ID(), stream.Protocol(), err)
37+
continue
38+
}
39+
40+
logger.Tracef(
41+
"host %s received message from peer %s: %s",
42+
s.host.id(), peer, msg)
43+
44+
if err = handler(stream, msg); err != nil {
45+
logger.Tracef("failed to handle message %s from stream id %s: %s", msg, stream.ID(), err)
46+
return
47+
}
48+
49+
s.host.bwc.LogRecvMessage(int64(n))
50+
}
51+
}
52+
53+
func (s *Service) resetInboundStream(stream libp2pnetwork.Stream) {
54+
protocolID := stream.Protocol()
55+
peerID := stream.Conn().RemotePeer()
56+
57+
s.notificationsMu.Lock()
58+
defer s.notificationsMu.Unlock()
59+
60+
for _, prtl := range s.notificationsProtocols {
61+
if prtl.protocolID != protocolID {
62+
continue
63+
}
64+
65+
prtl.inboundHandshakeData.Delete(peerID)
66+
break
67+
}
68+
69+
logger.Debugf(
70+
"cleaning up inbound handshake data for protocol=%s, peer=%s",
71+
stream.Protocol(),
72+
peerID,
73+
)
74+
75+
_ = stream.Reset()
76+
}

dot/network/light.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,71 @@ import (
99
"github.com/ChainSafe/gossamer/dot/types"
1010
"github.com/ChainSafe/gossamer/lib/common"
1111
"github.com/ChainSafe/gossamer/pkg/scale"
12+
13+
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
14+
"github.com/libp2p/go-libp2p-core/peer"
1215
)
1316

17+
// handleLightStream handles streams with the <protocol-id>/light/2 protocol ID
18+
func (s *Service) handleLightStream(stream libp2pnetwork.Stream) {
19+
s.readStream(stream, s.decodeLightMessage, s.handleLightMsg)
20+
}
21+
22+
func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message, error) {
23+
s.lightRequestMu.RLock()
24+
defer s.lightRequestMu.RUnlock()
25+
26+
// check if we are the requester
27+
if _, ok := s.lightRequest[peer]; ok {
28+
// if we are, decode the bytes as a LightResponse
29+
return newLightResponseFromBytes(in)
30+
}
31+
32+
// otherwise, decode bytes as LightRequest
33+
return newLightRequestFromBytes(in)
34+
}
35+
36+
func (s *Service) handleLightMsg(stream libp2pnetwork.Stream, msg Message) (err error) {
37+
defer func() {
38+
_ = stream.Close()
39+
}()
40+
41+
lr, ok := msg.(*LightRequest)
42+
if !ok {
43+
return nil
44+
}
45+
46+
resp := NewLightResponse()
47+
switch {
48+
case lr.RemoteCallRequest != nil:
49+
resp.RemoteCallResponse, err = remoteCallResp(lr.RemoteCallRequest)
50+
case lr.RemoteHeaderRequest != nil:
51+
resp.RemoteHeaderResponse, err = remoteHeaderResp(lr.RemoteHeaderRequest)
52+
case lr.RemoteChangesRequest != nil:
53+
resp.RemoteChangesResponse, err = remoteChangeResp(lr.RemoteChangesRequest)
54+
case lr.RemoteReadRequest != nil:
55+
resp.RemoteReadResponse, err = remoteReadResp(lr.RemoteReadRequest)
56+
case lr.RemoteReadChildRequest != nil:
57+
resp.RemoteReadResponse, err = remoteReadChildResp(lr.RemoteReadChildRequest)
58+
default:
59+
logger.Warn("ignoring LightRequest without request data")
60+
return nil
61+
}
62+
63+
if err != nil {
64+
return err
65+
}
66+
67+
// TODO(arijit): Remove once we implement the internal APIs. Added to increase code coverage. (#1856)
68+
logger.Debugf("LightResponse message: %s", resp)
69+
70+
err = s.host.writeToStream(stream, resp)
71+
if err != nil {
72+
logger.Warnf("failed to send LightResponse message to peer %s: %s", stream.Conn().RemotePeer(), err)
73+
}
74+
return err
75+
}
76+
1477
// Pair is a pair of arbitrary bytes.
1578
type Pair struct {
1679
First []byte
@@ -46,6 +109,12 @@ func NewLightRequest() *LightRequest {
46109
}
47110
}
48111

112+
func newLightRequestFromBytes(in []byte) (msg *LightRequest, err error) {
113+
msg = NewLightRequest()
114+
err = msg.Decode(in)
115+
return msg, err
116+
}
117+
49118
func newRequest() *request {
50119
return &request{
51120
RemoteCallRequest: *newRemoteCallRequest(),
@@ -122,6 +191,12 @@ func NewLightResponse() *LightResponse {
122191
}
123192
}
124193

194+
func newLightResponseFromBytes(in []byte) (msg *LightResponse, err error) {
195+
msg = NewLightResponse()
196+
err = msg.Decode(in)
197+
return msg, err
198+
}
199+
125200
func newResponse() *response {
126201
return &response{
127202
RemoteCallResponse: *newRemoteCallResponse(),

0 commit comments

Comments
 (0)