Skip to content

Commit 54a44b3

Browse files
authored
refactor: use metadata store to check for blob existence (#1380)
1 parent 9b19b60 commit 54a44b3

File tree

6 files changed

+132
-36
lines changed

6 files changed

+132
-36
lines changed

common/aws/dynamodb/client.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type Client interface {
5555
UpdateItemWithCondition(ctx context.Context, tableName string, key Key, item Item, condition expression.ConditionBuilder) (Item, error)
5656
IncrementBy(ctx context.Context, tableName string, key Key, attr string, value uint64) (Item, error)
5757
GetItem(ctx context.Context, tableName string, key Key) (Item, error)
58+
GetItemWithInput(ctx context.Context, input *dynamodb.GetItemInput) (Item, error)
5859
GetItems(ctx context.Context, tableName string, keys []Key, consistentRead bool) ([]Item, error)
5960
QueryIndex(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
6061
Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
@@ -269,6 +270,16 @@ func (c *client) GetItem(ctx context.Context, tableName string, key Key) (Item,
269270
return resp.Item, nil
270271
}
271272

273+
// GetItemWithInput is a wrapper for the GetItem function that allows for a custom GetItemInput
274+
func (c *client) GetItemWithInput(ctx context.Context, input *dynamodb.GetItemInput) (Item, error) {
275+
resp, err := c.dynamoClient.GetItem(ctx, input)
276+
if err != nil {
277+
return nil, err
278+
}
279+
280+
return resp.Item, nil
281+
}
282+
272283
// GetItems returns the items for the given keys
273284
// Note: ordering of items is not guaranteed
274285
func (c *client) GetItems(ctx context.Context, tableName string, keys []Key, consistentRead bool) ([]Item, error) {

common/aws/mock/dynamodb_client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ func (c *MockDynamoDBClient) GetItem(ctx context.Context, tableName string, key
5959
return args.Get(0).(dynamodb.Item), args.Error(1)
6060
}
6161

62+
func (c *MockDynamoDBClient) GetItemWithInput(ctx context.Context, input *awsdynamodb.GetItemInput) (dynamodb.Item, error) {
63+
args := c.Called()
64+
return args.Get(0).(dynamodb.Item), args.Error(1)
65+
}
66+
6267
func (c *MockDynamoDBClient) GetItems(ctx context.Context, tableName string, keys []dynamodb.Key, consistentRead bool) ([]dynamodb.Item, error) {
6368
args := c.Called()
6469
return args.Get(0).([]dynamodb.Item), args.Error(1)

disperser/apiserver/disperse_blob_v2.go

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
2929
if onchainState == nil {
3030
return nil, api.NewErrorInternal("onchain state is nil")
3131
}
32-
if err := s.validateDispersalRequest(ctx, req, onchainState); err != nil {
32+
blobHeader, err := s.validateDispersalRequest(req, onchainState)
33+
if err != nil {
3334
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to validate the request: %v", err))
3435
}
3536

37+
if err := s.checkBlobExistence(ctx, blobHeader); err != nil {
38+
return nil, err
39+
}
40+
3641
// Check against payment meter to make sure there is quota remaining
3742
if err := s.checkPaymentMeter(ctx, req, start); err != nil {
3843
return nil, err
@@ -43,10 +48,6 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
4348

4449
blob := req.GetBlob()
4550
s.metrics.reportDisperseBlobSize(len(blob))
46-
blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader())
47-
if err != nil {
48-
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to parse the blob header proto: %v", err))
49-
}
5051
s.logger.Debug("received a new blob dispersal request", "blobSizeBytes", len(blob), "quorums", req.GetBlobHeader().GetQuorumNumbers())
5152

5253
blobKey, err := s.StoreBlob(ctx, blob, blobHeader, req.GetSignature(), time.Now(), onchainState.TTL)
@@ -133,100 +134,110 @@ func (s *DispersalServerV2) checkPaymentMeter(ctx context.Context, req *pb.Dispe
133134
}
134135

135136
func (s *DispersalServerV2) validateDispersalRequest(
136-
ctx context.Context,
137137
req *pb.DisperseBlobRequest,
138-
onchainState *OnchainState) error {
138+
onchainState *OnchainState) (*corev2.BlobHeader, error) {
139139

140140
signature := req.GetSignature()
141141
if len(signature) != 65 {
142-
return fmt.Errorf("signature is expected to be 65 bytes, but got %d bytes", len(signature))
142+
return nil, fmt.Errorf("signature is expected to be 65 bytes, but got %d bytes", len(signature))
143143
}
144144
blob := req.GetBlob()
145145
blobSize := len(blob)
146146
if blobSize == 0 {
147-
return errors.New("blob size must be greater than 0")
147+
return nil, errors.New("blob size must be greater than 0")
148148
}
149149
blobLength := encoding.GetBlobLengthPowerOf2(uint(blobSize))
150150
if blobLength > uint(s.maxNumSymbolsPerBlob) {
151-
return errors.New("blob size too big")
151+
return nil, errors.New("blob size too big")
152152
}
153153

154154
blobHeaderProto := req.GetBlobHeader()
155155
if blobHeaderProto.GetCommitment() == nil {
156-
return errors.New("blob header must contain commitments")
156+
return nil, errors.New("blob header must contain commitments")
157157
}
158158

159159
if blobHeaderProto.GetCommitment() == nil {
160-
return errors.New("blob header must contain a commitment")
160+
return nil, errors.New("blob header must contain a commitment")
161161
}
162162
commitedBlobLength := blobHeaderProto.GetCommitment().GetLength()
163163
if commitedBlobLength == 0 || commitedBlobLength != encoding.NextPowerOf2(commitedBlobLength) {
164-
return errors.New("invalid commitment length, must be a power of 2")
164+
return nil, errors.New("invalid commitment length, must be a power of 2")
165165
}
166166
lengthPowerOf2 := encoding.GetBlobLengthPowerOf2(uint(blobSize))
167167
if lengthPowerOf2 > uint(commitedBlobLength) {
168-
return fmt.Errorf("commitment length %d is less than blob length %d", commitedBlobLength, lengthPowerOf2)
168+
return nil, fmt.Errorf("commitment length %d is less than blob length %d", commitedBlobLength, lengthPowerOf2)
169169
}
170170

171171
blobHeader, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto)
172172
if err != nil {
173-
return fmt.Errorf("invalid blob header: %w", err)
173+
return nil, fmt.Errorf("invalid blob header: %w", err)
174174
}
175175

176176
if blobHeader.PaymentMetadata == (core.PaymentMetadata{}) {
177-
return errors.New("payment metadata is required")
177+
return nil, errors.New("payment metadata is required")
178178
}
179179

180180
timestampIsNegative := blobHeader.PaymentMetadata.Timestamp < 0
181181
paymentIsNegative := blobHeader.PaymentMetadata.CumulativePayment.Cmp(big.NewInt(0)) == -1
182182
timestampIsZeroAndPaymentIsZero := blobHeader.PaymentMetadata.Timestamp == 0 && blobHeader.PaymentMetadata.CumulativePayment.Cmp(big.NewInt(0)) == 0
183183
if timestampIsNegative || paymentIsNegative || timestampIsZeroAndPaymentIsZero {
184-
return errors.New("invalid payment metadata")
184+
return nil, errors.New("invalid payment metadata")
185185
}
186186

187187
if len(blobHeaderProto.GetQuorumNumbers()) == 0 {
188-
return errors.New("blob header must contain at least one quorum number")
188+
return nil, errors.New("blob header must contain at least one quorum number")
189189
}
190190

191191
if len(blobHeaderProto.GetQuorumNumbers()) > int(onchainState.QuorumCount) {
192-
return fmt.Errorf("too many quorum numbers specified: maximum is %d", onchainState.QuorumCount)
192+
return nil, fmt.Errorf("too many quorum numbers specified: maximum is %d", onchainState.QuorumCount)
193193
}
194194

195195
for _, quorum := range blobHeaderProto.GetQuorumNumbers() {
196196
if quorum > corev2.MaxQuorumID || uint8(quorum) >= onchainState.QuorumCount {
197-
return fmt.Errorf("invalid quorum number %d; maximum is %d", quorum, onchainState.QuorumCount)
197+
return nil, fmt.Errorf("invalid quorum number %d; maximum is %d", quorum, onchainState.QuorumCount)
198198
}
199199
}
200200

201201
// validate every 32 bytes is a valid field element
202202
_, err = rs.ToFrArray(blob)
203203
if err != nil {
204204
s.logger.Error("failed to convert a 32bytes as a field element", "err", err)
205-
return errors.New("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617")
205+
return nil, errors.New("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617")
206206
}
207207

208208
if _, ok := onchainState.BlobVersionParameters.Get(corev2.BlobVersion(blobHeaderProto.GetVersion())); !ok {
209-
return fmt.Errorf("invalid blob version %d; valid blob versions are: %v", blobHeaderProto.GetVersion(), onchainState.BlobVersionParameters.Keys())
209+
return nil, fmt.Errorf("invalid blob version %d; valid blob versions are: %v", blobHeaderProto.GetVersion(), onchainState.BlobVersionParameters.Keys())
210210
}
211211

212212
if err = s.authenticator.AuthenticateBlobRequest(blobHeader, signature); err != nil {
213-
return fmt.Errorf("authentication failed: %w", err)
213+
return nil, fmt.Errorf("authentication failed: %w", err)
214214
}
215215

216216
commitments, err := s.prover.GetCommitmentsForPaddedLength(blob)
217217
if err != nil {
218-
return fmt.Errorf("failed to get commitments: %w", err)
218+
return nil, fmt.Errorf("failed to get commitments: %w", err)
219219
}
220220
if !commitments.Equal(&blobHeader.BlobCommitments) {
221-
return errors.New("invalid blob commitment")
221+
return nil, errors.New("invalid blob commitment")
222222
}
223223

224+
return blobHeader, nil
225+
}
226+
227+
func (s *DispersalServerV2) checkBlobExistence(ctx context.Context, blobHeader *corev2.BlobHeader) error {
224228
blobKey, err := blobHeader.BlobKey()
225229
if err != nil {
226-
return fmt.Errorf("failed to get blob key: %w", err)
230+
return api.NewErrorInvalidArg(fmt.Sprintf("failed to parse the blob header: %v", err))
231+
}
232+
233+
// check if blob already exists
234+
exists, err := s.blobMetadataStore.CheckBlobExists(ctx, blobKey)
235+
if err != nil {
236+
return api.NewErrorInternal(fmt.Sprintf("failed to check blob existence: %v", err))
227237
}
228-
if s.blobStore.CheckBlobExists(ctx, blobKey) {
229-
return fmt.Errorf("blob already exists: %s", blobKey.Hex())
238+
239+
if exists {
240+
return api.NewErrorAlreadyExists(fmt.Sprintf("blob already exists: %s", blobKey.Hex()))
230241
}
231242

232243
return nil

disperser/common/v2/blobstore/dynamo_metadata_store.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,30 @@ func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey corev2.
265265
return metadata, nil
266266
}
267267

268+
// CheckBlobExists checks if a blob exists without fetching the entire metadata.
269+
func (s *BlobMetadataStore) CheckBlobExists(ctx context.Context, blobKey corev2.BlobKey) (bool, error) {
270+
input := &dynamodb.GetItemInput{
271+
TableName: aws.String(s.tableName),
272+
Key: map[string]types.AttributeValue{
273+
"PK": &types.AttributeValueMemberS{
274+
Value: blobKeyPrefix + blobKey.Hex(),
275+
},
276+
"SK": &types.AttributeValueMemberS{
277+
Value: blobMetadataSK,
278+
},
279+
},
280+
ProjectionExpression: aws.String("PK"), // Only fetch the PK attribute
281+
}
282+
283+
item, err := s.dynamoDBClient.GetItemWithInput(ctx, input)
284+
if err != nil {
285+
return false, fmt.Errorf("failed to check blob existence: %w", err)
286+
}
287+
288+
// If the item is not nil, the blob exists
289+
return item != nil, nil
290+
}
291+
268292
// GetBlobMetadataByStatus returns all the metadata with the given status that were updated after lastUpdatedAt
269293
// Because this function scans the entire index, it should only be used for status with a limited number of items.
270294
// Results are ordered by UpdatedAt in ascending order.

disperser/common/v2/blobstore/dynamo_metadata_store_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1934,3 +1934,53 @@ func newBlob(t *testing.T) (corev2.BlobKey, *corev2.BlobHeader) {
19341934
require.NoError(t, err)
19351935
return bk, bh
19361936
}
1937+
1938+
func TestCheckBlobExists(t *testing.T) {
1939+
ctx := context.Background()
1940+
// Create a test blob
1941+
blobKey, blobHeader := newBlob(t)
1942+
1943+
// Check that the blob does not exist initially
1944+
exists, err := blobMetadataStore.CheckBlobExists(ctx, blobKey)
1945+
require.NoError(t, err)
1946+
require.False(t, exists, "Blob should not exist before being added")
1947+
1948+
// Create blob metadata
1949+
blobMetadata := &v2.BlobMetadata{
1950+
BlobHeader: blobHeader,
1951+
Signature: []byte("test-signature"),
1952+
BlobStatus: v2.Queued,
1953+
Expiry: uint64(time.Now().Add(time.Hour).Unix()),
1954+
NumRetries: 0,
1955+
BlobSize: 1024,
1956+
RequestedAt: uint64(time.Now().UnixNano()),
1957+
UpdatedAt: uint64(time.Now().UnixNano()),
1958+
}
1959+
1960+
// Store the blob metadata
1961+
err = blobMetadataStore.PutBlobMetadata(ctx, blobMetadata)
1962+
require.NoError(t, err)
1963+
1964+
// Check that the blob now exists
1965+
exists, err = blobMetadataStore.CheckBlobExists(ctx, blobKey)
1966+
require.NoError(t, err)
1967+
require.True(t, exists, "Blob should exist after being added")
1968+
1969+
// Delete the blob metadata
1970+
err = blobMetadataStore.DeleteBlobMetadata(ctx, blobKey)
1971+
require.NoError(t, err)
1972+
1973+
// Check that the blob no longer exists
1974+
exists, err = blobMetadataStore.CheckBlobExists(ctx, blobKey)
1975+
require.NoError(t, err)
1976+
require.False(t, exists, "Blob should not exist after being deleted")
1977+
1978+
// Test with non-existent blob key
1979+
randomKey := corev2.BlobKey{}
1980+
_, err = rand.Read(randomKey[:])
1981+
require.NoError(t, err)
1982+
1983+
exists, err = blobMetadataStore.CheckBlobExists(ctx, randomKey)
1984+
require.NoError(t, err)
1985+
require.False(t, exists, "Random blob key should not exist")
1986+
}

disperser/common/v2/blobstore/s3_blob_store.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,20 @@ func NewBlobStore(s3BucketName string, s3Client s3.Client, logger logging.Logger
2626

2727
// StoreBlob adds a blob to the blob store
2828
func (b *BlobStore) StoreBlob(ctx context.Context, key corev2.BlobKey, data []byte) error {
29-
if b.CheckBlobExists(ctx, key) {
29+
_, err := b.s3Client.HeadObject(ctx, b.bucketName, s3.ScopedBlobKey(key))
30+
if err == nil {
3031
b.logger.Warnf("blob already exists in bucket %s: %s", b.bucketName, key)
3132
return common.ErrAlreadyExists
3233
}
3334

34-
err := b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data)
35+
err = b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data)
3536
if err != nil {
3637
b.logger.Errorf("failed to upload blob in bucket %s: %v", b.bucketName, err)
3738
return err
3839
}
3940
return nil
4041
}
4142

42-
// CheckBlobExists checks if a blob exists in the blob store
43-
func (b *BlobStore) CheckBlobExists(ctx context.Context, key corev2.BlobKey) bool {
44-
_, err := b.s3Client.HeadObject(ctx, b.bucketName, s3.ScopedBlobKey(key))
45-
return err == nil
46-
}
47-
4843
// GetBlob retrieves a blob from the blob store
4944
func (b *BlobStore) GetBlob(ctx context.Context, key corev2.BlobKey) ([]byte, error) {
5045
data, err := b.s3Client.DownloadObject(ctx, b.bucketName, s3.ScopedBlobKey(key))

0 commit comments

Comments
 (0)