Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions disperser/cmd/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
NumRequestRetries: ctx.GlobalInt(flags.NumRequestRetriesFlag.Name),
MaxBatchSize: int32(ctx.GlobalInt(flags.MaxBatchSizeFlag.Name)),
SignificantSigningThresholdPercentage: uint8(ctx.GlobalUint(flags.SignificantSigningThresholdPercentageFlag.Name)),
SignificantSigningMetricsThresholds: ctx.GlobalStringSlice(flags.SignificantSigningMetricsThresholdsFlag.Name),
},
NumConcurrentEncodingRequests: ctx.GlobalInt(flags.NumConcurrentEncodingRequestsFlag.Name),
NumConcurrentDispersalRequests: ctx.GlobalInt(flags.NumConcurrentDispersalRequestsFlag.Name),
Expand Down
9 changes: 9 additions & 0 deletions disperser/cmd/controller/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,14 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "SIGNIFICANT_SIGNING_THRESHOLD_PERCENTAGE"),
Value: 55,
}
defaultSigningThresholds cli.StringSlice = []string{"0.55", "0.67"}
SignificantSigningMetricsThresholdsFlag = cli.StringSliceFlag{
Name: common.PrefixFlag(FlagPrefix, "significant-signing-thresholds"),
Usage: "Significant signing thresholds for metrics, each must be between 0.0 and 1.0",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "SIGNIFICANT_SIGNING_METRICS_THRESHOLDS"),
Value: &defaultSigningThresholds,
}
)

