Skip to content

Commit 57a7b3b

Browse files
authored
feat: add metrics to payload disperser (#1441)
Signed-off-by: Cody Littley <[email protected]>
1 parent 1f48397 commit 57a7b3b

File tree

7 files changed

+211
-27
lines changed

7 files changed

+211
-27
lines changed

api/clients/v2/disperser_client.go

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/Layr-Labs/eigenda/api"
1010
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
11+
"github.com/Layr-Labs/eigenda/common"
1112
"github.com/Layr-Labs/eigenda/core"
1213
corev2 "github.com/Layr-Labs/eigenda/core/v2"
1314
dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
@@ -23,13 +24,29 @@ type DisperserClientConfig struct {
2324
UseSecureGrpcFlag bool
2425
}
2526

27+
// DisperserClient manages communication with the disperser server.
2628
type DisperserClient interface {
29+
// Close closes the grpc connection to the disperser server.
2730
Close() error
28-
DisperseBlob(ctx context.Context, data []byte, blobVersion corev2.BlobVersion, quorums []core.QuorumID) (*dispv2.BlobStatus, corev2.BlobKey, error)
31+
// DisperseBlob disperses a blob with the given data, blob version, and quorums.
32+
DisperseBlob(
33+
ctx context.Context,
34+
data []byte,
35+
blobVersion corev2.BlobVersion,
36+
quorums []core.QuorumID) (*dispv2.BlobStatus, corev2.BlobKey, error)
37+
// DisperseBlobWithProbe is similar to DisperseBlob, but also takes a SequenceProbe to capture metrics.
38+
// If the probe is nil, no metrics are captured.
39+
DisperseBlobWithProbe(
40+
ctx context.Context,
41+
data []byte,
42+
blobVersion corev2.BlobVersion,
43+
quorums []core.QuorumID,
44+
probe *common.SequenceProbe) (*dispv2.BlobStatus, corev2.BlobKey, error)
45+
// GetBlobStatus returns the status of a blob with the given blob key.
2946
GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error)
47+
// GetBlobCommitment returns the blob commitment for a given blob payload.
3048
GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error)
3149
}
32-
3350
type disperserClient struct {
3451
config *DisperserClientConfig
3552
signer corev2.BlobRequestSigner
@@ -39,7 +56,7 @@ type disperserClient struct {
3956
client disperser_rpc.DisperserClient
4057
prover encoding.Prover
4158
accountant *Accountant
42-
requestMutex sync.Mutex // Mutex to ensure only one dispersal request is sent at a time
59+
accountantLock sync.Mutex
4360
}
4461

4562
var _ DisperserClient = &disperserClient{}
@@ -128,42 +145,68 @@ func (c *disperserClient) DisperseBlob(
128145
blobVersion corev2.BlobVersion,
129146
quorums []core.QuorumID,
130147
) (*dispv2.BlobStatus, corev2.BlobKey, error) {
131-
c.requestMutex.Lock()
132-
defer c.requestMutex.Unlock()
148+
return c.DisperseBlobWithProbe(ctx, data, blobVersion, quorums, nil)
149+
}
150+
151+
// DisperseBlobWithProbe disperses a blob with the given data, blob version, and quorums. If sequenceProbe is not nil,
152+
// the probe is used to capture metrics during the dispersal process.
153+
func (c *disperserClient) DisperseBlobWithProbe(
154+
ctx context.Context,
155+
data []byte,
156+
blobVersion corev2.BlobVersion,
157+
quorums []core.QuorumID,
158+
probe *common.SequenceProbe,
159+
) (*dispv2.BlobStatus, corev2.BlobKey, error) {
160+
161+
if len(quorums) == 0 {
162+
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum numbers must be provided")
163+
}
164+
if c.signer == nil {
165+
return nil, [32]byte{}, api.NewErrorInternal("uninitialized signer for authenticated dispersal")
166+
}
167+
for _, q := range quorums {
168+
if q > corev2.MaxQuorumID {
169+
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum number must be less than 256")
170+
}
171+
}
133172

134173
err := c.initOnceGrpcConnection()
135174
if err != nil {
136175
return nil, [32]byte{}, api.NewErrorFailover(err)
137176
}
177+
178+
probe.SetStage("acquire_accountant_lock")
179+
c.accountantLock.Lock()
180+
181+
probe.SetStage("prepare_for_dispersal")
182+
138183
err = c.initOncePopulateAccountant(ctx)
139184
if err != nil {
140185
return nil, [32]byte{}, api.NewErrorFailover(err)
141186
}
142187

143-
if c.signer == nil {
144-
return nil, [32]byte{}, api.NewErrorInternal("uninitialized signer for authenticated dispersal")
145-
}
146-
147188
symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
148189
payment, err := c.accountant.AccountBlob(ctx, time.Now().UnixNano(), uint64(symbolLength), quorums)
149190
if err != nil {
191+
c.accountantLock.Unlock()
150192
return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
151193
}
152194

153-
if len(quorums) == 0 {
154-
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum numbers must be provided")
155-
}
156-
157-
for _, q := range quorums {
158-
if q > corev2.MaxQuorumID {
159-
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum number must be less than 256")
160-
}
195+
if payment.CumulativePayment == nil || payment.CumulativePayment.Sign() == 0 {
196+
// This request is using reserved bandwidth, no need to prevent parallel dispersal.
197+
c.accountantLock.Unlock()
198+
} else {
199+
// This request is using on-demand bandwidth, current implementation requires sequential dispersal.
200+
defer c.accountantLock.Unlock()
161201
}
162202

163203
// check every 32 bytes of data are within the valid range for a bn254 field element
164204
_, err = rs.ToFrArray(data)
165205
if err != nil {
166-
return nil, [32]byte{}, fmt.Errorf("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617 %w", err)
206+
return nil, [32]byte{}, fmt.Errorf(
207+
"encountered an error to convert a 32-bytes into a valid field element, "+
208+
"please use the correct format where every 32bytes(big-endian) is less than "+
209+
"21888242871839275222246405745257275088548364400416034343698204186575808495617 %w", err)
167210
}
168211

169212
var blobCommitments encoding.BlobCommitments
@@ -220,6 +263,8 @@ func (c *disperserClient) DisperseBlob(
220263
BlobHeader: blobHeaderProto,
221264
}
222265

266+
probe.SetStage("send_to_disperser")
267+
223268
reply, err := c.client.DisperseBlob(ctx, request)
224269
if err != nil {
225270
return nil, [32]byte{}, fmt.Errorf("error while calling DisperseBlob: %w", err)
@@ -230,6 +275,8 @@ func (c *disperserClient) DisperseBlob(
230275
return nil, [32]byte{}, err
231276
}
232277

278+
probe.SetStage("verify_blob_key")
279+
233280
if verifyReceivedBlobKey(blobHeader, reply) != nil {
234281
return nil, [32]byte{}, fmt.Errorf("verify received blob key: %w", err)
235282
}
@@ -245,9 +292,9 @@ func (c *disperserClient) DisperseBlob(
245292
//
246293
// This function returns nil if the verification succeeds, and otherwise returns an error describing the failure
247294
func verifyReceivedBlobKey(
248-
// the blob header which was constructed locally and sent to the disperser
295+
// the blob header which was constructed locally and sent to the disperser
249296
blobHeader *corev2.BlobHeader,
250-
// the reply received back from the disperser
297+
// the reply received back from the disperser
251298
disperserReply *disperser_rpc.DisperseBlobReply,
252299
) error {
253300

api/clients/v2/payloaddispersal/payload_disperser.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"github.com/Layr-Labs/eigenda/api/clients/v2"
99
"github.com/Layr-Labs/eigenda/api/clients/v2/coretypes"
1010
dispgrpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
11+
"github.com/Layr-Labs/eigenda/common"
1112
core "github.com/Layr-Labs/eigenda/core/v2"
1213
"github.com/Layr-Labs/eigensdk-go/logging"
14+
"github.com/prometheus/client_golang/prometheus"
1315
)
1416

1517
// PayloadDisperser provides the ability to disperse payloads to EigenDA via a Disperser grpc service.
@@ -20,9 +22,11 @@ type PayloadDisperser struct {
2022
config PayloadDisperserConfig
2123
disperserClient clients.DisperserClient
2224
certVerifier clients.ICertVerifier
25+
stageTimer *common.StageTimer
2326
}
2427

2528
// NewPayloadDisperser creates a PayloadDisperser from subcomponents that have already been constructed and initialized.
29+
// If the registry is nil then no metrics will be collected.
2630
func NewPayloadDisperser(
2731
logger logging.Logger,
2832
payloadDisperserConfig PayloadDisperserConfig,
@@ -35,6 +39,8 @@ func NewPayloadDisperser(
3539
// disperser, but the disperser's commitments will be verifiable without needing a full-fledged prover
3640
disperserClient clients.DisperserClient,
3741
certVerifier clients.ICertVerifier,
42+
// if nil, then no metrics will be collected
43+
registry *prometheus.Registry,
3844
) (*PayloadDisperser, error) {
3945

4046
err := payloadDisperserConfig.checkAndSetDefaults()
@@ -47,6 +53,7 @@ func NewPayloadDisperser(
4753
config: payloadDisperserConfig,
4854
disperserClient: disperserClient,
4955
certVerifier: certVerifier,
56+
stageTimer: common.NewStageTimer(registry, "PayloadDisperser", "SendPayload"),
5057
}, nil
5158
}
5259

@@ -63,11 +70,17 @@ func (pd *PayloadDisperser) SendPayload(
6370
// payload is the raw data to be stored on eigenDA
6471
payload *coretypes.Payload,
6572
) (*coretypes.EigenDACert, error) {
73+
74+
probe := pd.stageTimer.NewSequence("convert_to_blob")
75+
defer probe.End()
76+
6677
blob, err := payload.ToBlob(pd.config.PayloadPolynomialForm)
6778
if err != nil {
6879
return nil, fmt.Errorf("convert payload to blob: %w", err)
6980
}
7081

82+
probe.SetStage("get_quorums")
83+
7184
timeoutCtx, cancel := context.WithTimeout(ctx, pd.config.ContractCallTimeout)
7285
defer cancel()
7386
requiredQuorums, err := pd.certVerifier.GetQuorumNumbersRequired(timeoutCtx)
@@ -82,31 +95,37 @@ func (pd *PayloadDisperser) SendPayload(
8295
// serialized bytes. The operations taking place in DisperseBlob require the bytes to be converted into field
8396
// elements anyway, so serializing the blob here is unnecessary work. This will be a larger change that affects
8497
// many areas of code, though.
85-
blobStatus, blobKey, err := pd.disperserClient.DisperseBlob(
98+
blobStatus, blobKey, err := pd.disperserClient.DisperseBlobWithProbe(
8699
timeoutCtx,
87100
blob.Serialize(),
88101
pd.config.BlobVersion,
89102
requiredQuorums,
90-
)
103+
probe)
91104
if err != nil {
92105
return nil, fmt.Errorf("disperse blob: %w", err)
93106
}
94107
pd.logger.Debug("Successful DisperseBlob", "blobStatus", blobStatus.String(), "blobKey", blobKey.Hex())
95108

109+
probe.SetStage("QUEUED")
110+
96111
timeoutCtx, cancel = context.WithTimeout(ctx, pd.config.BlobCertifiedTimeout)
97112
defer cancel()
98-
blobStatusReply, err := pd.pollBlobStatusUntilCertified(timeoutCtx, blobKey, blobStatus.ToProfobuf())
113+
blobStatusReply, err := pd.pollBlobStatusUntilCertified(timeoutCtx, blobKey, blobStatus.ToProfobuf(), probe)
99114
if err != nil {
100115
return nil, fmt.Errorf("poll blob status until certified: %w", err)
101116
}
102117
pd.logger.Debug("Blob status CERTIFIED", "blobKey", blobKey.Hex())
103118

119+
probe.SetStage("build_cert")
120+
104121
eigenDACert, err := pd.buildEigenDACert(ctx, blobKey, blobStatusReply)
105122
if err != nil {
106123
// error returned from method is sufficiently descriptive
107124
return nil, err
108125
}
109126

127+
probe.SetStage("verify_cert")
128+
110129
timeoutCtx, cancel = context.WithTimeout(ctx, pd.config.ContractCallTimeout)
111130
defer cancel()
112131
err = pd.certVerifier.VerifyCertV2(timeoutCtx, eigenDACert)
@@ -141,6 +160,7 @@ func (pd *PayloadDisperser) pollBlobStatusUntilCertified(
141160
ctx context.Context,
142161
blobKey core.BlobKey,
143162
initialStatus dispgrpc.BlobStatus,
163+
probe *common.SequenceProbe,
144164
) (*dispgrpc.BlobStatusReply, error) {
145165

146166
previousStatus := initialStatus
@@ -182,6 +202,9 @@ func (pd *PayloadDisperser) pollBlobStatusUntilCertified(
182202
case dispgrpc.BlobStatus_QUEUED, dispgrpc.BlobStatus_ENCODED, dispgrpc.BlobStatus_GATHERING_SIGNATURES:
183203
// TODO (litt): check signing percentage when we are gathering signatures, potentially break
184204
// out of this loop early if we have enough signatures
205+
206+
// Report all non-terminal statuses to the probe. Repeat reports are no-ops.
207+
probe.SetStage(newStatus.String())
185208
continue
186209
default:
187210
return nil, fmt.Errorf(

common/stage_timer.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package common
2+
3+
import (
4+
"time"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/client_golang/prometheus/promauto"
8+
)
9+
10+
// StageTimer encapsulates metrics to help track the time spent in each stage of the payload dispersal process.
11+
type StageTimer struct {
12+
stageCount *prometheus.GaugeVec
13+
stageLatency *prometheus.SummaryVec
14+
}
15+
16+
// SequenceProbe can be used to track the amount of time that a particular operation spends doing particular
17+
// sub-operations (i.e. stages). Multiple instances of a particular operation can be tracked concurrently by the same
18+
// StageTimer. For each operation, the StageTimer builds a SequenceProbe. Each SequenceProbe is responsible for
19+
// tracking the lifecycle of a single iteration of an operation.
20+
type SequenceProbe struct {
21+
stageTimer *StageTimer
22+
currentStage string
23+
currentStageStart time.Time
24+
}
25+
26+
// NewStageTimer creates a new stageTimer with the given prefix and name.
27+
func NewStageTimer(registry *prometheus.Registry, prefix, name string) *StageTimer {
28+
if registry == nil {
29+
return nil
30+
}
31+
32+
stageLatency := promauto.With(registry).NewSummaryVec(
33+
prometheus.SummaryOpts{
34+
Namespace: prefix,
35+
Name: name + "_stage_latency_ms",
36+
Help: "the latency of each type of operation",
37+
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
38+
},
39+
[]string{"stage"},
40+
)
41+
42+
stageCount := promauto.With(registry).NewGaugeVec(
43+
prometheus.GaugeOpts{
44+
Namespace: prefix,
45+
Name: name + "_stage_count",
46+
Help: "the number of operations with a specific stage",
47+
},
48+
[]string{"stage"},
49+
)
50+
51+
return &StageTimer{
52+
stageLatency: stageLatency,
53+
stageCount: stageCount,
54+
}
55+
}
56+
57+
// NewSequence creates a new sequenceProbe with the given initial stage.
58+
func (s *StageTimer) NewSequence(initialStage string) *SequenceProbe {
59+
if s == nil {
60+
return nil
61+
}
62+
63+
s.stageCount.WithLabelValues(initialStage).Inc()
64+
return &SequenceProbe{
65+
stageTimer: s,
66+
currentStage: initialStage,
67+
currentStageStart: time.Now(),
68+
}
69+
}
70+
71+
// SetStage updates the stage of the current sequence.
72+
func (p *SequenceProbe) SetStage(stage string) {
73+
if p == nil {
74+
return
75+
}
76+
if p.currentStage == stage {
77+
return
78+
}
79+
80+
now := time.Now()
81+
elapsed := now.Sub(p.currentStageStart)
82+
p.stageTimer.stageLatency.WithLabelValues(p.currentStage).Observe(ToMilliseconds(elapsed))
83+
p.currentStageStart = now
84+
85+
p.stageTimer.stageCount.WithLabelValues(p.currentStage).Dec()
86+
p.stageTimer.stageCount.WithLabelValues(stage).Inc()
87+
p.currentStage = stage
88+
}
89+
90+
// End completes the current sequence. It is important to call this before discarding the sequenceProbe.
91+
func (p *SequenceProbe) End() {
92+
if p == nil {
93+
return
94+
}
95+
96+
now := time.Now()
97+
elapsed := now.Sub(p.currentStageStart)
98+
p.stageTimer.stageLatency.WithLabelValues(p.currentStage).Observe(ToMilliseconds(elapsed))
99+
100+
p.stageTimer.stageCount.WithLabelValues(p.currentStage).Dec()
101+
}

disperser/common/v2/blobstore/dynamo_metadata_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1705,7 +1705,7 @@ func UnmarshalOperatorID(item commondynamodb.Item) (*core.OperatorID, error) {
17051705
operatorIDStr := obj.OperatorID
17061706
if strings.HasPrefix(operatorIDStr, dispersalRequestSKPrefix) {
17071707
operatorIDStr = strings.TrimPrefix(operatorIDStr, dispersalRequestSKPrefix)
1708-
} else if strings.HasPrefix(operatorIDStr, dispersalResponseSKPrefix) {
1708+
} else {
17091709
operatorIDStr = strings.TrimPrefix(operatorIDStr, dispersalResponseSKPrefix)
17101710
}
17111711

0 commit comments

Comments
 (0)