Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func RunController(ctx *cli.Context) error {
sigAgg,
nodeClientManager,
logger,
metricsRegistry,
)
if err != nil {
return fmt.Errorf("failed to create dispatcher: %v", err)
Expand Down
61 changes: 59 additions & 2 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"math"
"time"

Expand Down Expand Up @@ -38,6 +39,7 @@ type Dispatcher struct {
aggregator core.SignatureAggregator
nodeClientManager NodeClientManager
logger logging.Logger
metrics *dispatcherMetrics

cursor *blobstore.StatusIndexCursor
}
Expand All @@ -57,6 +59,7 @@ func NewDispatcher(
aggregator core.SignatureAggregator,
nodeClientManager NodeClientManager,
logger logging.Logger,
registry *prometheus.Registry,
) (*Dispatcher, error) {
if config == nil {
return nil, errors.New("config is required")
Expand All @@ -73,6 +76,7 @@ func NewDispatcher(
aggregator: aggregator,
nodeClientManager: nodeClientManager,
logger: logger.With("component", "Dispatcher"),
metrics: newDispatcherMetrics(registry),

cursor: nil,
}, nil
Expand Down Expand Up @@ -101,7 +105,6 @@ func (d *Dispatcher) Start(ctx context.Context) error {
}
continue
}

go func() {
err := d.HandleSignatures(ctx, batchData, sigChan)
if err != nil {
Expand All @@ -118,6 +121,11 @@ func (d *Dispatcher) Start(ctx context.Context) error {
}

func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, *batchData, error) {
start := time.Now()
defer func() {
d.metrics.reportHandleBatchLatency(time.Since(start))
}()

currentBlockNumber, err := d.chainState.GetCurrentBlockNumber()
if err != nil {
return nil, nil, fmt.Errorf("failed to get current block number: %w", err)
Expand Down Expand Up @@ -148,6 +156,8 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
continue
}

submissionStart := time.Now()

d.pool.Submit(func() {

req := &corev2.DispersalRequest{
Expand All @@ -158,14 +168,21 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
DispersedAt: uint64(time.Now().UnixNano()),
BatchHeader: *batch.BatchHeader,
}
putDispersalRequestStart := time.Now()
err := d.blobMetadataStore.PutDispersalRequest(ctx, req)
if err != nil {
d.logger.Error("failed to put dispersal request", "err", err)
return
}

for i := 0; i < d.NumRequestRetries+1; i++ {
d.metrics.reportPutDispersalRequestLatency(time.Since(putDispersalRequestStart))

var i int
for i = 0; i < d.NumRequestRetries+1; i++ {
sendChunksStart := time.Now()
sig, err := d.sendChunks(ctx, client, batch)
sendChunksFinished := time.Now()
d.metrics.reportSendChunksLatency(sendChunksFinished.Sub(sendChunksStart))
if err == nil {
storeErr := d.blobMetadataStore.PutDispersalResponse(ctx, &corev2.DispersalResponse{
DispersalRequest: req,
Expand All @@ -177,6 +194,8 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
d.logger.Error("failed to put dispersal response", "err", storeErr)
}

d.metrics.reportPutDispersalResponseLatency(time.Since(sendChunksFinished))

sigChan <- core.SigningMessage{
Signature: sig,
Operator: opID,
Expand All @@ -191,28 +210,42 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
d.logger.Warn("failed to send chunks", "operator", opID, "NumAttempts", i, "err", err)
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying
}
d.metrics.reportSendChunksRetryCount(float64(i))
})

d.metrics.reportPoolSubmissionLatency(time.Since(submissionStart))
}

return sigChan, batchData, nil
}

// HandleSignatures receives signatures from operators, validates, and aggregates them
func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, sigChan chan core.SigningMessage) error {
handleSignaturesStart := time.Now()
defer func() {
d.metrics.reportHandleSignaturesLatency(time.Since(handleSignaturesStart))
}()

quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan)
if err != nil {
return fmt.Errorf("failed to receive and validate signatures: %w", err)
}
receiveSignaturesFinished := time.Now()
d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart))

quorums := make([]core.QuorumID, len(quorumAttestation.QuorumResults))
i := 0
for quorumID := range quorumAttestation.QuorumResults {
quorums[i] = quorumID
i++
}
aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, quorums)
aggregateSignaturesFinished := time.Now()
d.metrics.reportAggregateSignaturesLatency(aggregateSignaturesFinished.Sub(receiveSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to aggregate signatures: %w", err)
}

err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{
BatchHeader: batchData.Batch.BatchHeader,
AttestedAt: uint64(time.Now().UnixNano()),
Expand All @@ -222,11 +255,15 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
Sigma: aggSig.AggSignature,
QuorumNumbers: quorums,
})
putAttestationFinished := time.Now()
d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to put attestation: %w", err)
}

err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified)
updateBatchStatusFinished := time.Now()
d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished))
if err != nil {
return fmt.Errorf("failed to mark blobs as certified: %w", err)
}
Expand All @@ -237,7 +274,14 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
// NewBatch creates a batch of blobs to dispatch
// Warning: This function is not thread-safe
func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) (*batchData, error) {
newBatchStart := time.Now()
defer func() {
d.metrics.reportNewBatchLatency(time.Since(newBatchStart))
}()

blobMetadatas, cursor, err := d.blobMetadataStore.GetBlobMetadataByStatusPaginated(ctx, v2.Encoded, d.cursor, d.MaxBatchSize)
getBlobMetadataFinished := time.Now()
d.metrics.reportGetBlobMetadataLatency(getBlobMetadataFinished.Sub(newBatchStart))
if err != nil {
return nil, fmt.Errorf("failed to get blob metadata by status: %w", err)
}
Expand All @@ -247,6 +291,8 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
}

