Skip to content

Commit 1ef09d5

Browse files
ian-shimjianoaix
authored andcommitted
Update metrics (#32)
1 parent f4fdae8 commit 1ef09d5

File tree

7 files changed

+44
-47
lines changed

7 files changed

+44
-47
lines changed

core/aggregation.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(state *IndexedOperatorState
9292
socket = op.Socket
9393
}
9494
if r.Err != nil {
95-
a.Logger.Warn("Error returned from messageChan", "operator", operatorIDHex, "socket", socket, "err", r.Err)
95+
a.Logger.Warn("[AggregateSignatures] error returned from messageChan", "operator", operatorIDHex, "socket", socket, "err", r.Err)
9696
continue
9797
}
9898

@@ -110,6 +110,8 @@ func (a *StdSignatureAggregator) AggregateSignatures(state *IndexedOperatorState
110110
continue
111111
}
112112

113+
a.Logger.Info("[AggregateSignatures] received signature from operator", "operator", operatorIDHex, "socket", socket)
114+
113115
for ind, id := range quorumIDs {
114116

115117
// Get stake amounts for operator

disperser/batcher/batcher.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,9 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.
187187
// Append the error
188188
result = multierror.Append(result, err)
189189
}
190+
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Failed)
190191
}
191192

192-
b.Metrics.UpdateFailedBatchAndBlobs(len(blobMetadatas))
193-
194193
// Return the error(s)
195194
return result.ErrorOrNil()
196195
}
@@ -318,8 +317,10 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
318317

319318
if status == disperser.Confirmed {
320319
_, updateConfirmationInfoErr = b.Queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo)
320+
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Confirmed)
321321
} else if status == disperser.InsufficientSignatures {
322322
_, updateConfirmationInfoErr = b.Queue.MarkBlobInsufficientSignatures(ctx, metadata, confirmationInfo)
323+
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures)
323324
} else {
324325
updateConfirmationInfoErr = fmt.Errorf("HandleSingleBatch: trying to update confirmation info for blob in status other than confirmed or insufficient signatures: %s", status.String())
325326
}
@@ -340,8 +341,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
340341

341342
log.Trace("[batcher] Update confirmation info took", "duration", time.Since(stageTimer))
342343
b.Metrics.ObserveLatency("UpdateConfirmationInfo", float64(time.Since(stageTimer).Milliseconds()))
343-
344-
b.Metrics.UpdateCompletedBatchAndBlobs(batch.BlobMetadata, passed)
344+
b.Metrics.IncrementBatchCount(len(batch.BlobMetadata))
345345
return nil
346346
}
347347

disperser/batcher/batcher_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/Layr-Labs/eigenda/common"
13+
"github.com/Layr-Labs/eigenda/common/logging"
1314
cmock "github.com/Layr-Labs/eigenda/common/mock"
1415
"github.com/Layr-Labs/eigenda/core"
1516
"github.com/Layr-Labs/eigenda/core/encoding"
@@ -63,14 +64,15 @@ func makeTestBlob(securityParams []*core.SecurityParam) core.Blob {
6364

6465
func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher) {
6566
// Common Components
66-
logger := &cmock.Logger{}
67+
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
68+
assert.NoError(t, err)
6769

6870
// Core Components
6971
cst, err := coremock.NewChainDataMock(10)
7072
assert.NoError(t, err)
7173
cst.On("GetCurrentBlockNumber").Return(uint(10), nil)
7274
asgn := &core.StdAssignmentCoordinator{}
73-
agg := &core.StdSignatureAggregator{}
75+
agg := core.NewStdSignatureAggregator(logger)
7476
enc, err := makeTestEncoder()
7577
assert.NoError(t, err)
7678

disperser/batcher/encoded_blob_store.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ func (e *encodedBlobStore) PutEncodingResult(result *EncodingResult) error {
102102
}
103103
e.encoded[requestID] = result
104104
delete(e.requested, requestID)
105-
e.logger.Trace("[PutEncodingResult]", "referenceBlockNumber", result.ReferenceBlockNumber, "requestID", requestID, "encodedSize", e.encodedResultSize)
106105

107106
return nil
108107
}

