Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 52 additions & 14 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,9 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transac
// minimums will need to be done only starting at the swapped in/out nonce
// and leading up to the first no-change.
type BlobPool struct {
config Config // Pool configuration
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
config Config // Pool configuration
reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools
hasPendingAuth func(common.Address) bool // Determine whether the specified address has a pending 7702-auth

store billy.Database // Persistent data store for the tx metadata and blobs
stored uint64 // Useful data size of all transactions on disk
Expand Down Expand Up @@ -329,13 +330,14 @@ type BlobPool struct {

// New creates a new blob transaction pool to gather, sort and filter inbound
// blob transactions from the network.
func New(config Config, chain BlockChain) *BlobPool {
func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bool) *BlobPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()

// Create the transaction pool with its initial settings
return &BlobPool{
config: config,
hasPendingAuth: hasPendingAuth,
signer: types.LatestSigner(chain.Config()),
chain: chain,
lookup: newLookup(),
Expand All @@ -353,8 +355,8 @@ func (p *BlobPool) Filter(tx *types.Transaction) bool {
// Init sets the gas price needed to keep a transaction in the pool and the chain
// head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings.
func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error {
p.reserve = reserve
func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error {
p.reserver = reserver

var (
queuedir string
Expand Down Expand Up @@ -499,7 +501,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
return err
}
if _, ok := p.index[sender]; !ok {
if err := p.reserve(sender, true); err != nil {
if err := p.reserver.Hold(sender); err != nil {
return err
}
p.index[sender] = []*blobTxMeta{}
Expand Down Expand Up @@ -554,7 +556,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if inclusions != nil { // only during reorgs will the heap be initialized
heap.Remove(p.evict, p.evict.index[addr])
}
p.reserve(addr, false)
p.reserver.Release(addr)

if gapped {
log.Warn("Dropping dangling blob transactions", "from", addr, "missing", next, "drop", nonces, "ids", ids)
Expand Down Expand Up @@ -707,7 +709,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if inclusions != nil { // only during reorgs will the heap be initialized
heap.Remove(p.evict, p.evict.index[addr])
}
p.reserve(addr, false)
p.reserver.Release(addr)
} else {
p.index[addr] = txs
}
Expand Down Expand Up @@ -1006,7 +1008,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
// Update the indices and metrics
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
if _, ok := p.index[addr]; !ok {
if err := p.reserve(addr, true); err != nil {
if err := p.reserver.Hold(addr); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
return err
}
Expand Down Expand Up @@ -1066,7 +1068,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
delete(p.spent, addr)

heap.Remove(p.evict, p.evict.index[addr])
p.reserve(addr, false)
p.reserver.Release(addr)
}
// Clear out the transactions from the data store
log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids)
Expand Down Expand Up @@ -1101,6 +1103,39 @@ func (p *BlobPool) ValidateTxBasics(tx *types.Transaction) error {
return txpool.ValidateTransaction(tx, p.head, p.signer, opts)
}

// checkDelegationLimit determines if the tx sender is delegated or has a
// pending delegation, and if so, ensures they have at most one in-flight
// **executable** transaction, e.g. disallow stacked and gapped transactions
// from the account.
func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error {
from, _ := types.Sender(p.signer, tx) // validated

// Short circuit if the sender has neither delegation nor pending delegation.
if p.state.GetCodeHash(from) == types.EmptyCodeHash {
// Because there is no exclusive lock held between different subpools
// when processing transactions, a blob transaction may be accepted
// while other SetCode transactions with pending authorities from the
// same address are also accepted simultaneously.
//
// This scenario is considered acceptable, as the rule primarily ensures
// that attackers cannot easily and endlessly stack blob transactions
// with a delegated or pending delegated sender.
if p.hasPendingAuth == nil || !p.hasPendingAuth(from) {
return nil
}
}
// Allow a single in-flight pending transaction.
pending := p.index[from]
if len(pending) == 0 {
return nil
}
// If account already has a pending transaction, allow replacement only.
if len(pending) == 1 && pending[0].nonce == tx.Nonce() {
return nil
}
return txpool.ErrInflightTxLimitReached
}

// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (p *BlobPool) validateTx(tx *types.Transaction) error {
Expand Down Expand Up @@ -1141,6 +1176,9 @@ func (p *BlobPool) validateTx(tx *types.Transaction) error {
if err := txpool.ValidateTransactionWithState(tx, p.signer, stateOpts); err != nil {
return err
}
if err := p.checkDelegationLimit(tx); err != nil {
return err
}
// If the transaction replaces an existing one, ensure that price bumps are
// adhered to.
var (
Expand Down Expand Up @@ -1369,7 +1407,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
// only by this subpool until all transactions are evicted
from, _ := types.Sender(p.signer, tx) // already validated above
if _, ok := p.index[from]; !ok {
if err := p.reserve(from, true); err != nil {
if err := p.reserver.Hold(from); err != nil {
addNonExclusiveMeter.Mark(1)
return err
}
Expand All @@ -1381,7 +1419,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
// by a return statement before running deferred methods. Take care with
// removing or subscoping err as it will break this clause.
if err != nil {
p.reserve(from, false)
p.reserver.Release(from)
}
}()
}
Expand Down Expand Up @@ -1513,7 +1551,7 @@ func (p *BlobPool) drop() {
if last {
delete(p.index, from)
delete(p.spent, from)
p.reserve(from, false)
p.reserver.Release(from)
} else {
txs[len(txs)-1] = nil
txs = txs[:len(txs)-1]
Expand Down Expand Up @@ -1789,7 +1827,7 @@ func (p *BlobPool) Clear() {
// can't happen until Clear releases the reservation lock. Clear cannot
// acquire the subpool lock until the transaction addition is completed.
for acct := range p.index {
p.reserve(acct, false)
p.reserver.Release(acct)
}
p.lookup = newLookup()
p.index = make(map[common.Address][]*blobTxMeta)
Expand Down
60 changes: 18 additions & 42 deletions core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"math/big"
"os"
"path/filepath"
"sync"
"testing"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -168,33 +167,6 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) {
return bc.statedb, nil
}

// makeAddressReserver is a utility method to sanity check that accounts are
// properly reserved by the blobpool (no duplicate reserves or unreserves).
func makeAddressReserver() txpool.AddressReserver {
var (
reserved = make(map[common.Address]struct{})
lock sync.Mutex
)
return func(addr common.Address, reserve bool) error {
lock.Lock()
defer lock.Unlock()

_, exists := reserved[addr]
if reserve {
if exists {
panic("already reserved")
}
reserved[addr] = struct{}{}
return nil
}
if !exists {
panic("not reserved")
}
delete(reserved, addr)
return nil
}
}

// makeTx is a utility method to construct a random blob transaction and sign it
// with a valid key, only setting the interesting fields from the perspective of
// the blob pool.
Expand Down Expand Up @@ -433,6 +405,10 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) {
}
}

func newReserver() *txpool.Reserver {
return txpool.NewReservationTracker().NewHandle(42)
}

// Tests that transactions can be loaded from disk on startup and that they are
// correctly discarded if invalid.
//
Expand Down Expand Up @@ -699,8 +675,8 @@ func TestOpenDrops(t *testing.T) {
blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice),
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain)
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
pool := New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
defer pool.Close()
Expand Down Expand Up @@ -817,8 +793,8 @@ func TestOpenIndex(t *testing.T) {
blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice),
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain)
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
pool := New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
defer pool.Close()
Expand Down Expand Up @@ -918,8 +894,8 @@ func TestOpenHeap(t *testing.T) {
blobfee: uint256.NewInt(105),
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain)
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
pool := New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
defer pool.Close()
Expand Down Expand Up @@ -997,8 +973,8 @@ func TestOpenCap(t *testing.T) {
blobfee: uint256.NewInt(105),
statedb: statedb,
}
pool := New(Config{Datadir: storage, Datacap: datacap}, chain)
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
pool := New(Config{Datadir: storage, Datacap: datacap}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
// Verify that enough transactions have been dropped to get the pool's size
Expand Down Expand Up @@ -1098,8 +1074,8 @@ func TestChangingSlotterSize(t *testing.T) {
blobfee: uint256.NewInt(105),
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain)
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
pool := New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}

Expand Down Expand Up @@ -1541,8 +1517,8 @@ func TestAdd(t *testing.T) {
blobfee: uint256.NewInt(105),
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain)
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
pool := New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("test %d: failed to create blob pool: %v", i, err)
}
verifyPoolInternals(t, pool)
Expand Down Expand Up @@ -1638,10 +1614,10 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
blobfee: uint256.NewInt(blobfee),
statedb: statedb,
}
pool = New(Config{Datadir: ""}, chain)
pool = New(Config{Datadir: ""}, chain, nil)
)

if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
b.Fatalf("failed to create blob pool: %v", err)
}
// Make the pool not use disk (just drop everything). This test never reads
Expand Down
4 changes: 4 additions & 0 deletions core/txpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,8 @@ var (
// input transaction of non-blob type when a blob transaction from this sender
// remains pending (and vice-versa).
ErrAlreadyReserved = errors.New("address already reserved")

// ErrInflightTxLimitReached is returned when the maximum number of in-flight
// transactions is reached for specific accounts.
ErrInflightTxLimitReached = errors.New("in-flight transaction limit reached for delegated accounts")
)
Loading