Skip to content

Commit afbef90

Browse files
authored
refactor: core meterer period record module (#1661)
* refactor: period record module in core/meterer * refactor: fmt and docs
1 parent f545b16 commit afbef90

File tree

3 files changed

+779
-1
lines changed

3 files changed

+779
-1
lines changed

core/meterer/meterer.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,10 @@ func GetOverflowPeriod(reservationPeriod uint64, reservationWindow uint64) uint6
284284

285285
// PaymentCharged returns the chargeable price for a given number of symbols
286286
func PaymentCharged(numSymbols, pricePerSymbol uint64) *big.Int {
287-
return new(big.Int).Mul(big.NewInt(int64(numSymbols)), big.NewInt(int64(pricePerSymbol)))
287+
// directly convert to uint64 to avoid overflow
288+
numSymbolsInt := new(big.Int).SetUint64(numSymbols)
289+
pricePerSymbolInt := new(big.Int).SetUint64(pricePerSymbol)
290+
return new(big.Int).Mul(numSymbolsInt, pricePerSymbolInt)
288291
}
289292

290293
// SymbolsCharged returns the number of symbols charged for a given data length
@@ -293,6 +296,9 @@ func SymbolsCharged(numSymbols uint64, minSymbols uint64) uint64 {
293296
if numSymbols <= minSymbols {
294297
return minSymbols
295298
}
299+
if minSymbols == 0 {
300+
return numSymbols
301+
}
296302
// Round up to the nearest multiple of MinNumSymbols
297303
roundedUp := core.RoundUpDivide(numSymbols, minSymbols) * minSymbols
298304
// Check for overflow; this case should never happen

core/meterer/period_record.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package meterer
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
8+
"github.com/Layr-Labs/eigenda/core"
9+
)
10+
11+
// QuorumPeriodRecords is a map of quorum number to a slice of period records
12+
type QuorumPeriodRecords map[core.QuorumID][]*PeriodRecord
13+
14+
// PeriodRecord contains the index of the reservation period and the usage of the period
15+
type PeriodRecord struct {
16+
// Index is start timestamp of the period in seconds; it is always a multiple of the reservation window
17+
Index uint32
18+
// Usage is the usage of the period in symbols
19+
Usage uint64
20+
}
21+
22+
// updateRecord tracks a successful update for rollback purposes
23+
type UpdateRecord struct {
24+
QuorumNumber core.QuorumID
25+
Period uint64
26+
Usage uint64
27+
}
28+
29+
// GetRelativePeriodRecord returns the period record for the given index and quorum number; if the record does not exist, it is initialized to 0
30+
func (pr QuorumPeriodRecords) GetRelativePeriodRecord(index uint64, quorumNumber core.QuorumID) *PeriodRecord {
31+
if _, exists := pr[quorumNumber]; !exists {
32+
pr[quorumNumber] = make([]*PeriodRecord, MinNumBins)
33+
}
34+
relativeIndex := uint32(index % uint64(MinNumBins))
35+
if pr[quorumNumber][relativeIndex] == nil {
36+
pr[quorumNumber][relativeIndex] = &PeriodRecord{
37+
Index: uint32(index),
38+
Usage: 0,
39+
}
40+
}
41+
return pr[quorumNumber][relativeIndex]
42+
}
43+
44+
// UpdateUsage attempts to update the usage for a quorum's reservation period
45+
// Returns error if the update would exceed the bin limit and cannot use overflow bin
46+
//
47+
// The function maintains a fixed-size circular buffer of numBins slots
48+
// to track usage across an unbounded sequence of time periods by mapping each
49+
// "absolute" period index onto a "relative" buffer index via modular arithmetic.
50+
//
51+
// Incoming timestamps are first bucketed into discrete reservation periods of
52+
// length reservationWindow, yielding an integer period index. When a request
53+
// for period p arrives, the system computes its buffer slot i = p mod numBins;
54+
// if the stored period at slot i differs from p, the slot is reset (index
55+
// updated, usage cleared) before accumulating usage.
56+
//
57+
// Controlled overflow allows unused capacity in a full bin to spill into future
58+
// bins under strict conditions, and a sliding valid-period window ensures only
59+
// recent periods are accepted.
60+
func (pr QuorumPeriodRecords) UpdateUsage(
61+
quorumNumber core.QuorumID,
62+
timestamp int64,
63+
numSymbols uint64,
64+
reservation *core.ReservedPayment,
65+
protocolConfig *core.PaymentQuorumProtocolConfig,
66+
) error {
67+
if reservation == nil {
68+
return errors.New("reservation cannot be nil")
69+
}
70+
if protocolConfig == nil {
71+
return errors.New("protocolConfig cannot be nil")
72+
}
73+
74+
symbolUsage := SymbolsCharged(numSymbols, protocolConfig.MinNumSymbols)
75+
binLimit := GetReservationBinLimit(reservation, protocolConfig.ReservationRateLimitWindow)
76+
77+
if symbolUsage > binLimit {
78+
return errors.New("symbol usage exceeds bin limit")
79+
}
80+
81+
currentPeriod := GetReservationPeriodByNanosecond(timestamp, protocolConfig.ReservationRateLimitWindow)
82+
relativePeriodRecord := pr.GetRelativePeriodRecord(currentPeriod, quorumNumber)
83+
oldUsage := relativePeriodRecord.Usage
84+
relativePeriodRecord.Usage += symbolUsage
85+
86+
// within the bin limit
87+
if relativePeriodRecord.Usage <= binLimit {
88+
return nil
89+
}
90+
91+
if oldUsage >= binLimit {
92+
return fmt.Errorf("reservation limit exceeded for quorum %d", quorumNumber)
93+
}
94+
95+
// overflow bin if we're over the limit
96+
overflowPeriod := GetOverflowPeriod(currentPeriod, protocolConfig.ReservationRateLimitWindow)
97+
overflowPeriodRecord := pr.GetRelativePeriodRecord(overflowPeriod, quorumNumber)
98+
if overflowPeriodRecord.Usage == 0 {
99+
overflowPeriodRecord.Usage += relativePeriodRecord.Usage - binLimit
100+
relativePeriodRecord.Usage = binLimit
101+
return nil
102+
}
103+
104+
return fmt.Errorf("reservation limit exceeded for quorum %d", quorumNumber)
105+
}
106+
107+
// Make a deep copy of the period records
108+
func (pr QuorumPeriodRecords) DeepCopy() QuorumPeriodRecords {
109+
copied := make(QuorumPeriodRecords)
110+
for quorumNumber, records := range pr {
111+
copied[quorumNumber] = make([]*PeriodRecord, len(records))
112+
for i, record := range records {
113+
if record != nil {
114+
// Create a new PeriodRecord with the same values
115+
copied[quorumNumber][i] = &PeriodRecord{
116+
Index: record.Index,
117+
Usage: record.Usage,
118+
}
119+
}
120+
}
121+
}
122+
return copied
123+
}
124+
125+
// FromProtoRecords converts protobuf period records to QuorumPeriodRecords
126+
func FromProtoRecords(protoRecords map[uint32]*disperser_rpc.PeriodRecords) QuorumPeriodRecords {
127+
records := make(QuorumPeriodRecords)
128+
for quorumNumber, protoRecord := range protoRecords {
129+
records[core.QuorumID(quorumNumber)] = make([]*PeriodRecord, MinNumBins)
130+
// Initialize all records to 0
131+
for i := range records[core.QuorumID(quorumNumber)] {
132+
records[core.QuorumID(quorumNumber)][i] = &PeriodRecord{
133+
Index: uint32(i),
134+
Usage: 0,
135+
}
136+
}
137+
// Populate with values from server
138+
for _, record := range protoRecord.GetRecords() {
139+
idx := record.Index % uint32(MinNumBins)
140+
records[core.QuorumID(quorumNumber)][idx] = &PeriodRecord{
141+
Index: record.Index,
142+
Usage: record.Usage,
143+
}
144+
}
145+
}
146+
return records
147+
}

0 commit comments

Comments
 (0)