@@ -18,6 +18,7 @@ package metrics
18
18
19
19
import (
20
20
"fmt"
21
+ "net"
21
22
"net/http"
22
23
"sync"
23
24
"time"
@@ -34,7 +35,7 @@ const (
34
35
labelOperationStatus = "operation_status"
35
36
snapshotController = "snapshot_controller"
36
37
operationLatencyMetricName = "operation_total_seconds"
37
- operationLatencyMetricHelpMsg = "Total number of seconds spent by the snapshot controller for an operation to complete end to end"
38
+ operationLatencyMetricHelpMsg = "Total number of seconds spent by the controller on an operation from end to end"
38
39
)
39
40
40
41
type OperationState string
@@ -45,8 +46,8 @@ const (
45
46
InterimFailure OperationState = "InterimFailure"
46
47
47
48
// TerminatingFailure is an operation state which means the controller has encountered
48
- // an error which it's not able to resume the operation and has marked a
49
- // permanent failure of the operation.
49
+ // an error which is not recoverable and the controller has marked a permanent
50
+ // failure of the operation.
50
51
TerminatingFailure OperationState = "TerminatingFailure"
51
52
52
53
// Success states that the controller has successfully executed the operation.
@@ -61,12 +62,16 @@ type MetricsManager interface {
61
62
// and serve HTTP requests received on addr/pattern
62
63
// if the "pattern" is empty (i.e., ""), no endpoint will be started. An error
63
64
// will be returned if there is any.
64
- StartServing (pattern , addr string , logger promhttp.Logger ) error
65
+ StartServing (pattern , addr string , logger promhttp.Logger , wg * sync. WaitGroup ) ( * http. Server , error )
65
66
66
67
// OperationStart takes in an operation and cache it's start time.
67
68
// if the operation already exists, it's an no-op.
68
69
OperationStart (op Operation )
69
70
71
+ // DropOperation removes an operation from cache.
72
+ // if the operation does not exist, it's an no-op.
73
+ DropOperation (op Operation )
74
+
70
75
// RecordMetrics records a metric point. Note that it will be an no-op if an
71
76
// operation has not been marked "Started" previously via invoking "OperationStart".
72
77
// op - the Operation which the metric is associated with.
@@ -89,11 +94,23 @@ type Operation struct {
89
94
ResourceID types.UID
90
95
}
91
96
97
+ type operationTs struct {
98
+ // startTime is the timestamp when an operation has been picked up by the
99
+ // controller for processing
100
+ startTime time.Time
101
+
102
+ // lastFailTime is the timestamp of the last interim failure on an operation
103
+ // if this field is not specified, i.e., lastFailTime.IsZero == true, it means
104
+ // the operation has never failed previously
105
+ lastFailTime time.Time
106
+ }
107
+
92
108
type operationMetricsManager struct {
93
109
// cache is a concurrent-safe map which stores start timestamps for all
94
110
// ongoing operations.
95
111
// key is an Operation
96
- // value is the Operation's start time
112
+ // value is the operationTs which records the start time and last failing time
113
+ // of the operation
97
114
cache sync.Map
98
115
99
116
// registry is a wrapper around Prometheus Registry
@@ -112,7 +129,14 @@ func NewMetricsManager() MetricsManager {
112
129
}
113
130
114
131
func (opMgr * operationMetricsManager ) OperationStart (op Operation ) {
115
- opMgr .cache .LoadOrStore (op , time .Now ())
132
+ opTs := operationTs {
133
+ startTime : time .Now (),
134
+ }
135
+ opMgr .cache .LoadOrStore (op , opTs )
136
+ }
137
+
138
+ func (opMgr * operationMetricsManager ) DropOperation (op Operation ) {
139
+ opMgr .cache .Delete (op )
116
140
}
117
141
118
142
func (opMgr * operationMetricsManager ) RecordMetrics (op Operation , state OperationState ) {
@@ -122,24 +146,34 @@ func (opMgr *operationMetricsManager) RecordMetrics(op Operation, state Operatio
122
146
// the operation has not been cached, return directly
123
147
return
124
148
}
125
- ts , ok := obj .(time. Time )
149
+ ts , ok := obj .(operationTs )
126
150
if ! ok {
127
151
// the cached item is not a time.Time, should NEVER happen, clean and return
128
152
klog .Errorf ("Invalid cache entry for key %v" , op )
129
153
opMgr .cache .Delete (op )
130
154
return
131
155
}
132
- duration := time .Since (ts ).Seconds ()
133
- opMgr .opLatencyMetrics .WithLabelValues (op .Driver , op .Name , string (state )).Observe (duration )
156
+ duration := time .Since (ts .startTime ).Seconds ()
134
157
switch state {
135
158
case Success , TerminatingFailure :
136
159
opMgr .cache .Delete (op )
137
160
case InterimFailure :
138
- // do nothing
161
+ if ! ts .lastFailTime .IsZero () {
162
+ // override duration
163
+ duration = time .Since (ts .lastFailTime ).Seconds ()
164
+ }
165
+ // override lastFailTime
166
+ newTs := operationTs {
167
+ startTime : ts .startTime ,
168
+ lastFailTime : time .Now (),
169
+ }
170
+ opMgr .cache .Store (op , newTs )
139
171
default :
140
172
// log error
141
173
klog .Errorf ("Not supported operation state %s" , state )
174
+ return
142
175
}
176
+ opMgr .opLatencyMetrics .WithLabelValues (op .Driver , op .Name , string (state )).Observe (duration )
143
177
}
144
178
145
179
func (opMgr * operationMetricsManager ) init () {
@@ -156,11 +190,16 @@ func (opMgr *operationMetricsManager) init() {
156
190
opMgr .registry .MustRegister (opMgr .opLatencyMetrics )
157
191
}
158
192
159
- func (opMgr * operationMetricsManager ) StartServing (pattern , addr string , logger promhttp.Logger ) error {
193
+ func (opMgr * operationMetricsManager ) StartServing (pattern , addr string , logger promhttp.Logger , wg * sync. WaitGroup ) ( * http. Server , error ) {
160
194
if addr == "" {
161
- return fmt .Errorf ("metrics endpoint will not be started as endpoint address is not specified" )
195
+ return nil , fmt .Errorf ("metrics endpoint will not be started as endpoint address is not specified" )
162
196
}
163
-
197
+ // start listening
198
+ l , err := net .Listen ("tcp" , addr )
199
+ if err != nil {
200
+ return nil , fmt .Errorf ("failed to listen on address[%s], error[%v]" , addr , err )
201
+ }
202
+ srv := & http.Server {Addr : addr }
164
203
http .Handle (pattern , k8smetrics .HandlerFor (
165
204
opMgr .registry ,
166
205
k8smetrics.HandlerOpts {
@@ -170,10 +209,10 @@ func (opMgr *operationMetricsManager) StartServing(pattern, addr string, logger
170
209
171
210
// start serving the endpoint
172
211
go func () {
173
- err := http . ListenAndServe ( addr , nil )
174
- if err != nil {
212
+ defer wg . Done ( )
213
+ if err := srv . Serve ( l ); err != http . ErrServerClosed {
175
214
klog .Fatalf ("failed to start endpoint at:%s/%s, error: %v" , addr , pattern , err )
176
215
}
177
216
}()
178
- return nil
217
+ return srv , nil
179
218
}
0 commit comments