Skip to content

Commit 7fedc98

Browse files
committed
reset cursors and deduplicate blobs in controller
1 parent d646501 commit 7fedc98

25 files changed

+528
-96
lines changed

api/docs/disperser_v2.html

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/docs/disperser_v2.md

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/docs/eigenda-protos.html

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/docs/eigenda-protos.md

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/grpc/disperser/v2/disperser_v2.pb.go

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/proto/disperser/v2/disperser_v2.proto

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,6 @@ enum BlobStatus {
157157
// and in doing so requesting that the validators sign to acknowledge receipt of the blob.
158158
// Requests that timeout or receive errors are resubmitted to DA nodes for some period of time set by the disperser,
159159
// after which the BlobStatus becomes COMPLETE.
160-
//
161-
// Note: this status is not currently implemented, and is a placeholder for future functionality.
162160
GATHERING_SIGNATURES = 3;
163161

164162
// COMPLETE means the blob has been dispersed to DA nodes, and the GATHERING_SIGNATURES period of time has completed.

common/aws/dynamodb/client.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ type Client interface {
5656
UpdateItemWithCondition(ctx context.Context, tableName string, key Key, item Item, condition expression.ConditionBuilder) (Item, error)
5757
IncrementBy(ctx context.Context, tableName string, key Key, attr string, value uint64) (Item, error)
5858
GetItem(ctx context.Context, tableName string, key Key) (Item, error)
59-
GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error)
59+
GetItems(ctx context.Context, tableName string, keys []Key, consistentRead bool) ([]Item, error)
6060
QueryIndex(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
6161
Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
6262
QueryWithInput(ctx context.Context, input *dynamodb.QueryInput) ([]Item, error)
@@ -276,8 +276,8 @@ func (c *client) GetItem(ctx context.Context, tableName string, key Key) (Item,
276276

277277
// GetItems returns the items for the given keys
278278
// Note: ordering of items is not guaranteed
279-
func (c *client) GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
280-
items, err := c.readItems(ctx, tableName, keys)
279+
func (c *client) GetItems(ctx context.Context, tableName string, keys []Key, consistentRead bool) ([]Item, error) {
280+
items, err := c.readItems(ctx, tableName, keys, consistentRead)
281281
if err != nil {
282282
return nil, err
283283
}
@@ -444,7 +444,12 @@ func (c *client) writeItems(ctx context.Context, tableName string, requestItems
444444
return failedItems, nil
445445
}
446446

447-
func (c *client) readItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
447+
func (c *client) readItems(
448+
ctx context.Context,
449+
tableName string,
450+
keys []Key,
451+
consistentRead bool,
452+
) ([]Item, error) {
448453
startIndex := 0
449454
items := make([]Item, 0)
450455
for startIndex < len(keys) {
@@ -454,7 +459,8 @@ func (c *client) readItems(ctx context.Context, tableName string, keys []Key) ([
454459
output, err := c.dynamoClient.BatchGetItem(ctx, &dynamodb.BatchGetItemInput{
455460
RequestItems: map[string]types.KeysAndAttributes{
456461
tableName: {
457-
Keys: keysBatch,
462+
Keys: keysBatch,
463+
ConsistentRead: aws.Bool(consistentRead),
458464
},
459465
},
460466
})

common/aws/dynamodb/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ func TestBatchOperations(t *testing.T) {
296296
}
297297
}
298298

299-
fetchedItems, err := dynamoClient.GetItems(ctx, tableName, keys)
299+
fetchedItems, err := dynamoClient.GetItems(ctx, tableName, keys, true)
300300
assert.NoError(t, err)
301301
assert.Len(t, fetchedItems, numItems)
302302
blobKeys := make([]string, numItems)

common/aws/mock/dynamodb_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (c *MockDynamoDBClient) GetItem(ctx context.Context, tableName string, key
5959
return args.Get(0).(dynamodb.Item), args.Error(1)
6060
}
6161

62-
func (c *MockDynamoDBClient) GetItems(ctx context.Context, tableName string, keys []dynamodb.Key) ([]dynamodb.Item, error) {
62+
func (c *MockDynamoDBClient) GetItems(ctx context.Context, tableName string, keys []dynamodb.Key, consistentRead bool) ([]dynamodb.Item, error) {
6363
args := c.Called()
6464
return args.Get(0).([]dynamodb.Item), args.Error(1)
6565
}

core/v2/types.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -350,14 +350,22 @@ func (a *Attestation) ToProtobuf() (*disperserpb.Attestation, error) {
350350
quorumResults[i] = a.QuorumResults[q]
351351
}
352352

353-
apkG2Bytes := a.APKG2.Bytes()
354-
sigmaBytes := a.Sigma.Bytes()
353+
var apkG2Bytes []byte
354+
var sigmaBytes []byte
355+
if a.APKG2 != nil {
356+
b := a.APKG2.Bytes()
357+
apkG2Bytes = b[:]
358+
}
359+
if a.Sigma != nil {
360+
b := a.Sigma.Bytes()
361+
sigmaBytes = b[:]
362+
}
355363

356364
return &disperserpb.Attestation{
357365
NonSignerPubkeys: nonSignerPubKeys,
358-
ApkG2: apkG2Bytes[:],
366+
ApkG2: apkG2Bytes,
359367
QuorumApks: quorumAPKs,
360-
Sigma: sigmaBytes[:],
368+
Sigma: sigmaBytes,
361369
QuorumNumbers: quorumNumbers,
362370
QuorumSignedPercentages: quorumResults,
363371
}, nil

0 commit comments

Comments
 (0)