Skip to content

Commit 9f68f74

Browse files
authored
Relay metrics (#938)
Signed-off-by: Cody Littley <[email protected]>
1 parent b8deee5 commit 9f68f74

30 files changed

+1045
-132
lines changed

common/metrics/config.go

Lines changed: 0 additions & 10 deletions
This file was deleted.

common/metrics/metrics.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package metrics
22

3-
import "time"
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"time"
6+
)
47

58
// Metrics provides a convenient interface for reporting metrics.
69
type Metrics interface {
@@ -62,6 +65,9 @@ type Metrics interface {
6265
pollPeriod time.Duration,
6366
source func() float64,
6467
label ...any) error
68+
69+
// RegisterExternalMetrics registers prometheus collectors created outside the metrics framework.
70+
RegisterExternalMetrics(collectors ...prometheus.Collector)
6571
}
6672

6773
// Metric represents a metric that can be reported.

common/metrics/metrics_server.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ type metrics struct {
2424
// logger is the logger used to log messages.
2525
logger logging.Logger
2626

27-
// config is the configuration for the metrics.
28-
config *Config
27+
// namespace is prepended to all metric names.
28+
namespace string
2929

3030
// registry is the prometheus registry used to report metrics.
3131
registry *prometheus.Registry
@@ -55,13 +55,13 @@ type metrics struct {
5555
}
5656

5757
// NewMetrics creates a new Metrics instance.
58-
func NewMetrics(logger logging.Logger, config *Config) Metrics {
58+
func NewMetrics(logger logging.Logger, namespace string, port int) Metrics {
5959
reg := prometheus.NewRegistry()
6060
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
6161
reg.MustRegister(collectors.NewGoCollector())
6262

63-
logger.Infof("Starting metrics server at port %d", config.HTTPPort)
64-
addr := fmt.Sprintf(":%d", config.HTTPPort)
63+
logger.Infof("Starting metrics server at port %d", port)
64+
addr := fmt.Sprintf(":%d", port)
6565
mux := http.NewServeMux()
6666
mux.Handle("/metrics", promhttp.HandlerFor(
6767
reg,
@@ -74,7 +74,7 @@ func NewMetrics(logger logging.Logger, config *Config) Metrics {
7474

7575
m := &metrics{
7676
logger: logger,
77-
config: config,
77+
namespace: namespace,
7878
registry: reg,
7979
metricMap: make(map[metricID]Metric),
8080
isAlive: atomic.Bool{},
@@ -198,7 +198,7 @@ func (m *metrics) NewLatencyMetric(
198198
metric, err := newLatencyMetric(
199199
m.logger,
200200
m.registry,
201-
m.config.Namespace,
201+
m.namespace,
202202
name,
203203
description,
204204
objectives,
@@ -238,7 +238,7 @@ func (m *metrics) NewCountMetric(
238238
metric, err := newCountMetric(
239239
m.logger,
240240
m.registry,
241-
m.config.Namespace,
241+
m.namespace,
242242
name, description,
243243
labelTemplate)
244244

@@ -287,7 +287,7 @@ func (m *metrics) newGaugeMetricUnsafe(
287287
metric, err := newGaugeMetric(
288288
m.logger,
289289
m.registry,
290-
m.config.Namespace,
290+
m.namespace,
291291
name,
292292
unit,
293293
description,
@@ -368,7 +368,7 @@ func (m *metrics) GenerateMetricsDocumentation() string {
368368
}
369369
slices.SortFunc(metricIDs, sortFunc)
370370

371-
sb.Write([]byte(fmt.Sprintf("# Metrics Documentation for namespace '%s'\n\n", m.config.Namespace)))
371+
sb.Write([]byte(fmt.Sprintf("# Metrics Documentation for namespace '%s'\n\n", m.namespace)))
372372
sb.Write([]byte(fmt.Sprintf("This documentation was automatically generated at time `%s`\n\n",
373373
time.Now().Format(time.RFC3339))))
374374

@@ -402,7 +402,7 @@ func (m *metrics) GenerateMetricsDocumentation() string {
402402
sb.Write([]byte(fmt.Sprintf("| **Quantiles** | %s |\n", m.quantilesMap[*id])))
403403
}
404404
sb.Write([]byte(fmt.Sprintf("| **Fully Qualified Name** | `%s_%s_%s` |\n",
405-
m.config.Namespace, id.name, id.unit)))
405+
m.namespace, id.name, id.unit)))
406406
}
407407

408408
return sb.String()
@@ -428,3 +428,7 @@ func (m *metrics) WriteMetricsDocumentation(fileName string) error {
428428

429429
return nil
430430
}
431+
432+
func (m *metrics) RegisterExternalMetrics(collectors ...prometheus.Collector) {
433+
m.registry.MustRegister(collectors...)
434+
}

common/metrics/mock_metrics.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package metrics
22

33
import (
4+
"github.com/prometheus/client_golang/prometheus"
45
"time"
56
)
67

@@ -62,6 +63,10 @@ func (m *mockMetrics) NewAutoGauge(
6263
return nil
6364
}
6465

66+
func (m *mockMetrics) RegisterExternalMetrics(collectors ...prometheus.Collector) {
67+
68+
}
69+
6570
var _ CountMetric = &mockCountMetric{}
6671

6772
type mockCountMetric struct {

common/metrics/test/main.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,12 @@ type LabelType2 struct {
2424
}
2525

2626
func main() {
27-
28-
metricsConfig := &metrics.Config{
29-
Namespace: "test",
30-
HTTPPort: 9101,
31-
}
32-
3327
logger, err := common.NewLogger(common.DefaultLoggerConfig())
3428
if err != nil {
3529
panic(err)
3630
}
3731

38-
metricsServer := metrics.NewMetrics(logger, metricsConfig)
32+
metricsServer := metrics.NewMetrics(logger, "test", 9101)
3933

4034
l1, err := metricsServer.NewLatencyMetric(
4135
"l1",

metrics.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# EigenDA Metrics Documentation
22

3-
- [churner](operators/churner/churner-metrics.md)
3+
- [churner](operators/churner/mdoc/churner-metrics.md)
4+
- [relay](relay/mdoc/relay-metrics.md)
45

File renamed without changes.

operators/churner/metrics.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,7 @@ func NewMetrics(httpPort int, logger logging.Logger) (*Metrics, error) {
6565
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
6666
reg.MustRegister(collectors.NewGoCollector())
6767

68-
metricsServer := metrics.NewMetrics(logger, &metrics.Config{
69-
Namespace: "eigenda_churner",
70-
HTTPPort: httpPort,
71-
})
68+
metricsServer := metrics.NewMetrics(logger, "eigenda_churner", httpPort)
7269

7370
numRequests, err := metricsServer.NewCountMetric(
7471
"request",
@@ -100,7 +97,7 @@ func NewMetrics(httpPort int, logger logging.Logger) (*Metrics, error) {
10097

10198
// WriteMetricsDocumentation writes the metrics for the churner to a markdown file.
10299
func (g *Metrics) WriteMetricsDocumentation() error {
103-
return g.metricsServer.WriteMetricsDocumentation("operators/churner/churner-metrics.md")
100+
return g.metricsServer.WriteMetricsDocumentation("operators/churner/mdoc/churner-metrics.md")
104101
}
105102

106103
// ObserveLatency observes the latency of a stage

relay/blob_provider.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ func newBlobProvider(
3333
blobStore *blobstore.BlobStore,
3434
blobCacheSize uint64,
3535
maxIOConcurrency int,
36-
fetchTimeout time.Duration) (*blobProvider, error) {
36+
fetchTimeout time.Duration,
37+
metrics *cache.CacheAccessorMetrics) (*blobProvider, error) {
3738

3839
server := &blobProvider{
3940
ctx: ctx,
@@ -42,9 +43,12 @@ func newBlobProvider(
4243
fetchTimeout: fetchTimeout,
4344
}
4445

45-
c := cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize, computeBlobCacheWeight)
46+
cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](
47+
cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize, computeBlobCacheWeight),
48+
maxIOConcurrency,
49+
server.fetchBlob,
50+
metrics)
4651

47-
cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](c, maxIOConcurrency, server.fetchBlob)
4852
if err != nil {
4953
return nil, fmt.Errorf("error creating blob cache: %w", err)
5054
}

relay/blob_provider_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ func TestReadWrite(t *testing.T) {
4141
blobStore,
4242
1024*1024*32,
4343
32,
44-
10*time.Second)
44+
10*time.Second,
45+
nil)
4546
require.NoError(t, err)
4647

4748
// Read the blobs back.
@@ -78,7 +79,8 @@ func TestNonExistentBlob(t *testing.T) {
7879
blobStore,
7980
1024*1024*32,
8081
32,
81-
10*time.Second)
82+
10*time.Second,
83+
nil)
8284
require.NoError(t, err)
8385

8486
for i := 0; i < 10; i++ {

0 commit comments

Comments
 (0)