Skip to content

Commit 322ccf9

Browse files
authored
feat(lib/grandpa): send NeighbourMessage to peers (ChainSafe#1558)
1 parent e749a8d commit 322ccf9

File tree

16 files changed

+154
-39
lines changed

16 files changed

+154
-39
lines changed

dot/core/digest.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type DigestHandler struct {
4242
// block notification channels
4343
imported chan *types.Block
4444
importedID byte
45-
finalised chan *types.Header
45+
finalised chan *types.FinalisationInfo
4646
finalisedID byte
4747

4848
// GRANDPA changes
@@ -68,7 +68,7 @@ type resume struct {
6868
// NewDigestHandler returns a new DigestHandler
6969
func NewDigestHandler(blockState BlockState, epochState EpochState, grandpaState GrandpaState, babe BlockProducer, verifier Verifier) (*DigestHandler, error) {
7070
imported := make(chan *types.Block, 16)
71-
finalised := make(chan *types.Header, 16)
71+
finalised := make(chan *types.FinalisationInfo, 16)
7272
iid, err := blockState.RegisterImportedChannel(imported)
7373
if err != nil {
7474
return nil, err
@@ -195,12 +195,12 @@ func (h *DigestHandler) handleBlockImport(ctx context.Context) {
195195
func (h *DigestHandler) handleBlockFinalisation(ctx context.Context) {
196196
for {
197197
select {
198-
case header := <-h.finalised:
199-
if header == nil {
198+
case info := <-h.finalised:
199+
if info == nil || info.Header == nil {
200200
continue
201201
}
202202

203-
err := h.handleGrandpaChangesOnFinalization(header.Number)
203+
err := h.handleGrandpaChangesOnFinalization(info.Header.Number)
204204
if err != nil {
205205
logger.Error("failed to handle grandpa changes on block finalisation", "error", err)
206206
}

dot/core/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type BlockState interface {
4444
SetFinalizedHash(common.Hash, uint64, uint64) error
4545
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
4646
UnregisterImportedChannel(id byte)
47-
RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error)
47+
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
4848
UnregisterFinalizedChannel(id byte)
4949
HighestCommonAncestor(a, b common.Hash) (common.Hash, error)
5050
SubChain(start, end common.Hash) ([]common.Hash, error)

dot/rpc/modules/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type BlockAPI interface {
3333
GetJustification(hash common.Hash) ([]byte, error)
3434
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
3535
UnregisterImportedChannel(id byte)
36-
RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error)
36+
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
3737
UnregisterFinalizedChannel(id byte)
3838
SubChain(start, end common.Hash) ([]common.Hash, error)
3939
}

dot/rpc/subscription/listeners.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,19 @@ func (l *BlockListener) Listen() {
114114

115115
// BlockFinalizedListener to handle listening for finalised blocks
116116
type BlockFinalizedListener struct {
117-
channel chan *types.Header
117+
channel chan *types.FinalisationInfo
118118
wsconn WSConnAPI
119119
chanID byte
120120
subID uint
121121
}
122122

123123
// Listen implementation of Listen interface to listen for importedChan changes
124124
func (l *BlockFinalizedListener) Listen() {
125-
for header := range l.channel {
126-
if header == nil {
125+
for info := range l.channel {
126+
if info == nil || info.Header == nil {
127127
continue
128128
}
129-
head, err := modules.HeaderToJSON(*header)
129+
head, err := modules.HeaderToJSON(*info.Header)
130130
if err != nil {
131131
logger.Error("failed to convert header to JSON", "error", err)
132132
}
@@ -147,7 +147,7 @@ type ExtrinsicSubmitListener struct {
147147
importedChan chan *types.Block
148148
importedChanID byte
149149
importedHash common.Hash
150-
finalisedChan chan *types.Header
150+
finalisedChan chan *types.FinalisationInfo
151151
finalisedChanID byte
152152
}
153153

@@ -180,10 +180,10 @@ func (l *ExtrinsicSubmitListener) Listen() {
180180

181181
// listen for finalised headers
182182
go func() {
183-
for header := range l.finalisedChan {
184-
if reflect.DeepEqual(l.importedHash, header.Hash()) {
183+
for info := range l.finalisedChan {
184+
if reflect.DeepEqual(l.importedHash, info.Header.Hash()) {
185185
resM := make(map[string]interface{})
186-
resM["finalised"] = header.Hash().String()
186+
resM["finalised"] = info.Header.Hash().String()
187187
l.wsconn.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, l.subID, resM))
188188
}
189189
}

dot/rpc/subscription/listeners_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestBlockListener_Listen(t *testing.T) {
9595
}
9696

9797
func TestBlockFinalizedListener_Listen(t *testing.T) {
98-
notifyChan := make(chan *types.Header)
98+
notifyChan := make(chan *types.FinalisationInfo)
9999
mockConnection := &MockWSConnAPI{}
100100
bfl := BlockFinalizedListener{
101101
channel: notifyChan,
@@ -113,14 +113,16 @@ func TestBlockFinalizedListener_Listen(t *testing.T) {
113113

114114
go bfl.Listen()
115115

116-
notifyChan <- header
116+
notifyChan <- &types.FinalisationInfo{
117+
Header: header,
118+
}
117119
time.Sleep(time.Millisecond * 10)
118120
require.Equal(t, expectedResponse, mockConnection.lastMessage)
119121
}
120122

121123
func TestExtrinsicSubmitListener_Listen(t *testing.T) {
122124
notifyImportedChan := make(chan *types.Block)
123-
notifyFinalizedChan := make(chan *types.Header)
125+
notifyFinalizedChan := make(chan *types.FinalisationInfo)
124126

125127
mockConnection := &MockWSConnAPI{}
126128
esl := ExtrinsicSubmitListener{
@@ -149,7 +151,9 @@ func TestExtrinsicSubmitListener_Listen(t *testing.T) {
149151
time.Sleep(time.Millisecond * 10)
150152
require.Equal(t, expectedImportedRespones, mockConnection.lastMessage)
151153

152-
notifyFinalizedChan <- header
154+
notifyFinalizedChan <- &types.FinalisationInfo{
155+
Header: header,
156+
}
153157
time.Sleep(time.Millisecond * 10)
154158
resFinalised := map[string]interface{}{"finalised": block.Header.Hash().String()}
155159
expectedFinalizedRespones := newSubscriptionResponse(AuthorExtrinsicUpdates, esl.subID, resFinalised)

dot/rpc/subscription/websocket.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func (c *WSConn) initBlockListener(reqID float64) (uint, error) {
236236

237237
func (c *WSConn) initBlockFinalizedListener(reqID float64) (uint, error) {
238238
bfl := &BlockFinalizedListener{
239-
channel: make(chan *types.Header),
239+
channel: make(chan *types.FinalisationInfo),
240240
wsconn: c,
241241
}
242242

@@ -271,7 +271,7 @@ func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (uint, er
271271
importedChan: make(chan *types.Block),
272272
wsconn: c,
273273
extrinsic: types.Extrinsic(extBytes),
274-
finalisedChan: make(chan *types.Header),
274+
finalisedChan: make(chan *types.FinalisationInfo),
275275
}
276276

277277
if c.BlockAPI == nil {

dot/rpc/subscription/websocket_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ func (m *MockBlockAPI) RegisterImportedChannel(ch chan<- *types.Block) (byte, er
223223
}
224224
func (m *MockBlockAPI) UnregisterImportedChannel(id byte) {
225225
}
226-
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error) {
226+
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) {
227227
return 0, nil
228228
}
229229
func (m *MockBlockAPI) UnregisterFinalizedChannel(id byte) {}

dot/rpc/websocket_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (m *MockBlockAPI) RegisterImportedChannel(ch chan<- *types.Block) (byte, er
120120
}
121121
func (m *MockBlockAPI) UnregisterImportedChannel(id byte) {
122122
}
123-
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error) {
123+
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) {
124124
return 0, nil
125125
}
126126
func (m *MockBlockAPI) UnregisterFinalizedChannel(id byte) {}

dot/state/block.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type BlockState struct {
4747

4848
// block notifiers
4949
imported map[byte]chan<- *types.Block
50-
finalised map[byte]chan<- *types.Header
50+
finalised map[byte]chan<- *types.FinalisationInfo
5151
importedLock sync.RWMutex
5252
finalisedLock sync.RWMutex
5353

@@ -65,7 +65,7 @@ func NewBlockState(db chaindb.Database, bt *blocktree.BlockTree) (*BlockState, e
6565
baseState: NewBaseState(db),
6666
db: chaindb.NewTable(db, blockPrefix),
6767
imported: make(map[byte]chan<- *types.Block),
68-
finalised: make(map[byte]chan<- *types.Header),
68+
finalised: make(map[byte]chan<- *types.FinalisationInfo),
6969
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
7070
}
7171

@@ -85,7 +85,7 @@ func NewBlockStateFromGenesis(db chaindb.Database, header *types.Header) (*Block
8585
baseState: NewBaseState(db),
8686
db: chaindb.NewTable(db, blockPrefix),
8787
imported: make(map[byte]chan<- *types.Block),
88-
finalised: make(map[byte]chan<- *types.Header),
88+
finalised: make(map[byte]chan<- *types.FinalisationInfo),
8989
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
9090
}
9191

@@ -424,7 +424,7 @@ func (bs *BlockState) SetFinalizedHash(hash common.Hash, round, setID uint64) er
424424
bs.Lock()
425425
defer bs.Unlock()
426426

427-
go bs.notifyFinalized(hash)
427+
go bs.notifyFinalized(hash, round, setID)
428428
if round > 0 {
429429
err := bs.SetRound(round)
430430
if err != nil {

dot/state/block_notify.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (bs *BlockState) RegisterImportedChannel(ch chan<- *types.Block) (byte, err
5151

5252
// RegisterFinalizedChannel registers a channel for block notification upon block finalisation.
5353
// It returns the channel ID (used for unregistering the channel)
54-
func (bs *BlockState) RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error) {
54+
func (bs *BlockState) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) {
5555
bs.finalisedLock.RLock()
5656

5757
if len(bs.finalised) == 256 {
@@ -111,7 +111,7 @@ func (bs *BlockState) notifyImported(block *types.Block) {
111111
}
112112
}
113113

114-
func (bs *BlockState) notifyFinalized(hash common.Hash) {
114+
func (bs *BlockState) notifyFinalized(hash common.Hash, round, setID uint64) {
115115
bs.finalisedLock.RLock()
116116
defer bs.finalisedLock.RUnlock()
117117

@@ -126,11 +126,16 @@ func (bs *BlockState) notifyFinalized(hash common.Hash) {
126126
}
127127

128128
logger.Debug("notifying finalised block chans...", "chans", bs.finalised)
129+
info := &types.FinalisationInfo{
130+
Header: header,
131+
Round: round,
132+
SetID: setID,
133+
}
129134

130135
for _, ch := range bs.finalised {
131-
go func(ch chan<- *types.Header) {
136+
go func(ch chan<- *types.FinalisationInfo) {
132137
select {
133-
case ch <- header:
138+
case ch <- info:
134139
default:
135140
}
136141
}(ch)

0 commit comments

Comments
 (0)