Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 65 additions & 19 deletions consensus/dbft/dbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ type DBFT struct {
// downloader events. Downloader events are used to track miner's state since
// miner work may be temporary suspended due to the node sync.
mux *event.TypeMux
// syncing indicates whether the node is still syncing.
// syncing indicates whether the node is still syncing. This variable is updated
// irrespectively from the engine activity, and thus, may be relied on even when
// dBFT engine is not started.
syncing atomic.Bool

config *params.DBFTConfig // Consensus engine configuration parameters
Expand Down Expand Up @@ -712,6 +714,47 @@ func (c *DBFT) WithRequestTxs(f func(hashed []common.Hash)) {
// the ongoing node sync process.
func (c *DBFT) WithMux(mux *event.TypeMux) {
c.mux = mux

go c.syncWatcher()
}

// syncWatcher is a standalone loop aimed to be active irrespectively of dBFT engine
// activity. It tracks the first chain sync attempt till its end.
func (c *DBFT) syncWatcher() {
downloaderEvents := c.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
defer func() {
if !downloaderEvents.Closed() {
downloaderEvents.Unsubscribe()
}
}()
dlEventCh := downloaderEvents.Chan()

events:
for {
select {
case <-c.quit:
break events
case ev := <-dlEventCh:
if ev == nil {
// Unsubscription done, stop listening.
dlEventCh = nil
break events
}
switch ev.Data.(type) {
case downloader.StartEvent:
c.syncing.Store(true)

case downloader.FailedEvent:
c.syncing.Store(false)

case downloader.DoneEvent:
c.syncing.Store(false)

// Stop reacting to downloader events.
downloaderEvents.Unsubscribe()
}
}
}
}

// WithTxPool initializes transaction pool API for DBFT interactions with memory pool
Expand Down Expand Up @@ -1244,7 +1287,7 @@ func (c *DBFT) eventLoop() {
// been broadcasted the events are unregistered and the loop is exited. This to
// prevent a major security vuln where external parties can DOS you with blocks
// and halt your dBFT operation for as long as the DOS continues.
downloaderEvents := c.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
downloaderEvents := c.mux.Subscribe(downloader.DoneEvent{}, downloader.FailedEvent{})
defer func() {
if !downloaderEvents.Closed() {
downloaderEvents.Unsubscribe()
Expand Down Expand Up @@ -1296,7 +1339,7 @@ events:
case tx := <-c.txs:
c.dbft.OnTransaction(&Transaction{Tx: tx})
case b := <-c.chainHeadEvents:
err := c.handleChainBlock(b.Block.Header())
err := c.handleChainBlock(b.Block.Header(), true)
if err != nil {
log.Warn("Failed to handle chain block",
"index", b.Block.NumberU64(),
Expand All @@ -1318,14 +1361,9 @@ events:
continue
}
switch ev.Data.(type) {
case downloader.StartEvent:
c.syncing.Store(true)

case downloader.FailedEvent:
c.syncing.Store(false)

latest := c.chain.CurrentHeader()
err := c.handleChainBlock(latest)
err := c.handleChainBlock(latest, false)
if err != nil {
log.Warn("Failed to handle latest chain block",
"index", latest.Number.Uint64(),
Expand All @@ -1334,13 +1372,11 @@ events:
}

case downloader.DoneEvent:
c.syncing.Store(false)

// Stop reacting to downloader events.
downloaderEvents.Unsubscribe()

latest := c.chain.CurrentHeader()
err := c.handleChainBlock(latest)
err := c.handleChainBlock(latest, false)
if err != nil {
log.Warn("Failed to handle latest chain block",
"index", latest.Number.Uint64(),
Expand All @@ -1361,7 +1397,7 @@ events:
}
}
if latestBlock.Block != nil {
err := c.handleChainBlock(latestBlock.Block.Header())
err := c.handleChainBlock(latestBlock.Block.Header(), true)
if err != nil {
log.Warn("Failed to handle latest chain block",
"index", latestBlock.Block.NumberU64(),
Expand Down Expand Up @@ -1410,6 +1446,10 @@ func (c *DBFT) OnPayload(cp *dbftproto.Message) error {
log.Debug("Skip dBFT payload handling: dbft is inactive or not started yet", "hash", cp.Hash())
return nil
}
if c.syncing.Load() {
log.Debug("Skip dBFT payload handling due to sync", "hash", cp.Hash())
return nil
}

p := payloadFromMessage(cp)
// decode payload data into message
Expand Down Expand Up @@ -1477,15 +1517,21 @@ func (c *DBFT) validatePayload(p *Payload) error {

// IsExtensibleAllowed determines if address is allowed to send extensible payloads
// (only consensus payloads for now) at the specified height.
func (c *DBFT) IsExtensibleAllowed(h uint64, u common.Address) bool {
func (c *DBFT) IsExtensibleAllowed(h uint64, u common.Address) error {
// Can't verify extensible sender if the node has an outdated state.
if c.syncing.Load() {
return dbftproto.ErrSyncing
}
// Only validators are included into extensible whitelist for now.
validators, err := c.getValidators(&h, nil, nil)
if err != nil {
return false
return fmt.Errorf("failed to get validators: %w", err)
}
n := sort.Search(len(validators), func(i int) bool { return validators[i].Cmp(u) >= 0 })
res := n < len(validators)
return res
if n >= len(validators) {
return fmt.Errorf("address is not a validator")
}
return nil
}

func (c *DBFT) newPayload(ctx *dbft.Context[common.Hash], t dbft.MessageType, msg any) dbft.ConsensusPayload[common.Hash] {
Expand All @@ -1503,11 +1549,11 @@ func (c *DBFT) newPayload(ctx *dbft.Context[common.Hash], t dbft.MessageType, ms
return cp
}

func (c *DBFT) handleChainBlock(h *types.Header) error {
func (c *DBFT) handleChainBlock(h *types.Header, checkForSync bool) error {
// A short path if miner is not active and the node is in the process of block
// sync. In this case dBFT can't react properly on the newcoming blocks since no
// sealing task is expected from miner.
if c.syncing.Load() {
if checkForSync && c.syncing.Load() {
log.Info("Skipping dBFT block callback due to sync",
"block index", h.Number.Int64(),
"dbft index", c.dbft.BlockIndex,
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
var (
bft *dbft.DBFT
onPayload func(*dbftproto.Message) error
isExtensibleAllowed func(uint64, common.Address) bool
isExtensibleAllowed func(uint64, common.Address) error
)
switch t := eth.engine.(type) {
case *dbft.DBFT:
Expand Down
4 changes: 4 additions & 0 deletions eth/protocols/dbft/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
maxMessageSize = 4 * 1024 * 1024
)

// ErrSyncing is returned when operation can't be performed due to the fact that
// the node is in the process of chain sync.
var ErrSyncing = errors.New("node is syncing")

var (
errMsgTooLarge = errors.New("message too long")
errDecode = errors.New("invalid message")
Expand Down
6 changes: 3 additions & 3 deletions eth/protocols/dbft/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

type ledger struct {
bc BlockChainAPI
isExtensibleAllowed func(height uint64, addr common.Address) bool
isExtensibleAllowed func(height uint64, addr common.Address) error
}

func newLedger(bc BlockChainAPI, isExtensibleAllowed func(uint64, common.Address) bool) *ledger {
func newLedger(bc BlockChainAPI, isExtensibleAllowed func(uint64, common.Address) error) *ledger {
return &ledger{
bc: bc,
isExtensibleAllowed: isExtensibleAllowed,
Expand All @@ -24,6 +24,6 @@ func (l *ledger) BlockHeight() uint64 {
return uint64(l.bc.BlockNumber())
}

func (l *ledger) IsAddressAllowed(addr common.Address) bool {
func (l *ledger) IsAddressAllowed(addr common.Address) error {
return l.isExtensibleAllowed(l.BlockHeight(), addr)
}
13 changes: 9 additions & 4 deletions eth/protocols/dbft/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// Ledger is enough of Blockchain to satisfy Pool.
type Ledger interface {
BlockHeight() uint64
IsAddressAllowed(common.Address) bool
IsAddressAllowed(common.Address) error
}

// Pool represents a pool of extensible payloads.
Expand Down Expand Up @@ -90,8 +90,13 @@ func (p *Pool) verify(m *Message) (bool, error) {
}
return false, errInvalidHeight
}
if !p.chain.IsAddressAllowed(m.Sender) {
return false, errDisallowedSender
err = p.chain.IsAddressAllowed(m.Sender)
if err != nil {
// There's no reliable way to check sender for syncing node.
if errors.Is(err, ErrSyncing) {
return false, nil
}
return false, err
}
return true, nil
}
Expand Down Expand Up @@ -120,7 +125,7 @@ func (p *Pool) RemoveStale(index uint64) {
old := elem
elem = elem.Next()

if m.ValidBlockEnd <= index || !p.chain.IsAddressAllowed(m.Sender) {
if m.ValidBlockEnd <= index || p.chain.IsAddressAllowed(m.Sender) != nil {
delete(p.verified, h)
lst.Remove(old)
continue
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/dbft/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Service struct {
}

// New creates a new instance of [Service].
func New(bc BlockChainAPI, onPayload func(*Message) error, isExtensibleAllowed func(uint64, common.Address) bool) *Service {
func New(bc BlockChainAPI, onPayload func(*Message) error, isExtensibleAllowed func(uint64, common.Address) error) *Service {
poolLedger := newLedger(bc, isExtensibleAllowed)
return &Service{
bc: bc,
Expand Down