Skip to content

Commit a309139

Browse files
authored
feat: offchain quorum period record getter (#1620)
* refactor: payment dynamo store period record getter * fix: specify reservation periods per quorum * fix: lint and err propagation * fix: period record array and unit tests * refactor: clean test context for all dynamodb metering store tests
1 parent e8a7170 commit a309139

File tree

5 files changed

+640
-392
lines changed

5 files changed

+640
-392
lines changed

core/meterer/dynamodb_metering_store.go

Lines changed: 75 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"math"
78
"math/big"
89
"strconv"
10+
"strings"
911

1012
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
1113
commonaws "github.com/Layr-Labs/eigenda/common/aws"
1214
commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
1315
"github.com/Layr-Labs/eigenda/core"
1416
"github.com/Layr-Labs/eigensdk-go/logging"
15-
"github.com/aws/aws-sdk-go-v2/aws"
16-
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
1717
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
1818
gethcommon "github.com/ethereum/go-ethereum/common"
1919
)
@@ -65,7 +65,7 @@ func NewDynamoDBMeteringStore(
6565
}
6666

6767
// IncrementBinUsages updates the bin usage for each quorum in quorumNumbers for a specific account and reservation period.
68-
// The key is AccountIDAndQuorum, formatted as {AccountID}:{quorumNumber}.
68+
// The key AccountID is formatted as {AccountID}:{quorumNumber}.
6969
func (s *DynamoDBMeteringStore) IncrementBinUsages(ctx context.Context, accountID gethcommon.Address, quorumNumbers []core.QuorumID, reservationPeriods map[core.QuorumID]uint64, sizes map[core.QuorumID]uint64) (map[core.QuorumID]uint64, error) {
7070
binUsages := make(map[core.QuorumID]uint64)
7171

@@ -256,30 +256,53 @@ func (s *DynamoDBMeteringStore) RollbackOnDemandPayment(ctx context.Context, acc
256256
return nil
257257
}
258258

259-
func (s *DynamoDBMeteringStore) GetPeriodRecords(ctx context.Context, accountID gethcommon.Address, reservationPeriod uint64) ([MinNumBins]*pb.PeriodRecord, error) {
260-
// Fetch the 3 bins start from the current bin
261-
queryInput := &dynamodb.QueryInput{
262-
TableName: aws.String(s.reservationTableName),
263-
KeyConditionExpression: aws.String("AccountID = :account AND ReservationPeriod >= :reservationPeriod"),
264-
ExpressionAttributeValues: commondynamodb.ExpressionValues{
265-
":account": &types.AttributeValueMemberS{Value: accountID.Hex()},
266-
":reservationPeriod": &types.AttributeValueMemberN{Value: strconv.FormatUint(reservationPeriod, 10)},
267-
},
268-
ScanIndexForward: aws.Bool(true),
269-
Limit: aws.Int32(MinNumBins),
270-
}
271-
bins, err := s.dynamoClient.QueryWithInput(ctx, queryInput)
259+
// GetPeriodRecords retrieves period records for multiple quorums efficiently.
260+
// This function is optimized for retrieving period records for all quorums in a single database operation.
261+
// The records start from the first reservation period for each quorum to at most numBins in length into the future.
262+
// quorumNumbers and reservationPeriods must have the same length and the same ordering.
263+
// Returns an array of PeriodRecords up to numBins in length, with records for each requested quorum.
264+
func (s *DynamoDBMeteringStore) GetPeriodRecords(
265+
ctx context.Context,
266+
accountID gethcommon.Address,
267+
quorumNumbers []core.QuorumID,
268+
reservationPeriods []uint64,
269+
numBins uint32,
270+
) (map[core.QuorumID]*pb.PeriodRecords, error) {
271+
if len(quorumNumbers) == 0 || len(reservationPeriods) != len(quorumNumbers) {
272+
return nil, nil
273+
}
274+
275+
// Prepare all keys for batch get
276+
var keys []map[string]types.AttributeValue
277+
for i, quorum := range quorumNumbers {
278+
accountIDAndQuorum := accountID.Hex() + ":" + strconv.FormatUint(uint64(quorum), 10)
279+
for j := 0; j < int(numBins); j++ {
280+
key := map[string]types.AttributeValue{
281+
"AccountID": &types.AttributeValueMemberS{Value: accountIDAndQuorum},
282+
"ReservationPeriod": &types.AttributeValueMemberN{Value: strconv.FormatUint(reservationPeriods[i]+uint64(j), 10)},
283+
}
284+
keys = append(keys, key)
285+
}
286+
}
287+
288+
items, err := s.dynamoClient.GetItems(ctx, s.reservationTableName, keys, true)
272289
if err != nil {
273-
return [MinNumBins]*pb.PeriodRecord{}, fmt.Errorf("failed to query payments for account: %w", err)
290+
return nil, fmt.Errorf("failed to batch get period records for account: %w", err)
274291
}
275292

276-
records := [MinNumBins]*pb.PeriodRecord{}
277-
for i := 0; i < len(bins) && i < int(MinNumBins); i++ {
278-
periodRecord, err := parsePeriodRecord(bins[i])
293+
records := make(map[core.QuorumID]*pb.PeriodRecords)
294+
for _, item := range items {
295+
quorumNumber, periodRecord, err := parsePeriodRecord(item)
279296
if err != nil {
280-
return [MinNumBins]*pb.PeriodRecord{}, fmt.Errorf("failed to parse bin %d record: %w", i, err)
297+
return nil, fmt.Errorf("failed to parse period record: %w", err)
298+
}
299+
if existingRecords, exists := records[quorumNumber]; exists {
300+
existingRecords.Records = append(existingRecords.Records, periodRecord)
301+
} else {
302+
records[quorumNumber] = &pb.PeriodRecords{
303+
Records: []*pb.PeriodRecord{periodRecord},
304+
}
281305
}
282-
records[i] = periodRecord
283306
}
284307

285308
return records, nil
@@ -321,38 +344,60 @@ func (s *DynamoDBMeteringStore) GetLargestCumulativePayment(ctx context.Context,
321344
return payment, nil
322345
}
323346

324-
func parsePeriodRecord(bin map[string]types.AttributeValue) (*pb.PeriodRecord, error) {
347+
func parsePeriodRecord(bin map[string]types.AttributeValue) (core.QuorumID, *pb.PeriodRecord, error) {
325348
reservationPeriod, ok := bin["ReservationPeriod"]
326349
if !ok {
327-
return nil, errors.New("ReservationPeriod is not present in the response")
350+
return 0, nil, errors.New("ReservationPeriod is not present in the response")
328351
}
329352

330353
reservationPeriodAttr, ok := reservationPeriod.(*types.AttributeValueMemberN)
331354
if !ok {
332-
return nil, fmt.Errorf("unexpected type for ReservationPeriod: %T", reservationPeriod)
355+
return 0, nil, fmt.Errorf("unexpected type for ReservationPeriod: %T", reservationPeriod)
333356
}
334357

335358
reservationPeriodValue, err := strconv.ParseUint(reservationPeriodAttr.Value, 10, 32)
336359
if err != nil {
337-
return nil, fmt.Errorf("failed to parse ReservationPeriod: %w", err)
360+
return 0, nil, fmt.Errorf("failed to parse ReservationPeriod: %w", err)
338361
}
339362

340363
binUsage, ok := bin["BinUsage"]
341364
if !ok {
342-
return nil, errors.New("BinUsage is not present in the response")
365+
return 0, nil, errors.New("BinUsage is not present in the response")
343366
}
344367

345368
binUsageAttr, ok := binUsage.(*types.AttributeValueMemberN)
346369
if !ok {
347-
return nil, fmt.Errorf("unexpected type for BinUsage: %T", binUsage)
370+
return 0, nil, fmt.Errorf("unexpected type for BinUsage: %T", binUsage)
348371
}
349372

350373
binUsageValue, err := strconv.ParseUint(binUsageAttr.Value, 10, 32)
351374
if err != nil {
352-
return nil, fmt.Errorf("failed to parse BinUsage: %w", err)
375+
return 0, nil, fmt.Errorf("failed to parse BinUsage: %w", err)
376+
}
377+
accountIDAndQuorum, ok := bin["AccountID"]
378+
if !ok {
379+
return 0, nil, errors.New("AccountID is not present in the response")
380+
}
381+
382+
accountIDAndQuorumAttr, ok := accountIDAndQuorum.(*types.AttributeValueMemberS)
383+
if !ok {
384+
return 0, nil, fmt.Errorf("unexpected type for AccountID: %T", accountIDAndQuorum)
385+
}
386+
387+
parts := strings.Split(accountIDAndQuorumAttr.Value, ":")
388+
if len(parts) != 2 {
389+
return 0, nil, fmt.Errorf("invalid AccountID format: %s", accountIDAndQuorumAttr.Value)
390+
}
391+
392+
quorumNumber, err := strconv.ParseUint(parts[1], 10, 32)
393+
if err != nil {
394+
return 0, nil, fmt.Errorf("failed to parse QuorumNumber: %w", err)
395+
}
396+
if quorumNumber > math.MaxUint8 {
397+
return 0, nil, fmt.Errorf("QuorumNumber exceeds maximum value for uint8: %d", quorumNumber)
353398
}
354399

355-
return &pb.PeriodRecord{
400+
return core.QuorumID(quorumNumber), &pb.PeriodRecord{
356401
Index: uint32(reservationPeriodValue),
357402
Usage: uint64(binUsageValue),
358403
}, nil

0 commit comments

Comments
 (0)