Skip to content

Commit d62503f

Browse files
fix(dot/netwok): check for duplicate message earlier (#2435)
Check if we have seen the block announce notification message before any processing happens on it don't report if we fail to find if a protocol is supported
1 parent 2deb7dd commit d62503f

File tree

10 files changed

+110
-53
lines changed

10 files changed

+110
-53
lines changed

dot/network/block_announce.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,14 @@ func (bm *BlockAnnounceMessage) Decode(in []byte) error {
6969
}
7070

7171
// Hash returns the hash of the BlockAnnounceMessage
72-
func (bm *BlockAnnounceMessage) Hash() common.Hash {
72+
func (bm *BlockAnnounceMessage) Hash() (common.Hash, error) {
7373
// scale encode each extrinsic
74-
encMsg, _ := bm.Encode()
75-
hash, _ := common.Blake2bHash(encMsg)
76-
return hash
74+
encMsg, err := bm.Encode()
75+
if err != nil {
76+
return common.Hash{}, fmt.Errorf("cannot encode message: %w", err)
77+
}
78+
79+
return common.Blake2bHash(encMsg)
7780
}
7881

7982
// IsHandshake returns false
@@ -144,9 +147,15 @@ func (*BlockAnnounceHandshake) Type() byte {
144147
return 0
145148
}
146149

147-
// Hash ...
148-
func (*BlockAnnounceHandshake) Hash() common.Hash {
149-
return common.Hash{}
150+
// Hash returns blake2b hash of block announce handshake.
151+
func (hs *BlockAnnounceHandshake) Hash() (common.Hash, error) {
152+
// scale encode each extrinsic
153+
encMsg, err := hs.Encode()
154+
if err != nil {
155+
return common.Hash{}, fmt.Errorf("cannot encode handshake: %w", err)
156+
}
157+
158+
return common.Blake2bHash(encMsg)
150159
}
151160

152161
// IsHandshake returns true
@@ -174,7 +183,7 @@ func (s *Service) validateBlockAnnounceHandshake(from peer.ID, hs Handshake) err
174183
return errors.New("invalid handshake type")
175184
}
176185

177-
if bhs.GenesisHash != s.blockState.GenesisHash() {
186+
if !bhs.GenesisHash.Equal(s.blockState.GenesisHash()) {
178187
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
179188
Value: peerset.GenesisMismatch,
180189
Reason: peerset.GenesisMismatchReason,

dot/network/gossip.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package network
55

66
import (
7+
"fmt"
78
"sync"
89

910
"github.com/ChainSafe/gossamer/internal/log"
@@ -25,19 +26,25 @@ func newGossip() *gossip {
2526
}
2627
}
2728

28-
// hasSeen broadcasts messages that have not been seen
29-
func (g *gossip) hasSeen(msg NotificationsMessage) bool {
30-
// check if message has not been seen
31-
msgHash := msg.Hash()
29+
// hasSeen checks if we have seen the given message before.
30+
func (g *gossip) hasSeen(msg NotificationsMessage) (bool, error) {
31+
msgHash, err := msg.Hash()
32+
if err != nil {
33+
return false, fmt.Errorf("could not hash notification message: %w", err)
34+
}
35+
3236
g.seenMutex.Lock()
3337
defer g.seenMutex.Unlock()
3438

39+
// check if message has not been seen
3540
_, ok := g.seenMap[msgHash]
3641
if !ok {
3742
// set message to has been seen
38-
g.seenMap[msgHash] = struct{}{}
39-
return false
43+
if !msg.IsHandshake() {
44+
g.seenMap[msgHash] = struct{}{}
45+
}
46+
return false, nil
4047
}
4148

42-
return true
49+
return true, nil
4350
}

dot/network/gossip_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,15 @@ func TestGossip(t *testing.T) {
8989

9090
time.Sleep(TestMessageTimeout)
9191

92-
_, ok := nodeB.gossip.seenMap[announceMessage.Hash()]
92+
hash, err := announceMessage.Hash()
93+
require.NoError(t, err)
94+
95+
_, ok := nodeB.gossip.seenMap[hash]
9396
require.True(t, ok, "node B did not receive block request message from node A")
9497

95-
_, ok = nodeC.gossip.seenMap[announceMessage.Hash()]
98+
_, ok = nodeC.gossip.seenMap[hash]
9699
require.True(t, ok, "node C did not receive block request message from node B")
97100

98-
_, ok = nodeA.gossip.seenMap[announceMessage.Hash()]
101+
_, ok = nodeA.gossip.seenMap[hash]
99102
require.True(t, ok, "node A did not receive block request message from node C")
100103
}

dot/network/message.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type Message interface {
3636
type NotificationsMessage interface {
3737
Message
3838
Type() byte
39-
Hash() common.Hash
39+
Hash() (common.Hash, error)
4040
IsHandshake() bool
4141
}
4242

@@ -389,11 +389,13 @@ func (cm *ConsensusMessage) Decode(in []byte) error {
389389
}
390390

391391
// Hash returns the Hash of ConsensusMessage
392-
func (cm *ConsensusMessage) Hash() common.Hash {
392+
func (cm *ConsensusMessage) Hash() (common.Hash, error) {
393393
// scale encode each extrinsic
394-
encMsg, _ := cm.Encode()
395-
hash, _ := common.Blake2bHash(encMsg)
396-
return hash
394+
encMsg, err := cm.Encode()
395+
if err != nil {
396+
return common.Hash{}, fmt.Errorf("cannot encode message: %w", err)
397+
}
398+
return common.Blake2bHash(encMsg)
397399
}
398400

399401
// IsHandshake returns false

dot/network/message_cache.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package network
55

66
import (
77
"errors"
8+
"fmt"
89
"time"
910

1011
"github.com/ChainSafe/gossamer/lib/common"
@@ -55,6 +56,7 @@ func (m *messageCache) put(peer peer.ID, msg NotificationsMessage) (bool, error)
5556
func (m *messageCache) exists(peer peer.ID, msg NotificationsMessage) bool {
5657
key, err := generateCacheKey(peer, msg)
5758
if err != nil {
59+
logger.Errorf("could not generate cache key: %s", err)
5860
return false
5961
}
6062

@@ -67,7 +69,12 @@ func generateCacheKey(peer peer.ID, msg NotificationsMessage) ([]byte, error) {
6769
return nil, errors.New("cache does not support handshake messages")
6870
}
6971

70-
peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msg.Hash().ToBytes()...))
72+
msgHash, err := msg.Hash()
73+
if err != nil {
74+
return nil, fmt.Errorf("cannot hash notification message: %w", err)
75+
}
76+
77+
peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msgHash.ToBytes()...))
7178
if err != nil {
7279
return nil, err
7380
}

dot/network/notifications.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,20 @@ func (s *Service) createNotificationsMessageHandler(
136136
return fmt.Errorf("%w: expected %T but got %T", errMessageTypeNotValid, (NotificationsMessage)(nil), msg)
137137
}
138138

139+
hasSeen, err := s.gossip.hasSeen(msg)
140+
if err != nil {
141+
return fmt.Errorf("could not check if message was seen before: %w", err)
142+
}
143+
144+
if hasSeen {
145+
// report peer if we get duplicate gossip message.
146+
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
147+
Value: peerset.DuplicateGossipValue,
148+
Reason: peerset.DuplicateGossipReason,
149+
}, peer)
150+
return nil
151+
}
152+
139153
if msg.IsHandshake() {
140154
logger.Tracef("received handshake on notifications sub-protocol %s from peer %s, message is: %s",
141155
info.protocolID, stream.Conn().RemotePeer(), msg)
@@ -207,16 +221,7 @@ func (s *Service) createNotificationsMessageHandler(
207221
return nil
208222
}
209223

210-
if !s.gossip.hasSeen(msg) {
211-
s.broadcastExcluding(info, peer, msg)
212-
return nil
213-
}
214-
215-
// report peer if we get duplicate gossip message.
216-
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
217-
Value: peerset.DuplicateGossipValue,
218-
Reason: peerset.DuplicateGossipReason,
219-
}, peer)
224+
s.broadcastExcluding(info, peer, msg)
220225
return nil
221226
}
222227
}
@@ -238,7 +243,13 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
238243
return
239244
}
240245

