Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 80 additions & 65 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ var (
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)

blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
blockPrefetchExecuteTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil)
blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil)

errInsertionInterrupted = errors.New("insertion is interrupted")
errChainStopped = errors.New("blockchain is stopped")
Expand Down Expand Up @@ -1759,18 +1761,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
bc.reportBlock(block, nil, err)
return nil, it.index, err
}
// No validation errors for the first block (or chain prefix skipped)
var activeState *state.StateDB
defer func() {
// The chain importer is starting and stopping trie prefetchers. If a bad
// block or other error is hit however, an early return may not properly
// terminate the background threads. This defer ensures that we clean up
// and dangling prefetcher, without deferring each and holding on live refs.
if activeState != nil {
activeState.StopPrefetcher()
}
}()

// Track the singleton witness from this chain insertion (if any)
var witness *stateless.Witness

Expand Down Expand Up @@ -1826,63 +1816,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
continue
}
// Retrieve the parent block and it's state to execute on top
start := time.Now()
parent := it.previous()
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.statedb)
if err != nil {
return nil, it.index, err
}

// If we are past Byzantium, enable prefetching to pull in trie node paths
// while processing transactions. Before Byzantium the prefetcher is mostly
// useless due to the intermediate root hashing after each transaction.
if bc.chainConfig.IsByzantium(block.Number()) {
// Generate witnesses either if we're self-testing, or if it's the
// only block being inserted. A bit crude, but witnesses are huge,
// so we refuse to make an entire chain of them.
if bc.vmConfig.StatelessSelfValidation || (makeWitness && len(chain) == 1) {
witness, err = stateless.NewWitness(block.Header(), bc)
if err != nil {
return nil, it.index, err
}
}
statedb.StartPrefetcher("chain", witness)
}
activeState = statedb

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt atomic.Bool
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.statedb)

go func(start time.Time, followup *types.Block, throwaway *state.StateDB) {
// Disable tracing for prefetcher executions.
vmCfg := bc.vmConfig
vmCfg.Tracer = nil
bc.prefetcher.Prefetch(followup, throwaway, vmCfg, &followupInterrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if followupInterrupt.Load() {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now(), followup, throwaway)
}
}

// The traced section of block import.
res, err := bc.processBlock(block, statedb, start, setHead)
followupInterrupt.Store(true)
start := time.Now()
res, err := bc.processBlock(parent.Root, block, setHead, makeWitness && len(chain) == 1)
if err != nil {
return nil, it.index, err
}
// Report the import stats before returning the various results
stats.processed++
stats.usedGas += res.usedGas
witness = res.witness

var snapDiffItems, snapBufItems common.StorageSize
if bc.snaps != nil {
Expand Down Expand Up @@ -1938,11 +1885,74 @@ type blockProcessingResult struct {
usedGas uint64
procTime time.Duration
status WriteStatus
witness *stateless.Witness
}

// processBlock executes and validates the given block. If there was no error
// it writes the block and associated state to database.
func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, start time.Time, setHead bool) (_ *blockProcessingResult, blockEndErr error) {
func (bc *BlockChain) processBlock(parentRoot common.Hash, block *types.Block, setHead bool, makeWitness bool) (_ *blockProcessingResult, blockEndErr error) {
var (
err error
startTime = time.Now()
statedb *state.StateDB
interrupt atomic.Bool
)
defer interrupt.Store(true) // terminate the prefetch at the end

if bc.cacheConfig.TrieCleanNoPrefetch {
statedb, err = state.New(parentRoot, bc.statedb)
if err != nil {
return nil, err
}
} else {
// If prefetching is enabled, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
//
// Note: the main processor and prefetcher share the same reader with a local
// cache for mitigating the overhead of state access.
reader, err := bc.statedb.ReaderWithCache(parentRoot)
if err != nil {
return nil, err
}
throwaway, err := state.NewWithReader(parentRoot, bc.statedb, reader)
if err != nil {
return nil, err
}
statedb, err = state.NewWithReader(parentRoot, bc.statedb, reader)
if err != nil {
return nil, err
}
go func(start time.Time, throwaway *state.StateDB, block *types.Block) {
// Disable tracing for prefetcher executions.
vmCfg := bc.vmConfig
vmCfg.Tracer = nil
bc.prefetcher.Prefetch(block, throwaway, vmCfg, &interrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if interrupt.Load() {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now(), throwaway, block)
}

// If we are past Byzantium, enable prefetching to pull in trie node paths
// while processing transactions. Before Byzantium the prefetcher is mostly
// useless due to the intermediate root hashing after each transaction.
var witness *stateless.Witness
if bc.chainConfig.IsByzantium(block.Number()) {
// Generate witnesses either if we're self-testing, or if it's the
// only block being inserted. A bit crude, but witnesses are huge,
// so we refuse to make an entire chain of them.
if bc.vmConfig.StatelessSelfValidation || makeWitness {
witness, err = stateless.NewWitness(block.Header(), bc)
if err != nil {
return nil, err
}
}
statedb.StartPrefetcher("chain", witness)
defer statedb.StopPrefetcher()
}

if bc.logger != nil && bc.logger.OnBlockStart != nil {
bc.logger.OnBlockStart(tracing.BlockEvent{
Block: block,
Expand Down Expand Up @@ -2001,7 +2011,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
}
}
xvtime := time.Since(xvstart)
proctime := time.Since(start) // processing + validation + cross validation
proctime := time.Since(startTime) // processing + validation + cross validation

// Update the metrics touched during block processing and validation
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
Expand Down Expand Up @@ -2042,9 +2052,14 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them

blockWriteTimer.Update(time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockInsertTimer.UpdateSince(start)

return &blockProcessingResult{usedGas: res.GasUsed, procTime: proctime, status: status}, nil
blockInsertTimer.UpdateSince(startTime)

return &blockProcessingResult{
usedGas: res.GasUsed,
procTime: proctime,
status: status,
witness: witness,
}, nil
}

// insertSideChain is called when an import batch hits upon a pruned ancestor
Expand Down
1 change: 1 addition & 0 deletions core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (it *insertIterator) next() (*types.Block, error) {
//
// Both header and body validation errors (nil too) is cached into the iterator
// to avoid duplicating work on the following next() call.
// nolint:unused
func (it *insertIterator) peek() (*types.Block, error) {
// If we reached the end of the chain, abort
if it.index+1 >= len(it.chain) {
Expand Down
13 changes: 11 additions & 2 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (

const (
// Number of codehash->size associations to keep.
codeSizeCacheSize = 100000
codeSizeCacheSize = 1_000_000 // 4 megabytes in total

// Cache size granted for caching clean code.
codeCacheSize = 64 * 1024 * 1024
codeCacheSize = 256 * 1024 * 1024

// Number of address->curve point associations to keep.
pointCacheSize = 4096
Expand Down Expand Up @@ -208,6 +208,15 @@ func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) {
return newReader(newCachingCodeReader(db.disk, db.codeCache, db.codeSizeCache), combined), nil
}

// ReaderWithCache creates a state reader with internal local cache.
func (db *CachingDB) ReaderWithCache(stateRoot common.Hash) (Reader, error) {
reader, err := db.Reader(stateRoot)
if err != nil {
return nil, err
}
return newReaderWithCache(reader), nil
}

// OpenTrie opens the main account trie at a specific root hash.
func (db *CachingDB) OpenTrie(root common.Hash) (Trie, error) {
if db.triedb.IsVerkle() {
Expand Down
Loading