Skip to content

Commit ed89588

Browse files
authored
feat: GetPaymentStateForAllQuorums api impl (#1664)
* feat: GetPaymentStateForAllQuorums impl * refactor: protobuf to native payment type conversions * refactor: revert GetPaymentState in server, update log level * refactor: accountant comments and happy path update * fix: accountant unit test; lint * chore: comment * refactor: validate reservation timestamps consistency, deprecate comment * refactor: zero value warn log levels * refactor: rm ValidateReservations parameter comment * chore: fmt * refactor: GetPaymentStateForAllQuorums fail at error in chain read
1 parent 5433572 commit ed89588

File tree

10 files changed

+1278
-1620
lines changed

10 files changed

+1278
-1620
lines changed

api/clients/v2/accountant.go

Lines changed: 101 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -5,115 +5,105 @@ import (
55
"fmt"
66
"math/big"
77
"sync"
8+
"time"
89

9-
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
1010
"github.com/Layr-Labs/eigenda/core"
1111
"github.com/Layr-Labs/eigenda/core/meterer"
1212
gethcommon "github.com/ethereum/go-ethereum/common"
1313
)
1414

15-
var requiredQuorums = []core.QuorumID{0, 1}
16-
1715
type Accountant struct {
1816
// on-chain states
19-
accountID gethcommon.Address
20-
reservation *core.ReservedPayment
21-
onDemand *core.OnDemandPayment
22-
reservationWindow uint64
23-
pricePerSymbol uint64
24-
minNumSymbols uint64
17+
accountID gethcommon.Address
18+
reservations map[core.QuorumID]*core.ReservedPayment
19+
// OnDemand is initially only enabled on quorum 0. Accountant must be updated to be quorum specific
20+
// after the protocol decides to support onDemand on custom quorums and decentralized ratelimiting.
21+
onDemand *core.OnDemandPayment
22+
23+
paymentVaultParams *meterer.PaymentVaultParams
2524

2625
// local accounting
27-
// contains an array of period records, with length of max(MinNumBins, numBins)
28-
// numBins can be arbitrarily bigger than MinNumBins if the client wants to track more history in the cache
29-
periodRecords []PeriodRecord
26+
// contains a fixed meterer.MinNumBins bins per quorum
27+
periodRecords meterer.QuorumPeriodRecords
3028
cumulativePayment *big.Int
3129

32-
// locks for concurrent access to period records and on-demand payment
30+
// locks for concurrent access to period records
3331
periodRecordsLock sync.Mutex
34-
onDemandLock sync.Mutex
32+
// lock for concurrent access to on-demand payment
33+
onDemandLock sync.Mutex
3534
}
3635

37-
// PeriodRecord contains the index of the reservation period and the usage of the period
38-
type PeriodRecord struct {
39-
// Index is start timestamp of the period in seconds; it is always a multiple of the reservation window
40-
Index uint32
41-
// Usage is the usage of the period in symbols
42-
Usage uint64
43-
}
44-
45-
func NewAccountant(accountID gethcommon.Address, reservation *core.ReservedPayment, onDemand *core.OnDemandPayment, reservationWindow uint64, pricePerSymbol uint64, minNumSymbols uint64, numBins uint32) *Accountant {
46-
periodRecords := make([]PeriodRecord, max(numBins, uint32(meterer.MinNumBins)))
47-
for i := range periodRecords {
48-
periodRecords[i] = PeriodRecord{Index: uint32(i), Usage: 0}
49-
}
50-
a := Accountant{
51-
accountID: accountID,
52-
reservation: reservation,
53-
onDemand: onDemand,
54-
reservationWindow: reservationWindow,
55-
pricePerSymbol: pricePerSymbol,
56-
minNumSymbols: minNumSymbols,
36+
// NewAccountant initializes an accountant with the given account ID. The accountant must call SetPaymentState to populate the state.
37+
func NewAccountant(accountID gethcommon.Address) *Accountant {
38+
reservations := make(map[core.QuorumID]*core.ReservedPayment)
39+
onDemand := &core.OnDemandPayment{
40+
CumulativePayment: big.NewInt(0),
41+
}
42+
periodRecords := make(meterer.QuorumPeriodRecords)
43+
return &Accountant{
44+
accountID: accountID,
45+
reservations: reservations,
46+
onDemand: onDemand,
47+
paymentVaultParams: &meterer.PaymentVaultParams{
48+
QuorumPaymentConfigs: make(map[uint8]*core.PaymentQuorumConfig),
49+
QuorumProtocolConfigs: make(map[uint8]*core.PaymentQuorumProtocolConfig),
50+
OnDemandQuorumNumbers: make([]uint8, 0),
51+
},
5752
periodRecords: periodRecords,
5853
cumulativePayment: big.NewInt(0),
5954
}
60-
// TODO: add a routine to refresh the on-chain state occasionally?
61-
return &a
6255
}
6356

64-
// reservationUsage attempts to use the reservation for the given request.
65-
func (a *Accountant) reservationUsage(
66-
symbolUsage uint64,
67-
quorumNumbers []uint8,
68-
timestamp int64) error {
69-
if err := meterer.ValidateQuorum(quorumNumbers, a.reservation.QuorumNumbers); err != nil {
57+
// ReservationUsage attempts to use the reservation for the requested quorums; if any quorum fails to use the reservation, the entire operation is rolled back.
58+
func (a *Accountant) reservationUsage(numSymbols uint64, quorumNumbers []core.QuorumID, paymentHeaderTimestampNs int64) error {
59+
if err := meterer.ValidateReservations(a.reservations, a.paymentVaultParams.QuorumProtocolConfigs, quorumNumbers, paymentHeaderTimestampNs, time.Now().UnixNano()); err != nil {
7060
return err
7161
}
72-
if !a.reservation.IsActiveByNanosecond(timestamp) {
73-
return fmt.Errorf("reservation is not active at timestamp %d", timestamp)
74-
}
75-
76-
reservationWindow := a.reservationWindow
77-
currentReservationPeriod := meterer.GetReservationPeriodByNanosecond(timestamp, reservationWindow)
7862

7963
a.periodRecordsLock.Lock()
8064
defer a.periodRecordsLock.Unlock()
81-
relativePeriodRecord := a.getOrRefreshRelativePeriodRecord(currentReservationPeriod, reservationWindow)
82-
relativePeriodRecord.Usage += symbolUsage
8365

84-
// Check if we can use the reservation within the bin limit
85-
binLimit := meterer.GetReservationBinLimit(a.reservation, a.reservationWindow)
86-
if relativePeriodRecord.Usage <= binLimit {
87-
return nil
88-
}
66+
periodRecordsCopy := a.periodRecords.DeepCopy()
8967

90-
overflowPeriodRecord := a.getOrRefreshRelativePeriodRecord(meterer.GetOverflowPeriod(currentReservationPeriod, reservationWindow), reservationWindow)
91-
// Allow one overflow when the overflow bin is empty, the current usage and new length are both less than the limit
92-
if overflowPeriodRecord.Usage == 0 && relativePeriodRecord.Usage-symbolUsage < binLimit && symbolUsage <= binLimit {
93-
overflowPeriodRecord.Usage += relativePeriodRecord.Usage - binLimit
94-
return nil
68+
for _, quorumNumber := range quorumNumbers {
69+
reservation, exists := a.reservations[quorumNumber]
70+
if !exists {
71+
// this case should never happen because ValidateReservations should have already checked this; handle it just in case
72+
return fmt.Errorf("reservation not found for quorum %d", quorumNumber)
73+
}
74+
_, protocolConfig, err := a.paymentVaultParams.GetQuorumConfigs(quorumNumber)
75+
if err != nil {
76+
return err
77+
}
78+
if err := periodRecordsCopy.UpdateUsage(quorumNumber, paymentHeaderTimestampNs, numSymbols, reservation, protocolConfig); err != nil {
79+
return err
80+
}
9581
}
9682

97-
// Reservation not sufficient for the request, rollback the usage
98-
relativePeriodRecord.Usage -= symbolUsage
99-
return errors.New("insufficient reservation")
83+
a.periodRecords = periodRecordsCopy
84+
return nil
10085
}
10186

10287
// onDemandUsage attempts to use on-demand payment for the given request.
10388
// Returns the cumulative payment if successful, or an error if on-demand cannot be used.
104-
func (a *Accountant) onDemandUsage(symbolUsage uint64, quorumNumbers []uint8) (*big.Int, error) {
105-
if err := meterer.ValidateQuorum(quorumNumbers, requiredQuorums); err != nil {
89+
func (a *Accountant) onDemandUsage(numSymbols uint64, quorumNumbers []core.QuorumID) (*big.Int, error) {
90+
if err := meterer.ValidateQuorum(quorumNumbers, a.paymentVaultParams.OnDemandQuorumNumbers); err != nil {
10691
return nil, err
10792
}
10893

94+
paymentQuorumConfig, protocolConfig, err := a.paymentVaultParams.GetQuorumConfigs(meterer.OnDemandQuorumID)
95+
if err != nil {
96+
return nil, err
97+
}
98+
symbolsCharged := meterer.SymbolsCharged(numSymbols, protocolConfig.MinNumSymbols)
99+
paymentCharged := meterer.PaymentCharged(symbolsCharged, paymentQuorumConfig.OnDemandPricePerSymbol)
100+
109101
a.onDemandLock.Lock()
110102
defer a.onDemandLock.Unlock()
111-
112-
incrementRequired := meterer.PaymentCharged(symbolUsage, a.pricePerSymbol)
113-
resultingPayment := new(big.Int).Add(a.cumulativePayment, incrementRequired)
114-
103+
// calculate the increment required to add to the cumulative payment
104+
resultingPayment := new(big.Int).Add(a.cumulativePayment, paymentCharged)
115105
if resultingPayment.Cmp(a.onDemand.CumulativePayment) <= 0 {
116-
a.cumulativePayment.Add(a.cumulativePayment, incrementRequired)
106+
a.cumulativePayment.Add(a.cumulativePayment, paymentCharged)
117107
return a.cumulativePayment, nil
118108
}
119109

@@ -130,12 +120,16 @@ func (a *Accountant) AccountBlob(
130120
timestamp int64,
131121
numSymbols uint64,
132122
quorums []uint8) (*core.PaymentMetadata, error) {
133-
134-
symbolUsage := meterer.SymbolsCharged(numSymbols, a.minNumSymbols)
123+
if len(quorums) == 0 {
124+
return nil, fmt.Errorf("no quorums provided")
125+
}
126+
if numSymbols == 0 {
127+
return nil, fmt.Errorf("zero symbols requested")
128+
}
135129

136130
// Always try to use reservation first
137-
err := a.reservationUsage(symbolUsage, quorums, timestamp)
138-
if err == nil {
131+
rezErr := a.reservationUsage(numSymbols, quorums, timestamp)
132+
if rezErr == nil {
139133
return &core.PaymentMetadata{
140134
AccountID: a.accountID,
141135
Timestamp: timestamp,
@@ -144,9 +138,9 @@ func (a *Accountant) AccountBlob(
144138
}
145139

146140
// Fall back to on-demand payment if reservation fails
147-
cumulativePayment, err := a.onDemandUsage(symbolUsage, quorums)
148-
if err != nil {
149-
return nil, fmt.Errorf("cannot create payment information for reservation or on-demand. Consider depositing more eth to the PaymentVault contract for your account. For more details, see https://docs.eigenda.xyz/core-concepts/payments#disperser-client-requirements. Account: %s, Error: %w", a.accountID.Hex(), err)
141+
cumulativePayment, onDemandErr := a.onDemandUsage(numSymbols, quorums)
142+
if onDemandErr != nil {
143+
return nil, fmt.Errorf("cannot create payment information for reservation or on-demand. Consider depositing more eth to the PaymentVault contract for your account. For more details, see https://docs.eigenda.xyz/core-concepts/payments#disperser-client-requirements. Account: %s, Reservation Error: %w, On-demand Error: %w", a.accountID.Hex(), rezErr, onDemandErr)
150144
}
151145

152146
pm := &core.PaymentMetadata{
@@ -158,93 +152,51 @@ func (a *Accountant) AccountBlob(
158152
return pm, nil
159153
}
160154

161-
// getOrRefreshRelativePeriodRecord returns the period record for the given index (which is in seconds and is the multiple of the reservation window),
162-
// wrapping around the circular buffer and clearing the record if the index is greater than the number of bins
163-
func (a *Accountant) getOrRefreshRelativePeriodRecord(index uint64, reservationWindow uint64) *PeriodRecord {
164-
relativeIndex := uint32((index / reservationWindow) % uint64(len(a.periodRecords)))
165-
if relativeIndex >= uint32(len(a.periodRecords)) {
166-
panic(fmt.Sprintf("relativeIndex %d is greater than the number of bins %d cached", relativeIndex, len(a.periodRecords)))
167-
}
168-
if a.periodRecords[relativeIndex].Index < uint32(index) {
169-
a.periodRecords[relativeIndex] = PeriodRecord{
170-
Index: uint32(index),
171-
Usage: 0,
172-
}
173-
}
174-
175-
return &a.periodRecords[relativeIndex]
176-
}
177-
178-
// SetPaymentState sets the accountant's state from the disperser's response
179-
// We require disperser to return a valid set of global parameters, but optional
180-
// account level on/off-chain state. If on-chain fields are not present, we use
181-
// dummy values that disable accountant from using the corresponding payment method.
182-
// If off-chain fields are not present, we assume the account has no payment history
183-
// and set accountant state to use initial values.
184-
func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentStateReply) error {
185-
if paymentState == nil {
186-
return fmt.Errorf("payment state cannot be nil")
187-
} else if paymentState.GetPaymentGlobalParams() == nil {
188-
return fmt.Errorf("payment global params cannot be nil")
189-
}
190-
191-
a.minNumSymbols = paymentState.GetPaymentGlobalParams().GetMinNumSymbols()
192-
a.pricePerSymbol = paymentState.GetPaymentGlobalParams().GetPricePerSymbol()
193-
a.reservationWindow = paymentState.GetPaymentGlobalParams().GetReservationWindow()
194-
195-
if paymentState.GetOnchainCumulativePayment() == nil {
155+
// SetPaymentState sets the accountant's state, requiring valid payment vault parameters, but
156+
// optional account level on/off-chain state. If on-chain fields are not present, we use dummy
157+
// values that disable accountant from using the corresponding payment method. If off-chain
158+
// fields are not present, we assume the account has no payment history and set accountant state
159+
// to use initial values.
160+
func (a *Accountant) SetPaymentState(
161+
paymentVaultParams *meterer.PaymentVaultParams,
162+
reservations map[core.QuorumID]*core.ReservedPayment,
163+
cumulativePayment *big.Int,
164+
onchainCumulativePayment *big.Int,
165+
periodRecords meterer.QuorumPeriodRecords,
166+
) error {
167+
if paymentVaultParams == nil {
168+
return fmt.Errorf("payment vault params cannot be nil")
169+
}
170+
171+
a.paymentVaultParams = paymentVaultParams
172+
173+
if onchainCumulativePayment == nil {
196174
a.onDemand = &core.OnDemandPayment{
197175
CumulativePayment: big.NewInt(0),
198176
}
199177
} else {
200178
a.onDemand = &core.OnDemandPayment{
201-
CumulativePayment: new(big.Int).SetBytes(paymentState.GetOnchainCumulativePayment()),
179+
CumulativePayment: new(big.Int).Set(onchainCumulativePayment),
202180
}
203181
}
204182

205-
if paymentState.GetCumulativePayment() == nil {
183+
if cumulativePayment == nil {
206184
a.cumulativePayment = big.NewInt(0)
207185
} else {
208-
a.cumulativePayment = new(big.Int).SetBytes(paymentState.GetCumulativePayment())
186+
a.cumulativePayment = new(big.Int).Set(cumulativePayment)
209187
}
210188

211-
if paymentState.GetReservation() == nil {
212-
a.reservation = &core.ReservedPayment{
213-
SymbolsPerSecond: 0,
214-
StartTimestamp: 0,
215-
EndTimestamp: 0,
216-
QuorumNumbers: []uint8{},
217-
QuorumSplits: []byte{},
218-
}
189+
if reservations == nil {
190+
a.reservations = make(map[core.QuorumID]*core.ReservedPayment)
191+
a.periodRecords = make(meterer.QuorumPeriodRecords)
219192
} else {
220-
quorumNumbers := make([]uint8, len(paymentState.GetReservation().GetQuorumNumbers()))
221-
for i, quorum := range paymentState.GetReservation().GetQuorumNumbers() {
222-
quorumNumbers[i] = uint8(quorum)
223-
}
224-
quorumSplits := make([]uint8, len(paymentState.GetReservation().GetQuorumSplits()))
225-
for i, quorum := range paymentState.GetReservation().GetQuorumSplits() {
226-
quorumSplits[i] = uint8(quorum)
227-
}
228-
a.reservation = &core.ReservedPayment{
229-
SymbolsPerSecond: uint64(paymentState.GetReservation().GetSymbolsPerSecond()),
230-
StartTimestamp: uint64(paymentState.GetReservation().GetStartTimestamp()),
231-
EndTimestamp: uint64(paymentState.GetReservation().GetEndTimestamp()),
232-
QuorumNumbers: quorumNumbers,
233-
QuorumSplits: quorumSplits,
234-
}
235-
}
236-
237-
periodRecords := make([]PeriodRecord, len(paymentState.GetPeriodRecords()))
238-
for i, record := range paymentState.GetPeriodRecords() {
239-
if record == nil {
240-
periodRecords[i] = PeriodRecord{Index: 0, Usage: 0}
193+
a.reservations = reservations
194+
if periodRecords == nil {
195+
a.periodRecords = make(meterer.QuorumPeriodRecords)
241196
} else {
242-
periodRecords[i] = PeriodRecord{
243-
Index: record.Index,
244-
Usage: record.Usage,
245-
}
197+
a.periodRecords = periodRecords
246198
}
247199
}
248-
a.periodRecords = periodRecords
200+
249201
return nil
250202
}

0 commit comments

Comments
 (0)