state, err := d.GetOperatorState(ctx, blobMetadatas, referenceBlockNumber)
getOperatorStateFinished := time.Now()
d.metrics.reportGetOperatorStateLatency(getOperatorStateFinished.Sub(getBlobMetadataFinished))
if err != nil {
return nil, fmt.Errorf("failed to get operator state: %w", err)
}
Expand All @@ -264,6 +310,8 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
}

certs, _, err := d.blobMetadataStore.GetBlobCertificates(ctx, keys)
getBlobCertificatesFinished := time.Now()
d.metrics.reportGetBlobCertificatesLatency(getBlobCertificatesFinished.Sub(getOperatorStateFinished))
if err != nil {
return nil, fmt.Errorf("failed to get blob certificates: %w", err)
}
Expand Down Expand Up @@ -304,11 +352,15 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
copy(batchHeader.BatchRoot[:], tree.Root())

batchHeaderHash, err := batchHeader.Hash()
buildMerkleTreeFinished := time.Now()
d.metrics.reportBuildMerkleTreeLatency(buildMerkleTreeFinished.Sub(getBlobCertificatesFinished))
if err != nil {
return nil, fmt.Errorf("failed to hash batch header: %w", err)
}

err = d.blobMetadataStore.PutBatchHeader(ctx, batchHeader)
putBatchHeaderFinished := time.Now()
d.metrics.reportPutBatchHeaderLatency(putBatchHeaderFinished.Sub(buildMerkleTreeFinished))
if err != nil {
return nil, fmt.Errorf("failed to put batch header: %w", err)
}
Expand Down Expand Up @@ -338,13 +390,18 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
}
}

proofGenerationFinished := time.Now()
d.metrics.reportProofLatency(proofGenerationFinished.Sub(putBatchHeaderFinished))

verificationInfos := make([]*corev2.BlobVerificationInfo, len(verificationInfoMap))
i := 0
for _, v := range verificationInfoMap {
verificationInfos[i] = v
i++
}
err = d.blobMetadataStore.PutBlobVerificationInfos(ctx, verificationInfos)
putBlobVerificationInfosFinished := time.Now()
d.metrics.reportPutVerificationInfosLatency(putBlobVerificationInfosFinished.Sub(proofGenerationFinished))
if err != nil {
return nil, fmt.Errorf("failed to put blob verification infos: %w", err)
}
Expand Down
Loading
Loading