Skip to content

Commit 7bd7171

Browse files
committed
add basic metrics utility functions
relax latency check to be greater than instead of within threshold
1 parent 509aa5f commit 7bd7171

File tree

5 files changed

+739
-0
lines changed

5 files changed

+739
-0
lines changed

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,15 @@ require (
1010
github.com/imdario/mergo v0.3.9 // indirect
1111
github.com/kubernetes-csi/csi-lib-utils v0.7.0
1212
github.com/kubernetes-csi/csi-test v2.0.0+incompatible
13+
github.com/prometheus/client_golang v1.0.0
14+
github.com/prometheus/client_model v0.2.0
15+
github.com/prometheus/common v0.4.1
1316
google.golang.org/grpc v1.28.0
1417
k8s.io/api v0.18.0
1518
k8s.io/apimachinery v0.18.0
1619
k8s.io/client-go v0.18.0
1720
k8s.io/code-generator v0.18.0
21+
k8s.io/component-base v0.18.0
1822
k8s.io/klog v1.0.0
1923
k8s.io/kubernetes v1.18.0
2024
)

pkg/metrics/metrics.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"fmt"
21+
"net"
22+
"net/http"
23+
"sync"
24+
"time"
25+
26+
"github.com/prometheus/client_golang/prometheus/promhttp"
27+
"k8s.io/apimachinery/pkg/types"
28+
k8smetrics "k8s.io/component-base/metrics"
29+
"k8s.io/klog"
30+
)
31+
32+
const (
33+
opStatusUnknown = "Unknown"
34+
labelDriverName = "driver_name"
35+
labelOperationName = "operation_name"
36+
labelOperationStatus = "operation_status"
37+
subSystem = "snapshot_controller"
38+
metricName = "operation_total_seconds"
39+
metricHelpMsg = "Total number of seconds spent by the controller on an operation from end to end"
40+
)
41+
42+
// OperationStatus is the interface type for representing an operation's execution
43+
// status, with the nil value representing an "Unknown" status of the operation.
44+
type OperationStatus interface {
45+
String() string
46+
}
47+
48+
var metricBuckets = []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 300, 600}
49+
50+
type MetricsManager interface {
51+
// StartMetricsEndpoint starts the metrics endpoint at the specified addr/pattern for
52+
// metrics managed by this MetricsManager. It spawns a goroutine to listen to
53+
// and serve HTTP requests received on addr/pattern.
54+
// If the "pattern" is empty (i.e., ""), no endpoint will be started.
55+
// An error will be returned if there is any.
56+
StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error)
57+
58+
// OperationStart takes in an operation and caches its start time.
59+
// if the operation already exists, it's an no-op.
60+
OperationStart(op Operation)
61+
62+
// DropOperation removes an operation from cache.
63+
// if the operation does not exist, it's an no-op.
64+
DropOperation(op Operation)
65+
66+
// RecordMetrics records a metric point. Note that it will be an no-op if an
67+
// operation has NOT been marked "Started" previously via invoking "OperationStart".
68+
// Invoking of RecordMetrics effectively removes the cached entry.
69+
// op - the operation which the metric is associated with.
70+
// status - the operation status, if not specified, i.e., status == nil, an
71+
// "Unknown" status of the passed-in operation is assumed.
72+
RecordMetrics(op Operation, status OperationStatus)
73+
}
74+
75+
// Operation is a structure which holds information to identify a snapshot
76+
// related operation
77+
type Operation struct {
78+
// the name of the operation, for example: "CreateSnapshot", "DeleteSnapshot"
79+
Name string
80+
// the name of the driver which executes the operation
81+
Driver string
82+
// the resource UID to which the operation has been executed against
83+
ResourceID types.UID
84+
}
85+
86+
type operationMetricsManager struct {
87+
// cache is a concurrent-safe map which stores start timestamps for all
88+
// ongoing operations.
89+
// key is an Operation
90+
// value is the timestamp of the start time of the operation
91+
cache sync.Map
92+
93+
// registry is a wrapper around Prometheus Registry
94+
registry k8smetrics.KubeRegistry
95+
96+
// opLatencyMetrics is a Histogram metrics
97+
opLatencyMetrics *k8smetrics.HistogramVec
98+
}
99+
100+
func NewMetricsManager() MetricsManager {
101+
mgr := &operationMetricsManager{
102+
cache: sync.Map{},
103+
}
104+
mgr.init()
105+
return mgr
106+
}
107+
108+
func (opMgr *operationMetricsManager) OperationStart(op Operation) {
109+
opMgr.cache.LoadOrStore(op, time.Now())
110+
}
111+
112+
func (opMgr *operationMetricsManager) DropOperation(op Operation) {
113+
opMgr.cache.Delete(op)
114+
}
115+
116+
func (opMgr *operationMetricsManager) RecordMetrics(op Operation, status OperationStatus) {
117+
obj, exists := opMgr.cache.Load(op)
118+
if !exists {
119+
// the operation has not been cached, return directly
120+
return
121+
}
122+
ts, ok := obj.(time.Time)
123+
if !ok {
124+
// the cached item is not a time.Time, should NEVER happen, clean and return
125+
klog.Errorf("Invalid cache entry for key %v", op)
126+
opMgr.cache.Delete(op)
127+
return
128+
}
129+
strStatus := opStatusUnknown
130+
if status != nil {
131+
strStatus = status.String()
132+
}
133+
duration := time.Since(ts).Seconds()
134+
opMgr.opLatencyMetrics.WithLabelValues(op.Driver, op.Name, strStatus).Observe(duration)
135+
opMgr.cache.Delete(op)
136+
}
137+
138+
func (opMgr *operationMetricsManager) init() {
139+
opMgr.registry = k8smetrics.NewKubeRegistry()
140+
opMgr.opLatencyMetrics = k8smetrics.NewHistogramVec(
141+
&k8smetrics.HistogramOpts{
142+
Subsystem: subSystem,
143+
Name: metricName,
144+
Help: metricHelpMsg,
145+
Buckets: metricBuckets,
146+
},
147+
[]string{labelDriverName, labelOperationName, labelOperationStatus},
148+
)
149+
opMgr.registry.MustRegister(opMgr.opLatencyMetrics)
150+
}
151+
152+
func (opMgr *operationMetricsManager) StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) {
153+
if addr == "" {
154+
return nil, fmt.Errorf("metrics endpoint will not be started as endpoint address is not specified")
155+
}
156+
// start listening
157+
l, err := net.Listen("tcp", addr)
158+
if err != nil {
159+
return nil, fmt.Errorf("failed to listen on address[%s], error[%v]", addr, err)
160+
}
161+
mux := http.NewServeMux()
162+
mux.Handle(pattern, k8smetrics.HandlerFor(
163+
opMgr.registry,
164+
k8smetrics.HandlerOpts{
165+
ErrorLog: logger,
166+
ErrorHandling: k8smetrics.ContinueOnError,
167+
}))
168+
srv := &http.Server{Addr: l.Addr().String(), Handler: mux}
169+
// start serving the endpoint
170+
go func() {
171+
defer wg.Done()
172+
if err := srv.Serve(l); err != http.ErrServerClosed {
173+
klog.Fatalf("failed to start endpoint at:%s/%s, error: %v", addr, pattern, err)
174+
}
175+
}()
176+
return srv, nil
177+
}

0 commit comments

Comments
 (0)