241-
if support, err := s.host.supportsProtocol(peer, info.protocolID); err != nil || !support {
246+
support, err := s.host.supportsProtocol(peer, info.protocolID)
247+
if err != nil {
248+
logger.Errorf("could not check if protocol %s is supported by peer %s: %s", info.protocolID, peer, err)
249+
return
250+
}
251+
252+
if !support {
242253
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
243254
Value: peerset.BadProtocolValue,
244255
Reason: peerset.BadProtocolReason,
@@ -319,7 +330,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP
319330
peer, info.protocolID, hs)
320331
stream, err := s.host.send(peer, info.protocolID, hs)
321332
if err != nil {
322-
logger.Tracef("failed to send message to peer %s: %s", peer, err)
333+
logger.Tracef("failed to send handshake to peer %s: %s", peer, err)
323334
// don't need to close the stream here, as it's nil!
324335
return nil, err
325336
}
@@ -345,7 +356,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP
345356
}
346357

347358
if hsResponse.err != nil {
348-
logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, err)
359+
logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, hsResponse.err)
349360
closeOutboundStream(info, peer, stream)
350361
return nil, hsResponse.err
351362
}

dot/network/notifications_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,9 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T)
189189
Roles: 4,
190190
BestBlockNumber: 77,
191191
BestBlockHash: common.Hash{1},
192-
GenesisHash: common.Hash{2},
192+
// we are using a different genesis here, thus this
193+
// handshake would be validated to be incorrect.
194+
GenesisHash: common.Hash{2},
193195
}
194196

