Skip to content
86 changes: 67 additions & 19 deletions api/clients/v2/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/Layr-Labs/eigenda/api"
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
Expand All @@ -23,13 +24,29 @@ type DisperserClientConfig struct {
UseSecureGrpcFlag bool
}

// DisperserClient manages communication with the disperser server.
type DisperserClient interface {
// Close closes the grpc connection to the disperser server.
Close() error
DisperseBlob(ctx context.Context, data []byte, blobVersion corev2.BlobVersion, quorums []core.QuorumID) (*dispv2.BlobStatus, corev2.BlobKey, error)
// DisperseBlob disperses a blob with the given data, blob version, and quorums.
DisperseBlob(
ctx context.Context,
data []byte,
blobVersion corev2.BlobVersion,
quorums []core.QuorumID) (*dispv2.BlobStatus, corev2.BlobKey, error)
// DisperseBlobWithProbe is similar to DisperseBlob, but also takes a SequenceProbe to capture metrics.
// If the probe is nil, no metrics are captured.
DisperseBlobWithProbe(
ctx context.Context,
data []byte,
blobVersion corev2.BlobVersion,
quorums []core.QuorumID,
probe *common.SequenceProbe) (*dispv2.BlobStatus, corev2.BlobKey, error)
// GetBlobStatus returns the status of a blob with the given blob key.
GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error)
// GetBlobCommitment returns the blob commitment for a given blob payload.
GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error)
}

