Skip to content

Commit 1fd8fde

Browse files
authored
feat: support range query for per-account blobs at metadata store (#1438)
1 parent edb1e64 commit 1fd8fde

File tree

2 files changed

+300
-3
lines changed

2 files changed

+300
-3
lines changed

disperser/common/v2/blobstore/dynamo_metadata_store.go

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
2525
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
2626
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
27+
gethcommon "github.com/ethereum/go-ethereum/common"
2728
)
2829

2930
const (
@@ -32,6 +33,7 @@ const (
3233
OperatorResponseIndexName = "OperatorResponseIndex"
3334
RequestedAtIndexName = "RequestedAtIndex"
3435
AttestedAtIndexName = "AttestedAtAIndex"
36+
AccountBlobIndexName = "AccountBlobIndex"
3537

3638
blobKeyPrefix = "BlobKey#"
3739
dispersalKeyPrefix = "Dispersal#"
@@ -480,6 +482,83 @@ func (s *BlobMetadataStore) GetBlobMetadataByRequestedAtBackward(
480482
return result, lastProcessedCursor, nil
481483
}
482484

485+
// GetBlobMetadataByAccountID returns blobs (as BlobMetadata) within time range (start, end)
486+
// (in ns, both exclusive), retrieved and ordered by RequestedAt timestamp in specified order, for
487+
// a given account.
488+
//
489+
// If specified order is ascending (`ascending` is true), retrieve data from the oldest (`start`)
490+
// to the newest (`end`); otherwise retrieve by the opposite direction.
491+
//
492+
// If limit > 0, returns at most that many blobs. If limit <= 0, returns all results
493+
// in the time range.
494+
func (s *BlobMetadataStore) GetBlobMetadataByAccountID(
495+
ctx context.Context,
496+
accountId gethcommon.Address,
497+
start uint64,
498+
end uint64,
499+
limit int,
500+
ascending bool,
501+
) ([]*v2.BlobMetadata, error) {
502+
if start+1 > end-1 {
503+
return nil, fmt.Errorf("no time point in exclusive time range (%d, %d)", start, end)
504+
}
505+
506+
blobs := make([]*v2.BlobMetadata, 0)
507+
var lastEvaledKey map[string]types.AttributeValue
508+
adjustedStart, adjustedEnd := start+1, end-1
509+
510+
// Iteratively fetch results until we get desired number of items or exhaust the
511+
// available data.
512+
// This needs to be processed in a loop because DynamoDb has a limit on the response
513+
// size of a query (1MB) and we may have more data than that.
514+
for {
515+
remaining := math.MaxInt
516+
if limit > 0 {
517+
remaining = limit - len(blobs)
518+
}
519+
res, err := s.dynamoDBClient.QueryIndexWithPagination(
520+
ctx,
521+
s.tableName,
522+
AccountBlobIndexName,
523+
"AccountID = :pk AND RequestedAt BETWEEN :start AND :end",
524+
commondynamodb.ExpressionValues{
525+
":pk": &types.AttributeValueMemberS{Value: accountId.Hex()},
526+
":start": &types.AttributeValueMemberN{Value: strconv.FormatInt(int64(adjustedStart), 10)},
527+
":end": &types.AttributeValueMemberN{Value: strconv.FormatInt(int64(adjustedEnd), 10)},
528+
},
529+
int32(remaining),
530+
lastEvaledKey,
531+
ascending,
532+
)
533+
if err != nil {
534+
return nil, fmt.Errorf("query failed for accountId %s with time range (%d, %d): %w", accountId.Hex(), adjustedStart, adjustedEnd, err)
535+
}
536+
537+
// Collect results
538+
for _, item := range res.Items {
539+
it, err := UnmarshalBlobMetadata(item)
540+
if err != nil {
541+
return blobs, fmt.Errorf("failed to unmarshal blob metadata: %w", err)
542+
}
543+
blobs = append(blobs, it)
544+
545+
// Desired number of items collected
546+
if limit > 0 && len(blobs) >= limit {
547+
return blobs, nil
548+
}
549+
}
550+
551+
// Exhausted all items already
552+
if res.LastEvaluatedKey == nil {
553+
break
554+
}
555+
// For next iteration
556+
lastEvaledKey = res.LastEvaluatedKey
557+
}
558+
559+
return blobs, nil
560+
}
561+
483562
// queryBucketAttestation returns attestations within a single bucket of time range [start, end]. Results are ordered by AttestedAt in
484563
// ascending order.
485564
//
@@ -872,7 +951,7 @@ func (s *BlobMetadataStore) GetDispersalRequest(ctx context.Context, batchHeader
872951
// If specified order is ascending (`ascending` is true), retrieve data from the oldest (`start`)
873952
// to the newest (`end`); otherwise retrieve by the opposite direction.
874953
//
875-
// If limit > 0, returns at most that many attestations. If limit <= 0, returns all results
954+
// If limit > 0, returns at most that many dispersals. If limit <= 0, returns all results
876955
// in the time range.
877956
func (s *BlobMetadataStore) GetDispersalRequestByDispersedAt(
878957
ctx context.Context,
@@ -890,8 +969,8 @@ func (s *BlobMetadataStore) GetDispersalRequestByDispersedAt(
890969
var lastEvaledKey map[string]types.AttributeValue
891970
adjustedStart, adjustedEnd := start+1, end-1
892971

893-
// Iteratively fetch results from the bucket until we get desired number of items or
894-
// exhaust the available data.
972+
// Iteratively fetch results until we get desired number of items or exhaust the
973+
// available data.
895974
// This needs to be processed in a loop because DynamoDb has a limit on the response
896975
// size of a query (1MB) and we may have more data than that.
897976
for {
@@ -1319,6 +1398,14 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit
13191398
AttributeName: aws.String("RespondedAt"),
13201399
AttributeType: types.ScalarAttributeTypeN,
13211400
},
1401+
{
1402+
AttributeName: aws.String("AccountID"),
1403+
AttributeType: types.ScalarAttributeTypeS,
1404+
},
1405+
{
1406+
AttributeName: aws.String("RequestedAt"),
1407+
AttributeType: types.ScalarAttributeTypeN,
1408+
},
13221409
{
13231410
AttributeName: aws.String("RequestedAtBucket"),
13241411
AttributeType: types.ScalarAttributeTypeS,
@@ -1408,6 +1495,26 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit
14081495
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
14091496
},
14101497
},
1498+
{
1499+
IndexName: aws.String(AccountBlobIndexName),
1500+
KeySchema: []types.KeySchemaElement{
1501+
{
1502+
AttributeName: aws.String("AccountID"),
1503+
KeyType: types.KeyTypeHash,
1504+
},
1505+
{
1506+
AttributeName: aws.String("RequestedAt"),
1507+
KeyType: types.KeyTypeRange,
1508+
},
1509+
},
1510+
Projection: &types.Projection{
1511+
ProjectionType: types.ProjectionTypeAll,
1512+
},
1513+
ProvisionedThroughput: &types.ProvisionedThroughput{
1514+
ReadCapacityUnits: aws.Int64(readCapacityUnits),
1515+
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
1516+
},
1517+
},
14111518
{
14121519
IndexName: aws.String(RequestedAtIndexName),
14131520
KeySchema: []types.KeySchemaElement{
@@ -1471,6 +1578,8 @@ func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error)
14711578
fields["SK"] = &types.AttributeValueMemberS{Value: blobMetadataSK}
14721579
fields["RequestedAtBucket"] = &types.AttributeValueMemberS{Value: computeRequestedAtBucket(metadata.RequestedAt)}
14731580
fields["RequestedAtBlobKey"] = &types.AttributeValueMemberS{Value: encodeBlobFeedCursorKey(metadata.RequestedAt, &blobKey)}
1581+
fields["AccountID"] = &types.AttributeValueMemberS{Value: metadata.BlobHeader.PaymentMetadata.AccountID.Hex()}
1582+
14741583
return fields, nil
14751584
}
14761585

disperser/common/v2/blobstore/dynamo_metadata_store_test.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"crypto/rand"
66
"encoding/hex"
77
"errors"
8+
"fmt"
89
"math"
910
"math/big"
1011
"strings"
@@ -79,6 +80,28 @@ func checkDispersalsDesc(t *testing.T, items []*corev2.DispersalRequest) {
7980
}
8081
}
8182

83+
func checkBlobsAsc(t *testing.T, items []*v2.BlobMetadata) {
84+
if len(items) > 1 {
85+
for i := 1; i < len(items); i++ {
86+
assert.Less(t,
87+
items[i-1].RequestedAt, // previous should be less
88+
items[i].RequestedAt, // than current
89+
"blobs should be in ascending order",
90+
)
91+
}
92+
}
93+
}
94+
95+
func checkBlobsDesc(t *testing.T, items []*v2.BlobMetadata) {
96+
for i := 1; i < len(items); i++ {
97+
assert.Greater(t,
98+
items[i-1].RequestedAt, // previous should be greater
99+
items[i].RequestedAt, // than current
100+
"blobs should be in descending order",
101+
)
102+
}
103+
}
104+
82105
func TestBlobFeedCursor_Equal(t *testing.T) {
83106
bk1 := corev2.BlobKey([32]byte{1, 2, 3})
84107
bk2 := corev2.BlobKey([32]byte{2, 3, 4})
@@ -832,6 +855,171 @@ func TestBlobMetadataStoreGetBlobMetadataByRequestedAtBackward(t *testing.T) {
832855
})
833856
}
834857

858+
func TestBlobMetadataStoreGetBlobMetadataByAccountID(t *testing.T) {
859+
ctx := context.Background()
860+
861+
// Make all blobs happen in 12s
862+
numBlobs := 120
863+
nanoSecsPerBlob := uint64(1e8) // 10 blobs per second
864+
865+
now := uint64(time.Now().UnixNano())
866+
firstBlobTime := now - uint64(10*time.Minute.Nanoseconds())
867+
868+
accountId := gethcommon.HexToAddress(fmt.Sprintf("0x000000000000000000000000000000000000000%d", 5))
869+
870+
// Create blobs for testing
871+
keys := make([]corev2.BlobKey, numBlobs)
872+
requestedAt := make([]uint64, numBlobs)
873+
dynamoKeys := make([]commondynamodb.Key, numBlobs)
874+
for i := 0; i < numBlobs; i++ {
875+
_, blobHeader := newBlob(t)
876+
blobHeader.PaymentMetadata.AccountID = accountId
877+
blobKey, err := blobHeader.BlobKey()
878+
require.NoError(t, err)
879+
requestedAt[i] = firstBlobTime + nanoSecsPerBlob*uint64(i)
880+
now := time.Now()
881+
metadata := &v2.BlobMetadata{
882+
BlobHeader: blobHeader,
883+
Signature: []byte{1, 2, 3},
884+
BlobStatus: v2.Encoded,
885+
Expiry: uint64(now.Add(time.Hour).Unix()),
886+
NumRetries: 0,
887+
UpdatedAt: uint64(now.UnixNano()),
888+
RequestedAt: requestedAt[i],
889+
}
890+
err = blobMetadataStore.PutBlobMetadata(ctx, metadata)
891+
require.NoError(t, err)
892+
keys[i] = blobKey
893+
dynamoKeys[i] = commondynamodb.Key{
894+
"PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey.Hex()},
895+
"SK": &types.AttributeValueMemberS{Value: "BlobMetadata"},
896+
}
897+
}
898+
defer deleteItems(t, dynamoKeys)
899+
900+
// Test empty range
901+
t.Run("empty range", func(t *testing.T) {
902+
// Test invalid time range
903+
_, err := blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, 1, 1, 0, true)
904+
require.Error(t, err)
905+
assert.Equal(t, "no time point in exclusive time range (1, 1)", err.Error())
906+
907+
_, err = blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, 1, 2, 0, true)
908+
require.Error(t, err)
909+
assert.Equal(t, "no time point in exclusive time range (1, 2)", err.Error())
910+
911+
// Test empty range
912+
blobs, err := blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, now, now+1024, 0, true)
913+
require.NoError(t, err)
914+
assert.Equal(t, 0, len(blobs))
915+
})
916+
917+
// Test full range query
918+
t.Run("ascending full range", func(t *testing.T) {
919+
// Test without limit
920+
blobs, err := blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, firstBlobTime-1, now, 0, true)
921+
require.NoError(t, err)
922+
require.Equal(t, numBlobs, len(blobs))
923+
checkBlobsAsc(t, blobs)
924+
925+
// Test with limit
926+
blobs, err = blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, firstBlobTime-1, now, 10, true)
927+
require.NoError(t, err)
928+
require.Equal(t, 10, len(blobs))
929+
checkBlobsAsc(t, blobs)
930+
931+
// Test min/max timestamp range
932+
blobs, err = blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, 0, now, 0, true)
933+
require.NoError(t, err)
934+
require.Equal(t, numBlobs, len(blobs))
935+
checkBlobsAsc(t, blobs)
936+
blobs, err = blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, firstBlobTime-1, math.MaxInt64, 0, true)
937+
require.NoError(t, err)
938+
require.Equal(t, numBlobs, len(blobs))
939+
checkBlobsAsc(t, blobs)
940+
})
941+
942+
// Test full range query
943+
t.Run("descending full range", func(t *testing.T) {
944+
// Test without limit
945+
blobs, err := blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, firstBlobTime-1, now, 0, false)
946+
require.NoError(t, err)
947+
require.Equal(t, numBlobs, len(blobs))
948+
checkBlobsDesc(t, blobs)
949+
950+
// Test with limit
951+
blobs, err = blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, firstBlobTime-1, now, 10, false)
952+
require.NoError(t, err)
953+
require.Equal(t, 10, len(blobs))
954+
checkBlobsDesc(t, blobs)
955+
956+
// Test min/max timestamp range
957+
blobs, err = blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, 0, now, 0, false)
958+
require.NoError(t, err)
959+
require.Equal(t, numBlobs, len(blobs))
960+
checkBlobsDesc(t, blobs)
961+
blobs, err = blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, firstBlobTime-1, math.MaxInt64, 0, false)
962+
require.NoError(t, err)
963+
require.Equal(t, numBlobs, len(blobs))
964+
checkBlobsDesc(t, blobs)
965+
})
966+
967+
// Test range boundaries
968+
t.Run("ascending range boundaries", func(t *testing.T) {
969+
// Test exclusive start
970+
blobs, err := blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, firstBlobTime, now, 0, true)
971+
require.NoError(t, err)
972+
require.Equal(t, numBlobs-1, len(blobs))
973+
assert.Equal(t, requestedAt[1], blobs[0].RequestedAt)
974+
assert.Equal(t, requestedAt[numBlobs-1], blobs[numBlobs-2].RequestedAt)
975+
checkBlobsAsc(t, blobs)
976+
977+
// Test exclusive end
978+
blobs, err = blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, firstBlobTime-1, requestedAt[4], 0, true)
979+
require.NoError(t, err)
980+
require.Equal(t, 4, len(blobs))
981+
assert.Equal(t, requestedAt[0], blobs[0].RequestedAt)
982+
assert.Equal(t, requestedAt[3], blobs[3].RequestedAt)
983+
checkBlobsAsc(t, blobs)
984+
})
985+
986+
// Test range boundaries
987+
t.Run("descending range boundaries", func(t *testing.T) {
988+
// Test exclusive start
989+
blobs, err := blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, firstBlobTime, now, 0, false)
990+
require.NoError(t, err)
991+
require.Equal(t, numBlobs-1, len(blobs))
992+
assert.Equal(t, requestedAt[numBlobs-1], blobs[0].RequestedAt)
993+
assert.Equal(t, requestedAt[1], blobs[numBlobs-2].RequestedAt)
994+
checkBlobsDesc(t, blobs)
995+
996+
// Test exclusive end
997+
blobs, err = blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, firstBlobTime-1, requestedAt[4], 0, false)
998+
require.NoError(t, err)
999+
require.Equal(t, 4, len(blobs))
1000+
assert.Equal(t, requestedAt[3], blobs[0].RequestedAt)
1001+
assert.Equal(t, requestedAt[0], blobs[3].RequestedAt)
1002+
checkBlobsDesc(t, blobs)
1003+
})
1004+
1005+
// Test pagination
1006+
t.Run("pagination", func(t *testing.T) {
1007+
for i := 1; i < numBlobs; i++ {
1008+
blobs, err := blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, requestedAt[i-1], requestedAt[i]+1, 0, true)
1009+
require.NoError(t, err)
1010+
require.Equal(t, 1, len(blobs))
1011+
assert.Equal(t, requestedAt[i], blobs[0].RequestedAt)
1012+
}
1013+
1014+
for i := 1; i < numBlobs; i++ {
1015+
blobs, err := blobMetadataStore.GetBlobMetadataByAccountID(ctx, accountId, requestedAt[i-1], requestedAt[i]+1, 0, false)
1016+
require.NoError(t, err)
1017+
require.Equal(t, 1, len(blobs))
1018+
assert.Equal(t, requestedAt[i], blobs[0].RequestedAt)
1019+
}
1020+
})
1021+
}
1022+
8351023
func TestBlobMetadataStoreGetAttestationByAttestedAtForward(t *testing.T) {
8361024
ctx := context.Background()
8371025
numBatches := 72

0 commit comments

Comments
 (0)