195197
err = handler(stream, testHandshake)
@@ -367,43 +369,43 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
367369
require.Len(t, txnBatch, 1)
368370

369371
msg = &TransactionMessage{
370-
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
372+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}},
371373
}
372374
err = handler(stream, msg)
373375
require.NoError(t, err)
374376
require.Len(t, txnBatch, 2)
375377

376378
msg = &TransactionMessage{
377-
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
379+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
378380
}
379381
err = handler(stream, msg)
380382
require.NoError(t, err)
381383
require.Len(t, txnBatch, 3)
382384

383385
msg = &TransactionMessage{
384-
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
386+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}},
385387
}
386388
err = handler(stream, msg)
387389
require.NoError(t, err)
388390
require.Len(t, txnBatch, 4)
389391

390392
msg = &TransactionMessage{
391-
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
393+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}},
392394
}
393395
err = handler(stream, msg)
394396
require.NoError(t, err)
395397
require.Len(t, txnBatch, 5)
396398

397399
// reached batch size limit, below transaction will not be included in batch.
398400
msg = &TransactionMessage{
399-
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
401+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}},
400402
}
401403
err = handler(stream, msg)
402404
require.NoError(t, err)
403405
require.Len(t, txnBatch, 5)
404406

405407
msg = &TransactionMessage{
406-
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
408+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}, {8, 8}},
407409
}
408410
// wait for transaction batch channel to process.
409411
time.Sleep(1300 * time.Millisecond)

dot/network/transaction.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,12 @@ func (tm *TransactionMessage) Decode(in []byte) error {
5454
}
5555

5656
// Hash returns the hash of the TransactionMessage
57-
func (tm *TransactionMessage) Hash() common.Hash {
58-
encMsg, _ := tm.Encode()
59-
hash, _ := common.Blake2bHash(encMsg)
60-
return hash
57+
func (tm *TransactionMessage) Hash() (common.Hash, error) {
58+
encMsg, err := tm.Encode()
59+
if err != nil {
60+
return common.Hash{}, fmt.Errorf("could not encode message: %w", err)
61+
}
62+
return common.Blake2bHash(encMsg)
6163
}
6264

6365
// IsHandshake returns false
@@ -93,8 +95,8 @@ func (*transactionHandshake) Type() byte {
9395
}
9496

9597
// Hash ...
96-
func (*transactionHandshake) Hash() common.Hash {
97-
return common.Hash{}
98+
func (*transactionHandshake) Hash() (common.Hash, error) {
99+
return common.Hash{}, nil
98100
}
99101

100102
// IsHandshake returns true
@@ -129,6 +131,7 @@ func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage, slotDur
129131
case txnMsg := <-txnBatchCh:
130132
propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg)
131133
if err != nil {
134+
logger.Warnf("could not handle transaction message: %s", err)
132135
s.host.closeProtocolStream(protocolID, txnMsg.peer)
133136
continue
134137
}
@@ -137,7 +140,16 @@ func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage, slotDur
137140
continue
138141
}
139142

140-
if !s.gossip.hasSeen(txnMsg.msg) {
143+
// TODO: Check if s.gossip.hasSeen should be moved before handleTransactionMessage. #2445
144+
// That we could avoid handling the transactions again, which we would have already seen.
145+
146+
hasSeen, err := s.gossip.hasSeen(txnMsg.msg)
147+
if err != nil {
148+
s.host.closeProtocolStream(protocolID, txnMsg.peer)
149+
logger.Debugf("could not check if message was seen before: %s", err)
150+
continue
151+
}
152+
if !hasSeen {
141153
s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg)
142154
}
143155
}

dot/peerset/handler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ func (h *Handler) RemovePeer(setID int, peers ...peer.ID) {
7777

7878
// ReportPeer reports ReputationChange according to the peer behaviour.
7979
func (h *Handler) ReportPeer(rep ReputationChange, peers ...peer.ID) {
80+
for _, pid := range peers {
81+
logger.Debugf("reporting reputation change of %d to peer %s, reason: %s", rep.Value, pid, rep.Reason)
82+
}
83+
8084
h.actionQueue <- action{
8185
actionCall: reportPeer,
8286
reputation: rep,

lib/grandpa/network.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ func (*GrandpaHandshake) Type() byte {
6464
}
6565

6666
// Hash ...
67-
func (*GrandpaHandshake) Hash() common.Hash {
68-
return common.Hash{}
67+
func (*GrandpaHandshake) Hash() (common.Hash, error) {
68+
return common.Hash{}, nil
6969
}
7070

7171
// IsHandshake returns true

0 commit comments

Comments
 (0)