Skip to content

Commit 1f3bef2

Browse files
committed
add metric to common controller - basic metrics utilities
use a different port, use a longer delta
1 parent 2890f4a commit 1f3bef2

File tree

5 files changed

+398
-0
lines changed

5 files changed

+398
-0
lines changed

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,15 @@ require (
1010
github.com/imdario/mergo v0.3.7 // indirect
1111
github.com/kubernetes-csi/csi-lib-utils v0.7.0
1212
github.com/kubernetes-csi/csi-test v2.0.0+incompatible
13+
github.com/prometheus/client_golang v1.0.0
14+
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
15+
github.com/prometheus/common v0.4.1
1316
google.golang.org/grpc v1.26.0
1417
k8s.io/api v0.17.0
1518
k8s.io/apimachinery v0.17.1-beta.0
1619
k8s.io/client-go v0.17.0
1720
k8s.io/code-generator v0.0.0-20191121015212-c4c8f8345c7e
21+
k8s.io/component-base v0.17.0
1822
k8s.io/klog v1.0.0
1923
k8s.io/kubernetes v1.14.0
2024
)

pkg/metrics/metrics.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"fmt"
21+
"net/http"
22+
"sync"
23+
"time"
24+
25+
"github.com/prometheus/client_golang/prometheus/promhttp"
26+
"k8s.io/apimachinery/pkg/types"
27+
k8smetrics "k8s.io/component-base/metrics"
28+
"k8s.io/klog"
29+
)
30+
31+
const (
32+
labelDriverName = "driver_name"
33+
labelOperationName = "operation_name"
34+
labelOperationStatus = "operation_status"
35+
snapshotController = "snapshot_controller"
36+
operationLatencyMetricName = "operation_total_seconds"
37+
operationLatencyMetricHelpMsg = "Total number of seconds spent by the snapshot controller for an operation to complete end to end"
38+
)
39+
40+
type OperationState string
41+
42+
const (
43+
// InterimFailure is an operation state which means the controller will keep
44+
// on retrying the operation upon encountering this type of error.
45+
InterimFailure OperationState = "InterimFailure"
46+
47+
// TerminatingFailure is an operation state which means the controller has encountered
48+
// an error which it's not able to resume the operation and has marked the
49+
// permanently failure of the operation.
50+
TerminatingFailure OperationState = "TerminatingFailure"
51+
52+
// Success states that the controller has successfully executed the operation.
53+
Success OperationState = "Success"
54+
)
55+
56+
var operationLatencyMetricBuckets = []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 300, 600}
57+
58+
type MetricsManager interface {
59+
// StartServing starts the metrics endpoint at the specified addr/pattern for
60+
// metrics managed by this MetricsManager. It spawn a goroutine to listen to
61+
// and serve HTTP requests received on addr/pattern
62+
// if the "pattern" is empty (i.e., ""), no endpoint will be started. An error
63+
// will be returned if there is any.
64+
StartServing(pattern, addr string, logger promhttp.Logger) error
65+
66+
// OperationStart takes in an operation and cache it's start time.
67+
// if the operation already exists, it's an no-op.
68+
OperationStart(op Operation)
69+
70+
// RecordMetrics records a metric point. Note that it will be an no-op if an
71+
// operation has not been marked "Started" previously via invoking "OperationStart".
72+
// op - the Operation which the metric is associated with.
73+
// state - metrics manager operations differently to the cache based on the state
74+
// of the Operation:
75+
// InterimFailure: record a failure metric however keep operation's start time in cache
76+
// TerminatingFailure: record a failure metric and remove operation's start time from cache
77+
// Success: record a success metric and remove the operation's start time from cache
78+
RecordMetrics(op Operation, state OperationState)
79+
}
80+
81+
// Operation is a structure which holds information to identifier a snapshot
82+
// related operation
83+
type Operation struct {
84+
// the name of the operation, for example: "CreateSnapshot", "DeleteSnapshot"
85+
Name string
86+
// the name of the driver which executes the operation
87+
Driver string
88+
// the resource UID to which the operation has been executed against
89+
ResourceID types.UID
90+
}
91+
92+
func (op *Operation) key() string {
93+
return fmt.Sprintf("%s-%s-%s", op.Name, op.Driver, op.ResourceID)
94+
}
95+
96+
type operationTs struct {
97+
operation Operation
98+
startTime time.Time
99+
}
100+
101+
type operationMetricsManager struct {
102+
// cache is a concurrent-safe map which stores start timestamps for all
103+
// ongoing operations.
104+
// key is
105+
// value is
106+
cache sync.Map
107+
108+
registry k8smetrics.KubeRegistry
109+
110+
opLatencyMetric *k8smetrics.HistogramVec
111+
}
112+
113+
func NewMetricsManager() MetricsManager {
114+
mgr := &operationMetricsManager{
115+
cache: sync.Map{},
116+
}
117+
mgr.init()
118+
return mgr
119+
}
120+
121+
func (opMgr *operationMetricsManager) OperationStart(op Operation) {
122+
opTs := operationTs{
123+
operation: op,
124+
startTime: time.Now(),
125+
}
126+
opMgr.cache.LoadOrStore(op.key(), &opTs)
127+
}
128+
129+
func (opMgr *operationMetricsManager) RecordMetrics(op Operation, state OperationState) {
130+
key := op.key()
131+
obj, exists := opMgr.cache.Load(key)
132+
if !exists {
133+
// the operation has not been cached, return directly
134+
return
135+
}
136+
ts, ok := obj.(*operationTs)
137+
if !ok {
138+
// the cached item is not an operationTs, should NEVER happen, clean and return
139+
klog.Errorf("Invalid cache entry for key %s", key)
140+
opMgr.cache.Delete(key)
141+
return
142+
}
143+
duration := time.Since(ts.startTime).Seconds()
144+
opMgr.opLatencyMetric.WithLabelValues(op.Driver, op.Name, string(state)).Observe(duration)
145+
switch state {
146+
case Success, TerminatingFailure:
147+
opMgr.cache.Delete(key)
148+
case InterimFailure:
149+
// do nothing
150+
default:
151+
// log error
152+
klog.Errorf("Not supported operation state %s", state)
153+
}
154+
}
155+
156+
func (opMgr *operationMetricsManager) init() {
157+
opMgr.registry = k8smetrics.NewKubeRegistry()
158+
opMgr.opLatencyMetric = k8smetrics.NewHistogramVec(
159+
&k8smetrics.HistogramOpts{
160+
Subsystem: snapshotController,
161+
Name: operationLatencyMetricName,
162+
Help: operationLatencyMetricHelpMsg,
163+
Buckets: operationLatencyMetricBuckets,
164+
},
165+
[]string{labelDriverName, labelOperationName, labelOperationStatus},
166+
)
167+
opMgr.registry.MustRegister(opMgr.opLatencyMetric)
168+
}
169+
170+
func (opMgr *operationMetricsManager) StartServing(pattern, addr string, logger promhttp.Logger) error {
171+
if addr == "" {
172+
return fmt.Errorf("metrics endpoint will not be started as endpoint address is not specified")
173+
}
174+
175+
http.Handle(pattern, k8smetrics.HandlerFor(
176+
opMgr.registry,
177+
k8smetrics.HandlerOpts{
178+
ErrorLog: logger,
179+
ErrorHandling: k8smetrics.ContinueOnError,
180+
}))
181+
182+
// start serving the endpoint
183+
go func() {
184+
err := http.ListenAndServe(addr, nil)
185+
if err != nil {
186+
klog.Fatalf("failed to start endpoint at:%s/%s, error: %v", addr, pattern, err)
187+
}
188+
}()
189+
return nil
190+
}

0 commit comments

Comments
 (0)