Skip to content

Commit 88685cc

Browse files
authored
maintenance(dot/sync): add clearing of pending blocks set (#2055)
1 parent dd08424 commit 88685cc

File tree

6 files changed

+136
-35
lines changed

6 files changed

+136
-35
lines changed

dot/sync/chain_sync.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ type chainSync struct {
129129
// disjoint set of blocks which are known but not ready to be processed
130130
// ie. we only know the hash, number, or the parent block is unknown, or the body is unknown
131131
// note: the block may have empty fields, as some data about it may be unknown
132-
pendingBlocks DisjointBlockSet
132+
pendingBlocks DisjointBlockSet
133+
pendingBlockDoneCh chan<- struct{}
133134

134135
// bootstrap or tip (near-head)
135136
state chainSyncState
@@ -192,11 +193,17 @@ func (cs *chainSync) start() {
192193
time.Sleep(time.Millisecond * 100)
193194
}
194195

196+
pendingBlockDoneCh := make(chan struct{})
197+
cs.pendingBlockDoneCh = pendingBlockDoneCh
198+
go cs.pendingBlocks.run(pendingBlockDoneCh)
195199
go cs.sync()
196200
go cs.logSyncSpeed()
197201
}
198202

199203
func (cs *chainSync) stop() {
204+
if cs.pendingBlockDoneCh != nil {
205+
close(cs.pendingBlockDoneCh)
206+
}
200207
cs.cancel()
201208
}
202209

@@ -439,6 +446,13 @@ func (cs *chainSync) sync() {
439446
logger.Debugf(
440447
"discarding worker id %d: maximum retry count reached",
441448
worker.id)
449+
450+
// if this worker was triggered due to a block in the pending blocks set,
451+
// we want to remove it from the set, as we asked all our peers for it
452+
// and none replied with the info we need.
453+
if worker.pendingBlock != nil {
454+
cs.pendingBlocks.removeBlock(worker.pendingBlock.hash)
455+
}
442456
continue
443457
}
444458

dot/sync/disjoint_block_set.go

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,18 @@ import (
77
"errors"
88
"math/big"
99
"sync"
10+
"time"
1011

1112
"github.com/ChainSafe/gossamer/dot/types"
1213
"github.com/ChainSafe/gossamer/lib/common"
1314
)
1415

16+
const (
17+
// ttl is the time that a block can stay in this set before being cleared.
18+
ttl = 10 * time.Minute
19+
clearBlocksInterval = time.Minute
20+
)
21+
1522
var (
1623
errUnknownBlock = errors.New("cannot add justification for unknown block")
1724
errSetAtLimit = errors.New("cannot add block; set is at capacity")
@@ -20,6 +27,7 @@ var (
2027
// DisjointBlockSet represents a set of incomplete blocks, or blocks
2128
// with an unknown parent. it is implemented by *disjointBlockSet
2229
type DisjointBlockSet interface {
30+
run(done <-chan struct{})
2331
addHashAndNumber(common.Hash, *big.Int) error
2432
addHeader(*types.Header) error
2533
addBlock(*types.Block) error
@@ -44,6 +52,21 @@ type pendingBlock struct {
4452
header *types.Header
4553
body *types.Body
4654
justification []byte
55+
56+
// the time when this block should be cleared from the set.
57+
// if the block is re-added to the set, this time get updated.
58+
clearAt time.Time
59+
}
60+
61+
func newPendingBlock(hash common.Hash, number *big.Int,
62+
header *types.Header, body *types.Body, clearAt time.Time) *pendingBlock {
63+
return &pendingBlock{
64+
hash: hash,
65+
number: number,
66+
header: header,
67+
body: body,
68+
clearAt: clearAt,
69+
}
4770
}
4871

4972
func (b *pendingBlock) toBlockData() *types.BlockData {
@@ -80,13 +103,41 @@ type disjointBlockSet struct {
80103

81104
// map of parent hash -> child hashes
82105
parentToChildren map[common.Hash]map[common.Hash]struct{}
106+
107+
timeNow func() time.Time
83108
}
84109

85110
func newDisjointBlockSet(limit int) *disjointBlockSet {
86111
return &disjointBlockSet{
87112
blocks: make(map[common.Hash]*pendingBlock),
88113
parentToChildren: make(map[common.Hash]map[common.Hash]struct{}),
89114
limit: limit,
115+
timeNow: time.Now,
116+
}
117+
}
118+
119+
func (s *disjointBlockSet) run(done <-chan struct{}) {
120+
ticker := time.NewTicker(clearBlocksInterval)
121+
defer ticker.Stop()
122+
123+
for {
124+
select {
125+
case <-ticker.C:
126+
s.clearBlocks()
127+
case <-done:
128+
return
129+
}
130+
}
131+
}
132+
133+
func (s *disjointBlockSet) clearBlocks() {
134+
s.Lock()
135+
defer s.Unlock()
136+
137+
for _, block := range s.blocks {
138+
if s.timeNow().Sub(block.clearAt) > 0 {
139+
s.removeBlockInner(block.hash)
140+
}
90141
}
91142
}
92143

@@ -104,19 +155,16 @@ func (s *disjointBlockSet) addHashAndNumber(hash common.Hash, number *big.Int) e
104155
s.Lock()
105156
defer s.Unlock()
106157

107-
if _, has := s.blocks[hash]; has {
158+
if b, has := s.blocks[hash]; has {
159+
b.clearAt = s.timeNow().Add(ttl)
108160
return nil
109161
}
110162

111163
if len(s.blocks) == s.limit {
112164
return errSetAtLimit
113165
}
114166

115-
s.blocks[hash] = &pendingBlock{
116-
hash: hash,
117-
number: number,
118-
}
119-
167+
s.blocks[hash] = newPendingBlock(hash, number, nil, nil, s.timeNow().Add(ttl))
120168
return nil
121169
}
122170

@@ -125,21 +173,17 @@ func (s *disjointBlockSet) addHeader(header *types.Header) error {
125173
defer s.Unlock()
126174

127175
hash := header.Hash()
128-
b, has := s.blocks[hash]
129-
if has {
176+
if b, has := s.blocks[hash]; has {
130177
b.header = header
178+
b.clearAt = s.timeNow().Add(ttl)
131179
return nil
132180
}
133181

134182
if len(s.blocks) == s.limit {
135183
return errSetAtLimit
136184
}
137185

138-
s.blocks[hash] = &pendingBlock{
139-
hash: hash,
140-
number: header.Number,
141-
header: header,
142-
}
186+
s.blocks[hash] = newPendingBlock(hash, header.Number, header, nil, s.timeNow().Add(ttl))
143187
s.addToParentMap(header.ParentHash, hash)
144188
return nil
145189
}
@@ -149,23 +193,18 @@ func (s *disjointBlockSet) addBlock(block *types.Block) error {
149193
defer s.Unlock()
150194

151195
hash := block.Header.Hash()
152-
b, has := s.blocks[hash]
153-
if has {
196+
if b, has := s.blocks[hash]; has {
154197
b.header = &block.Header
155198
b.body = &block.Body
199+
b.clearAt = s.timeNow().Add(ttl)
156200
return nil
157201
}
158202

159203
if len(s.blocks) == s.limit {
160204
return errSetAtLimit
161205
}
162206

163-
s.blocks[hash] = &pendingBlock{
164-
hash: hash,
165-
number: block.Header.Number,
166-
header: &block.Header,
167-
body: &block.Body,
168-
}
207+
s.blocks[hash] = newPendingBlock(hash, block.Header.Number, &block.Header, &block.Body, s.timeNow().Add(ttl))
169208
s.addToParentMap(block.Header.ParentHash, hash)
170209
return nil
171210
}
@@ -177,6 +216,7 @@ func (s *disjointBlockSet) addJustification(hash common.Hash, just []byte) error
177216
b, has := s.blocks[hash]
178217
if has {
179218
b.justification = just
219+
b.clearAt = time.Now().Add(ttl)
180220
return nil
181221
}
182222

@@ -187,6 +227,12 @@ func (s *disjointBlockSet) addJustification(hash common.Hash, just []byte) error
187227
func (s *disjointBlockSet) removeBlock(hash common.Hash) {
188228
s.Lock()
189229
defer s.Unlock()
230+
s.removeBlockInner(hash)
231+
}
232+
233+
// this function does not lock!!
234+
// it should only be called by other functions in this file that lock the set beforehand.
235+
func (s *disjointBlockSet) removeBlockInner(hash common.Hash) {
190236
block, has := s.blocks[hash]
191237
if !has {
192238
return

dot/sync/disjoint_block_set_test.go

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package sync
66
import (
77
"math/big"
88
"testing"
9+
"time"
910

1011
"github.com/ChainSafe/gossamer/dot/types"
1112
"github.com/ChainSafe/gossamer/lib/common"
@@ -15,6 +16,9 @@ import (
1516

1617
func TestDisjointBlockSet(t *testing.T) {
1718
s := newDisjointBlockSet(pendingBlocksLimit)
19+
s.timeNow = func() time.Time {
20+
return time.Time{}
21+
}
1822

1923
hash := common.Hash{0xa, 0xb}
2024
number := big.NewInt(100)
@@ -23,8 +27,9 @@ func TestDisjointBlockSet(t *testing.T) {
2327
require.Equal(t, 1, s.size())
2428

2529
expected := &pendingBlock{
26-
hash: hash,
27-
number: number,
30+
hash: hash,
31+
number: number,
32+
clearAt: time.Time{}.Add(ttl),
2833
}
2934
blocks := s.getBlocks()
3035
require.Equal(t, 1, len(blocks))
@@ -36,10 +41,12 @@ func TestDisjointBlockSet(t *testing.T) {
3641
s.addHeader(header)
3742
require.True(t, s.hasBlock(header.Hash()))
3843
require.Equal(t, 2, s.size())
44+
3945
expected = &pendingBlock{
40-
hash: header.Hash(),
41-
number: header.Number,
42-
header: header,
46+
hash: header.Hash(),
47+
number: header.Number,
48+
header: header,
49+
clearAt: time.Time{}.Add(ttl),
4350
}
4451
require.Equal(t, expected, s.getBlock(header.Hash()))
4552

@@ -51,9 +58,10 @@ func TestDisjointBlockSet(t *testing.T) {
5158
s.addHeader(header2)
5259
require.Equal(t, 3, s.size())
5360
expected = &pendingBlock{
54-
hash: header2.Hash(),
55-
number: header2.Number,
56-
header: header2,
61+
hash: header2.Hash(),
62+
number: header2.Number,
63+
header: header2,
64+
clearAt: time.Time{}.Add(ttl),
5765
}
5866
require.Equal(t, expected, s.getBlock(header2.Hash()))
5967

@@ -64,10 +72,11 @@ func TestDisjointBlockSet(t *testing.T) {
6472
s.addBlock(block)
6573
require.Equal(t, 3, s.size())
6674
expected = &pendingBlock{
67-
hash: header2.Hash(),
68-
number: header2.Number,
69-
header: header2,
70-
body: &block.Body,
75+
hash: header2.Hash(),
76+
number: header2.Number,
77+
header: header2,
78+
body: &block.Body,
79+
clearAt: time.Time{}.Add(ttl),
7180
}
7281
require.Equal(t, expected, s.getBlock(header2.Hash()))
7382

@@ -195,3 +204,24 @@ func TestDisjointBlockSet_getReadyDescendants_blockNotComplete(t *testing.T) {
195204
require.Equal(t, block1.ToBlockData(), ready[0])
196205
require.Equal(t, block2.ToBlockData(), ready[1])
197206
}
207+
208+
func TestDisjointBlockSet_ClearBlocks(t *testing.T) {
209+
s := newDisjointBlockSet(pendingBlocksLimit)
210+
211+
testHashA := common.Hash{0}
212+
testHashB := common.Hash{1}
213+
214+
s.blocks[testHashA] = &pendingBlock{
215+
hash: testHashA,
216+
clearAt: time.Unix(1000, 0),
217+
}
218+
s.blocks[testHashB] = &pendingBlock{
219+
hash: testHashB,
220+
clearAt: time.Now().Add(ttl * 2),
221+
}
222+
223+
s.clearBlocks()
224+
require.Equal(t, 1, len(s.blocks))
225+
_, has := s.blocks[testHashB]
226+
require.True(t, has)
227+
}

dot/sync/tip_syncer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ func (s *tipSyncer) handleTick() ([]*worker, error) {
181181
targetNumber: fin.Number,
182182
direction: network.Descending,
183183
requestData: bootstrapRequestData,
184+
pendingBlock: block,
184185
})
185186
continue
186187
}
@@ -193,6 +194,7 @@ func (s *tipSyncer) handleTick() ([]*worker, error) {
193194
targetHash: block.hash,
194195
targetNumber: block.number,
195196
requestData: network.RequestedDataBody + network.RequestedDataJustification,
197+
pendingBlock: block,
196198
})
197199
continue
198200
}
@@ -217,6 +219,7 @@ func (s *tipSyncer) handleTick() ([]*worker, error) {
217219
targetNumber: fin.Number,
218220
direction: network.Descending,
219221
requestData: bootstrapRequestData,
222+
pendingBlock: block,
220223
})
221224
}
222225

dot/sync/tip_syncer_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func TestTipSyncer_handleTick_case1(t *testing.T) {
158158

159159
fin, _ := s.blockState.GetHighestFinalisedHeader()
160160

161-
// add pending blocks w/ only hash and number, lower than finalised should be removed
161+
// add pending blocks w/ only hash and number, equal or lower than finalised should be removed
162162
s.pendingBlocks.addHashAndNumber(common.Hash{0xa}, fin.Number)
163163
s.pendingBlocks.addHashAndNumber(common.Hash{0xb}, big.NewInt(0).Add(fin.Number, big.NewInt(1)))
164164

@@ -170,8 +170,10 @@ func TestTipSyncer_handleTick_case1(t *testing.T) {
170170
targetNumber: fin.Number,
171171
direction: network.Descending,
172172
requestData: bootstrapRequestData,
173+
pendingBlock: s.pendingBlocks.getBlock(common.Hash{0xb}),
173174
},
174175
}
176+
175177
w, err = s.handleTick()
176178
require.NoError(t, err)
177179
require.Equal(t, expected, w)
@@ -198,13 +200,15 @@ func TestTipSyncer_handleTick_case2(t *testing.T) {
198200
targetNumber: header.Number,
199201
direction: network.Ascending,
200202
requestData: network.RequestedDataBody + network.RequestedDataJustification,
203+
pendingBlock: s.pendingBlocks.getBlock(header.Hash()),
201204
},
202205
}
203206
w, err := s.handleTick()
204207
require.NoError(t, err)
205208
require.Equal(t, expected, w)
206209
require.True(t, s.pendingBlocks.hasBlock(header.Hash()))
207210
}
211+
208212
func TestTipSyncer_handleTick_case3(t *testing.T) {
209213
s := newTestTipSyncer(t)
210214

@@ -248,6 +252,7 @@ func TestTipSyncer_handleTick_case3(t *testing.T) {
248252
targetNumber: fin.Number,
249253
direction: network.Descending,
250254
requestData: bootstrapRequestData,
255+
pendingBlock: s.pendingBlocks.getBlock(header.Hash()),
251256
},
252257
}
253258

0 commit comments

Comments
 (0)