-
Notifications
You must be signed in to change notification settings - Fork 237
feat: periodically return attestation with updated signatures #1497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
c05347d
bc85930
0bd99d5
3e4ba55
03b794e
5d3ea11
2df5562
07674e2
56f1c05
a5d535c
aec015e
b026783
045310c
097ba51
1cd8155
4c9d0a2
2b64d53
2a0b2a2
ee6c2be
77b37d5
6351dc2
67d8e3c
3332ba2
c2f6759
838bcdb
92e2a26
4cea944
0f163eb
626c2ce
6cfe7cc
29a7d6e
4cb3883
ddad1a1
afa7304
f3b5f28
a164bf7
780efba
8ed2f53
70042aa
235efc4
7404ea9
de6e086
4aa8441
7639906
84315df
46c76cb
7b0103d
bb60720
8755300
ac4c35c
58b1779
37e185d
4c14b86
318f860
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,7 @@ type DispatcherConfig struct { | |
AttestationTimeout time.Duration | ||
// The maximum time permitted to wait for all nodes to provide signatures for a batch. | ||
BatchAttestationTimeout time.Duration | ||
// SignatureTickInterval is the interval at which new Attestations will be submitted to the blobMetadataStore, | ||
// SignatureTickInterval is the interval at which Attestations will be updated in the blobMetadataStore, | ||
// as signature gathering progresses. | ||
SignatureTickInterval time.Duration | ||
NumRequestRetries int | ||
|
@@ -320,10 +320,12 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, | |
return sigChan, batchData, nil | ||
} | ||
|
||
// HandleSignatures receives signatures from operators, validates, and aggregates them. | ||
// HandleSignatures receives SigningMessages from operators for a given batch through the input sigChan. The signatures | ||
// are validated, aggregated, and used to put an Attestation for the batch into the blobMetadataStore. The Attestation | ||
// is periodically updated as additional signatures are gathered. | ||
// | ||
// This method submits Attestations to the blobMetadataStore, containing signing data from the SigningMessages received | ||
// through the sigChan. It periodically submits Attestations, as signatures are gathered. | ||
// This method will continue gathering signatures until a SigningMessage has been received from every operator, or until | ||
// the global attestationCtx times out. | ||
func (d *Dispatcher) HandleSignatures( | ||
ctx context.Context, | ||
attestationCtx context.Context, | ||
|
@@ -350,26 +352,7 @@ func (d *Dispatcher) HandleSignatures( | |
} | ||
} | ||
|
||
// submit an empty attestation before starting to gather signatures. | ||
// a new attestation will be periodically resubmitted as signatures are gathered. | ||
attestation := &corev2.Attestation{ | ||
BatchHeader: batchData.Batch.BatchHeader, | ||
AttestedAt: uint64(time.Now().UnixNano()), | ||
NonSignerPubKeys: nil, | ||
APKG2: nil, | ||
QuorumAPKs: nil, | ||
Sigma: nil, | ||
QuorumNumbers: nil, | ||
QuorumResults: nil, | ||
} | ||
err := d.blobMetadataStore.PutAttestation(ctx, attestation) | ||
if err != nil { | ||
// this error isn't fatal: a subsequent PutAttestation attempt might succeed | ||
// TODO: this used to cause the HandleSignatures method to fail entirely. Is it ok to continue trying here? | ||
d.logger.Error("error calling PutAttestation", | ||
"err", err, | ||
"batchHeaderHash", batchHeaderHash) | ||
} | ||
// TODO: I removed the initial empty attestation update. Is that ok? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The empty attestation is created here so that an attestation is available for query "immediately" after the blob is marked as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After struggling with CI for a while, I decided to just keep the empty attestation |
||
|
||
// This channel will remain open until the attestationTimeout triggers, or until signatures from all validators | ||
// have been received and processed. It will periodically yield QuorumAttestations with the latest set of received | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. received and processed ? I assume because they're part of the same function if not we could have quit just after receiving ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
@@ -400,9 +383,9 @@ func (d *Dispatcher) HandleSignatures( | |
finalAttestation := &core.QuorumAttestation{} | ||
// continue receiving attestations from the channel until it's closed | ||
for receivedQuorumAttestation := range attestationChan { | ||
err := d.submitAttestation(ctx, batchData, receivedQuorumAttestation) | ||
err := d.updateAttestation(ctx, batchData, receivedQuorumAttestation) | ||
if err != nil { | ||
d.logger.Warnf("error submitting attestation for batch %s: %v", batchHeaderHash, err) | ||
d.logger.Warnf("error updating attestation for batch %s: %v", batchHeaderHash, err) | ||
continue | ||
} | ||
|
||
|
@@ -412,7 +395,7 @@ func (d *Dispatcher) HandleSignatures( | |
d.metrics.reportReceiveSignaturesLatency(time.Since(handleSignaturesStart)) | ||
|
||
updateBatchStatusStartTime := time.Now() | ||
_, quorumPercentages := d.parseAndLogQuorumPercentages(batchHeaderHash, finalAttestation.QuorumResults) | ||
_, quorumPercentages := d.parseQuorumPercentages(finalAttestation.QuorumResults) | ||
err = d.updateBatchStatus(ctx, batchData, quorumPercentages) | ||
d.metrics.reportUpdateBatchStatusLatency(time.Since(updateBatchStatusStartTime)) | ||
if err != nil { | ||
|
@@ -438,15 +421,13 @@ func (d *Dispatcher) HandleSignatures( | |
return nil | ||
} | ||
|
||
// submitAttestation submits a QuorumAttestation to the blobMetadataStore | ||
func (d *Dispatcher) submitAttestation( | ||
// updateAttestation updates the QuorumAttestation in the blobMetadataStore | ||
func (d *Dispatcher) updateAttestation( | ||
ctx context.Context, | ||
batchData *batchData, | ||
quorumAttestation *core.QuorumAttestation, | ||
) error { | ||
sortedNonZeroQuorums, quorumPercentages := d.parseAndLogQuorumPercentages( | ||
hex.EncodeToString(batchData.BatchHeaderHash[:]), | ||
quorumAttestation.QuorumResults) | ||
sortedNonZeroQuorums, quorumPercentages := d.parseQuorumPercentages(quorumAttestation.QuorumResults) | ||
if len(sortedNonZeroQuorums) == 0 { | ||
return errors.New("all quorums received no attestation for batch") | ||
} | ||
|
@@ -481,39 +462,47 @@ func (d *Dispatcher) submitAttestation( | |
return fmt.Errorf("put attestation: %w", err) | ||
litt3 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
d.logAttestationUpdate(hex.EncodeToString(batchData.BatchHeaderHash[:]), quorumAttestation.QuorumResults) | ||
|
||
return nil | ||
} | ||
|
||
// parseAndLogQuorumPercentages iterates over the map of QuorumResults, and logs the signing percentages of each quorum. | ||
// | ||
// This method returns a sorted slice of nonZeroQuorums (quorums with >0 signing percentage), and a map from QuorumID to | ||
// signing percentage. | ||
func (d *Dispatcher) parseAndLogQuorumPercentages( | ||
batchHeaderHash string, | ||
// parseQuorumPercentages iterates over the map of QuorumResults, and returns a sorted slice of nonZeroQuorums | ||
// (quorums with >0 signing percentage), and a map from QuorumID to signing percentage. | ||
func (d *Dispatcher) parseQuorumPercentages( | ||
quorumResults map[core.QuorumID]*core.QuorumResult, | ||
) ([]core.QuorumID, map[core.QuorumID]uint8) { | ||
nonZeroQuorums := make([]core.QuorumID, 0) | ||
quorumPercentages := make(map[core.QuorumID]uint8) | ||
|
||
messageBuilder := strings.Builder{} | ||
messageBuilder.WriteString(fmt.Sprintf("batchHeaderHash: %s (quorumID, percentSigned)", batchHeaderHash)) | ||
|
||
for quorumID, quorumResult := range quorumResults { | ||
messageBuilder.WriteString(fmt.Sprintf("\n%d, %d%%", quorumID, quorumResult.PercentSigned)) | ||
|
||
if quorumResult.PercentSigned > 0 { | ||
nonZeroQuorums = append(nonZeroQuorums, quorumID) | ||
quorumPercentages[quorumID] = quorumResult.PercentSigned | ||
} | ||
} | ||
|
||
d.logger.Debug(messageBuilder.String()) | ||
|
||
slices.Sort(nonZeroQuorums) | ||
|
||
return nonZeroQuorums, quorumPercentages | ||
} | ||
|
||
// logAttestationUpdate logs the attestation details, including batch header hash and quorum signing percentages | ||
func (d *Dispatcher) logAttestationUpdate(batchHeaderHash string, quorumResults map[core.QuorumID]*core.QuorumResult) { | ||
quorumPercentagesBuilder := strings.Builder{} | ||
quorumPercentagesBuilder.WriteString("(") | ||
|
||
for quorumID, quorumResult := range quorumResults { | ||
quorumPercentagesBuilder.WriteString( | ||
fmt.Sprintf("quorum_%d: %d%%, ", quorumID, quorumResult.PercentSigned)) | ||
} | ||
quorumPercentagesBuilder.WriteString(")") | ||
|
||
d.logger.Debug("attestation updated", | ||
"batchHeaderHash", batchHeaderHash, | ||
"quorumPercentages", quorumPercentagesBuilder.String()) | ||
} | ||
|
||
func (d *Dispatcher) dedupBlobs(blobs []*v2.BlobMetadata) []*v2.BlobMetadata { | ||
dedupedBlobs := make([]*v2.BlobMetadata, 0) | ||
for _, blob := range blobs { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reminder to resolve TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed