Skip to content

Commit 58bdff6

Browse files
committed
add metric to common controller - basic metrics utilities
use a different port, use a longer delta
1 parent 2890f4a commit 58bdff6

File tree

5 files changed

+449
-0
lines changed

5 files changed

+449
-0
lines changed

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,15 @@ require (
1010
github.com/imdario/mergo v0.3.7 // indirect
1111
github.com/kubernetes-csi/csi-lib-utils v0.7.0
1212
github.com/kubernetes-csi/csi-test v2.0.0+incompatible
13+
github.com/prometheus/client_golang v1.0.0
14+
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
15+
github.com/prometheus/common v0.4.1
1316
google.golang.org/grpc v1.26.0
1417
k8s.io/api v0.17.0
1518
k8s.io/apimachinery v0.17.1-beta.0
1619
k8s.io/client-go v0.17.0
1720
k8s.io/code-generator v0.0.0-20191121015212-c4c8f8345c7e
21+
k8s.io/component-base v0.17.0
1822
k8s.io/klog v1.0.0
1923
k8s.io/kubernetes v1.14.0
2024
)

pkg/metrics/metrics.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"fmt"
21+
"net"
22+
"net/http"
23+
"sync"
24+
"time"
25+
26+
"github.com/prometheus/client_golang/prometheus/promhttp"
27+
"k8s.io/apimachinery/pkg/types"
28+
k8smetrics "k8s.io/component-base/metrics"
29+
"k8s.io/klog"
30+
)
31+
32+
const (
33+
labelDriverName = "driver_name"
34+
labelOperationName = "operation_name"
35+
labelOperationStatus = "operation_status"
36+
subSystem = "snapshot_controller"
37+
metricName = "operation_total_seconds"
38+
metricHelpMsg = "Total number of seconds spent by the controller on an operation from end to end"
39+
)
40+
41+
type OperationState string
42+
43+
const (
44+
// Failed is an operation state which means the controller has encountered
45+
// an error which is not recoverable and the controller has marked a permanent
46+
// failure of the operation.
47+
Failure OperationState = "Failure"
48+
49+
// Success states that the controller has successfully executed the operation.
50+
Success OperationState = "Success"
51+
)
52+
53+
var metricBuckets = []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 300, 600}
54+
55+
type MetricsManager interface {
56+
// StartServing starts the metrics endpoint at the specified addr/pattern for
57+
// metrics managed by this MetricsManager. It spawns a goroutine to listen to
58+
// and serve HTTP requests received on addr/pattern
59+
// if the "pattern" is empty (i.e., ""), no endpoint will be started. An error
60+
// will be returned if there is any.
61+
StartServing(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error)
62+
63+
// OperationStart takes in an operation and cache it's start time.
64+
// if the operation already exists, it's an no-op.
65+
OperationStart(op Operation)
66+
67+
// DropOperation removes an operation from cache.
68+
// if the operation does not exist, it's an no-op.
69+
DropOperation(op Operation)
70+
71+
// RecordMetrics records a metric point. Note that it will be an no-op if an
72+
// operation has not been marked "Started" previously via invoking "OperationStart".
73+
// Invoking of RecordMetrics effectively removes the cached entry.
74+
// op - the operation which the metric is associated with.
75+
// state - the operation state
76+
// Failure: record a failure metric
77+
// Success: record a success metric
78+
RecordMetrics(op Operation, state OperationState)
79+
}
80+
81+
// Operation is a structure which holds information to identify a snapshot
82+
// related operation
83+
type Operation struct {
84+
// the name of the operation, for example: "CreateSnapshot", "DeleteSnapshot"
85+
Name string
86+
// the name of the driver which executes the operation
87+
Driver string
88+
// the resource UID to which the operation has been executed against
89+
ResourceID types.UID
90+
}
91+
92+
type operationMetricsManager struct {
93+
// cache is a concurrent-safe map which stores start timestamps for all
94+
// ongoing operations.
95+
// key is an Operation
96+
// value is the timestamp of the start time of the operation
97+
cache sync.Map
98+
99+
// registry is a wrapper around Prometheus Registry
100+
registry k8smetrics.KubeRegistry
101+
102+
// opLatencyMetrics is a Histogram metrics
103+
opLatencyMetrics *k8smetrics.HistogramVec
104+
}
105+
106+
func NewMetricsManager() MetricsManager {
107+
mgr := &operationMetricsManager{
108+
cache: sync.Map{},
109+
}
110+
mgr.init()
111+
return mgr
112+
}
113+
114+
func (opMgr *operationMetricsManager) OperationStart(op Operation) {
115+
opMgr.cache.LoadOrStore(op, time.Now())
116+
}
117+
118+
func (opMgr *operationMetricsManager) DropOperation(op Operation) {
119+
opMgr.cache.Delete(op)
120+
}
121+
122+
func (opMgr *operationMetricsManager) RecordMetrics(op Operation, state OperationState) {
123+
obj, exists := opMgr.cache.Load(op)
124+
if !exists {
125+
// the operation has not been cached, return directly
126+
return
127+
}
128+
ts, ok := obj.(time.Time)
129+
if !ok {
130+
// the cached item is not a time.Time, should NEVER happen, clean and return
131+
klog.Errorf("Invalid cache entry for key %v", op)
132+
opMgr.cache.Delete(op)
133+
return
134+
}
135+
duration := time.Since(ts).Seconds()
136+
opMgr.opLatencyMetrics.WithLabelValues(op.Driver, op.Name, string(state)).Observe(duration)
137+
opMgr.cache.Delete(op)
138+
}
139+
140+
func (opMgr *operationMetricsManager) init() {
141+
opMgr.registry = k8smetrics.NewKubeRegistry()
142+
opMgr.opLatencyMetrics = k8smetrics.NewHistogramVec(
143+
&k8smetrics.HistogramOpts{
144+
Subsystem: subSystem,
145+
Name: metricName,
146+
Help: metricHelpMsg,
147+
Buckets: metricBuckets,
148+
},
149+
[]string{labelDriverName, labelOperationName, labelOperationStatus},
150+
)
151+
opMgr.registry.MustRegister(opMgr.opLatencyMetrics)
152+
}
153+
154+
func (opMgr *operationMetricsManager) StartServing(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) {
155+
if addr == "" {
156+
return nil, fmt.Errorf("metrics endpoint will not be started as endpoint address is not specified")
157+
}
158+
// start listening
159+
l, err := net.Listen("tcp", addr)
160+
if err != nil {
161+
return nil, fmt.Errorf("failed to listen on address[%s], error[%v]", addr, err)
162+
}
163+
srv := &http.Server{Addr: addr}
164+
http.Handle(pattern, k8smetrics.HandlerFor(
165+
opMgr.registry,
166+
k8smetrics.HandlerOpts{
167+
ErrorLog: logger,
168+
ErrorHandling: k8smetrics.ContinueOnError,
169+
}))
170+
171+
// start serving the endpoint
172+
go func() {
173+
defer wg.Done()
174+
if err := srv.Serve(l); err != http.ErrServerClosed {
175+
klog.Fatalf("failed to start endpoint at:%s/%s, error: %v", addr, pattern, err)
176+
}
177+
}()
178+
return srv, nil
179+
}

0 commit comments

Comments
 (0)