Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data/transactions/verify/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func txnGroup(stxs []transactions.SignedTxn, contextHdr *bookkeeping.BlockHeader
}

if cache != nil {
cache.Add(stxs, groupCtx)
cache.Add(groupCtx)
}

return
Expand Down Expand Up @@ -531,7 +531,7 @@ func PaysetGroups(ctx context.Context, payset [][]transactions.SignedTxn, blkHea
if verifyErr != nil {
return verifyErr
}
cache.AddPayset(txnGroups, groupCtxs)
cache.AddPayset(groupCtxs)
return nil
}, nextWorkset, worksDoneCh)
if err1 != nil {
Expand Down
6 changes: 2 additions & 4 deletions data/transactions/verify/txnBatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,10 @@ func (tbp *txnSigBatchProcessor) postProcessVerifiedJobs(ctx interface{}, failed
for i := range bl.txnGroups {
tbp.sendResult(bl.txnGroups[i], bl.backlogMessage[i], nil)
}
tbp.cache.AddPayset(bl.txnGroups, bl.groupCtxs)
tbp.cache.AddPayset(bl.groupCtxs)
return
}

verifiedTxnGroups := make([][]transactions.SignedTxn, 0, len(bl.txnGroups))
verifiedGroupCtxs := make([]*GroupContext, 0, len(bl.groupCtxs))
failedSigIdx := 0
for txgIdx := range bl.txnGroups {
Expand All @@ -280,13 +279,12 @@ func (tbp *txnSigBatchProcessor) postProcessVerifiedJobs(ctx interface{}, failed
}
var result error
if !txGroupSigFailed {
verifiedTxnGroups = append(verifiedTxnGroups, bl.txnGroups[txgIdx])
verifiedGroupCtxs = append(verifiedGroupCtxs, bl.groupCtxs[txgIdx])
} else {
result = err
}
tbp.sendResult(bl.txnGroups[txgIdx], bl.backlogMessage[txgIdx], result)
}
// loading them all at once by locking the cache once
tbp.cache.AddPayset(verifiedTxnGroups, verifiedGroupCtxs)
tbp.cache.AddPayset(verifiedGroupCtxs)
}
24 changes: 12 additions & 12 deletions data/transactions/verify/verifiedTxnCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ var errMissingPinnedEntry = &VerifiedTxnCacheError{errors.New("Missing pinned en
type VerifiedTransactionCache interface {
// Add adds a given transaction group and its associated group context to the cache. If any of the transactions already appear
// in the cache, the new entry overrides the old one.
Add(txgroup []transactions.SignedTxn, groupCtx *GroupContext)
Add(groupCtx *GroupContext)
// AddPayset works in a similar way to Add, but is intended for adding an array of transaction groups, along with their corresponding contexts.
AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext)
AddPayset(groupCtxs []*GroupContext)
// GetUnverifiedTransactionGroups compares the provided payset against the currently cached transactions and figure which transaction groups aren't fully cached.
GetUnverifiedTransactionGroups(payset [][]transactions.SignedTxn, CurrSpecAddrs transactions.SpecialAddresses, CurrProto protocol.ConsensusVersion) [][]transactions.SignedTxn
// UpdatePinned replaces the pinned entries with the one provided in the pinnedTxns map. This is typically expected to be a subset of the
Expand Down Expand Up @@ -98,18 +98,18 @@ func MakeVerifiedTransactionCache(cacheSize int) VerifiedTransactionCache {

// Add adds a given transaction group and it's associated group context to the cache. If any of the transactions already appear
// in the cache, the new entry overrides the old one.
func (v *verifiedTransactionCache) Add(txgroup []transactions.SignedTxn, groupCtx *GroupContext) {
func (v *verifiedTransactionCache) Add(groupCtx *GroupContext) {
v.bucketsLock.Lock()
defer v.bucketsLock.Unlock()
v.add(txgroup, groupCtx)
v.add(groupCtx)
}

// AddPayset works in a similar way to Add, but is intended for adding an array of transaction groups, along with their corresponding contexts.
func (v *verifiedTransactionCache) AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext) {
func (v *verifiedTransactionCache) AddPayset(groupCtxs []*GroupContext) {
v.bucketsLock.Lock()
defer v.bucketsLock.Unlock()
for i := range txgroup {
v.add(txgroup[i], groupCtxs[i])
for _, groupCtx := range groupCtxs {
v.add(groupCtx)
}
}

Expand Down Expand Up @@ -242,14 +242,14 @@ func (v *verifiedTransactionCache) Pin(txgroup []transactions.SignedTxn) (err er
}

// add is the internal implementation of Add/AddPayset which adds a transaction group to the buffer.
func (v *verifiedTransactionCache) add(txgroup []transactions.SignedTxn, groupCtx *GroupContext) {
if len(v.buckets[v.base])+len(txgroup) > v.entriesPerBucket {
func (v *verifiedTransactionCache) add(groupCtx *GroupContext) {
if len(v.buckets[v.base])+len(groupCtx.signedGroupTxns) > v.entriesPerBucket {
// move to the next bucket while deleting the content of the next bucket.
v.base = (v.base + 1) % len(v.buckets)
v.buckets[v.base] = make(map[transactions.Txid]*GroupContext, v.entriesPerBucket)
}
currentBucket := v.buckets[v.base]
for _, txn := range txgroup {
for _, txn := range groupCtx.signedGroupTxns {
currentBucket[txn.ID()] = groupCtx
}
}
Expand All @@ -261,10 +261,10 @@ type mockedCache struct {
alwaysVerified bool
}

func (v *mockedCache) Add(txgroup []transactions.SignedTxn, groupCtx *GroupContext) {
func (v *mockedCache) Add(groupCtx *GroupContext) {
}

func (v *mockedCache) AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext) {
func (v *mockedCache) AddPayset(groupCtxs []*GroupContext) {
}

func (v *mockedCache) GetUnverifiedTransactionGroups(txnGroups [][]transactions.SignedTxn, currSpecAddrs transactions.SpecialAddresses, currProto protocol.ConsensusVersion) (unverifiedGroups [][]transactions.SignedTxn) {
Expand Down
22 changes: 13 additions & 9 deletions data/transactions/verify/verifiedTxnCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestAddingToCache(t *testing.T) {
txnGroups := generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs)
groupCtx, err := PrepareGroupContext(txnGroups[0], blockHeader, nil, nil)
require.NoError(t, err)
impl.Add(txnGroups[0], groupCtx)
impl.Add(groupCtx)
// make it was added.
for _, txn := range txnGroups[0] {
ctx, has := impl.buckets[impl.base][txn.ID()]
Expand All @@ -55,12 +55,13 @@ func TestBucketCycling(t *testing.T) {
_, signedTxn, _, _ := generateTestObjects(entriesPerBucket*bucketCount*2, bucketCount, 0, 0)

require.Equal(t, entriesPerBucket*bucketCount*2, len(signedTxn))
groupCtx, err := PrepareGroupContext([]transactions.SignedTxn{signedTxn[0]}, blockHeader, nil, nil)
require.NoError(t, err)

// fill up the cache with entries.
for i := 0; i < entriesPerBucket*bucketCount; i++ {
impl.Add([]transactions.SignedTxn{signedTxn[i]}, groupCtx)
txnGroup := []transactions.SignedTxn{signedTxn[i]}
groupCtx, err := PrepareGroupContext(txnGroup, blockHeader, nil, nil)
require.NoError(t, err)
impl.Add(groupCtx)
// test to see that the base is sliding when bucket get filled up.
require.Equal(t, i/entriesPerBucket, impl.base)
}
Expand All @@ -71,7 +72,10 @@ func TestBucketCycling(t *testing.T) {

// -- all buckets are full at this point --
// add one additional item which would flush the bottom bucket.
impl.Add([]transactions.SignedTxn{signedTxn[len(signedTxn)-1]}, groupCtx)
txnGroup := []transactions.SignedTxn{signedTxn[len(signedTxn)-1]}
groupCtx, err := PrepareGroupContext(txnGroup, blockHeader, nil, nil)
require.NoError(t, err)
impl.Add(groupCtx)
require.Equal(t, 0, impl.base)
require.Equal(t, 1, len(impl.buckets[0]))
}
Expand All @@ -93,7 +97,7 @@ func TestGetUnverifiedTransactionGroups50(t *testing.T) {
expectedUnverifiedGroups = append(expectedUnverifiedGroups, txnGroups[i])
} else {
groupCtx, _ := PrepareGroupContext(txnGroups[i], blockHeader, nil, nil)
impl.Add(txnGroups[i], groupCtx)
impl.Add(groupCtx)
}
}

Expand All @@ -117,7 +121,7 @@ func BenchmarkGetUnverifiedTransactionGroups50(b *testing.B) {
queryTxnGroups = append(queryTxnGroups, txnGroups[i])
} else {
groupCtx, _ := PrepareGroupContext(txnGroups[i], blockHeader, nil, nil)
impl.Add(txnGroups[i], groupCtx)
impl.Add(groupCtx)
}
}

Expand Down Expand Up @@ -146,7 +150,7 @@ func TestUpdatePinned(t *testing.T) {
// insert some entries.
for i := 0; i < len(txnGroups); i++ {
groupCtx, _ := PrepareGroupContext(txnGroups[i], blockHeader, nil, nil)
impl.Add(txnGroups[i], groupCtx)
impl.Add(groupCtx)
}

// pin the first half.
Expand Down Expand Up @@ -175,7 +179,7 @@ func TestPinningTransactions(t *testing.T) {
// insert half of the entries.
for i := 0; i < len(txnGroups)/2; i++ {
groupCtx, _ := PrepareGroupContext(txnGroups[i], blockHeader, nil, nil)
impl.Add(txnGroups[i], groupCtx)
impl.Add(groupCtx)
}

// try to pin a previously added entry.
Expand Down
4 changes: 2 additions & 2 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func BenchmarkTxHandlerProcessing(b *testing.B) {
// vtCache is a noop VerifiedTransactionCache
type vtCache struct{}

func (vtCache) Add(txgroup []transactions.SignedTxn, groupCtx *verify.GroupContext) {}
func (vtCache) AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*verify.GroupContext) {
func (vtCache) Add(groupCtx *verify.GroupContext) {}
func (vtCache) AddPayset(groupCtxs []*verify.GroupContext) {
return
}
func (vtCache) GetUnverifiedTransactionGroups(payset [][]transactions.SignedTxn, CurrSpecAddrs transactions.SpecialAddresses, CurrProto protocol.ConsensusVersion) [][]transactions.SignedTxn {
Expand Down
Loading