var requiredFlags = []cli.Flag{
Expand Down Expand Up @@ -261,6 +269,7 @@ var optionalFlags = []cli.Flag{
ControllerReadinessProbePathFlag,
ControllerHealthProbePathFlag,
SignificantSigningThresholdPercentageFlag,
SignificantSigningMetricsThresholdsFlag,
}

var Flags []cli.Flag
Expand Down
29 changes: 24 additions & 5 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/Layr-Labs/eigenda/common/healthcheck"
"math"
"slices"
"strconv"
"strings"
"time"

"github.com/Layr-Labs/eigenda/common/healthcheck"

"github.com/Layr-Labs/eigenda/api/clients/v2"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -44,9 +46,10 @@ type DispatcherConfig struct {
// SignificantSigningThresholdPercentage is a configurable "important" signing threshold. Right now, it's being
// used to track signing metrics, to understand system performance. If the value is 0, then special handling for
// the threshold is disabled.
// TODO (litt3): this might eventually be used to cause special case handling at an "important" threshold, e.g.
// "update the attestation as soon as the threshold is reached."
SignificantSigningThresholdPercentage uint8
// Important signing thresholds for metrics reporting.
// Values should be between 0.0 (0% signed) and 1.0 (100% signed).
SignificantSigningMetricsThresholds []string
}

type Dispatcher struct {
Expand Down Expand Up @@ -101,6 +104,22 @@ func NewDispatcher(
config.MaxBatchSize == 0 {
return nil, errors.New("invalid config")
}

// CLI library doesn't support float slices at current version, parsing must happen manually
significantThresholds := make([]float64, 0, len(config.SignificantSigningMetricsThresholds))
for _, threshold := range config.SignificantSigningMetricsThresholds {
significantThreshold, err := strconv.ParseFloat(threshold, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse significant threshold %s: %v", threshold, err)
}
significantThresholds = append(significantThresholds, significantThreshold)
}

metrics, err := newDispatcherMetrics(registry, significantThresholds)
if err != nil {
return nil, fmt.Errorf("failed to initialize metrics: %v", err)
}

return &Dispatcher{
DispatcherConfig: config,

Expand All @@ -110,7 +129,7 @@ func NewDispatcher(
aggregator: aggregator,
nodeClientManager: nodeClientManager,
logger: logger.With("component", "Dispatcher"),
metrics: newDispatcherMetrics(registry),
metrics: metrics,

cursor: nil,
beforeDispatch: beforeDispatch,
Expand Down Expand Up @@ -170,7 +189,7 @@ func (d *Dispatcher) HandleBatch(
) (chan core.SigningMessage, *batchData, error) {
// Signal Liveness to indicate no stall
healthcheck.SignalHeartbeat("dispatcher", d.controllerLivenessChan, d.logger)

batchProbe.SetStage("get_reference_block")
currentBlockNumber, err := d.chainState.GetCurrentBlockNumber(ctx)
if err != nil {
Expand Down
74 changes: 71 additions & 3 deletions disperser/controller/dispatcher_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"fmt"
"sort"
"time"

"github.com/Layr-Labs/eigenda/common"
Expand Down Expand Up @@ -31,10 +32,19 @@ type dispatcherMetrics struct {
blobSetSize *prometheus.GaugeVec
batchStageTimer *common.StageTimer
sendToValidatorStageTimer *common.StageTimer
importantSigningThresholds []float64
signatureThresholds *prometheus.CounterVec
}

// NewDispatcherMetrics sets up metrics for the dispatcher.
func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics {
//
// importantSigningThresholds is a list of meaningful thresholds. Thresholds should be between 0.0 and 1.0.
// A count of batches meeting each specified threshold is reported as a metric.
func newDispatcherMetrics(
registry *prometheus.Registry,
importantSigningThresholds []float64,
) (*dispatcherMetrics, error) {

objectives := map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}

attestation := promauto.With(registry).NewGaugeVec(
Expand Down Expand Up @@ -181,6 +191,31 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics {
"send_to_validator",
false)

// Verify that thresholds are sane
for _, threshold := range importantSigningThresholds {
if threshold < 0 || threshold > 1 {
return nil, fmt.Errorf("threshold %f is not between 0.0 and 1.0", threshold)
}
}
sort.Float64s(importantSigningThresholds)

// Add thresholds for 0.0 and 1.0, if missing.
if len(importantSigningThresholds) == 0 || importantSigningThresholds[0] != 0.0 {
importantSigningThresholds = append([]float64{0.0}, importantSigningThresholds...)
}
if importantSigningThresholds[len(importantSigningThresholds)-1] != 1.0 {
importantSigningThresholds = append(importantSigningThresholds, 1.0)
}

batchSigningThresholdCount := promauto.With(registry).NewCounterVec(
prometheus.CounterOpts{
Namespace: dispatcherNamespace,
Name: "batch_signing_threshold_count",
Help: "A count of batches that have reached various signature thresholds.",
},
[]string{"quorum", "threshold"},
)

return &dispatcherMetrics{
sendChunksRetryCount: sendChunksRetryCount,
processSigningMessageLatency: processSigningMessageLatency,
Expand All @@ -198,7 +233,9 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics {
blobSetSize: blobSetSize,
batchStageTimer: batchStageTimer,
sendToValidatorStageTimer: sendToValidatorStageTimer,
}
importantSigningThresholds: importantSigningThresholds,
signatureThresholds: batchSigningThresholdCount,
}, nil
}

func (m *dispatcherMetrics) reportSendChunksRetryCount(retries float64) {
Expand Down Expand Up @@ -266,7 +303,11 @@ func (m *dispatcherMetrics) reportBlobSetSize(size int) {
m.blobSetSize.WithLabelValues().Set(float64(size))
}

func (m *dispatcherMetrics) reportAttestation(operatorCount map[core.QuorumID]int, signerCount map[core.QuorumID]int, quorumResults map[core.QuorumID]*core.QuorumResult) {
func (m *dispatcherMetrics) reportAttestation(
operatorCount map[core.QuorumID]int,
signerCount map[core.QuorumID]int,
quorumResults map[core.QuorumID]*core.QuorumResult) {

for quorumID, count := range operatorCount {
quorumStr := fmt.Sprintf("%d", quorumID)
signers, ok := signerCount[quorumID]
Expand All @@ -282,7 +323,34 @@ func (m *dispatcherMetrics) reportAttestation(operatorCount map[core.QuorumID]in
m.attestation.WithLabelValues("signers", quorumStr).Set(float64(signers))
m.attestation.WithLabelValues("non_signers", quorumStr).Set(float64(nonSigners))
m.attestation.WithLabelValues("percent_signed", quorumStr).Set(float64(quorumResult.PercentSigned))

m.reportSigningThreshold(quorumID, float64(quorumResult.PercentSigned)/100.0)
}
}

func (m *dispatcherMetrics) reportSigningThreshold(quorumID core.QuorumID, signingFraction float64) {
// First, determine the threshold to report. In order to be reported as threshold X, the signing fraction
// must be greater than or equal to X, but strictly less than the next highest threshold.
//
// For example, let's say important thresholds are [0, 0.55, 0.67, 0.80, 1.0]
// 0.55 signing -> threshold 0.55 (>= 0.55 but < 0.67)
// 0.56 signing -> threshold 0.55 (>= 0.55 but < 0.67)
// 0.66 signing -> threshold 0.55 (>= 0.55 but < 0.67)
// 0.67 signing -> threshold 0.67 (>= 0.67 but < 0.80)

var threshold float64
for i := len(m.importantSigningThresholds) - 1; i >= 0; i-- {
candidateThreshold := m.importantSigningThresholds[i]
if candidateThreshold <= signingFraction {
threshold = candidateThreshold
break
}
}

quorumString := fmt.Sprintf("%d", quorumID)
thresholdString := fmt.Sprintf("%f", threshold)

m.signatureThresholds.WithLabelValues(quorumString, thresholdString).Inc()
}

func (m *dispatcherMetrics) newBatchProbe() *common.SequenceProbe {
Expand Down
Loading