Skip to content

Commit 7ac688d

Browse files
authored
refactor: quorum specific metrics for metered bytes (#1668)
* feat: quorum specific metrics for metered bytes * fix: unit test mock calls
1 parent 0ac8cb9 commit 7ac688d

File tree

4 files changed

+221
-61
lines changed

4 files changed

+221
-61
lines changed

core/meterer/meterer.go

Lines changed: 45 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -73,41 +73,35 @@ func (m *Meterer) Start(ctx context.Context) {
7373

7474
// MeterRequest validates a blob header and adds it to the meterer's state
7575
// TODO: return error if there's a rejection (with reasoning) or internal error (should be very rare)
76-
func (m *Meterer) MeterRequest(ctx context.Context, header core.PaymentMetadata, numSymbols uint64, quorumNumbers []uint8, receivedAt time.Time) (uint64, error) {
76+
func (m *Meterer) MeterRequest(ctx context.Context, header core.PaymentMetadata, numSymbols uint64, quorumNumbers []uint8, receivedAt time.Time) (map[core.QuorumID]uint64, error) {
7777
m.logger.Info("Validating incoming request's payment metadata", "paymentMetadata", header, "numSymbols", numSymbols, "quorumNumbers", quorumNumbers)
7878

7979
params, err := m.ChainPaymentState.GetPaymentGlobalParams()
8080
if err != nil {
81-
return 0, fmt.Errorf("failed to get payment global params: %w", err)
81+
return nil, fmt.Errorf("failed to get payment global params: %w", err)
8282
}
8383
// Validate against the payment method
8484
if !payment_logic.IsOnDemandPayment(&header) {
8585
reservations, err := m.ChainPaymentState.GetReservedPaymentByAccountAndQuorums(ctx, header.AccountID, quorumNumbers)
8686
if err != nil {
87-
return 0, fmt.Errorf("failed to get active reservation by account: %w", err)
87+
return nil, fmt.Errorf("failed to get active reservation by account: %w", err)
8888
}
89-
if err := m.serveReservationRequest(ctx, params, header, reservations, numSymbols, quorumNumbers, receivedAt); err != nil {
90-
return 0, fmt.Errorf("invalid reservation request: %w", err)
91-
}
92-
} else {
93-
onDemandPayment, err := m.ChainPaymentState.GetOnDemandPaymentByAccount(ctx, header.AccountID)
89+
usage, err := m.serveReservationRequest(ctx, params, header, reservations, numSymbols, quorumNumbers, receivedAt)
9490
if err != nil {
95-
return 0, fmt.Errorf("failed to get on-demand payment by account: %w", err)
96-
}
97-
if err := m.serveOnDemandRequest(ctx, params, header, onDemandPayment, numSymbols, quorumNumbers, receivedAt); err != nil {
98-
return 0, fmt.Errorf("invalid on-demand request: %w", err)
91+
return nil, fmt.Errorf("invalid reservation request: %w", err)
9992
}
93+
return usage, nil
10094
}
10195

102-
// TODO(hopeyen): each quorum can have different min num symbols; the returned symbolsCharged is only for used for metrics.
103-
// for now we simply return the charge for quorum 0, as quorums are likely to share the same min num symbols
104-
// we can make this more granular by adding metrics to the meterer later on
105-
_, protocolConfig, err := params.GetQuorumConfigs(OnDemandQuorumID)
96+
onDemandPayment, err := m.ChainPaymentState.GetOnDemandPaymentByAccount(ctx, header.AccountID)
10697
if err != nil {
107-
return 0, fmt.Errorf("failed to get on-demand quorum config: %w", err)
98+
return nil, fmt.Errorf("failed to get on-demand payment by account: %w", err)
10899
}
109-
symbolsCharged := payment_logic.SymbolsCharged(numSymbols, protocolConfig.MinNumSymbols)
110-
return symbolsCharged, nil
100+
usage, err := m.serveOnDemandRequest(ctx, params, header, onDemandPayment, numSymbols, quorumNumbers, receivedAt)
101+
if err != nil {
102+
return nil, fmt.Errorf("invalid on-demand request: %w", err)
103+
}
104+
return usage, nil
111105
}
112106

113107
// serveReservationRequest handles the rate limiting logic for incoming requests
@@ -119,17 +113,18 @@ func (m *Meterer) serveReservationRequest(
119113
numSymbols uint64,
120114
quorumNumbers []uint8,
121115
receivedAt time.Time,
122-
) error {
116+
) (map[core.QuorumID]uint64, error) {
123117
m.logger.Debug("Recording and validating reservation usage", "header", header, "reservation", reservations)
124118
if err := payment_logic.ValidateReservations(reservations, globalParams.QuorumProtocolConfigs, quorumNumbers, header.Timestamp, receivedAt.UnixNano()); err != nil {
125-
return fmt.Errorf("invalid reservation: %w", err)
119+
return nil, fmt.Errorf("invalid reservation: %w", err)
126120
}
127121

128122
// Make atomic batched updates over all reservations identified by the same account and quorum
129-
if err := m.incrementBinUsage(ctx, header, reservations, globalParams, numSymbols); err != nil {
130-
return fmt.Errorf("failed to increment bin usages: %w", err)
123+
usage, err := m.incrementBinUsage(ctx, header, reservations, globalParams, numSymbols)
124+
if err != nil {
125+
return nil, fmt.Errorf("failed to increment bin usages: %w", err)
131126
}
132-
return nil
127+
return usage, nil
133128
}
134129

135130
// incrementBinUsage increments the bin usage atomically and checks for overflow
@@ -138,15 +133,15 @@ func (m *Meterer) incrementBinUsage(
138133
reservations map[core.QuorumID]*core.ReservedPayment,
139134
globalParams *PaymentVaultParams,
140135
numSymbols uint64,
141-
) error {
136+
) (map[core.QuorumID]uint64, error) {
142137
charges := make(map[core.QuorumID]uint64)
143138
quorumNumbers := make([]core.QuorumID, 0, len(reservations))
144139
reservationWindows := make(map[core.QuorumID]uint64, len(reservations))
145140
requestReservationPeriods := make(map[core.QuorumID]uint64, len(reservations))
146141
for quorumID := range reservations {
147142
_, protocolConfig, err := globalParams.GetQuorumConfigs(quorumID)
148143
if err != nil {
149-
return fmt.Errorf("failed to get quorum config for quorum %d: %w", quorumID, err)
144+
return nil, fmt.Errorf("failed to get quorum config for quorum %d: %w", quorumID, err)
150145
}
151146
charges[quorumID] = payment_logic.SymbolsCharged(numSymbols, protocolConfig.MinNumSymbols)
152147
quorumNumbers = append(quorumNumbers, quorumID)
@@ -155,13 +150,9 @@ func (m *Meterer) incrementBinUsage(
155150
}
156151
// Batch increment all quorums for the current quorums' reservation period
157152
// For each quorum, increment by its specific symbolsCharged value
158-
updatedUsages := make(map[core.QuorumID]uint64)
159153
usage, err := m.MeteringStore.IncrementBinUsages(ctx, header.AccountID, quorumNumbers, requestReservationPeriods, charges)
160154
if err != nil {
161-
return err
162-
}
163-
for _, quorumID := range quorumNumbers {
164-
updatedUsages[quorumID] = usage[quorumID]
155+
return nil, err
165156
}
166157
overflowAmounts := make(map[core.QuorumID]uint64)
167158
overflowPeriods := make(map[core.QuorumID]uint64)
@@ -170,28 +161,28 @@ func (m *Meterer) incrementBinUsage(
170161
reservationWindow := reservationWindows[quorumID]
171162
requestReservationPeriod := requestReservationPeriods[quorumID]
172163
usageLimit := payment_logic.GetBinLimit(reservation.SymbolsPerSecond, reservationWindow)
173-
newUsage, ok := updatedUsages[quorumID]
164+
newUsage, ok := usage[quorumID]
174165
if !ok {
175-
return fmt.Errorf("failed to get updated usage for quorum %d", quorumID)
166+
return nil, fmt.Errorf("failed to get updated usage for quorum %d", quorumID)
176167
}
177168
prevUsage := newUsage - charges[quorumID]
178169
if newUsage <= usageLimit {
179170
continue
180171
} else if prevUsage >= usageLimit {
181172
// Bin was already filled before this increment
182-
return fmt.Errorf("bin has already been filled for quorum %d", quorumID)
173+
return nil, fmt.Errorf("bin has already been filled for quorum %d", quorumID)
183174
}
184175
overflowPeriod := payment_logic.GetOverflowPeriod(requestReservationPeriod, reservationWindow)
185176
if charges[quorumID] <= usageLimit && overflowPeriod <= payment_logic.GetReservationPeriod(int64(reservation.EndTimestamp), reservationWindow) {
186177
// Needs to go to overflow bin
187178
overflowAmounts[quorumID] = newUsage - usageLimit
188179
overflowPeriods[quorumID] = overflowPeriod
189180
} else {
190-
return fmt.Errorf("overflow usage exceeds bin limit for quorum %d", quorumID)
181+
return nil, fmt.Errorf("overflow usage exceeds bin limit for quorum %d", quorumID)
191182
}
192183
}
193184
if len(overflowAmounts) != len(overflowPeriods) {
194-
return fmt.Errorf("overflow amount and period mismatch")
185+
return nil, fmt.Errorf("overflow amount and period mismatch")
195186
}
196187
// Batch increment overflow bins for all overflown reservation candidates
197188
if len(overflowAmounts) > 0 {
@@ -205,55 +196,60 @@ func (m *Meterer) incrementBinUsage(
205196
// Rollback the increments for the current periods
206197
rollbackErr := m.MeteringStore.DecrementBinUsages(ctx, header.AccountID, quorumNumbers, requestReservationPeriods, charges)
207198
if rollbackErr != nil {
208-
return fmt.Errorf("failed to increment overflow bins: %w; rollback also failed: %v", err, rollbackErr)
199+
return nil, fmt.Errorf("failed to increment overflow bins: %w; rollback also failed: %v", err, rollbackErr)
209200
}
210-
return fmt.Errorf("failed to increment overflow bins: %w; successfully rolled back increments", err)
201+
return nil, fmt.Errorf("failed to increment overflow bins: %w; successfully rolled back increments", err)
211202
}
212203
}
213204

214-
return nil
205+
return charges, nil
215206
}
216207

217208
// serveOnDemandRequest handles the rate limiting logic for incoming requests
218209
// On-demand requests doesn't have additional quorum settings and should only be
219210
// allowed by ETH and EIGEN quorums
220-
func (m *Meterer) serveOnDemandRequest(ctx context.Context, globalParams *PaymentVaultParams, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, symbolsCharged uint64, headerQuorums []uint8, receivedAt time.Time) error {
211+
func (m *Meterer) serveOnDemandRequest(ctx context.Context, globalParams *PaymentVaultParams, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, numSymbols uint64, headerQuorums []uint8, receivedAt time.Time) (map[core.QuorumID]uint64, error) {
221212
m.logger.Debug("Recording and validating on-demand usage", "header", header, "onDemandPayment", onDemandPayment)
222213

223214
if err := payment_logic.ValidateQuorum(headerQuorums, globalParams.OnDemandQuorumNumbers); err != nil {
224-
return fmt.Errorf("invalid quorum for On-Demand Request: %w", err)
215+
return nil, fmt.Errorf("invalid quorum for On-Demand Request: %w", err)
225216
}
226217

227218
// Verify that the claimed cumulative payment doesn't exceed the on-chain deposit
228219
if header.CumulativePayment.Cmp(onDemandPayment.CumulativePayment) > 0 {
229-
return fmt.Errorf("request claims a cumulative payment greater than the on-chain deposit")
220+
return nil, fmt.Errorf("request claims a cumulative payment greater than the on-chain deposit")
230221
}
231222

232223
paymentConfig, protocolConfig, err := globalParams.GetQuorumConfigs(OnDemandQuorumID)
233224
if err != nil {
234-
return fmt.Errorf("failed to get payment config for on-demand quorum: %w", err)
225+
return nil, fmt.Errorf("failed to get payment config for on-demand quorum: %w", err)
235226
}
236227

237-
symbolsCharged = payment_logic.SymbolsCharged(symbolsCharged, protocolConfig.MinNumSymbols)
228+
symbolsCharged := payment_logic.SymbolsCharged(numSymbols, protocolConfig.MinNumSymbols)
238229
paymentCharged := payment_logic.PaymentCharged(symbolsCharged, paymentConfig.OnDemandPricePerSymbol)
239230
oldPayment, err := m.MeteringStore.AddOnDemandPayment(ctx, header, paymentCharged)
240231
if err != nil {
241-
return fmt.Errorf("failed to update cumulative payment: %w", err)
232+
return nil, fmt.Errorf("failed to update cumulative payment: %w", err)
242233
}
243234

244235
// Update bin usage atomically and check against bin capacity
245-
if err := m.incrementGlobalBinUsage(ctx, globalParams, uint64(symbolsCharged), receivedAt); err != nil {
236+
if err := m.incrementGlobalBinUsage(ctx, globalParams, symbolsCharged, receivedAt); err != nil {
246237
// If global bin usage update fails, roll back the payment to its previous value
247238
// The rollback will only happen if the current payment value still matches what we just wrote
248239
// This ensures we don't accidentally roll back a newer payment that might have been processed
249240
dbErr := m.MeteringStore.RollbackOnDemandPayment(ctx, header.AccountID, header.CumulativePayment, oldPayment)
250241
if dbErr != nil {
251-
return dbErr
242+
return nil, dbErr
252243
}
253-
return fmt.Errorf("failed global rate limiting: %w", err)
244+
return nil, fmt.Errorf("failed global rate limiting: %w", err)
254245
}
255246

256-
return nil
247+
// charges is applied to the header quorums
248+
charges := make(map[core.QuorumID]uint64, len(headerQuorums))
249+
for _, quorumID := range headerQuorums {
250+
charges[core.QuorumID(quorumID)] = symbolsCharged
251+
}
252+
return charges, nil
257253
}
258254

259255
// IncrementGlobalBinUsage increments the bin usage atomically and checks for overflow

0 commit comments

Comments
 (0)