Skip to content

Commit 6a7a24b

Browse files
authored
feat(payments): Implement leaky bucket algorithm (#1853)
* Implement leaky bucket algorithm Signed-off-by: litt3 <[email protected]> * Improve docs and clean up Signed-off-by: litt3 <[email protected]> * Simplify leaky bucket by using basic math ops Signed-off-by: litt3 <[email protected]> * Delete unnecessary utils Signed-off-by: litt3 <[email protected]> * Use better int types Signed-off-by: litt3 <[email protected]> * Specify capacity directly Signed-off-by: litt3 <[email protected]> * Use time.Second instead of 1e9 Signed-off-by: litt3 <[email protected]> * Add bool to return signature of Fill Signed-off-by: litt3 <[email protected]> * Reorganize comment diagrams Signed-off-by: litt3 <[email protected]> * Reorganize packages, and add package docs Signed-off-by: litt3 <[email protected]> * Add doc about leak rate Signed-off-by: litt3 <[email protected]> * Simplify computeFullSecondLeakage signature Signed-off-by: litt3 <[email protected]> * Augment previousPartialSecondLeakage doc Signed-off-by: litt3 <[email protected]> * Make time.Second less opaque Signed-off-by: litt3 <[email protected]> * Revert change to accept capacity in LeakyBucket constructor Signed-off-by: litt3 <[email protected]> * Add doc to bias field Signed-off-by: litt3 <[email protected]> * Panic on unknown enum val Signed-off-by: litt3 <[email protected]> --------- Signed-off-by: litt3 <[email protected]>
1 parent 11659e8 commit 6a7a24b

File tree

7 files changed

+620
-0
lines changed

7 files changed

+620
-0
lines changed

core/payments/reservation/CLAUDE.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Reservation Payments
2+
3+
The reservation package implements accounting logic for reservation-based EigenDA usage.
4+
5+
## Key Implementation Details
6+
7+
- Reservation accounting is performed with a LeakyBucket algorithm.
8+
- Each instance of the LeakyBucket algorithm is configured with a BiasBehavior, to determine whether to err
9+
on the side of permitting more or less throughput.
10+
- Each instance of the LeakyBucket algorithm is configured with an OverfillBehavior, which governs behavior when bucket
11+
capacity is exceeded.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package reservation
2+
3+
// In the leaky bucket implementation, there are different points where we need to decide whether we should err on the
4+
// side of permitting *more* or *less* throughput.
5+
//
6+
// Consider the different users of the leaky bucket:
7+
// - Validator nodes should err on the side of permitting *more* throughput. Processing a little extra data isn't
8+
// a big deal, but denying usage that a user is entitled to is something to be avoided at all costs.
9+
// - Clients should err on the side of utilizing *less* throughput. They should do their best to use the
10+
// full capacity of the reservation they're entitled to, but should prefer slight under-use.
11+
type BiasBehavior string
12+
13+
const (
14+
// When in doubt, permit *more* throughput instead of less.
15+
//
16+
// This is what a validator node should use.
17+
BiasPermitMore BiasBehavior = "permitMore"
18+
// When in doubt, permit *less* throughput instead of more.
19+
//
20+
// This is what a client should use.
21+
BiasPermitLess BiasBehavior = "permitLess"
22+
)

core/payments/reservation/doc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package reservation implements accounting logic for reservation-based EigenDA usage.
2+
package reservation

core/payments/reservation/errors.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package reservation
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
// TimeMovedBackwardError is returned when a timestamp is observed that is before a previously observed timestamp.
9+
//
10+
// This should not normally happen, but with clock drift and NTP adjustments, system clocks can occasionally jump
11+
// backward. This error allows the system to handle such cases gracefully rather than fatally erroring.
12+
type TimeMovedBackwardError struct {
13+
// The current time that was provided
14+
CurrentTime time.Time
15+
// The previously observed time that is after CurrentTime
16+
PreviousTime time.Time
17+
}
18+
19+
// Implements the error interface
20+
func (e *TimeMovedBackwardError) Error() string {
21+
return fmt.Sprintf("time moved backward: current time %s is before previous time %s (delta: %v)",
22+
e.CurrentTime.Format(time.RFC3339Nano),
23+
e.PreviousTime.Format(time.RFC3339Nano),
24+
e.PreviousTime.Sub(e.CurrentTime))
25+
}
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
package reservation
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"time"
7+
)
8+
9+
// This struct implements the [leaky bucket](https://en.wikipedia.org/wiki/Leaky_bucket) algorithm as a meter.
10+
//
11+
// Symbols "leak out" of the bucket at a constant rate, creating capacity for new symbols. The bucket can be "filled"
12+
// with additional symbols if there is enough available capacity.
13+
//
14+
// The standard golang golang.org/x/time/rate.Limiter is not suitable for our use-case, for the following reasons:
15+
//
16+
// 1. The Limiter doesn't support the concept of overfilling the bucket. We require the concept of overfill, for cases
17+
// where a bucket size might be too small to fit the largest permissible blob size. We don't want to prevent users
18+
// with a small reservation size from submitting large blobs.
19+
// 2. The Limiter uses floating point math. Though it would *probably* be ok to use floats, it makes the distributed
20+
// system harder to reason about. What level of error accumulation would we see with frequent updates? Under
21+
// what conditions would it be possible for the client and server representations of a given leaky bucket to
22+
// diverge, and what impact would that have on our assumptions? These questions can be avoided entirely by using
23+
// an integer based implementation.
24+
//
25+
// NOTE: This struct doesn't do any synchronization! The caller is responsible for making sure that only one goroutine
26+
// is using it at a time.
27+
type LeakyBucket struct {
28+
// Defines whether we should err on the side of permitting more or less throughput
29+
//
30+
// Practically, this value is used to determine whether the bucket is initialized to full or empty, as well as
31+
// rounding direction when leaking.
32+
biasBehavior BiasBehavior
33+
34+
// Defines different ways that overfilling the bucket should be handled
35+
overfillBehavior OverfillBehavior
36+
37+
// The total number of symbols that fit in the bucket
38+
bucketCapacity uint64
39+
40+
// The number of symbols that leak out of the bucket each second, as determined by the reservation.
41+
symbolsPerSecondLeakRate uint64
42+
43+
// The number of symbols currently in the bucket
44+
currentFillLevel uint64
45+
46+
// The time at which the previous leak calculation was made
47+
previousLeakTime time.Time
48+
49+
// The number of symbols which leaked in the "partial second" of the previous leak calculation.
50+
//
51+
// To understand the logic of how this value is used, see the inline documentation of the `leak()` method.
52+
//
53+
// Since the leaky bucket uses integers instead of floats, leak math isn't straight forward. It's easy to calculate
54+
// the number of symbols that leak in a full second, since leak rate is defined in terms of symbols / second. But
55+
// determining how many symbols leak in a number of nanoseconds requires making a rounding choice. Leak calculation
56+
// N needs to take the partialSecondLeakage of calculation N-1 into account, so that the precisely correct number
57+
// of symbols are leaked for each full second.
58+
previousPartialSecondLeakage uint64
59+
}
60+
61+
// Creates a new instance of the leaky bucket algorithm
62+
func NewLeakyBucket(
63+
// how fast symbols leak out of the bucket
64+
symbolsPerSecondLeakRate uint64,
65+
// bucketCapacityDuration * symbolsPerSecondLeakRate becomes the bucket capacity
66+
bucketCapacityDuration time.Duration,
67+
// whether to err on the side of permitting more or less throughput
68+
biasBehavior BiasBehavior,
69+
// how to handle overfilling the bucket
70+
overfillBehavior OverfillBehavior,
71+
// the current time, when this is being constructed
72+
now time.Time,
73+
) (*LeakyBucket, error) {
74+
if symbolsPerSecondLeakRate == 0 {
75+
return nil, errors.New("symbolsPerSecondLeakRate must be > 0")
76+
}
77+
78+
if bucketCapacityDuration <= 0 {
79+
return nil, fmt.Errorf("bucketCapacityDuration must be > 0, got %s", bucketCapacityDuration)
80+
}
81+
82+
// 1e9
83+
nanosecondsPerSecond := uint64(time.Second)
84+
bucketCapacity := symbolsPerSecondLeakRate * uint64(bucketCapacityDuration.Nanoseconds()) / nanosecondsPerSecond
85+
86+
if bucketCapacity == 0 {
87+
return nil, fmt.Errorf("bucket capacity must be > 0 (from leak rate %d symbols/sec * duration %s)",
88+
symbolsPerSecondLeakRate, bucketCapacityDuration)
89+
}
90+
91+
var currentFillLevel uint64
92+
switch biasBehavior {
93+
case BiasPermitMore:
94+
// starting with a fill level of 0 means the bucket starts out with available capacity
95+
currentFillLevel = 0
96+
case BiasPermitLess:
97+
// starting with a full bucket means some time must elapse to allow leakage before the bucket can be used
98+
currentFillLevel = bucketCapacity
99+
default:
100+
panic(fmt.Sprintf("unknown bias behavior %s", biasBehavior))
101+
}
102+
103+
return &LeakyBucket{
104+
biasBehavior: biasBehavior,
105+
overfillBehavior: overfillBehavior,
106+
bucketCapacity: bucketCapacity,
107+
symbolsPerSecondLeakRate: symbolsPerSecondLeakRate,
108+
currentFillLevel: currentFillLevel,
109+
previousLeakTime: now,
110+
previousPartialSecondLeakage: 0,
111+
}, nil
112+
}
113+
114+
// Fill the bucket with a number of symbols.
115+
//
116+
// - Returns (true, nil) if the leaky bucket has enough capacity to accept the fill.
117+
// - Returns (false, nil) if bucket lacks capacity to permit the fill.
118+
// - Returns (false, error) for actual errors:
119+
// - TimeMovedBackwardError if input time is before previous leak time.
120+
// - Generic error for all other modes of failure.
121+
//
122+
// If the bucket doesn't have enough capacity to accommodate the fill, symbolCount IS NOT added to the bucket, i.e. a
123+
// failed fill doesn't count against the meter.
124+
func (lb *LeakyBucket) Fill(now time.Time, symbolCount uint32) (bool, error) {
125+
if symbolCount == 0 {
126+
return false, errors.New("symbolCount must be > 0")
127+
}
128+
129+
err := lb.leak(now)
130+
if err != nil {
131+
return false, fmt.Errorf("leak: %w", err)
132+
}
133+
134+
// this is how full the bucket would be, if the fill were to be accepted
135+
newFillLevel := lb.currentFillLevel + uint64(symbolCount)
136+
137+
// if newFillLevel is <= the total bucket capacity, no further checks are required
138+
if newFillLevel <= lb.bucketCapacity {
139+
lb.currentFillLevel = newFillLevel
140+
return true, nil
141+
}
142+
143+
// this fill would result in the bucket being overfilled, so we check the overfill behavior to decide what to do
144+
switch lb.overfillBehavior {
145+
case OverfillNotPermitted:
146+
return false, nil
147+
case OverfillOncePermitted:
148+
zeroCapacityAvailable := lb.currentFillLevel >= lb.bucketCapacity
149+
150+
// if there is no available capacity whatsoever, dispersal is never permitted, no matter the overfill behavior
151+
if zeroCapacityAvailable {
152+
return false, nil
153+
}
154+
155+
lb.currentFillLevel = newFillLevel
156+
return true, nil
157+
default:
158+
panic(fmt.Sprintf("unknown overfill behavior %s", lb.overfillBehavior))
159+
}
160+
}
161+
162+
// Reverts a previous fill, i.e. removes the number of symbols that got added to the bucket
163+
//
164+
// - Returns a TimeMovedBackwardError if input time is before previous leak time.
165+
// - Returns a generic error for all other modes of failure.
166+
//
167+
// The input time should be the most up-to-date time, NOT the time of the original fill.
168+
func (lb *LeakyBucket) RevertFill(now time.Time, symbolCount uint32) error {
169+
if symbolCount == 0 {
170+
return errors.New("symbolCount must be > 0")
171+
}
172+
173+
err := lb.leak(now)
174+
if err != nil {
175+
return fmt.Errorf("leak: %w", err)
176+
}
177+
178+
if lb.currentFillLevel <= uint64(symbolCount) {
179+
lb.currentFillLevel = 0
180+
return nil
181+
}
182+
183+
lb.currentFillLevel = lb.currentFillLevel - uint64(symbolCount)
184+
return nil
185+
}
186+
187+
// Lets the correct number of symbols leak out of the bucket, based on when we last leaked
188+
//
189+
// Returns a TimeMovedBackwardError if input time is before previous leak time.
190+
func (lb *LeakyBucket) leak(now time.Time) error {
191+
if now.Before(lb.previousLeakTime) {
192+
return &TimeMovedBackwardError{PreviousTime: lb.previousLeakTime, CurrentTime: now}
193+
}
194+
195+
defer func() {
196+
lb.previousLeakTime = now
197+
}()
198+
199+
// Previous leak (N-1) Current Leak (N)
200+
// ↓ ↓
201+
// |----*----------|----------------|----------*-----|
202+
// ↑________________________________↑
203+
// fullSecondLeakage
204+
fullSecondLeakage := lb.computeFullSecondLeakage(uint64(now.Unix()))
205+
206+
// We need to correct the full-second leakage value: the previous leak calculation already let some symbols from a
207+
// partial second period leak out, and those symbols shouldn't leak twice
208+
//
209+
// This value can be negative if the previous leak calculation was within the same second as this calculation,
210+
// since in that case fullSecondLeakage would be 0.
211+
//
212+
// Previous leak (N-1) Current Leak (N)
213+
// ↓ ↓
214+
// |----*----------|----------------|----------*-----|
215+
// ↑____↑
216+
// previousPartialSecondLeakage
217+
//
218+
// Previous leak (N-1) Current Leak (N)
219+
// ↓ ↓
220+
// |----*----------|----------------|----------*-----|
221+
// ↑___________________________↑
222+
// correctedFullSecondLeakage
223+
correctedFullSecondLeakage := fullSecondLeakage - lb.previousPartialSecondLeakage
224+
225+
// Previous leak (N-1) Current Leak (N)
226+
// ↓ ↓
227+
// |----*----------|----------------|----------*-----|
228+
// ↑__________↑
229+
// partialSecondLeakage
230+
partialSecondLeakage := lb.computePartialSecondLeakage(uint64(now.Nanosecond()))
231+
lb.previousPartialSecondLeakage = partialSecondLeakage
232+
233+
// Previous leak (N-1) Current Leak (N)
234+
// ↓ ↓
235+
// |----*----------|----------------|----------*-----|
236+
// ↑______________________________________↑
237+
// actualLeakage
238+
actualLeakage := correctedFullSecondLeakage + partialSecondLeakage
239+
240+
if lb.currentFillLevel <= actualLeakage {
241+
lb.currentFillLevel = 0
242+
return nil
243+
}
244+
245+
lb.currentFillLevel = lb.currentFillLevel - actualLeakage
246+
return nil
247+
}
248+
249+
// Accepts the current number of seconds since epoch. Returns the number of symbols that should leak from the bucket,
250+
// based on when we last leaked.
251+
//
252+
// Since this method only takes full seconds into consideration, the returned value must be used carefully. See leak()
253+
// for details.
254+
func (lb *LeakyBucket) computeFullSecondLeakage(epochSeconds uint64) uint64 {
255+
secondsSinceLastUpdate := epochSeconds - uint64(lb.previousLeakTime.Unix())
256+
fullSecondLeakage := secondsSinceLastUpdate * lb.symbolsPerSecondLeakRate
257+
return fullSecondLeakage
258+
}
259+
260+
// Accepts a number of nanoseconds, which represent a fraction of a single second.
261+
//
262+
// Computes the number of symbols which leak out in the given fractional second. Since this deals with integers,
263+
// the configured bias determines which direction we round in.
264+
func (lb *LeakyBucket) computePartialSecondLeakage(nanos uint64) uint64 {
265+
// 1e9
266+
nanosecondsPerSecond := uint64(time.Second)
267+
268+
switch lb.biasBehavior {
269+
case BiasPermitMore:
270+
// Round up, to permit more (more leakage = more capacity freed up)
271+
// Add (1e9 - 1) before dividing to round up
272+
return (nanos*lb.symbolsPerSecondLeakRate + nanosecondsPerSecond - 1) / nanosecondsPerSecond
273+
case BiasPermitLess:
274+
// Round down, to permit less (less leakage = less capacity freed up)
275+
return nanos * lb.symbolsPerSecondLeakRate / nanosecondsPerSecond
276+
default:
277+
panic(fmt.Sprintf("unknown bias: %s", lb.biasBehavior))
278+
}
279+
}

0 commit comments

Comments
 (0)