@@ -36,6 +36,7 @@ import (
3636 "github.com/ethereum/go-ethereum/core/state"
3737 "github.com/ethereum/go-ethereum/core/txpool"
3838 "github.com/ethereum/go-ethereum/core/types"
39+ "github.com/ethereum/go-ethereum/crypto/kzg4844"
3940 "github.com/ethereum/go-ethereum/event"
4041 "github.com/ethereum/go-ethereum/log"
4142 "github.com/ethereum/go-ethereum/metrics"
@@ -88,9 +89,11 @@ const (
8889// bare minimum needed fields to keep the size down (and thus number of entries
8990// larger with the same memory consumption).
9091type blobTxMeta struct {
91- hash common.Hash // Transaction hash to maintain the lookup table
92- id uint64 // Storage ID in the pool's persistent store
93- size uint32 // Byte size in the pool's persistent store
92+ hash common.Hash // Transaction hash to maintain the lookup table
93+ vhashes []common.Hash // Blob versioned hashes to maintain the lookup table
94+
95+ id uint64 // Storage ID in the pool's persistent store
96+ size uint32 // Byte size in the pool's persistent store
9497
9598 nonce uint64 // Needed to prioritize inclusion order within an account
9699 costCap * uint256.Int // Needed to validate cumulative balance sufficiency
@@ -113,6 +116,7 @@ type blobTxMeta struct {
113116func newBlobTxMeta (id uint64 , size uint32 , tx * types.Transaction ) * blobTxMeta {
114117 meta := & blobTxMeta {
115118 hash : tx .Hash (),
119+ vhashes : tx .BlobHashes (),
116120 id : id ,
117121 size : size ,
118122 nonce : tx .Nonce (),
@@ -306,7 +310,7 @@ type BlobPool struct {
306310 state * state.StateDB // Current state at the head of the chain
307311 gasTip * uint256.Int // Currently accepted minimum gas tip
308312
309- lookup map [common. Hash ] uint64 // Lookup table mapping hashes to tx billy entries
313+ lookup * lookup // Lookup table mapping blobs to txs and txs to billy entries
310314 index map [common.Address ][]* blobTxMeta // Blob transactions grouped by accounts, sorted by nonce
311315 spent map [common.Address ]* uint256.Int // Expenditure tracking for individual accounts
312316 evict * evictHeap // Heap of cheapest accounts for eviction when full
@@ -328,7 +332,7 @@ func New(config Config, chain BlockChain) *BlobPool {
328332 config : config ,
329333 signer : types .LatestSigner (chain .Config ()),
330334 chain : chain ,
331- lookup : make ( map [common. Hash ] uint64 ),
335+ lookup : newLookup ( ),
332336 index : make (map [common.Address ][]* blobTxMeta ),
333337 spent : make (map [common.Address ]* uint256.Int ),
334338 }
@@ -471,7 +475,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
471475 }
472476
473477 meta := newBlobTxMeta (id , size , tx )
474- if _ , exists := p .lookup [ meta .hash ]; exists {
478+ if p .lookup . exists ( meta .hash ) {
475479 // This path is only possible after a crash, where deleted items are not
476480 // removed via the normal shutdown-startup procedure and thus may get
477481 // partially resurrected.
@@ -496,9 +500,8 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
496500 p .index [sender ] = append (p .index [sender ], meta )
497501 p .spent [sender ] = new (uint256.Int ).Add (p .spent [sender ], meta .costCap )
498502
499- p .lookup [ meta . hash ] = meta . id
503+ p .lookup . track ( meta )
500504 p .stored += uint64 (meta .size )
501-
502505 return nil
503506}
504507
@@ -531,7 +534,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
531534 nonces = append (nonces , txs [i ].nonce )
532535
533536 p .stored -= uint64 (txs [i ].size )
534- delete ( p .lookup , txs [i ]. hash )
537+ p .lookup . untrack ( txs [i ])
535538
536539 // Included transactions blobs need to be moved to the limbo
537540 if filled && inclusions != nil {
@@ -572,7 +575,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
572575
573576 p .spent [addr ] = new (uint256.Int ).Sub (p .spent [addr ], txs [0 ].costCap )
574577 p .stored -= uint64 (txs [0 ].size )
575- delete ( p .lookup , txs [0 ]. hash )
578+ p .lookup . untrack ( txs [0 ])
576579
577580 // Included transactions blobs need to be moved to the limbo
578581 if inclusions != nil {
@@ -621,14 +624,14 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
621624 // crash would result in previously deleted entities being resurrected.
622625 // That could potentially cause a duplicate nonce to appear.
623626 if txs [i ].nonce == txs [i - 1 ].nonce {
624- id := p .lookup [ txs [i ].hash ]
627+ id , _ := p .lookup . storeidOfTx ( txs [i ].hash )
625628
626629 log .Error ("Dropping repeat nonce blob transaction" , "from" , addr , "nonce" , txs [i ].nonce , "id" , id )
627630 dropRepeatedMeter .Mark (1 )
628631
629632 p .spent [addr ] = new (uint256.Int ).Sub (p .spent [addr ], txs [i ].costCap )
630633 p .stored -= uint64 (txs [i ].size )
631- delete ( p .lookup , txs [i ]. hash )
634+ p .lookup . untrack ( txs [i ])
632635
633636 if err := p .store .Delete (id ); err != nil {
634637 log .Error ("Failed to delete blob transaction" , "from" , addr , "id" , id , "err" , err )
@@ -650,7 +653,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
650653
651654 p .spent [addr ] = new (uint256.Int ).Sub (p .spent [addr ], txs [j ].costCap )
652655 p .stored -= uint64 (txs [j ].size )
653- delete ( p .lookup , txs [j ]. hash )
656+ p .lookup . untrack ( txs [j ])
654657 }
655658 txs = txs [:i ]
656659
@@ -688,7 +691,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
688691
689692 p .spent [addr ] = new (uint256.Int ).Sub (p .spent [addr ], last .costCap )
690693 p .stored -= uint64 (last .size )
691- delete ( p .lookup , last . hash )
694+ p .lookup . untrack ( last )
692695 }
693696 if len (txs ) == 0 {
694697 delete (p .index , addr )
@@ -728,7 +731,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
728731
729732 p .spent [addr ] = new (uint256.Int ).Sub (p .spent [addr ], last .costCap )
730733 p .stored -= uint64 (last .size )
731- delete ( p .lookup , last . hash )
734+ p .lookup . untrack ( last )
732735 }
733736 p .index [addr ] = txs
734737
@@ -1006,7 +1009,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
10061009 p .index [addr ] = append (p .index [addr ], meta )
10071010 p .spent [addr ] = new (uint256.Int ).Add (p .spent [addr ], meta .costCap )
10081011 }
1009- p .lookup [ meta . hash ] = meta . id
1012+ p .lookup . track ( meta )
10101013 p .stored += uint64 (meta .size )
10111014 return nil
10121015}
@@ -1033,7 +1036,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
10331036 )
10341037 p .spent [addr ] = new (uint256.Int ).Sub (p .spent [addr ], txs [i ].costCap )
10351038 p .stored -= uint64 (tx .size )
1036- delete ( p .lookup , tx . hash )
1039+ p .lookup . untrack ( tx )
10371040 txs [i ] = nil
10381041
10391042 // Drop everything afterwards, no gaps allowed
@@ -1043,7 +1046,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
10431046
10441047 p .spent [addr ] = new (uint256.Int ).Sub (p .spent [addr ], tx .costCap )
10451048 p .stored -= uint64 (tx .size )
1046- delete ( p .lookup , tx . hash )
1049+ p .lookup . untrack ( tx )
10471050 txs [i + 1 + j ] = nil
10481051 }
10491052 // Clear out the dropped transactions from the index
@@ -1171,8 +1174,7 @@ func (p *BlobPool) Has(hash common.Hash) bool {
11711174 p .lock .RLock ()
11721175 defer p .lock .RUnlock ()
11731176
1174- _ , ok := p .lookup [hash ]
1175- return ok
1177+ return p .lookup .exists (hash )
11761178}
11771179
11781180// Get returns a transaction if it is contained in the pool, or nil otherwise.
@@ -1189,7 +1191,7 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
11891191 }(time .Now ())
11901192
11911193 // Pull the blob from disk and return an assembled response
1192- id , ok := p .lookup [ hash ]
1194+ id , ok := p .lookup . storeidOfTx ( hash )
11931195 if ! ok {
11941196 return nil
11951197 }
@@ -1206,6 +1208,58 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
12061208 return item
12071209}
12081210
1211+ // GetBlobs returns a number of blobs are proofs for the given versioned hashes.
1212+ // This is a utility method for the engine API, enabling consensus clients to
1213+ // retrieve blobs from the pools directly instead of the network.
1214+ func (p * BlobPool ) GetBlobs (vhashes []common.Hash ) ([]* kzg4844.Blob , []* kzg4844.Proof ) {
1215+ // Create a map of the blob hash to indices for faster fills
1216+ var (
1217+ blobs = make ([]* kzg4844.Blob , len (vhashes ))
1218+ proofs = make ([]* kzg4844.Proof , len (vhashes ))
1219+ )
1220+ index := make (map [common.Hash ]int )
1221+ for i , vhash := range vhashes {
1222+ index [vhash ] = i
1223+ }
1224+ // Iterate over the blob hashes, pulling transactions that fill it. Take care
1225+ // to also fill anything else the transaction might include (probably will).
1226+ for i , vhash := range vhashes {
1227+ // If already filled by a previous fetch, skip
1228+ if blobs [i ] != nil {
1229+ continue
1230+ }
1231+ // Unfilled, retrieve the datastore item (in a short lock)
1232+ p .lock .RLock ()
1233+ id , exists := p .lookup .storeidOfBlob (vhash )
1234+ if ! exists {
1235+ p .lock .RUnlock ()
1236+ continue
1237+ }
1238+ data , err := p .store .Get (id )
1239+ p .lock .RUnlock ()
1240+
1241+ // After releasing the lock, try to fill any blobs requested
1242+ if err != nil {
1243+ log .Error ("Tracked blob transaction missing from store" , "id" , id , "err" , err )
1244+ continue
1245+ }
1246+ item := new (types.Transaction )
1247+ if err = rlp .DecodeBytes (data , item ); err != nil {
1248+ log .Error ("Blobs corrupted for traced transaction" , "id" , id , "err" , err )
1249+ continue
1250+ }
1251+ // Fill anything requested, not just the current versioned hash
1252+ sidecar := item .BlobTxSidecar ()
1253+ for j , blobhash := range item .BlobHashes () {
1254+ if idx , ok := index [blobhash ]; ok {
1255+ blobs [idx ] = & sidecar .Blobs [j ]
1256+ proofs [idx ] = & sidecar .Proofs [j ]
1257+ }
1258+ }
1259+ }
1260+ return blobs , proofs
1261+ }
1262+
12091263// Add inserts a set of blob transactions into the pool if they pass validation (both
12101264// consensus validity and pool restrictions).
12111265func (p * BlobPool ) Add (txs []* types.Transaction , local bool , sync bool ) []error {
@@ -1319,8 +1373,8 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
13191373 p .spent [from ] = new (uint256.Int ).Sub (p .spent [from ], prev .costCap )
13201374 p .spent [from ] = new (uint256.Int ).Add (p .spent [from ], meta .costCap )
13211375
1322- delete ( p .lookup , prev . hash )
1323- p .lookup [ meta . hash ] = meta . id
1376+ p .lookup . untrack ( prev )
1377+ p .lookup . track ( meta )
13241378 p .stored += uint64 (meta .size ) - uint64 (prev .size )
13251379 } else {
13261380 // Transaction extends previously scheduled ones
@@ -1330,7 +1384,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
13301384 newacc = true
13311385 }
13321386 p .spent [from ] = new (uint256.Int ).Add (p .spent [from ], meta .costCap )
1333- p .lookup [ meta . hash ] = meta . id
1387+ p .lookup . track ( meta )
13341388 p .stored += uint64 (meta .size )
13351389 }
13361390 // Recompute the rolling eviction fields. In case of a replacement, this will
@@ -1419,7 +1473,7 @@ func (p *BlobPool) drop() {
14191473 p .spent [from ] = new (uint256.Int ).Sub (p .spent [from ], drop .costCap )
14201474 }
14211475 p .stored -= uint64 (drop .size )
1422- delete ( p .lookup , drop . hash )
1476+ p .lookup . untrack ( drop )
14231477
14241478 // Remove the transaction from the pool's eviction heap:
14251479 // - If the entire account was dropped, pop off the address
0 commit comments