Skip to content

Commit 100eb3e

Browse files
authored
[v2] disperser client payments api (#928)
1 parent 9f68f74 commit 100eb3e

File tree

4 files changed

+136
-58
lines changed

4 files changed

+136
-58
lines changed

api/clients/accountant.go

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,14 @@ import (
88
"sync"
99
"time"
1010

11-
commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
11+
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
1212
"github.com/Layr-Labs/eigenda/core"
1313
"github.com/Layr-Labs/eigenda/core/meterer"
1414
)
1515

1616
var requiredQuorums = []uint8{0, 1}
1717

18-
type Accountant interface {
19-
AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*commonpb.PaymentHeader, error)
20-
}
21-
22-
var _ Accountant = &accountant{}
23-
24-
type accountant struct {
18+
type Accountant struct {
2519
// on-chain states
2620
accountID string
2721
reservation *core.ActiveReservation
@@ -45,15 +39,15 @@ type BinRecord struct {
4539
Usage uint64
4640
}
4741

48-
func NewAccountant(accountID string, reservation *core.ActiveReservation, onDemand *core.OnDemandPayment, reservationWindow uint32, pricePerSymbol uint32, minNumSymbols uint32, numBins uint32) *accountant {
42+
func NewAccountant(accountID string, reservation *core.ActiveReservation, onDemand *core.OnDemandPayment, reservationWindow uint32, pricePerSymbol uint32, minNumSymbols uint32, numBins uint32) *Accountant {
4943
//TODO: client storage; currently every instance starts fresh but on-chain or a small store makes more sense
5044
// Also client is currently responsible for supplying network params, we need to add RPC in order to be automatic
5145
// There's a subsequent PR that handles populating the accountant with on-chain state from the disperser
5246
binRecords := make([]BinRecord, numBins)
5347
for i := range binRecords {
5448
binRecords[i] = BinRecord{Index: uint32(i), Usage: 0}
5549
}
56-
a := accountant{
50+
a := Accountant{
5751
accountID: accountID,
5852
reservation: reservation,
5953
onDemand: onDemand,
@@ -73,7 +67,7 @@ func NewAccountant(accountID string, reservation *core.ActiveReservation, onDema
7367
// then on-demand if the reservation is not available. The returned values are
7468
// bin index for reservation payments and cumulative payment for on-demand payments,
7569
// and both fields are used to create the payment header and signature
76-
func (a *accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quorumNumbers []uint8) (uint32, *big.Int, error) {
70+
func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quorumNumbers []uint8) (uint32, *big.Int, error) {
7771
now := time.Now().Unix()
7872
currentBinIndex := meterer.GetBinIndex(uint64(now), a.reservationWindow)
7973

@@ -116,7 +110,7 @@ func (a *accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quo
116110
}
117111

118112
// AccountBlob accountant provides and records payment information
119-
func (a *accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*commonpb.PaymentHeader, error) {
113+
func (a *Accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*core.PaymentMetadata, error) {
120114
binIndex, cumulativePayment, err := a.BlobPaymentInfo(ctx, numSymbols, quorums)
121115
if err != nil {
122116
return nil, err
@@ -127,28 +121,27 @@ func (a *accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums
127121
BinIndex: binIndex,
128122
CumulativePayment: cumulativePayment,
129123
}
130-
protoPaymentHeader := pm.ConvertToProtoPaymentHeader()
131124

132-
return protoPaymentHeader, nil
125+
return pm, nil
133126
}
134127

135128
// TODO: PaymentCharged and SymbolsCharged copied from meterer, should be refactored
136129
// PaymentCharged returns the chargeable price for a given data length
137-
func (a *accountant) PaymentCharged(numSymbols uint) uint64 {
130+
func (a *Accountant) PaymentCharged(numSymbols uint) uint64 {
138131
return uint64(a.SymbolsCharged(numSymbols)) * uint64(a.pricePerSymbol)
139132
}
140133

141134
// SymbolsCharged returns the number of symbols charged for a given data length
142135
// being at least MinNumSymbols or the nearest rounded-up multiple of MinNumSymbols.
143-
func (a *accountant) SymbolsCharged(numSymbols uint) uint32 {
136+
func (a *Accountant) SymbolsCharged(numSymbols uint) uint32 {
144137
if numSymbols <= uint(a.minNumSymbols) {
145138
return a.minNumSymbols
146139
}
147140
// Round up to the nearest multiple of MinNumSymbols
148141
return uint32(core.RoundUpDivide(uint(numSymbols), uint(a.minNumSymbols))) * a.minNumSymbols
149142
}
150143

151-
func (a *accountant) GetRelativeBinRecord(index uint32) *BinRecord {
144+
func (a *Accountant) GetRelativeBinRecord(index uint32) *BinRecord {
152145
relativeIndex := index % a.numBins
153146
if a.binRecords[relativeIndex].Index != uint32(index) {
154147
a.binRecords[relativeIndex] = BinRecord{
@@ -160,6 +153,59 @@ func (a *accountant) GetRelativeBinRecord(index uint32) *BinRecord {
160153
return &a.binRecords[relativeIndex]
161154
}
162155

156+
func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentStateReply) error {
157+
if paymentState == nil {
158+
return fmt.Errorf("payment state cannot be nil")
159+
} else if paymentState.GetPaymentGlobalParams() == nil {
160+
return fmt.Errorf("payment global params cannot be nil")
161+
} else if paymentState.GetOnchainCumulativePayment() == nil {
162+
return fmt.Errorf("onchain cumulative payment cannot be nil")
163+
} else if paymentState.GetCumulativePayment() == nil {
164+
return fmt.Errorf("cumulative payment cannot be nil")
165+
} else if paymentState.GetReservation() == nil {
166+
return fmt.Errorf("reservation cannot be nil")
167+
} else if paymentState.GetReservation().GetQuorumNumbers() == nil {
168+
return fmt.Errorf("reservation quorum numbers cannot be nil")
169+
} else if paymentState.GetReservation().GetQuorumSplit() == nil {
170+
return fmt.Errorf("reservation quorum split cannot be nil")
171+
} else if paymentState.GetBinRecords() == nil {
172+
return fmt.Errorf("bin records cannot be nil")
173+
}
174+
175+
a.minNumSymbols = uint32(paymentState.PaymentGlobalParams.MinNumSymbols)
176+
a.onDemand.CumulativePayment = new(big.Int).SetBytes(paymentState.OnchainCumulativePayment)
177+
a.cumulativePayment = new(big.Int).SetBytes(paymentState.CumulativePayment)
178+
a.pricePerSymbol = uint32(paymentState.PaymentGlobalParams.PricePerSymbol)
179+
180+
a.reservation.SymbolsPerSec = uint64(paymentState.PaymentGlobalParams.GlobalSymbolsPerSecond)
181+
a.reservation.StartTimestamp = uint64(paymentState.Reservation.StartTimestamp)
182+
a.reservation.EndTimestamp = uint64(paymentState.Reservation.EndTimestamp)
183+
a.reservationWindow = uint32(paymentState.PaymentGlobalParams.ReservationWindow)
184+
185+
quorumNumbers := make([]uint8, len(paymentState.Reservation.QuorumNumbers))
186+
for i, quorum := range paymentState.Reservation.QuorumNumbers {
187+
quorumNumbers[i] = uint8(quorum)
188+
}
189+
a.reservation.QuorumNumbers = quorumNumbers
190+
191+
quorumSplit := make([]uint8, len(paymentState.Reservation.QuorumSplit))
192+
for i, quorum := range paymentState.Reservation.QuorumSplit {
193+
quorumSplit[i] = uint8(quorum)
194+
}
195+
a.reservation.QuorumSplit = quorumSplit
196+
197+
binRecords := make([]BinRecord, len(paymentState.BinRecords))
198+
for i, record := range paymentState.BinRecords {
199+
binRecords[i] = BinRecord{
200+
Index: record.Index,
201+
Usage: record.Usage,
202+
}
203+
}
204+
a.binRecords = binRecords
205+
206+
return nil
207+
}
208+
163209
// QuorumCheck eagerly returns error if the check finds a quorum number not an element of the allowed quorum numbers
164210
func QuorumCheck(quorumNumbers []uint8, allowedNumbers []uint8) error {
165211
if len(quorumNumbers) == 0 {

api/clients/accountant_test.go

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,30 +71,27 @@ func TestAccountBlob_Reservation(t *testing.T) {
7171
quorums := []uint8{0, 1}
7272

7373
header, err := accountant.AccountBlob(ctx, symbolLength, quorums)
74-
metadata := core.ConvertPaymentHeader(header)
7574

7675
assert.NoError(t, err)
7776
assert.Equal(t, meterer.GetBinIndex(uint64(time.Now().Unix()), reservationWindow), header.BinIndex)
78-
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
77+
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
7978
assert.Equal(t, isRotation([]uint64{500, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
8079

8180
symbolLength = uint64(700)
8281

8382
header, err = accountant.AccountBlob(ctx, symbolLength, quorums)
84-
metadata = core.ConvertPaymentHeader(header)
8583

8684
assert.NoError(t, err)
8785
assert.NotEqual(t, 0, header.BinIndex)
88-
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
86+
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
8987
assert.Equal(t, isRotation([]uint64{1200, 0, 200}, mapRecordUsage(accountant.binRecords)), true)
9088

9189
// Second call should use on-demand payment
9290
header, err = accountant.AccountBlob(ctx, 300, quorums)
93-
metadata = core.ConvertPaymentHeader(header)
9491

9592
assert.NoError(t, err)
9693
assert.Equal(t, uint32(0), header.BinIndex)
97-
assert.Equal(t, big.NewInt(300), metadata.CumulativePayment)
94+
assert.Equal(t, big.NewInt(300), header.CumulativePayment)
9895
}
9996

10097
func TestAccountBlob_OnDemand(t *testing.T) {
@@ -124,10 +121,9 @@ func TestAccountBlob_OnDemand(t *testing.T) {
124121
header, err := accountant.AccountBlob(ctx, numSymbols, quorums)
125122
assert.NoError(t, err)
126123

127-
metadata := core.ConvertPaymentHeader(header)
128124
expectedPayment := big.NewInt(int64(numSymbols * uint64(pricePerSymbol)))
129125
assert.Equal(t, uint32(0), header.BinIndex)
130-
assert.Equal(t, expectedPayment, metadata.CumulativePayment)
126+
assert.Equal(t, expectedPayment, header.CumulativePayment)
131127
assert.Equal(t, isRotation([]uint64{0, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
132128
assert.Equal(t, expectedPayment, accountant.cumulativePayment)
133129
}
@@ -180,24 +176,21 @@ func TestAccountBlobCallSeries(t *testing.T) {
180176

181177
// First call: Use reservation
182178
header, err := accountant.AccountBlob(ctx, 800, quorums)
183-
metadata := core.ConvertPaymentHeader(header)
184179
assert.NoError(t, err)
185180
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
186-
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
181+
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
187182

188183
// Second call: Use remaining reservation + overflow
189184
header, err = accountant.AccountBlob(ctx, 300, quorums)
190-
metadata = core.ConvertPaymentHeader(header)
191185
assert.NoError(t, err)
192186
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
193-
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
187+
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
194188

195189
// Third call: Use on-demand
196190
header, err = accountant.AccountBlob(ctx, 500, quorums)
197-
metadata = core.ConvertPaymentHeader(header)
198191
assert.NoError(t, err)
199192
assert.Equal(t, uint32(0), header.BinIndex)
200-
assert.Equal(t, big.NewInt(500), metadata.CumulativePayment)
193+
assert.Equal(t, big.NewInt(500), header.CumulativePayment)
201194

202195
// Fourth call: Insufficient on-demand
203196
_, err = accountant.AccountBlob(ctx, 600, quorums)
@@ -321,23 +314,20 @@ func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) {
321314
header, err := accountant.AccountBlob(ctx, 800, quorums)
322315
assert.NoError(t, err)
323316
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
324-
metadata := core.ConvertPaymentHeader(header)
325-
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
317+
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
326318
assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
327319

328320
// Second call: Allow one overflow
329321
header, err = accountant.AccountBlob(ctx, 500, quorums)
330322
assert.NoError(t, err)
331-
metadata = core.ConvertPaymentHeader(header)
332-
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
323+
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
333324
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true)
334325

335326
// Third call: Should use on-demand payment
336327
header, err = accountant.AccountBlob(ctx, 200, quorums)
337328
assert.NoError(t, err)
338329
assert.Equal(t, uint32(0), header.BinIndex)
339-
metadata = core.ConvertPaymentHeader(header)
340-
assert.Equal(t, big.NewInt(200), metadata.CumulativePayment)
330+
assert.Equal(t, big.NewInt(200), header.CumulativePayment)
341331
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true)
342332
}
343333

@@ -373,8 +363,7 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) {
373363
header, err := accountant.AccountBlob(ctx, 500, quorums)
374364
assert.NoError(t, err)
375365
assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
376-
metadata := core.ConvertPaymentHeader(header)
377-
assert.Equal(t, big.NewInt(500), metadata.CumulativePayment)
366+
assert.Equal(t, big.NewInt(500), header.CumulativePayment)
378367

379368
// Wait for next reservation duration
380369
time.Sleep(time.Duration(reservationWindow) * time.Second)

api/clients/disperser_client_v2.go

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package clients
33
import (
44
"context"
55
"fmt"
6-
"math/big"
76
"sync"
87

98
"github.com/Layr-Labs/eigenda/api"
@@ -30,12 +29,13 @@ type DisperserClientV2 interface {
3029
}
3130

3231
type disperserClientV2 struct {
33-
config *DisperserClientV2Config
34-
signer corev2.BlobRequestSigner
35-
initOnce sync.Once
36-
conn *grpc.ClientConn
37-
client disperser_rpc.DisperserClient
38-
prover encoding.Prover
32+
config *DisperserClientV2Config
33+
signer corev2.BlobRequestSigner
34+
initOnce sync.Once
35+
conn *grpc.ClientConn
36+
client disperser_rpc.DisperserClient
37+
prover encoding.Prover
38+
accountant *Accountant
3939
}
4040

4141
var _ DisperserClientV2 = &disperserClientV2{}
@@ -60,7 +60,7 @@ var _ DisperserClientV2 = &disperserClientV2{}
6060
//
6161
// // Subsequent calls will use the existing connection
6262
// status2, blobKey2, err := client.DisperseBlob(ctx, data, blobHeader)
63-
func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobRequestSigner, prover encoding.Prover) (*disperserClientV2, error) {
63+
func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobRequestSigner, prover encoding.Prover, accountant *Accountant) (*disperserClientV2, error) {
6464
if config == nil {
6565
return nil, api.NewErrorInvalidArg("config must be provided")
6666
}
@@ -75,13 +75,28 @@ func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobReq
7575
}
7676

7777
return &disperserClientV2{
78-
config: config,
79-
signer: signer,
80-
prover: prover,
78+
config: config,
79+
signer: signer,
80+
prover: prover,
81+
accountant: accountant,
8182
// conn and client are initialized lazily
8283
}, nil
8384
}
8485

86+
// PopulateAccountant populates the accountant with the payment state from the disperser.
87+
func (c *disperserClientV2) PopulateAccountant(ctx context.Context) error {
88+
paymentState, err := c.GetPaymentState(ctx)
89+
if err != nil {
90+
return fmt.Errorf("error getting payment state for initializing accountant: %w", err)
91+
}
92+
93+
err = c.accountant.SetPaymentState(paymentState)
94+
if err != nil {
95+
return fmt.Errorf("error setting payment state for accountant: %w", err)
96+
}
97+
return nil
98+
}
99+
85100
// Close closes the grpc connection to the disperser server.
86101
// It is thread safe and can be called multiple times.
87102
func (c *disperserClientV2) Close() error {
@@ -108,16 +123,15 @@ func (c *disperserClientV2) DisperseBlob(
108123
if c.signer == nil {
109124
return nil, [32]byte{}, api.NewErrorInternal("uninitialized signer for authenticated dispersal")
110125
}
126+
if c.accountant == nil {
127+
return nil, [32]byte{}, api.NewErrorInternal("uninitialized accountant for paid dispersal; make sure to call PopulateAccountant after creating the client")
128+
}
111129

112-
var payment core.PaymentMetadata
113-
accountId, err := c.signer.GetAccountID()
130+
symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
131+
payment, err := c.accountant.AccountBlob(ctx, uint64(symbolLength), quorums)
114132
if err != nil {
115-
return nil, [32]byte{}, api.NewErrorInvalidArg(fmt.Sprintf("please configure signer key if you want to use authenticated endpoint %v", err))
133+
return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
116134
}
117-
payment.AccountID = accountId
118-
// TODO: add payment metadata
119-
payment.BinIndex = 0
120-
payment.CumulativePayment = big.NewInt(0)
121135

122136
if len(quorums) == 0 {
123137
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum numbers must be provided")
@@ -160,7 +174,7 @@ func (c *disperserClientV2) DisperseBlob(
160174
BlobVersion: blobVersion,
161175
BlobCommitments: blobCommitments,
162176
QuorumNumbers: quorums,
163-
PaymentMetadata: payment,
177+
PaymentMetadata: *payment,
164178
}
165179
sig, err := c.signer.SignBlobRequest(blobHeader)
166180
if err != nil {
@@ -202,6 +216,30 @@ func (c *disperserClientV2) GetBlobStatus(ctx context.Context, blobKey corev2.Bl
202216
return c.client.GetBlobStatus(ctx, request)
203217
}
204218

219+
// GetPaymentState returns the payment state of the disperser client
220+
func (c *disperserClientV2) GetPaymentState(ctx context.Context) (*disperser_rpc.GetPaymentStateReply, error) {
221+
err := c.initOnceGrpcConnection()
222+
if err != nil {
223+
return nil, api.NewErrorInternal(err.Error())
224+
}
225+
226+
accountID, err := c.signer.GetAccountID()
227+
if err != nil {
228+
return nil, fmt.Errorf("error getting signer's account ID: %w", err)
229+
}
230+
231+
signature, err := c.signer.SignPaymentStateRequest()
232+
if err != nil {
233+
return nil, fmt.Errorf("error signing payment state request: %w", err)
234+
}
235+
236+
request := &disperser_rpc.GetPaymentStateRequest{
237+
AccountId: accountID,
238+
Signature: signature,
239+
}
240+
return c.client.GetPaymentState(ctx, request)
241+
}
242+
205243
// GetBlobCommitment is a utility method that calculates commitment for a blob payload.
206244
// While the blob commitment can be calculated by anyone, it requires SRS points to
207245
// be loaded. For service that does not have access to SRS points, this method can be

0 commit comments

Comments
 (0)