type disperserClient struct {
config *DisperserClientConfig
signer corev2.BlobRequestSigner
Expand All @@ -39,7 +56,7 @@ type disperserClient struct {
client disperser_rpc.DisperserClient
prover encoding.Prover
accountant *Accountant
requestMutex sync.Mutex // Mutex to ensure only one dispersal request is sent at a time
accountantLock sync.Mutex
}

var _ DisperserClient = &disperserClient{}
Expand Down Expand Up @@ -128,10 +145,44 @@ func (c *disperserClient) DisperseBlob(
blobVersion corev2.BlobVersion,
quorums []core.QuorumID,
) (*dispv2.BlobStatus, corev2.BlobKey, error) {
c.requestMutex.Lock()
defer c.requestMutex.Unlock()
return c.DisperseBlobWithProbe(ctx, data, blobVersion, quorums, nil)
}

err := c.initOnceGrpcConnection()
// DisperseBlobWithProbe disperses a blob with the given data, blob version, and quorums. If sequenceProbe is not nil,
// the probe is used to capture metrics during the dispersal process.
func (c *disperserClient) DisperseBlobWithProbe(
ctx context.Context,
data []byte,
blobVersion corev2.BlobVersion,
quorums []core.QuorumID,
probe *common.SequenceProbe,
) (*dispv2.BlobStatus, corev2.BlobKey, error) {

if len(quorums) == 0 {
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum numbers must be provided")
}

probe.SetStage("acquire_accountant_lock")
c.accountantLock.Lock()

symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
payment, err := c.accountant.AccountBlob(ctx, time.Now().UnixNano(), uint64(symbolLength), quorums)
if err != nil {
c.accountantLock.Unlock()
return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
}

if payment.CumulativePayment.Sign() == 0 {
// This request is using reserved bandwidth, no need to prevent parallel dispersal.
c.accountantLock.Unlock()
} else {
// This request is using on-demand bandwidth, current implementation requires sequential dispersal.
defer c.accountantLock.Unlock()
}

probe.SetStage("prepare_for_dispersal")

err = c.initOnceGrpcConnection()
if err != nil {
return nil, [32]byte{}, api.NewErrorFailover(err)
}
Expand All @@ -144,16 +195,6 @@ func (c *disperserClient) DisperseBlob(
return nil, [32]byte{}, api.NewErrorInternal("uninitialized signer for authenticated dispersal")
}

symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
payment, err := c.accountant.AccountBlob(ctx, time.Now().UnixNano(), uint64(symbolLength), quorums)
if err != nil {
return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
}

if len(quorums) == 0 {
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum numbers must be provided")
}

for _, q := range quorums {
if q > corev2.MaxQuorumID {
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum number must be less than 256")
Expand All @@ -163,7 +204,10 @@ func (c *disperserClient) DisperseBlob(
// check every 32 bytes of data are within the valid range for a bn254 field element
_, err = rs.ToFrArray(data)
if err != nil {
return nil, [32]byte{}, fmt.Errorf("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617 %w", err)
return nil, [32]byte{}, fmt.Errorf(
"encountered an error to convert a 32-bytes into a valid field element, "+
"please use the correct format where every 32bytes(big-endian) is less than "+
"21888242871839275222246405745257275088548364400416034343698204186575808495617 %w", err)
}

var blobCommitments encoding.BlobCommitments
Expand Down Expand Up @@ -220,6 +264,8 @@ func (c *disperserClient) DisperseBlob(
BlobHeader: blobHeaderProto,
}

probe.SetStage("send_to_disperser")

reply, err := c.client.DisperseBlob(ctx, request)
if err != nil {
return nil, [32]byte{}, fmt.Errorf("error while calling DisperseBlob: %w", err)
Expand All @@ -230,6 +276,8 @@ func (c *disperserClient) DisperseBlob(
return nil, [32]byte{}, err
}

probe.SetStage("verify_blob_key")

if verifyReceivedBlobKey(blobHeader, reply) != nil {
return nil, [32]byte{}, fmt.Errorf("verify received blob key: %w", err)
}
Expand All @@ -245,9 +293,9 @@ func (c *disperserClient) DisperseBlob(
//
// This function returns nil if the verification succeeds, and otherwise returns an error describing the failure
func verifyReceivedBlobKey(
// the blob header which was constructed locally and sent to the disperser
// the blob header which was constructed locally and sent to the disperser
blobHeader *corev2.BlobHeader,
// the reply received back from the disperser
// the reply received back from the disperser
disperserReply *disperser_rpc.DisperseBlobReply,
) error {

Expand Down
44 changes: 33 additions & 11 deletions api/clients/v2/payloaddispersal/payload_disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"github.com/Layr-Labs/eigenda/api/clients/v2"
"github.com/Layr-Labs/eigenda/api/clients/v2/coretypes"
dispgrpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/common"
core "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
)

// PayloadDisperser provides the ability to disperse payloads to EigenDA via a Disperser grpc service.
Expand All @@ -20,21 +22,24 @@ type PayloadDisperser struct {
config PayloadDisperserConfig
disperserClient clients.DisperserClient
certVerifier clients.ICertVerifier
stageTimer *common.StageTimer
}

// NewPayloadDisperser creates a PayloadDisperser from subcomponents that have already been constructed and initialized.
// If the registry is nil then no metrics will be collected.
func NewPayloadDisperser(
logger logging.Logger,
payloadDisperserConfig PayloadDisperserConfig,
// IMPORTANT: it is permissible for the disperserClient to be configured without a prover, but operating with this
// configuration puts a trust assumption on the disperser. With a nil prover, the disperser is responsible for computing
// the commitments to a blob, and the PayloadDisperser doesn't have a mechanism to verify these commitments.
//
// TODO: In the future, an optimized method of commitment verification using fiat shamir transformation will
// be implemented. This feature will allow a PayloadDisperser to offload commitment generation onto the
// disperser, but the disperser's commitments will be verifiable without needing a full-fledged prover
// IMPORTANT: it is permissible for the disperserClient to be configured without a prover, but operating with this
// configuration puts a trust assumption on the disperser. With a nil prover, the disperser is responsible for computing
// the commitments to a blob, and the PayloadDisperser doesn't have a mechanism to verify these commitments.
//
// TODO: In the future, an optimized method of commitment verification using fiat shamir transformation will
// be implemented. This feature will allow a PayloadDisperser to offload commitment generation onto the
// disperser, but the disperser's commitments will be verifiable without needing a full-fledged prover
disperserClient clients.DisperserClient,
certVerifier clients.ICertVerifier,
registry *prometheus.Registry,
) (*PayloadDisperser, error) {

err := payloadDisperserConfig.checkAndSetDefaults()
Expand All @@ -47,6 +52,7 @@ func NewPayloadDisperser(
config: payloadDisperserConfig,
disperserClient: disperserClient,
certVerifier: certVerifier,
stageTimer: common.NewStageTimer(registry, "PayloadDisperser", "SendPayload"),
}, nil
}

Expand All @@ -60,14 +66,20 @@ func NewPayloadDisperser(
// 6. Return the valid cert
func (pd *PayloadDisperser) SendPayload(
ctx context.Context,
// payload is the raw data to be stored on eigenDA
// payload is the raw data to be stored on eigenDA
payload *coretypes.Payload,
) (*coretypes.EigenDACert, error) {

probe := pd.stageTimer.NewSequence("convert_to_blob")
defer probe.End()

blob, err := payload.ToBlob(pd.config.PayloadPolynomialForm)
if err != nil {
return nil, fmt.Errorf("convert payload to blob: %w", err)
}

probe.SetStage("get_quorums")

timeoutCtx, cancel := context.WithTimeout(ctx, pd.config.ContractCallTimeout)
defer cancel()
requiredQuorums, err := pd.certVerifier.GetQuorumNumbersRequired(timeoutCtx)
Expand All @@ -82,31 +94,37 @@ func (pd *PayloadDisperser) SendPayload(
// serialized bytes. The operations taking place in DisperseBlob require the bytes to be converted into field
// elements anyway, so serializing the blob here is unnecessary work. This will be a larger change that affects
// many areas of code, though.
blobStatus, blobKey, err := pd.disperserClient.DisperseBlob(
blobStatus, blobKey, err := pd.disperserClient.DisperseBlobWithProbe(
timeoutCtx,
blob.Serialize(),
pd.config.BlobVersion,
requiredQuorums,
)
probe)
if err != nil {
return nil, fmt.Errorf("disperse blob: %w", err)
}
pd.logger.Debug("Successful DisperseBlob", "blobStatus", blobStatus.String(), "blobKey", blobKey.Hex())

probe.SetStage(blobStatus.String())

timeoutCtx, cancel = context.WithTimeout(ctx, pd.config.BlobCertifiedTimeout)
defer cancel()
blobStatusReply, err := pd.pollBlobStatusUntilCertified(timeoutCtx, blobKey, blobStatus.ToProfobuf())
blobStatusReply, err := pd.pollBlobStatusUntilCertified(timeoutCtx, blobKey, blobStatus.ToProfobuf(), probe)
if err != nil {
return nil, fmt.Errorf("poll blob status until certified: %w", err)
}
pd.logger.Debug("Blob status CERTIFIED", "blobKey", blobKey.Hex())

probe.SetStage("build_cert")

eigenDACert, err := pd.buildEigenDACert(ctx, blobKey, blobStatusReply)
if err != nil {
// error returned from method is sufficiently descriptive
return nil, err
}

probe.SetStage("verify_cert")

timeoutCtx, cancel = context.WithTimeout(ctx, pd.config.ContractCallTimeout)
defer cancel()
err = pd.certVerifier.VerifyCertV2(timeoutCtx, eigenDACert)
Expand Down Expand Up @@ -141,6 +159,7 @@ func (pd *PayloadDisperser) pollBlobStatusUntilCertified(
ctx context.Context,
blobKey core.BlobKey,
initialStatus dispgrpc.BlobStatus,
probe *common.SequenceProbe,
) (*dispgrpc.BlobStatusReply, error) {

previousStatus := initialStatus
Expand Down Expand Up @@ -182,6 +201,9 @@ func (pd *PayloadDisperser) pollBlobStatusUntilCertified(
case dispgrpc.BlobStatus_QUEUED, dispgrpc.BlobStatus_ENCODED, dispgrpc.BlobStatus_GATHERING_SIGNATURES:
// TODO (litt): check signing percentage when we are gathering signatures, potentially break
// out of this loop early if we have enough signatures

// Report all non-terminal statuses to the probe. Repeat reports are no-ops.
probe.SetStage(newStatus.String())
continue
default:
return nil, fmt.Errorf(
Expand Down
100 changes: 100 additions & 0 deletions common/stage_timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package common

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// It may be helpful to generalize this utility to be used in other parts of the codebase. For a future PR, perhaps.

// StageTimer encapsulates metrics to help track the time spent in each stage of the payload dispersal process.
type StageTimer struct {
stageCount *prometheus.GaugeVec
stageLatency *prometheus.SummaryVec
}

// SequenceProbe tracks the timing of a single sequence of operations. Multiple sequences can be tracked concurrently.
type SequenceProbe struct {
stageTimer *StageTimer
currentStage string
currentStageStart time.Time
}

// NewStageTimer creates a new stageTimer with the given prefix and name.
func NewStageTimer(registry *prometheus.Registry, prefix, name string) *StageTimer {
if registry == nil {
return nil
}

statusLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: prefix,
Name: name + "_stage_latency_ms",
Help: "the latency of each type of operation",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"stage"},
)

statusCount := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: prefix,
Name: name + "_stage_count",
Help: "the number of operations with a specific status",
},
[]string{"stage"},
)

return &StageTimer{
stageLatency: statusLatency,
stageCount: statusCount,
}
}

// NewSequence creates a new sequenceProbe with the given initial status.
func (s *StageTimer) NewSequence(initialStatus string) *SequenceProbe {
if s == nil {
return nil
}

s.stageCount.WithLabelValues(initialStatus).Inc()
return &SequenceProbe{
stageTimer: s,
currentStage: initialStatus,
currentStageStart: time.Now(),
}
}

// SetStage updates the status of the current sequence.
func (p *SequenceProbe) SetStage(stage string) {
if p == nil {
return
}
if p.currentStage == stage {
return
}

now := time.Now()
elapsed := now.Sub(p.currentStageStart)
p.stageTimer.stageLatency.WithLabelValues(p.currentStage).Observe(ToMilliseconds(elapsed))
p.currentStageStart = now

p.stageTimer.stageCount.WithLabelValues(p.currentStage).Dec()
p.stageTimer.stageCount.WithLabelValues(stage).Inc()
p.currentStage = stage
}

// End completes the current sequence. It is important to call this before discarding the sequenceProbe.
func (p *SequenceProbe) End() {
if p == nil {
return
}

now := time.Now()
elapsed := now.Sub(p.currentStageStart)
p.stageTimer.stageLatency.WithLabelValues(p.currentStage).Observe(ToMilliseconds(elapsed))

p.stageTimer.stageCount.WithLabelValues(p.currentStage).Dec()
}
2 changes: 1 addition & 1 deletion disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,7 @@ func UnmarshalOperatorID(item commondynamodb.Item) (*core.OperatorID, error) {
operatorIDStr := obj.OperatorID
if strings.HasPrefix(operatorIDStr, dispersalRequestSKPrefix) {
operatorIDStr = strings.TrimPrefix(operatorIDStr, dispersalRequestSKPrefix)
} else if strings.HasPrefix(operatorIDStr, dispersalResponseSKPrefix) {
} else {
operatorIDStr = strings.TrimPrefix(operatorIDStr, dispersalResponseSKPrefix)
}

Expand Down
Loading
Loading