Skip to content

Commit afea3bd

Browse files
authored
beacon/engine, core/txpool, eth/catalyst: add engine_getBlobsV1 API (#30537)
1 parent e26468f commit afea3bd

File tree

8 files changed

+336
-41
lines changed

8 files changed

+336
-41
lines changed

beacon/engine/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ type BlobsBundleV1 struct {
118118
Blobs []hexutil.Bytes `json:"blobs"`
119119
}
120120

121+
type BlobAndProofV1 struct {
122+
Blob hexutil.Bytes `json:"blob"`
123+
Proof hexutil.Bytes `json:"proof"`
124+
}
125+
121126
// JSON type overrides for ExecutionPayloadEnvelope.
122127
type executionPayloadEnvelopeMarshaling struct {
123128
BlockValue *hexutil.Big

core/txpool/blobpool/blobpool.go

Lines changed: 79 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/ethereum/go-ethereum/core/state"
3737
"github.com/ethereum/go-ethereum/core/txpool"
3838
"github.com/ethereum/go-ethereum/core/types"
39+
"github.com/ethereum/go-ethereum/crypto/kzg4844"
3940
"github.com/ethereum/go-ethereum/event"
4041
"github.com/ethereum/go-ethereum/log"
4142
"github.com/ethereum/go-ethereum/metrics"
@@ -88,9 +89,11 @@ const (
8889
// bare minimum needed fields to keep the size down (and thus number of entries
8990
// larger with the same memory consumption).
9091
type blobTxMeta struct {
91-
hash common.Hash // Transaction hash to maintain the lookup table
92-
id uint64 // Storage ID in the pool's persistent store
93-
size uint32 // Byte size in the pool's persistent store
92+
hash common.Hash // Transaction hash to maintain the lookup table
93+
vhashes []common.Hash // Blob versioned hashes to maintain the lookup table
94+
95+
id uint64 // Storage ID in the pool's persistent store
96+
size uint32 // Byte size in the pool's persistent store
9497

9598
nonce uint64 // Needed to prioritize inclusion order within an account
9699
costCap *uint256.Int // Needed to validate cumulative balance sufficiency
@@ -113,6 +116,7 @@ type blobTxMeta struct {
113116
func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
114117
meta := &blobTxMeta{
115118
hash: tx.Hash(),
119+
vhashes: tx.BlobHashes(),
116120
id: id,
117121
size: size,
118122
nonce: tx.Nonce(),
@@ -306,7 +310,7 @@ type BlobPool struct {
306310
state *state.StateDB // Current state at the head of the chain
307311
gasTip *uint256.Int // Currently accepted minimum gas tip
308312

309-
lookup map[common.Hash]uint64 // Lookup table mapping hashes to tx billy entries
313+
lookup *lookup // Lookup table mapping blobs to txs and txs to billy entries
310314
index map[common.Address][]*blobTxMeta // Blob transactions grouped by accounts, sorted by nonce
311315
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
312316
evict *evictHeap // Heap of cheapest accounts for eviction when full
@@ -328,7 +332,7 @@ func New(config Config, chain BlockChain) *BlobPool {
328332
config: config,
329333
signer: types.LatestSigner(chain.Config()),
330334
chain: chain,
331-
lookup: make(map[common.Hash]uint64),
335+
lookup: newLookup(),
332336
index: make(map[common.Address][]*blobTxMeta),
333337
spent: make(map[common.Address]*uint256.Int),
334338
}
@@ -471,7 +475,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
471475
}
472476

473477
meta := newBlobTxMeta(id, size, tx)
474-
if _, exists := p.lookup[meta.hash]; exists {
478+
if p.lookup.exists(meta.hash) {
475479
// This path is only possible after a crash, where deleted items are not
476480
// removed via the normal shutdown-startup procedure and thus may get
477481
// partially resurrected.
@@ -496,9 +500,8 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
496500
p.index[sender] = append(p.index[sender], meta)
497501
p.spent[sender] = new(uint256.Int).Add(p.spent[sender], meta.costCap)
498502

499-
p.lookup[meta.hash] = meta.id
503+
p.lookup.track(meta)
500504
p.stored += uint64(meta.size)
501-
502505
return nil
503506
}
504507

@@ -531,7 +534,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
531534
nonces = append(nonces, txs[i].nonce)
532535

533536
p.stored -= uint64(txs[i].size)
534-
delete(p.lookup, txs[i].hash)
537+
p.lookup.untrack(txs[i])
535538

536539
// Included transactions blobs need to be moved to the limbo
537540
if filled && inclusions != nil {
@@ -572,7 +575,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
572575

573576
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[0].costCap)
574577
p.stored -= uint64(txs[0].size)
575-
delete(p.lookup, txs[0].hash)
578+
p.lookup.untrack(txs[0])
576579

577580
// Included transactions blobs need to be moved to the limbo
578581
if inclusions != nil {
@@ -621,14 +624,14 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
621624
// crash would result in previously deleted entities being resurrected.
622625
// That could potentially cause a duplicate nonce to appear.
623626
if txs[i].nonce == txs[i-1].nonce {
624-
id := p.lookup[txs[i].hash]
627+
id, _ := p.lookup.storeidOfTx(txs[i].hash)
625628

626629
log.Error("Dropping repeat nonce blob transaction", "from", addr, "nonce", txs[i].nonce, "id", id)
627630
dropRepeatedMeter.Mark(1)
628631

629632
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap)
630633
p.stored -= uint64(txs[i].size)
631-
delete(p.lookup, txs[i].hash)
634+
p.lookup.untrack(txs[i])
632635

633636
if err := p.store.Delete(id); err != nil {
634637
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
@@ -650,7 +653,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
650653

651654
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[j].costCap)
652655
p.stored -= uint64(txs[j].size)
653-
delete(p.lookup, txs[j].hash)
656+
p.lookup.untrack(txs[j])
654657
}
655658
txs = txs[:i]
656659

@@ -688,7 +691,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
688691

689692
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap)
690693
p.stored -= uint64(last.size)
691-
delete(p.lookup, last.hash)
694+
p.lookup.untrack(last)
692695
}
693696
if len(txs) == 0 {
694697
delete(p.index, addr)
@@ -728,7 +731,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
728731

729732
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap)
730733
p.stored -= uint64(last.size)
731-
delete(p.lookup, last.hash)
734+
p.lookup.untrack(last)
732735
}
733736
p.index[addr] = txs
734737

@@ -1006,7 +1009,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
10061009
p.index[addr] = append(p.index[addr], meta)
10071010
p.spent[addr] = new(uint256.Int).Add(p.spent[addr], meta.costCap)
10081011
}
1009-
p.lookup[meta.hash] = meta.id
1012+
p.lookup.track(meta)
10101013
p.stored += uint64(meta.size)
10111014
return nil
10121015
}
@@ -1033,7 +1036,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
10331036
)
10341037
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap)
10351038
p.stored -= uint64(tx.size)
1036-
delete(p.lookup, tx.hash)
1039+
p.lookup.untrack(tx)
10371040
txs[i] = nil
10381041

