Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 88 additions & 134 deletions api/clients/v2/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,115 +5,106 @@ import (
"fmt"
"math/big"
"sync"
"time"

disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/meterer"
gethcommon "github.com/ethereum/go-ethereum/common"
)

var requiredQuorums = []core.QuorumID{0, 1}

type Accountant struct {
// on-chain states
accountID gethcommon.Address
reservation *core.ReservedPayment
onDemand *core.OnDemandPayment
reservationWindow uint64
pricePerSymbol uint64
minNumSymbols uint64
accountID gethcommon.Address
reservations map[core.QuorumID]*core.ReservedPayment
onDemand *core.OnDemandPayment

paymentVaultParams *meterer.PaymentVaultParams

// local accounting
// contains an array of period records, with length of max(MinNumBins, numBins)
// numBins can be arbitrarily bigger than MinNumBins if the client wants to track more history in the cache
periodRecords []PeriodRecord
// contains 3 bins; circular wrapping of indices
periodRecords meterer.QuorumPeriodRecords
cumulativePayment *big.Int

// locks for concurrent access to period records and on-demand payment
periodRecordsLock sync.Mutex
onDemandLock sync.Mutex
}

// PeriodRecord contains the index of the reservation period and the usage of the period
type PeriodRecord struct {
// Index is start timestamp of the period in seconds; it is always a multiple of the reservation window
Index uint32
// Usage is the usage of the period in symbols
Usage uint64
}

