Skip to content

Commit ae7012b

Browse files
authored
feat(dot/network): request block justifications when near head (#1499)
1 parent a6e0bd0 commit ae7012b

18 files changed

+455
-94
lines changed

dot/network/block_announce.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -212,14 +212,6 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err
212212
return errors.New("genesis hash mismatch")
213213
}
214214

215-
// if peer has higher best block than us, begin syncing
216-
latestHeader, err := s.blockState.BestBlockHeader()
217-
if err != nil {
218-
return err
219-
}
220-
221-
bestBlockNum := big.NewInt(int64(bhs.BestBlockNumber))
222-
223215
np, ok := s.notificationsProtocols[BlockAnnounceMsgType]
224216
if !ok {
225217
// this should never happen.
@@ -239,6 +231,14 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err
239231

240232
data.handshake = hs
241233

234+
// if peer has higher best block than us, begin syncing
235+
latestHeader, err := s.blockState.BestBlockHeader()
236+
if err != nil {
237+
return err
238+
}
239+
240+
bestBlockNum := big.NewInt(int64(bhs.BestBlockNumber))
241+
242242
// check if peer block number is greater than host block number
243243
if latestHeader.Number.Cmp(bestBlockNum) >= 0 {
244244
return nil

dot/network/host.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func (h *host) bootstrap() {
237237
// peer (gets the already opened outbound message stream or opens a new one).
238238
func (h *host) send(p peer.ID, pid protocol.ID, msg Message) (err error) {
239239
// get outbound stream for given peer
240-
s := h.getStream(p, pid)
240+
s := h.getOutboundStream(p, pid)
241241

242242
// check if stream needs to be opened
243243
if s == nil {
@@ -286,10 +286,10 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error {
286286
return err
287287
}
288288

289-
// getStream returns the outbound message stream for the given peer or returns
289+
// getOutboundStream returns the outbound message stream for the given peer or returns
290290
// nil if no outbound message stream exists. For each peer, each host opens an
291291
// outbound message stream and writes to the same stream until closed or reset.
292-
func (h *host) getStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Stream) {
292+
func (h *host) getOutboundStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Stream) {
293293
conns := h.h.Network().ConnsToPeer(p)
294294

295295
// loop through connections (only one for now)
@@ -310,7 +310,7 @@ func (h *host) getStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Strea
310310

311311
// closeStream closes a stream open to the peer with the given sub-protocol, if it exists.
312312
func (h *host) closeStream(p peer.ID, pid protocol.ID) {
313-
stream := h.getStream(p, pid)
313+
stream := h.getOutboundStream(p, pid)
314314
if stream != nil {
315315
_ = stream.Close()
316316
}

dot/network/host_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ func TestExistingStream(t *testing.T) {
269269
}
270270
require.NoError(t, err)
271271

272-
stream := nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID)
272+
stream := nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID)
273273
require.Nil(t, stream, "node A should not have an outbound stream")
274274

275275
// node A opens the stream to send the first message
@@ -279,15 +279,15 @@ func TestExistingStream(t *testing.T) {
279279
time.Sleep(TestMessageTimeout)
280280
require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A")
281281

282-
stream = nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID)
282+
stream = nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID)
283283
require.NotNil(t, stream, "node A should have an outbound stream")
284284

285285
// node A uses the stream to send a second message
286286
err = nodeA.host.send(addrInfosB[0].ID, nodeB.host.protocolID, testBlockRequestMessage)
287287
require.NoError(t, err)
288288
require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A")
289289

290-
stream = nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID)
290+
stream = nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID)
291291
require.NotNil(t, stream, "node B should have an outbound stream")
292292

293293
// node B opens the stream to send the first message
@@ -297,15 +297,15 @@ func TestExistingStream(t *testing.T) {
297297
time.Sleep(TestMessageTimeout)
298298
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B")
299299

300-
stream = nodeB.host.getStream(nodeA.host.id(), nodeB.host.protocolID)
300+
stream = nodeB.host.getOutboundStream(nodeA.host.id(), nodeB.host.protocolID)
301301
require.NotNil(t, stream, "node B should have an outbound stream")
302302

303303
// node B uses the stream to send a second message
304304
err = nodeB.host.send(addrInfosA[0].ID, nodeB.host.protocolID, testBlockRequestMessage)
305305
require.NoError(t, err)
306306
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B")
307307

308-
stream = nodeB.host.getStream(nodeA.host.id(), nodeB.host.protocolID)
308+
stream = nodeB.host.getOutboundStream(nodeA.host.id(), nodeB.host.protocolID)
309309
require.NotNil(t, stream, "node B should have an outbound stream")
310310
}
311311

dot/network/notifications.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,12 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
106106
return errors.New("message is not NotificationsMessage")
107107
}
108108

