Skip to content

Commit 57027db

Browse files
authored
fix(lib/babe): fix timing for transition between epochs (ChainSafe#1636)
1 parent 3c18e47 commit 57027db

File tree

10 files changed

+468
-105
lines changed

10 files changed

+468
-105
lines changed

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ require (
3838
github.com/libp2p/go-libp2p-core v0.7.0
3939
github.com/libp2p/go-libp2p-discovery v0.5.0
4040
github.com/libp2p/go-libp2p-kad-dht v0.11.1
41-
github.com/libp2p/go-libp2p-kbucket v0.4.7
4241
github.com/libp2p/go-libp2p-peerstore v0.2.6
4342
github.com/libp2p/go-libp2p-secio v0.2.2
4443
github.com/libp2p/go-sockaddr v0.1.0 // indirect

lib/babe/babe.go

Lines changed: 118 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ var logger log.Logger
3838
type Service struct {
3939
ctx context.Context
4040
cancel context.CancelFunc
41-
paused bool
4241
authority bool
4342
dev bool
4443

@@ -123,7 +122,7 @@ func NewService(cfg *ServiceConfig) (*Service, error) {
123122
rt: cfg.Runtime,
124123
transactionState: cfg.TransactionState,
125124
slotToProof: make(map[uint64]*VrfOutputAndProof),
126-
blockChan: make(chan types.Block),
125+
blockChan: make(chan types.Block, 16),
127126
pause: make(chan struct{}),
128127
authority: cfg.Authority,
129128
dev: cfg.IsDev,
@@ -146,7 +145,7 @@ func NewService(cfg *ServiceConfig) (*Service, error) {
146145
"epoch length (slots)", babeService.epochLength,
147146
"authorities", Authorities(babeService.epochData.authorities),
148147
"authority index", babeService.epochData.authorityIndex,
149-
"threshold", babeService.epochData.threshold.ToLEBytes(),
148+
"threshold", babeService.epochData.threshold,
150149
"randomness", babeService.epochData.randomness,
151150
)
152151
return babeService, nil
@@ -226,39 +225,49 @@ func (b *Service) EpochLength() uint64 {
226225

227226
// Pause pauses the service ie. halts block production
228227
func (b *Service) Pause() error {
229-
if b.paused {
230-
return errors.New("service already paused")
231-
}
232-
233228
b.Lock()
234229
defer b.Unlock()
235230

236-
b.pause <- struct{}{}
237-
b.paused = true
231+
if b.IsPaused() {
232+
return nil
233+
}
234+
235+
close(b.pause)
238236
return nil
239237
}
240238

241239
// Resume resumes the service ie. resumes block production
242240
func (b *Service) Resume() error {
243-
if !b.paused {
241+
b.Lock()
242+
defer b.Unlock()
243+
244+
if !b.IsPaused() {
244245
return nil
245246
}
246247

248+
b.pause = make(chan struct{})
249+
247250
epoch, err := b.epochState.GetCurrentEpoch()
248251
if err != nil {
249252
logger.Error("failed to get current epoch", "error", err)
250253
return err
251254
}
252255

253-
b.Lock()
254-
defer b.Unlock()
255-
256-
b.paused = false
257256
go b.initiate(epoch)
258257
logger.Info("service resumed", "epoch", epoch)
259258
return nil
260259
}
261260

261+
// IsPaused returns if the service is paused or not (ie. producing blocks)
262+
func (b *Service) IsPaused() bool {
263+
select {
264+
case <-b.pause:
265+
return true
266+
default:
267+
return false
268+
}
269+
}
270+
262271
// Stop stops the service. If stop is called, it cannot be resumed.
263272
func (b *Service) Stop() error {
264273
b.Lock()
@@ -301,13 +310,6 @@ func (b *Service) IsStopped() bool {
301310
return b.ctx.Err() != nil
302311
}
303312

304-
// IsPaused returns if the service is paused or not (ie. producing blocks)
305-
func (b *Service) IsPaused() bool {
306-
b.RLock()
307-
defer b.RUnlock()
308-
return b.paused
309-
}
310-
311313
func (b *Service) safeSend(msg types.Block) error {
312314
b.Lock()
313315
defer b.Unlock()
@@ -351,93 +353,115 @@ func (b *Service) initiate(epoch uint64) {
351353
return
352354
}
353355

354-
b.invokeBlockAuthoring(epoch)
356+
err := b.invokeBlockAuthoring(epoch)
357+
if err != nil {
358+
logger.Crit("block authoring error", "error", err)
359+
}
355360
}
356361

357-
func (b *Service) invokeBlockAuthoring(epoch uint64) {
358-
// calculate current slot
359-
startSlot := getCurrentSlot(b.slotDuration)
362+
func (b *Service) invokeBlockAuthoring(epoch uint64) error {
363+
for {
364+
// get start slot for current epoch
365+
epochStart, err := b.epochState.GetStartSlotForEpoch(epoch)
366+
if err != nil {
367+
logger.Error("failed to get start slot for current epoch", "epoch", epoch, "error", err)
368+
return err
369+
}
360370

361-
head, err := b.blockState.BestBlockHeader()
362-
if err != nil {
363-
logger.Error("failed to get best block header", "error", err)
364-
return
365-
}
371+
head, err := b.blockState.BestBlockHeader()
372+
if err != nil {
373+
logger.Error("failed to get best block header", "error", err)
374+
return err
375+
}
366376

367-
// if we're at genesis, set the first slot number for the network
368-
if head.Number.Cmp(big.NewInt(0)) == 0 {
369-
err = b.epochState.SetFirstSlot(startSlot)
377+
// if we're at genesis, set the first slot number for the network
378+
if head.Number.Cmp(big.NewInt(0)) == 0 {
379+
epochStart = getCurrentSlot(b.slotDuration)
380+
err = b.epochState.SetFirstSlot(epochStart)
381+
if err != nil {
382+
logger.Error("failed to set first slot number", "error", err)
383+
return err
384+
}
385+
}
386+
387+
logger.Info("initiating epoch", "number", epoch, "first slot of epoch", epochStart)
388+
err = b.initiateEpoch(epoch)
370389
if err != nil {
371-
logger.Error("failed to set first slot number", "error", err)
372-
return
390+
logger.Error("failed to initiate epoch", "epoch", epoch, "error", err)
391+
return err
373392
}
374-
}
375393

376-
logger.Info("initiating epoch", "number", epoch, "start slot", startSlot+b.epochLength)
377-
err = b.initiateEpoch(epoch)
378-
if err != nil {
379-
logger.Error("failed to initiate epoch", "epoch", epoch, "error", err)
380-
return
381-
}
394+
epochStartTime := getSlotStartTime(epochStart, b.slotDuration)
395+
logger.Debug("checking if epoch started", "epoch start", epochStartTime, "now", time.Now())
396+
397+
// check if it's time to start the epoch yet. if not, wait until it is
398+
if time.Since(epochStartTime) < 0 {
399+
logger.Debug("waiting for epoch to start")
400+
select {
401+
case <-time.After(time.Until(epochStartTime)):
402+
case <-b.ctx.Done():
403+
return nil
404+
case <-b.pause:
405+
return nil
406+
}
407+
}
382408

383-
// get start slot for current epoch
384-
epochStart, err := b.epochState.GetStartSlotForEpoch(0)
385-
if err != nil {
386-
logger.Error("failed to get start slot for current epoch", "epoch", epoch, "error", err)
387-
return
388-
}
409+
// calculate current slot
410+
startSlot := getCurrentSlot(b.slotDuration)
411+
intoEpoch := startSlot - epochStart
389412

390-
intoEpoch := startSlot - epochStart
391-
logger.Info("current epoch", "epoch", epoch, "slots into epoch", intoEpoch)
413+
// if the calculated amount of slots "into the epoch" is greater than the epoch length,
414+
// we've been offline for more than an epoch, and need to sync. pause BABE for now, syncer will
415+
// resume it when ready
416+
if b.epochLength <= intoEpoch && !b.dev {
417+
logger.Debug("pausing BABE, need to sync", "slots into epoch", intoEpoch, "startSlot", startSlot, "epochStart", epochStart)
418+
return b.Pause()
419+
}
392420

393-
// if the calculated amount of slots "into the epoch" is greater than the epoch length,
394-
// we've been offline for more than an epoch, and need to sync. pause BABE for now, syncer will
395-
// resume it when ready
396-
if b.epochLength <= intoEpoch && !b.dev {
397-
b.paused = true
398-
return
399-
}
421+
if b.dev {
422+
intoEpoch = intoEpoch % b.epochLength
423+
}
400424

401-
if b.dev {
402-
intoEpoch = intoEpoch % b.epochLength
403-
}
425+
logger.Info("current epoch", "epoch", epoch, "slots into epoch", intoEpoch)
404426

405-
slotDone := make([]<-chan time.Time, b.epochLength-intoEpoch)
406-
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
407-
slotDone[i] = time.After(b.getSlotDuration() * time.Duration(i))
408-
}
427+
slotDone := make([]<-chan time.Time, b.epochLength-intoEpoch)
428+
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
429+
slotDone[i] = time.After(b.getSlotDuration() * time.Duration(i))
430+
}
409431

410-
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
411-
select {
412-
case <-b.ctx.Done():
413-
return
414-
case <-b.pause:
415-
return
416-
case <-slotDone[i]:
417-
if !b.authority {
418-
continue
432+
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
433+
select {
434+
case <-b.ctx.Done():
435+
return nil
436+
case <-b.pause:
437+
return nil
438+
case <-slotDone[i]:
439+
if !b.authority {
440+
continue
441+
}
442+
443+
slotNum := startSlot + uint64(i)
444+
err = b.handleSlot(slotNum)
445+
if err == ErrNotAuthorized {
446+
logger.Debug("not authorized to produce a block in this slot", "slot", slotNum, "slots into epoch", i)
447+
continue
448+
} else if err != nil {
449+
logger.Warn("failed to handle slot", "slot", slotNum, "error", err)
450+
continue
451+
}
419452
}
453+
}
420454

421-
slotNum := startSlot + uint64(i)
422-
err = b.handleSlot(slotNum)
423-
if err == ErrNotAuthorized {
424-
logger.Debug("not authorized to produce a block in this slot", "slot", slotNum)
425-
continue
426-
} else if err != nil {
427-
logger.Warn("failed to handle slot", "slot", slotNum, "error", err)
428-
continue
429-
}
455+
// setup next epoch, re-invoke block authoring
456+
next, err := b.incrementEpoch()
457+
if err != nil {
458+
logger.Error("failed to increment epoch", "error", err)
459+
return err
430460
}
431-
}
432461

433-
// setup next epoch, re-invoke block authoring
434-
next, err := b.incrementEpoch()
435-
if err != nil {
436-
logger.Error("failed to increment epoch", "error", err)
437-
return
462+
logger.Info("epoch complete!", "completed epoch", epoch, "upcoming epoch", next)
463+
epoch = next
438464
}
439-
440-
b.invokeBlockAuthoring(next)
441465
}
442466

443467
func (b *Service) handleSlot(slotNum uint64) error {
@@ -466,8 +490,6 @@ func (b *Service) handleSlot(slotNum uint64) error {
466490
number: slotNum,
467491
}
468492

469-
logger.Debug("going to build block", "parent", parent)
470-
471493
// set runtime trie before building block
472494
// if block building is successful, store the resulting trie in the storage state
473495
ts, err := b.storageState.TrieState(&parent.StateRoot)
@@ -509,3 +531,7 @@ func (b *Service) handleSlot(slotNum uint64) error {
509531
func getCurrentSlot(slotDuration time.Duration) uint64 {
510532
return uint64(time.Now().UnixNano()) / uint64(slotDuration.Nanoseconds())
511533
}
534+
535+
func getSlotStartTime(slot uint64, slotDuration time.Duration) time.Time {
536+
return time.Unix(0, int64(slot)*slotDuration.Nanoseconds())
537+
}

lib/babe/babe_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ func createTestService(t *testing.T, cfg *ServiceConfig) *Service {
136136
cfg.Runtime = rt
137137
}
138138

139+
cfg.LogLvl = defaultTestLogLvl
139140
babeService, err := NewService(cfg)
140141
require.NoError(t, err)
141142
return babeService
@@ -252,3 +253,33 @@ func TestStartAndStop(t *testing.T) {
252253
err = bs.Stop()
253254
require.NoError(t, err)
254255
}
256+
257+
func TestService_PauseAndResume(t *testing.T) {
258+
bs := createTestService(t, &ServiceConfig{
259+
LogLvl: log.LvlCrit,
260+
})
261+
err := bs.Start()
262+
require.NoError(t, err)
263+
time.Sleep(time.Second)
264+
265+
go func() {
266+
_ = bs.Pause()
267+
}()
268+
269+
go func() {
270+
_ = bs.Pause()
271+
}()
272+
273+
go func() {
274+
err := bs.Resume() //nolint
275+
require.NoError(t, err)
276+
}()
277+
278+
go func() {
279+
err := bs.Resume() //nolint
280+
require.NoError(t, err)
281+
}()
282+
283+
err = bs.Stop()
284+
require.NoError(t, err)
285+
}

lib/babe/build.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ func (b *BlockBuilder) addToQueue(txs []*transaction.ValidTransaction) {
322322
}
323323

324324
func hasSlotEnded(slot Slot) bool {
325-
slotEnd := slot.start.Add(slot.duration)
325+
slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation
326326
return time.Since(slotEnd) >= 0
327327
}
328328

lib/babe/epoch.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,15 @@ func (b *Service) initiateEpoch(epoch uint64) error {
117117
delete(b.slotToProof, i-b.epochLength) // clear data from previous epoch
118118
}
119119

120-
b.slotToProof[i], err = b.runLottery(i, epoch)
120+
proof, err := b.runLottery(i, epoch)
121121
if err != nil {
122122
return fmt.Errorf("error running slot lottery at slot %d: error %s", i, err)
123123
}
124+
125+
if proof != nil {
126+
b.slotToProof[i] = proof
127+
logger.Trace("claimed slot!", "slot", startSlot, "slots into epoch", i-startSlot)
128+
}
124129
}
125130

126131
return nil

0 commit comments

Comments
 (0)