func NewAccountant(accountID gethcommon.Address, reservation *core.ReservedPayment, onDemand *core.OnDemandPayment, reservationWindow uint64, pricePerSymbol uint64, minNumSymbols uint64, numBins uint32) *Accountant {
periodRecords := make([]PeriodRecord, max(numBins, uint32(meterer.MinNumBins)))
for i := range periodRecords {
periodRecords[i] = PeriodRecord{Index: uint32(i), Usage: 0}
}
a := Accountant{
accountID: accountID,
reservation: reservation,
onDemand: onDemand,
reservationWindow: reservationWindow,
pricePerSymbol: pricePerSymbol,
minNumSymbols: minNumSymbols,
// NewAccountant initializes an accountant with the given account ID. The accountant must call SetPaymentState to populate the state.
func NewAccountant(accountID gethcommon.Address) *Accountant {
reservations := make(map[core.QuorumID]*core.ReservedPayment)
onDemand := &core.OnDemandPayment{
CumulativePayment: big.NewInt(0),
}
periodRecords := make(meterer.QuorumPeriodRecords)
return &Accountant{
accountID: accountID,
reservations: reservations,
onDemand: onDemand,
paymentVaultParams: &meterer.PaymentVaultParams{
QuorumPaymentConfigs: make(map[uint8]*core.PaymentQuorumConfig),
QuorumProtocolConfigs: make(map[uint8]*core.PaymentQuorumProtocolConfig),
OnDemandQuorumNumbers: make([]uint8, 0),
},
periodRecords: periodRecords,
cumulativePayment: big.NewInt(0),
}
// TODO: add a routine to refresh the on-chain state occasionally?
return &a
}

// reservationUsage attempts to use the reservation for the given request.
func (a *Accountant) reservationUsage(
symbolUsage uint64,
quorumNumbers []uint8,
timestamp int64) error {
if err := meterer.ValidateQuorum(quorumNumbers, a.reservation.QuorumNumbers); err != nil {
// ReservationUsage attempts to use the reservation for the requested quorums; if any quorum fails to use the reservation, the entire operation is rolled back.
func (a *Accountant) reservationUsage(numSymbols uint64, quorumNumbers []core.QuorumID, timestamp int64) error {
// The two timestamps are the same for the accountant client for validating the reservation period; the second timestamp is the received at time for the server
if err := meterer.ValidateReservations(a.reservations, a.paymentVaultParams.QuorumProtocolConfigs, quorumNumbers, timestamp, time.Unix(0, timestamp)); err != nil {
return err
}
if !a.reservation.IsActiveByNanosecond(timestamp) {
return fmt.Errorf("reservation is not active at timestamp %d", timestamp)
}

reservationWindow := a.reservationWindow
currentReservationPeriod := meterer.GetReservationPeriodByNanosecond(timestamp, reservationWindow)

a.periodRecordsLock.Lock()
defer a.periodRecordsLock.Unlock()
relativePeriodRecord := a.getOrRefreshRelativePeriodRecord(currentReservationPeriod, reservationWindow)
relativePeriodRecord.Usage += symbolUsage

// Check if we can use the reservation within the bin limit
binLimit := meterer.GetReservationBinLimit(a.reservation, a.reservationWindow)
if relativePeriodRecord.Usage <= binLimit {
return nil
}

overflowPeriodRecord := a.getOrRefreshRelativePeriodRecord(meterer.GetOverflowPeriod(currentReservationPeriod, reservationWindow), reservationWindow)
// Allow one overflow when the overflow bin is empty, the current usage and new length are both less than the limit
if overflowPeriodRecord.Usage == 0 && relativePeriodRecord.Usage-symbolUsage < binLimit && symbolUsage <= binLimit {
overflowPeriodRecord.Usage += relativePeriodRecord.Usage - binLimit
return nil
// deep copy of periodRecords for rollback in case of errors
originalPeriodRecords := a.periodRecords.DeepCopy()

for _, quorumNumber := range quorumNumbers {
reservation, exists := a.reservations[quorumNumber]
if !exists {
// this case should never happen because ValidateReservations should have already checked this; handle it just in case
a.periodRecords = originalPeriodRecords
return fmt.Errorf("reservation not found for quorum %d", quorumNumber)
}
_, protocolConfig, err := a.paymentVaultParams.GetQuorumConfigs(quorumNumber)
if err != nil {
a.periodRecords = originalPeriodRecords
return err
}
if err := a.periodRecords.UpdateUsage(quorumNumber, timestamp, numSymbols, reservation, protocolConfig); err != nil {
a.periodRecords = originalPeriodRecords
return err
}
}

// Reservation not sufficient for the request, rollback the usage
relativePeriodRecord.Usage -= symbolUsage
return errors.New("insufficient reservation")
return nil
}

// onDemandUsage attempts to use on-demand payment for the given request.
// Returns the cumulative payment if successful, or an error if on-demand cannot be used.
func (a *Accountant) onDemandUsage(symbolUsage uint64, quorumNumbers []uint8) (*big.Int, error) {
if err := meterer.ValidateQuorum(quorumNumbers, requiredQuorums); err != nil {
func (a *Accountant) onDemandUsage(numSymbols uint64, quorumNumbers []core.QuorumID) (*big.Int, error) {
if err := meterer.ValidateQuorum(quorumNumbers, a.paymentVaultParams.OnDemandQuorumNumbers); err != nil {
return nil, err
}

paymentQuorumConfig, protocolConfig, err := a.paymentVaultParams.GetQuorumConfigs(meterer.OnDemandQuorumID)
if err != nil {
return nil, err
}
symbolsCharged := meterer.SymbolsCharged(numSymbols, protocolConfig.MinNumSymbols)
paymentCharged := meterer.PaymentCharged(symbolsCharged, paymentQuorumConfig.OnDemandPricePerSymbol)

a.onDemandLock.Lock()
defer a.onDemandLock.Unlock()

incrementRequired := meterer.PaymentCharged(symbolUsage, a.pricePerSymbol)
resultingPayment := new(big.Int).Add(a.cumulativePayment, incrementRequired)

// calculate the increment required to add to the cumulative payment
resultingPayment := new(big.Int).Add(a.cumulativePayment, paymentCharged)
if resultingPayment.Cmp(a.onDemand.CumulativePayment) <= 0 {
a.cumulativePayment.Add(a.cumulativePayment, incrementRequired)
a.cumulativePayment.Add(a.cumulativePayment, paymentCharged)
return a.cumulativePayment, nil
}

Expand All @@ -130,12 +121,16 @@ func (a *Accountant) AccountBlob(
timestamp int64,
numSymbols uint64,
quorums []uint8) (*core.PaymentMetadata, error) {

symbolUsage := meterer.SymbolsCharged(numSymbols, a.minNumSymbols)
if len(quorums) == 0 {
return nil, fmt.Errorf("no quorums provided")
}
if numSymbols == 0 {
return nil, fmt.Errorf("zero symbols requested")
}

// Always try to use reservation first
err := a.reservationUsage(symbolUsage, quorums, timestamp)
if err == nil {
rezErr := a.reservationUsage(numSymbols, quorums, timestamp)
if rezErr == nil {
return &core.PaymentMetadata{
AccountID: a.accountID,
Timestamp: timestamp,
Expand All @@ -144,9 +139,9 @@ func (a *Accountant) AccountBlob(
}

// Fall back to on-demand payment if reservation fails
cumulativePayment, err := a.onDemandUsage(symbolUsage, quorums)
if err != nil {
return nil, fmt.Errorf("cannot create payment information for reservation or on-demand. Consider depositing more eth to the PaymentVault contract for your account. For more details, see https://docs.eigenda.xyz/core-concepts/payments#disperser-client-requirements. Account: %s, Error: %w", a.accountID.Hex(), err)
cumulativePayment, onDemandErr := a.onDemandUsage(numSymbols, quorums)
if onDemandErr != nil {
return nil, fmt.Errorf("cannot create payment information for reservation or on-demand. Consider depositing more eth to the PaymentVault contract for your account. For more details, see https://docs.eigenda.xyz/core-concepts/payments#disperser-client-requirements. Account: %s, Reservation Error: %w, On-demand Error: %w", a.accountID.Hex(), rezErr, onDemandErr)
}

pm := &core.PaymentMetadata{
Expand All @@ -158,39 +153,21 @@ func (a *Accountant) AccountBlob(
return pm, nil
}

// getOrRefreshRelativePeriodRecord returns the period record for the given index (which is in seconds and is the multiple of the reservation window),
// wrapping around the circular buffer and clearing the record if the index is greater than the number of bins
func (a *Accountant) getOrRefreshRelativePeriodRecord(index uint64, reservationWindow uint64) *PeriodRecord {
relativeIndex := uint32((index / reservationWindow) % uint64(len(a.periodRecords)))
if relativeIndex >= uint32(len(a.periodRecords)) {
panic(fmt.Sprintf("relativeIndex %d is greater than the number of bins %d cached", relativeIndex, len(a.periodRecords)))
}
if a.periodRecords[relativeIndex].Index < uint32(index) {
a.periodRecords[relativeIndex] = PeriodRecord{
Index: uint32(index),
Usage: 0,
}
}

return &a.periodRecords[relativeIndex]
}

// SetPaymentState sets the accountant's state from the disperser's response
// We require disperser to return a valid set of global parameters, but optional
// account level on/off-chain state. If on-chain fields are not present, we use
// dummy values that disable accountant from using the corresponding payment method.
// If off-chain fields are not present, we assume the account has no payment history
// and set accountant state to use initial values.
func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentStateReply) error {
func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentStateForAllQuorumsReply) error {
if paymentState == nil {
return fmt.Errorf("payment state cannot be nil")
} else if paymentState.GetPaymentGlobalParams() == nil {
return fmt.Errorf("payment global params cannot be nil")
}

a.minNumSymbols = paymentState.GetPaymentGlobalParams().GetMinNumSymbols()
a.pricePerSymbol = paymentState.GetPaymentGlobalParams().GetPricePerSymbol()
a.reservationWindow = paymentState.GetPaymentGlobalParams().GetReservationWindow()
paymentVaultParams, err := meterer.PaymentVaultParamsFromProtobuf(paymentState.GetPaymentVaultParams())
if err != nil {
return err
}
a.paymentVaultParams = paymentVaultParams

if paymentState.GetOnchainCumulativePayment() == nil {
a.onDemand = &core.OnDemandPayment{
Expand All @@ -208,43 +185,20 @@ func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentState
a.cumulativePayment = new(big.Int).SetBytes(paymentState.GetCumulativePayment())
}

if paymentState.GetReservation() == nil {
a.reservation = &core.ReservedPayment{
SymbolsPerSecond: 0,
StartTimestamp: 0,
EndTimestamp: 0,
QuorumNumbers: []uint8{},
QuorumSplits: []byte{},
}
if paymentState.GetReservations() == nil {
a.reservations = make(map[core.QuorumID]*core.ReservedPayment)
} else {
quorumNumbers := make([]uint8, len(paymentState.GetReservation().GetQuorumNumbers()))
for i, quorum := range paymentState.GetReservation().GetQuorumNumbers() {
quorumNumbers[i] = uint8(quorum)
}
quorumSplits := make([]uint8, len(paymentState.GetReservation().GetQuorumSplits()))
for i, quorum := range paymentState.GetReservation().GetQuorumSplits() {
quorumSplits[i] = uint8(quorum)
}
a.reservation = &core.ReservedPayment{
SymbolsPerSecond: uint64(paymentState.GetReservation().GetSymbolsPerSecond()),
StartTimestamp: uint64(paymentState.GetReservation().GetStartTimestamp()),
EndTimestamp: uint64(paymentState.GetReservation().GetEndTimestamp()),
QuorumNumbers: quorumNumbers,
QuorumSplits: quorumSplits,
}
}

periodRecords := make([]PeriodRecord, len(paymentState.GetPeriodRecords()))
for i, record := range paymentState.GetPeriodRecords() {
if record == nil {
periodRecords[i] = PeriodRecord{Index: 0, Usage: 0}
} else {
periodRecords[i] = PeriodRecord{
Index: record.Index,
Usage: record.Usage,
a.reservations = make(map[core.QuorumID]*core.ReservedPayment)
for quorumNumber, reservation := range paymentState.GetReservations() {
quorumID := core.QuorumID(quorumNumber)
a.reservations[quorumID] = &core.ReservedPayment{
SymbolsPerSecond: reservation.GetSymbolsPerSecond(),
StartTimestamp: uint64(reservation.GetStartTimestamp()),
EndTimestamp: uint64(reservation.GetEndTimestamp()),
}
}
a.periodRecords = meterer.FromProtoRecords(paymentState.GetPeriodRecords())
}
a.periodRecords = periodRecords

return nil
}
Loading
Loading