109-
logger.Trace("received message on notifications sub-protocol", "protocol", info.protocolID,
110-
"message", msg,
111-
"peer", stream.Conn().RemotePeer(),
112-
)
113-
114109
if msg.IsHandshake() {
110+
logger.Trace("received handshake on notifications sub-protocol", "protocol", info.protocolID,
111+
"message", msg,
112+
"peer", stream.Conn().RemotePeer(),
113+
)
114+
115115
hs, ok := msg.(Handshake)
116116
if !ok {
117117
return errors.New("failed to convert message to Handshake")
@@ -186,6 +186,11 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
186186
return nil
187187
}
188188

189+
logger.Debug("received message on notifications sub-protocol", "protocol", info.protocolID,
190+
"message", msg,
191+
"peer", stream.Conn().RemotePeer(),
192+
)
193+
189194
err := messageHandler(peer, msg)
190195
if err != nil {
191196
return err

dot/network/service.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) {
304304
defer info.mapMu.RUnlock()
305305

306306
peer := conn.RemotePeer()
307-
if hsData, has := info.getHandshakeData(peer); !has || !hsData.received {
307+
if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { //nolint
308308
info.handshakeData.Store(peer, &handshakeData{
309309
validated: false,
310310
})
@@ -428,6 +428,9 @@ func (s *Service) RegisterNotificationsProtocol(sub protocol.ID,
428428

429429
info := s.notificationsProtocols[messageID]
430430

431+
decoder := createDecoder(info, handshakeDecoder, messageDecoder)
432+
handlerWithValidate := s.createNotificationsMessageHandler(info, handshakeValidator, messageHandler)
433+
431434
s.host.registerStreamHandlerWithOverwrite(sub, overwriteProtocol, func(stream libp2pnetwork.Stream) {
432435
logger.Trace("received stream", "sub-protocol", sub)
433436
conn := stream.Conn()
@@ -437,10 +440,6 @@ func (s *Service) RegisterNotificationsProtocol(sub protocol.ID,
437440
}
438441

439442
p := conn.RemotePeer()
440-
441-
decoder := createDecoder(info, handshakeDecoder, messageDecoder)
442-
handlerWithValidate := s.createNotificationsMessageHandler(info, handshakeValidator, messageHandler)
443-
444443
s.readStream(stream, p, decoder, handlerWithValidate)
445444
})
446445

@@ -537,7 +536,7 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder
537536
// decode message based on message type
538537
msg, err := decoder(msgBytes[:tot], peer)
539538
if err != nil {
540-
logger.Trace("Failed to decode message from peer", "peer", peer, "err", err)
539+
logger.Trace("failed to decode message from peer", "protocol", stream.Protocol(), "err", err)
541540
continue
542541
}
543542

dot/network/state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type BlockState interface {
3030
GenesisHash() common.Hash
3131
HasBlockBody(common.Hash) (bool, error)
3232
GetFinalizedHeader(round, setID uint64) (*types.Header, error)
33+
GetHashByNumber(num *big.Int) (common.Hash, error)
3334
}
3435

3536
// Syncer is implemented by the syncing service

dot/network/state_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,7 @@ func (mbs *MockBlockState) HasBlockBody(common.Hash) (bool, error) {
7575
func (mbs *MockBlockState) GetFinalizedHeader(_, _ uint64) (*types.Header, error) {
7676
return mbs.BestBlockHeader()
7777
}
78+
79+
func (mbs *MockBlockState) GetHashByNumber(_ *big.Int) (common.Hash, error) {
80+
return common.Hash{}, nil
81+
}

0 commit comments

Comments
 (0)