Skip to content

Commit 02b53b6

Browse files
authored
chore(dot/network, lib/grandpa): update network.ConsensusMessage, add grandpa.NeighbourMessage and handle accordingly (ChainSafe#1519)
1 parent 0abea00 commit 02b53b6

File tree

14 files changed

+419
-102
lines changed

14 files changed

+419
-102
lines changed

dot/network/message.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -359,9 +359,6 @@ var _ NotificationsMessage = &ConsensusMessage{}
359359

360360
// ConsensusMessage is mostly opaque to us
361361
type ConsensusMessage struct {
362-
// Identifies consensus engine.
363-
ConsensusEngineID types.ConsensusEngineID
364-
// Message payload.
365362
Data []byte
366363
}
367364

@@ -377,23 +374,17 @@ func (cm *ConsensusMessage) Type() byte {
377374

378375
// String is the string
379376
func (cm *ConsensusMessage) String() string {
380-
return fmt.Sprintf("ConsensusMessage ConsensusEngineID=%d, DATA=%x", cm.ConsensusEngineID, cm.Data)
377+
return fmt.Sprintf("ConsensusMessage Data=%x", cm.Data)
381378
}
382379

383380
// Encode encodes a block response message using SCALE
384381
func (cm *ConsensusMessage) Encode() ([]byte, error) {
385-
encMsg := cm.ConsensusEngineID.ToBytes()
386-
return append(encMsg, cm.Data...), nil
382+
return cm.Data, nil
387383
}
388384

389385
// Decode the message into a ConsensusMessage
390386
func (cm *ConsensusMessage) Decode(in []byte) error {
391-
if len(in) < 5 {
392-
return errors.New("cannot decode ConsensusMessage: encoding is too short")
393-
}
394-
395-
cm.ConsensusEngineID = types.NewConsensusEngineID(in[:4])
396-
cm.Data = in[4:]
387+
cm.Data = in
397388
return nil
398389
}
399390

dot/network/message_test.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -340,12 +340,8 @@ func TestDecodeTransactionMessageTwoExtrinsics(t *testing.T) {
340340
}
341341

342342
func TestDecodeConsensusMessage(t *testing.T) {
343-
ConsensusEngineID := types.BabeEngineID
344-
345-
testID := hex.EncodeToString(types.BabeEngineID.ToBytes())
346343
testData := "03100405"
347-
348-
msg := "0x" + testID + testData // 0x4241424503100405
344+
msg := "0x" + testData
349345

350346
encMsg, err := common.HexToBytes(msg)
351347
require.Nil(t, err)
@@ -358,8 +354,7 @@ func TestDecodeConsensusMessage(t *testing.T) {
358354
require.Nil(t, err)
359355

360356
expected := &ConsensusMessage{
361-
ConsensusEngineID: ConsensusEngineID,
362-
Data: out,
357+
Data: out,
363358
}
364359

365360
require.Equal(t, expected, m)

dot/network/sync.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
"github.com/ChainSafe/gossamer/dot/types"
29+
"github.com/ChainSafe/gossamer/lib/blocktree"
2930
"github.com/ChainSafe/gossamer/lib/common"
3031
"github.com/ChainSafe/gossamer/lib/common/optional"
3132
"github.com/ChainSafe/gossamer/lib/common/variadic"
@@ -163,7 +164,6 @@ func newSyncQueue(s *Service) *syncQueue {
163164
func (q *syncQueue) start() {
164165
go q.handleResponseQueue()
165166
go q.syncAtHead()
166-
go q.finalizeAtHead()
167167

168168
go q.processBlockRequests()
169169
go q.processBlockResponses()
@@ -693,7 +693,7 @@ func (q *syncQueue) handleBlockJustification(data []*types.BlockData) {
693693
func (q *syncQueue) handleBlockData(data []*types.BlockData) {
694694
finalized, err := q.s.blockState.GetFinalizedHeader(0, 0)
695695
if err != nil {
696-
panic(err) // TODO: don't panic but try again. seems blockState needs better concurrency handling
696+
panic(err) // this should never happen
697697
}
698698

699699
end := data[len(data)-1].Number().Int64()
@@ -738,13 +738,23 @@ func (q *syncQueue) handleBlockData(data []*types.BlockData) {
738738
func (q *syncQueue) handleBlockDataFailure(idx int, err error, data []*types.BlockData) {
739739
logger.Warn("failed to handle block data", "failed on block", q.currStart+int64(idx), "error", err)
740740

741-
if errors.Is(err, chaindb.ErrKeyNotFound) {
741+
if errors.Is(err, chaindb.ErrKeyNotFound) || errors.Is(err, blocktree.ErrParentNotFound) {
742+
finalized, err := q.s.blockState.GetFinalizedHeader(0, 0)
743+
if err != nil {
744+
panic(err)
745+
}
746+
742747
header, err := types.NewHeaderFromOptional(data[idx].Header)
743748
if err != nil {
744749
logger.Debug("failed to get header from BlockData", "idx", idx, "error", err)
745750
return
746751
}
747752

753+
// don't request a chain that's been dropped
754+
if header.Number.Int64() <= finalized.Number.Int64() {
755+
return
756+
}
757+
748758
parentHash := header.ParentHash
749759
req := createBlockRequestWithHash(parentHash, 0)
750760

dot/state/block.go

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ const pruneKeyBufferSize = 1000
3838

3939
// BlockState defines fields for manipulating the state of blocks, such as BlockTree, BlockDB and Header
4040
type BlockState struct {
41-
bt *blocktree.BlockTree
42-
baseDB chaindb.Database
43-
db chaindb.Database
44-
lock sync.RWMutex
41+
bt *blocktree.BlockTree
42+
baseDB chaindb.Database
43+
db chaindb.Database
44+
sync.RWMutex
4545
genesisHash common.Hash
4646

4747
// block notifiers
@@ -268,7 +268,7 @@ func (bs *BlockState) GetHeader(hash common.Hash) (*types.Header, error) {
268268
func (bs *BlockState) GetHashByNumber(num *big.Int) (common.Hash, error) {
269269
bh, err := bs.db.Get(headerHashKey(num.Uint64()))
270270
if err != nil {
271-
return common.Hash{}, fmt.Errorf("cannot get block %d: %s", num, err)
271+
return common.Hash{}, fmt.Errorf("cannot get block %d: %w", num, err)
272272
}
273273

274274
return common.NewHash(bh), nil
@@ -278,7 +278,7 @@ func (bs *BlockState) GetHashByNumber(num *big.Int) (common.Hash, error) {
278278
func (bs *BlockState) GetHeaderByNumber(num *big.Int) (*types.Header, error) {
279279
bh, err := bs.db.Get(headerHashKey(num.Uint64()))
280280
if err != nil {
281-
return nil, fmt.Errorf("cannot get block %d: %s", num, err)
281+
return nil, fmt.Errorf("cannot get block %d: %w", num, err)
282282
}
283283

284284
hash := common.NewHash(bh)
@@ -304,7 +304,7 @@ func (bs *BlockState) GetBlockByNumber(num *big.Int) (*types.Block, error) {
304304
// First retrieve the block hash in a byte array based on the block number from the database
305305
byteHash, err := bs.db.Get(headerHashKey(num.Uint64()))
306306
if err != nil {
307-
return nil, fmt.Errorf("cannot get block %d: %s", num, err)
307+
return nil, fmt.Errorf("cannot get block %d: %w", num, err)
308308
}
309309

310310
// Then find the block based on the hash
@@ -322,17 +322,14 @@ func (bs *BlockState) GetBlockHash(blockNumber *big.Int) (*common.Hash, error) {
322322
// First retrieve the block hash in a byte array based on the block number from the database
323323
byteHash, err := bs.db.Get(headerHashKey(blockNumber.Uint64()))
324324
if err != nil {
325-
return nil, fmt.Errorf("cannot get block %d: %s", blockNumber, err)
325+
return nil, fmt.Errorf("cannot get block %d: %w", blockNumber, err)
326326
}
327327
hash := common.NewHash(byteHash)
328328
return &hash, nil
329329
}
330330

331331
// SetHeader will set the header into DB
332332
func (bs *BlockState) SetHeader(header *types.Header) error {
333-
bs.lock.Lock()
334-
defer bs.lock.Unlock()
335-
336333
hash := header.Hash()
337334

338335
// Write the encoded header
@@ -366,11 +363,7 @@ func (bs *BlockState) GetBlockBody(hash common.Hash) (*types.Body, error) {
366363

367364
// SetBlockBody will add a block body to the db
368365
func (bs *BlockState) SetBlockBody(hash common.Hash, body *types.Body) error {
369-
bs.lock.Lock()
370-
defer bs.lock.Unlock()
371-
372-
err := bs.db.Put(blockBodyKey(hash), body.AsOptional().Value())
373-
return err
366+
return bs.db.Put(blockBodyKey(hash), body.AsOptional().Value())
374367
}
375368

376369
// HasFinalizedBlock returns true if there is a finalized block for a given round and setID, false otherwise
@@ -427,6 +420,9 @@ func (bs *BlockState) GetFinalizedHash(round, setID uint64) (common.Hash, error)
427420

428421
// SetFinalizedHash sets the latest finalized block header
429422
func (bs *BlockState) SetFinalizedHash(hash common.Hash, round, setID uint64) error {
423+
bs.Lock()
424+
defer bs.Unlock()
425+
430426
go bs.notifyFinalized(hash)
431427
if round > 0 {
432428
err := bs.SetRound(round)
@@ -496,6 +492,8 @@ func (bs *BlockState) CompareAndSetBlockData(bd *types.BlockData) error {
496492

497493
// AddBlock adds a block to the blocktree and the DB with arrival time as current unix time
498494
func (bs *BlockState) AddBlock(block *types.Block) error {
495+
bs.Lock()
496+
defer bs.Unlock()
499497
return bs.AddBlockWithArrivalTime(block, time.Now())
500498
}
501499

@@ -506,6 +504,8 @@ func (bs *BlockState) AddBlockWithArrivalTime(block *types.Block, arrivalTime ti
506504
return err
507505
}
508506

507+
prevHead := bs.bt.DeepestBlockHash()
508+
509509
// add block to blocktree
510510
err = bs.bt.AddBlock(block.Header, uint64(arrivalTime.UnixNano()))
511511
if err != nil {
@@ -541,12 +541,58 @@ func (bs *BlockState) AddBlockWithArrivalTime(block *types.Block, arrivalTime ti
541541
return err
542542
}
543543

544+
// check if there was a re-org, if so, re-set the canonical number->hash mapping
545+
err = bs.handleAddedBlock(prevHead, bs.bt.DeepestBlockHash())
546+
if err != nil {
547+
return err
548+
}
549+
544550
go bs.notifyImported(block)
545551
return bs.baseDB.Flush()
546552
}
547553

554+
// handleAddedBlock re-sets the canonical number->hash mapping if there was a chain re-org.
555+
// prev is the previous best block hash before the new block was added to the blocktree.
556+
// curr is the current best blogetck hash.
557+
func (bs *BlockState) handleAddedBlock(prev, curr common.Hash) error {
558+
ancestor, err := bs.HighestCommonAncestor(prev, curr)
559+
if err != nil {
560+
return err
561+
}
562+
563+
// if the highest common ancestor of the previous chain head and current chain head is the previous chain head,
564+
// then the current chain head is the descendant of the previous and thus are on the same chain
565+
if ancestor == prev {
566+
return nil
567+
}
568+
569+
subchain, err := bs.SubChain(ancestor, curr)
570+
if err != nil {
571+
return err
572+
}
573+
574+
batch := bs.db.NewBatch()
575+
for _, hash := range subchain {
576+
// TODO: set number from ancestor.Number + i ?
577+
header, err := bs.GetHeader(hash)
578+
if err != nil {
579+
return fmt.Errorf("failed to get header in subchain: %w", err)
580+
}
581+
582+
err = batch.Put(headerHashKey(header.Number.Uint64()), hash.ToBytes())
583+
if err != nil {
584+
return err
585+
}
586+
}
587+
588+
return batch.Flush()
589+
}
590+
548591
// AddBlockToBlockTree adds the given block to the blocktree. It does not write it to the database.
549592
func (bs *BlockState) AddBlockToBlockTree(header *types.Header) error {
593+
bs.Lock()
594+
defer bs.Unlock()
595+
550596
arrivalTime, err := bs.GetArrivalTime(header.Hash())
551597
if err != nil {
552598
arrivalTime = time.Now()
@@ -567,7 +613,7 @@ func (bs *BlockState) isBlockOnCurrentChain(header *types.Header) (bool, error)
567613
}
568614

569615
// if the new block is ahead of our best block, then it is on our current chain.
570-
if header.Number.Cmp(bestBlock.Number) == 1 {
616+
if header.Number.Cmp(bestBlock.Number) > 0 {
571617
return true, nil
572618
}
573619

dot/state/block_data.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ func (bs *BlockState) HasReceipt(hash common.Hash) (bool, error) {
3232

3333
// SetReceipt sets a Receipt in the database
3434
func (bs *BlockState) SetReceipt(hash common.Hash, data []byte) error {
35-
bs.lock.Lock()
36-
defer bs.lock.Unlock()
35+
bs.Lock()
36+
defer bs.Unlock()
3737

3838
err := bs.db.Put(prefixKey(hash, receiptPrefix), data)
3939
if err != nil {
@@ -60,8 +60,8 @@ func (bs *BlockState) HasMessageQueue(hash common.Hash) (bool, error) {
6060

6161
// SetMessageQueue sets a MessageQueue in the database
6262
func (bs *BlockState) SetMessageQueue(hash common.Hash, data []byte) error {
63-
bs.lock.Lock()
64-
defer bs.lock.Unlock()
63+
bs.Lock()
64+
defer bs.Unlock()
6565

6666
err := bs.db.Put(prefixKey(hash, messageQueuePrefix), data)
6767
if err != nil {
@@ -88,8 +88,8 @@ func (bs *BlockState) HasJustification(hash common.Hash) (bool, error) {
8888

8989
// SetJustification sets a Justification in the database
9090
func (bs *BlockState) SetJustification(hash common.Hash, data []byte) error {
91-
bs.lock.Lock()
92-
defer bs.lock.Unlock()
91+
bs.Lock()
92+
defer bs.Unlock()
9393

9494
err := bs.db.Put(prefixKey(hash, justificationPrefix), data)
9595
if err != nil {

0 commit comments

Comments
 (0)