Skip to content

Commit 36a3d33

Browse files
authored
refactor: payment common functions (#1653)
* refactor: accountant internal functions * refactor: shared functions between meterer and accountant * refactor: formats, comments, simplfy function sig * refactor: feedbacks * refactor: common fn to get the overflow period * refactor: remove prints * fix: inclusive time bound
1 parent da8095c commit 36a3d33

File tree

6 files changed

+198
-201
lines changed

6 files changed

+198
-201
lines changed

api/clients/v2/accountant.go

Lines changed: 63 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package clients
22

33
import (
4+
"errors"
45
"fmt"
56
"math/big"
6-
"slices"
77
"sync"
88

99
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
@@ -27,8 +27,11 @@ type Accountant struct {
2727
// contains an array of period records, with length of max(MinNumBins, numBins)
2828
// numBins can be arbitrarily bigger than MinNumBins if the client wants to track more history in the cache
2929
periodRecords []PeriodRecord
30-
usageLock sync.Mutex
3130
cumulativePayment *big.Int
31+
32+
// locks for concurrent access to period records and on-demand payment
33+
periodRecordsLock sync.Mutex
34+
onDemandLock sync.Mutex
3235
}
3336

3437
// PeriodRecord contains the index of the reservation period and the usage of the period
@@ -58,75 +61,92 @@ func NewAccountant(accountID gethcommon.Address, reservation *core.ReservedPayme
5861
return &a
5962
}
6063

61-
// blobPaymentInfo calculates and records payment information. The accountant
62-
// will attempt to use the active reservation first and check for quorum settings,
63-
// then on-demand if the reservation is not available. It takes in a timestamp at
64-
// the current UNIX time in nanoseconds, and returns a cumulative payment for on-
65-
// demand payments in units of wei. Both timestamp and cumulative payment are used
66-
// to create the payment header and signature, with non-zero cumulative payment
67-
// indicating on-demand payment.
68-
// These generated values are used to create the payment header and signature, as specified in
69-
// api/proto/common/v2/common_v2.proto
70-
func (a *Accountant) blobPaymentInfo(
71-
numSymbols uint64,
64+
// reservationUsage attempts to use the reservation for the given request.
65+
func (a *Accountant) reservationUsage(
66+
symbolUsage uint64,
7267
quorumNumbers []uint8,
73-
timestamp int64) (*big.Int, error) {
68+
timestamp int64) error {
69+
if err := meterer.ValidateQuorum(quorumNumbers, a.reservation.QuorumNumbers); err != nil {
70+
return err
71+
}
72+
if !a.reservation.IsActiveByNanosecond(timestamp) {
73+
return fmt.Errorf("reservation is not active at timestamp %d", timestamp)
74+
}
75+
7476
reservationWindow := a.reservationWindow
7577
currentReservationPeriod := meterer.GetReservationPeriodByNanosecond(timestamp, reservationWindow)
76-
symbolUsage := a.symbolsCharged(numSymbols)
7778

78-
a.usageLock.Lock()
79-
defer a.usageLock.Unlock()
79+
a.periodRecordsLock.Lock()
80+
defer a.periodRecordsLock.Unlock()
8081
relativePeriodRecord := a.getOrRefreshRelativePeriodRecord(currentReservationPeriod, reservationWindow)
8182
relativePeriodRecord.Usage += symbolUsage
8283

83-
// first attempt to use the active reservation
84-
binLimit := a.reservation.SymbolsPerSecond * uint64(a.reservationWindow)
84+
// Check if we can use the reservation within the bin limit
85+
binLimit := meterer.GetReservationBinLimit(a.reservation, a.reservationWindow)
8586
if relativePeriodRecord.Usage <= binLimit {
86-
if err := QuorumCheck(quorumNumbers, a.reservation.QuorumNumbers); err != nil {
87-
return big.NewInt(0), err
88-
}
89-
return big.NewInt(0), nil
87+
return nil
9088
}
9189

92-
overflowPeriodRecord := a.getOrRefreshRelativePeriodRecord(currentReservationPeriod+2*reservationWindow, reservationWindow)
90+
overflowPeriodRecord := a.getOrRefreshRelativePeriodRecord(meterer.GetOverflowPeriod(currentReservationPeriod, reservationWindow), reservationWindow)
9391
// Allow one overflow when the overflow bin is empty, the current usage and new length are both less than the limit
9492
if overflowPeriodRecord.Usage == 0 && relativePeriodRecord.Usage-symbolUsage < binLimit && symbolUsage <= binLimit {
95-
if err := QuorumCheck(quorumNumbers, a.reservation.QuorumNumbers); err != nil {
96-
return big.NewInt(0), err
97-
}
9893
overflowPeriodRecord.Usage += relativePeriodRecord.Usage - binLimit
99-
return big.NewInt(0), nil
94+
return nil
10095
}
10196

102-
// reservation not available, rollback reservation records, attempt on-demand
103-
//todo: rollback on-demand if disperser respond with some type of rejection?
97+
// Reservation not sufficient for the request, rollback the usage
10498
relativePeriodRecord.Usage -= symbolUsage
105-
incrementRequired := big.NewInt(int64(a.paymentCharged(numSymbols)))
99+
return errors.New("insufficient reservation")
100+
}
101+
102+
// onDemandUsage attempts to use on-demand payment for the given request.
103+
// Returns the cumulative payment if successful, or an error if on-demand cannot be used.
104+
func (a *Accountant) onDemandUsage(symbolUsage uint64, quorumNumbers []uint8) (*big.Int, error) {
105+
if err := meterer.ValidateQuorum(quorumNumbers, requiredQuorums); err != nil {
106+
return nil, err
107+
}
108+
109+
a.onDemandLock.Lock()
110+
defer a.onDemandLock.Unlock()
111+
112+
incrementRequired := meterer.PaymentCharged(symbolUsage, a.pricePerSymbol)
113+
resultingPayment := new(big.Int).Add(a.cumulativePayment, incrementRequired)
106114

107-
resultingPayment := big.NewInt(0)
108-
resultingPayment.Add(a.cumulativePayment, incrementRequired)
109115
if resultingPayment.Cmp(a.onDemand.CumulativePayment) <= 0 {
110-
if err := QuorumCheck(quorumNumbers, requiredQuorums); err != nil {
111-
return big.NewInt(0), err
112-
}
113116
a.cumulativePayment.Add(a.cumulativePayment, incrementRequired)
114117
return a.cumulativePayment, nil
115118
}
116-
return big.NewInt(0), fmt.Errorf(
117-
"invalid payments: no available bandwidth reservation found for account %s, and current cumulativePayment balance insufficient "+
118-
"to make an on-demand dispersal. Consider increasing reservation or cumulative payment on-chain. For more details, see https://docs.eigenda.xyz/core-concepts/payments#disperser-client-requirements", a.accountID.Hex())
119+
120+
return nil, errors.New("insufficient ondemand payment")
119121
}
120122

121-
// AccountBlob accountant provides and records payment information
123+
// AccountBlob accountant generates payment information for a request. The accountant
124+
// takes in a timestamp at the current UNIX time in nanoseconds, number of symbols of the request,
125+
// and the quorums to disperse the request to. It will attempt to use the active reservation first
126+
// and then on-demand if the reservation is not available or insufficient for the request.
127+
// It returns a payment metadata object that will be used to create the payment header and signature,
128+
// as specified in api/proto/common/v2/common_v2.proto
122129
func (a *Accountant) AccountBlob(
123130
timestamp int64,
124131
numSymbols uint64,
125132
quorums []uint8) (*core.PaymentMetadata, error) {
126133

127-
cumulativePayment, err := a.blobPaymentInfo(numSymbols, quorums, timestamp)
134+
symbolUsage := meterer.SymbolsCharged(numSymbols, a.minNumSymbols)
135+
136+
// Always try to use reservation first
137+
err := a.reservationUsage(symbolUsage, quorums, timestamp)
138+
if err == nil {
139+
return &core.PaymentMetadata{
140+
AccountID: a.accountID,
141+
Timestamp: timestamp,
142+
CumulativePayment: big.NewInt(0),
143+
}, nil
144+
}
145+
146+
// Fall back to on-demand payment if reservation fails
147+
cumulativePayment, err := a.onDemandUsage(symbolUsage, quorums)
128148
if err != nil {
129-
return nil, err
149+
return nil, fmt.Errorf("cannot create payment information for reservation or on-demand. Consider depositing more eth to the PaymentVault contract for your account. For more details, see https://docs.eigenda.xyz/core-concepts/payments#disperser-client-requirements. Account: %s, Error: %w", a.accountID.Hex(), err)
130150
}
131151

132152
pm := &core.PaymentMetadata{
@@ -138,22 +158,6 @@ func (a *Accountant) AccountBlob(
138158
return pm, nil
139159
}
140160

141-
// TODO: paymentCharged and symbolsCharged copied from meterer, should be refactored
142-
// paymentCharged returns the chargeable price for a given data length
143-
func (a *Accountant) paymentCharged(numSymbols uint64) uint64 {
144-
return a.symbolsCharged(numSymbols) * a.pricePerSymbol
145-
}
146-
147-
// symbolsCharged returns the number of symbols charged for a given data length
148-
// being at least MinNumSymbols or the nearest rounded-up multiple of MinNumSymbols.
149-
func (a *Accountant) symbolsCharged(numSymbols uint64) uint64 {
150-
if numSymbols <= a.minNumSymbols {
151-
return a.minNumSymbols
152-
}
153-
// Round up to the nearest multiple of MinNumSymbols
154-
return core.RoundUpDivide(numSymbols, a.minNumSymbols) * a.minNumSymbols
155-
}
156-
157161
// getOrRefreshRelativePeriodRecord returns the period record for the given index (which is in seconds and is the multiple of the reservation window),
158162
// wrapping around the circular buffer and clearing the record if the index is greater than the number of bins
159163
func (a *Accountant) getOrRefreshRelativePeriodRecord(index uint64, reservationWindow uint64) *PeriodRecord {
@@ -176,7 +180,7 @@ func (a *Accountant) getOrRefreshRelativePeriodRecord(index uint64, reservationW
176180
// account level on/off-chain state. If on-chain fields are not present, we use
177181
// dummy values that disable accountant from using the corresponding payment method.
178182
// If off-chain fields are not present, we assume the account has no payment history
179-
// and set accoutant state to use initial values.
183+
// and set accountant state to use initial values.
180184
func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentStateReply) error {
181185
if paymentState == nil {
182186
return fmt.Errorf("payment state cannot be nil")
@@ -244,16 +248,3 @@ func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentState
244248
a.periodRecords = periodRecords
245249
return nil
246250
}
247-
248-
// QuorumCheck eagerly returns error if the check finds a quorum number not an element of the allowed quorum numbers
249-
func QuorumCheck(quorumNumbers []uint8, allowedNumbers []uint8) error {
250-
if len(quorumNumbers) == 0 {
251-
return fmt.Errorf("no quorum numbers provided")
252-
}
253-
for _, quorum := range quorumNumbers {
254-
if !slices.Contains(allowedNumbers, quorum) {
255-
return fmt.Errorf("provided quorum number %v not allowed", quorum)
256-
}
257-
}
258-
return nil
259-
}

0 commit comments

Comments
 (0)