10391042
// Drop everything afterwards, no gaps allowed
@@ -1043,7 +1046,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
10431046

10441047
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], tx.costCap)
10451048
p.stored -= uint64(tx.size)
1046-
delete(p.lookup, tx.hash)
1049+
p.lookup.untrack(tx)
10471050
txs[i+1+j] = nil
10481051
}
10491052
// Clear out the dropped transactions from the index
@@ -1171,8 +1174,7 @@ func (p *BlobPool) Has(hash common.Hash) bool {
11711174
p.lock.RLock()
11721175
defer p.lock.RUnlock()
11731176

1174-
_, ok := p.lookup[hash]
1175-
return ok
1177+
return p.lookup.exists(hash)
11761178
}
11771179

11781180
// Get returns a transaction if it is contained in the pool, or nil otherwise.
@@ -1189,7 +1191,7 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
11891191
}(time.Now())
11901192

11911193
// Pull the blob from disk and return an assembled response
1192-
id, ok := p.lookup[hash]
1194+
id, ok := p.lookup.storeidOfTx(hash)
11931195
if !ok {
11941196
return nil
11951197
}
@@ -1206,6 +1208,58 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
12061208
return item
12071209
}
12081210

1211+
// GetBlobs returns a number of blobs are proofs for the given versioned hashes.
1212+
// This is a utility method for the engine API, enabling consensus clients to
1213+
// retrieve blobs from the pools directly instead of the network.
1214+
func (p *BlobPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) {
1215+
// Create a map of the blob hash to indices for faster fills
1216+
var (
1217+
blobs = make([]*kzg4844.Blob, len(vhashes))
1218+
proofs = make([]*kzg4844.Proof, len(vhashes))
1219+
)
1220+
index := make(map[common.Hash]int)
1221+
for i, vhash := range vhashes {
1222+
index[vhash] = i
1223+
}
1224+
// Iterate over the blob hashes, pulling transactions that fill it. Take care
1225+
// to also fill anything else the transaction might include (probably will).
1226+
for i, vhash := range vhashes {
1227+
// If already filled by a previous fetch, skip
1228+
if blobs[i] != nil {
1229+
continue
1230+
}
1231+
// Unfilled, retrieve the datastore item (in a short lock)
1232+
p.lock.RLock()
1233+
id, exists := p.lookup.storeidOfBlob(vhash)
1234+
if !exists {
1235+
p.lock.RUnlock()
1236+
continue
1237+
}
1238+
data, err := p.store.Get(id)
1239+
p.lock.RUnlock()
1240+
1241+
// After releasing the lock, try to fill any blobs requested
1242+
if err != nil {
1243+
log.Error("Tracked blob transaction missing from store", "id", id, "err", err)
1244+
continue
1245+
}
1246+
item := new(types.Transaction)
1247+
if err = rlp.DecodeBytes(data, item); err != nil {
1248+
log.Error("Blobs corrupted for traced transaction", "id", id, "err", err)
1249+
continue
1250+
}
1251+
// Fill anything requested, not just the current versioned hash
1252+
sidecar := item.BlobTxSidecar()
1253+
for j, blobhash := range item.BlobHashes() {
1254+
if idx, ok := index[blobhash]; ok {
1255+
blobs[idx] = &sidecar.Blobs[j]
1256+
proofs[idx] = &sidecar.Proofs[j]
1257+
}
1258+
}
1259+
}
1260+
return blobs, proofs
1261+
}
1262+
12091263
// Add inserts a set of blob transactions into the pool if they pass validation (both
12101264
// consensus validity and pool restrictions).
12111265
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
@@ -1319,8 +1373,8 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
13191373
p.spent[from] = new(uint256.Int).Sub(p.spent[from], prev.costCap)
13201374
p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap)
13211375

1322-
delete(p.lookup, prev.hash)
1323-
p.lookup[meta.hash] = meta.id
1376+
p.lookup.untrack(prev)
1377+
p.lookup.track(meta)
13241378
p.stored += uint64(meta.size) - uint64(prev.size)
13251379
} else {
13261380
// Transaction extends previously scheduled ones
@@ -1330,7 +1384,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
13301384
newacc = true
13311385
}
13321386
p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap)
1333-
p.lookup[meta.hash] = meta.id
1387+
p.lookup.track(meta)
13341388
p.stored += uint64(meta.size)
13351389
}
13361390
// Recompute the rolling eviction fields. In case of a replacement, this will
@@ -1419,7 +1473,7 @@ func (p *BlobPool) drop() {
14191473
p.spent[from] = new(uint256.Int).Sub(p.spent[from], drop.costCap)
14201474
}
14211475
p.stored -= uint64(drop.size)
1422-
delete(p.lookup, drop.hash)
1476+
p.lookup.untrack(drop)
14231477

14241478
// Remove the transaction from the pool's eviction heap:
14251479
// - If the entire account was dropped, pop off the address

0 commit comments

Comments
 (0)