disperser/batcher/metrics.go

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
5252
Name: "batches_total",
5353
Help: "the number and size of total dispersal batch",
5454
},
55-
[]string{"state", "data"},
55+
[]string{"data"},
5656
),
5757
BatchProcLatency: promauto.With(reg).NewSummaryVec(
5858
prometheus.SummaryOpts{
@@ -85,43 +85,34 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
8585
return metrics
8686
}
8787

88-
func (g *Metrics) UpdateAttestation(signers, nonSigners int) {
89-
g.Attestation.WithLabelValues("signers").Set(float64(signers))
90-
g.Attestation.WithLabelValues("non_signers").Set(float64(nonSigners))
88+
func (g *Metrics) UpdateAttestation(operatorCount, nonSignerCount int) {
89+
g.Attestation.WithLabelValues("signers").Set(float64(operatorCount - nonSignerCount))
90+
g.Attestation.WithLabelValues("non_signers").Set(float64(nonSignerCount))
9191
}
9292

93-
// UpdateFailedBatchAndBlobs updates failed a batch and number of blob within it, it only
94-
// counts the number of blob and batches
95-
func (g *Metrics) UpdateFailedBatchAndBlobs(numBlob int) {
96-
g.Blob.WithLabelValues("failed", "number").Add(float64(numBlob))
97-
g.Batch.WithLabelValues("failed", "number").Inc()
98-
}
99-
100-
// UpdateCompletedBatchAndBlobs updates whenever there is a completed batch. it updates both the
101-
// number for batch and blob, and it updates size of data blob. Moreover, it updates the
102-
// time it takes to process the entire batch from "getting the blobs" to "marking as finished"
103-
func (g *Metrics) UpdateCompletedBatchAndBlobs(blobsInBatch []*disperser.BlobMetadata, succeeded []bool) {
104-
totalBlobSucceeded := 0
105-
totalBlobFailed := 0
106-
totalBlobSize := 0
107-
108-
for ind, metadata := range blobsInBatch {
109-
if succeeded[ind] {
110-
totalBlobSucceeded += 1
111-
totalBlobSize += int(metadata.RequestMetadata.BlobSize)
112-
} else {
113-
totalBlobFailed += 1
114-
}
93+
// UpdateCompletedBlob increments the number and updates size of processed blobs.
94+
func (g *Metrics) UpdateCompletedBlob(size int, status disperser.BlobStatus) {
95+
switch status {
96+
case disperser.Confirmed:
97+
g.Blob.WithLabelValues("confirmed", "number").Inc()
98+
g.Blob.WithLabelValues("confirmed", "size").Add(float64(size))
99+
case disperser.Failed:
100+
g.Blob.WithLabelValues("failed", "number").Inc()
101+
g.Blob.WithLabelValues("failed", "size").Add(float64(size))
102+
case disperser.InsufficientSignatures:
103+
g.Blob.WithLabelValues("insufficient_signature", "number").Inc()
104+
g.Blob.WithLabelValues("insufficient_signature", "size").Add(float64(size))
105+
default:
106+
return
115107
}
116108

117-
// Failed blob
118-
g.UpdateFailedBatchAndBlobs(totalBlobFailed)
109+
g.Blob.WithLabelValues("total", "number").Inc()
110+
g.Blob.WithLabelValues("total", "size").Add(float64(size))
111+
}
119112

120-
// Blob
121-
g.Blob.WithLabelValues("completed", "number").Add(float64(totalBlobSucceeded))
122-
g.Blob.WithLabelValues("completed", "size").Add(float64(totalBlobSize))
123-
// Batch
124-
g.Batch.WithLabelValues("completed", "number").Inc()
113+
func (g *Metrics) IncrementBatchCount(size int) {
114+
g.Batch.WithLabelValues("number").Inc()
115+
g.Batch.WithLabelValues("size").Add(float64(size))
125116
}
126117

127118
func (g *Metrics) ObserveLatency(stage string, latencyMs float64) {

disperser/dispatcher/dispatcher.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,13 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage,
8989
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
9090
defer cancel()
9191

92-
request, err := GetStoreChunksRequest(blobs, header)
92+
request, totalSize, err := GetStoreChunksRequest(blobs, header)
9393
if err != nil {
9494
return nil, err
9595
}
9696

9797
opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 1024)
98+
c.logger.Debug("sending chunks to operator", "operator", op.Socket, "size", totalSize)
9899
reply, err := gc.StoreChunks(ctx, request, opt)
99100

100101
if err != nil {
@@ -106,23 +107,24 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage,
106107
return sig, nil
107108
}
108109

109-
func GetStoreChunksRequest(blobMessages []*core.BlobMessage, header *core.BatchHeader) (*node.StoreChunksRequest, error) {
110-
110+
func GetStoreChunksRequest(blobMessages []*core.BlobMessage, header *core.BatchHeader) (*node.StoreChunksRequest, int, error) {
111111
blobs := make([]*node.Blob, len(blobMessages))
112+
totalSize := 0
112113
for i, blob := range blobMessages {
113114
var err error
114115
blobs[i], err = getBlobMessage(blob)
115116
if err != nil {
116-
return nil, err
117+
return nil, 0, err
117118
}
119+
totalSize += blob.BlobHeader.EncodedSizeAllQuorums()
118120
}
119121

120122
request := &node.StoreChunksRequest{
121123
BatchHeader: getBatchHeaderMessage(header),
122124
Blobs: blobs,
123125
}
124126

125-
return request, nil
127+
return request, totalSize, nil
126128
}
127129

128130
func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) {

node/grpc/server_load_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,9 @@ func TestStoreChunks(t *testing.T) {
9393
numTotalChunks += len(blobMessagesByOp[opID][i].Bundles[0])
9494
}
9595
t.Logf("Batch numTotalChunks: %d", numTotalChunks)
96-
req, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader)
96+
req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader)
9797
assert.NoError(t, err)
98+
assert.Equal(t, 50790400, totalSize)
9899

99100
timer := time.Now()
100101
reply, err := server.StoreChunks(context.Background(), req)

0 commit comments

Comments
 (0)