Skip to content

Commit 8b0b81a

Browse files
committed
add lastFailTime into cache
1 parent aa58f76 commit 8b0b81a

File tree

2 files changed

+77
-25
lines changed

2 files changed

+77
-25
lines changed

pkg/metrics/metrics.go

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ const (
3434
labelOperationStatus = "operation_status"
3535
snapshotController = "snapshot_controller"
3636
operationLatencyMetricName = "operation_total_seconds"
37-
operationLatencyMetricHelpMsg = "Total number of seconds spent by the snapshot controller for an operation to complete end to end"
37+
operationLatencyMetricHelpMsg = "Total number of seconds spent by the controller on an operation from end to end"
3838
)
3939

4040
type OperationState string
@@ -45,8 +45,8 @@ const (
4545
InterimFailure OperationState = "InterimFailure"
4646

4747
// TerminatingFailure is an operation state which means the controller has encountered
48-
// an error which it's not able to resume the operation and has marked a
49-
// permanent failure of the operation.
48+
// an error which is not recoverable and the controller has marked a permanent
49+
// failure of the operation.
5050
TerminatingFailure OperationState = "TerminatingFailure"
5151

5252
// Success states that the controller has successfully executed the operation.
@@ -61,12 +61,16 @@ type MetricsManager interface {
6161
// and serve HTTP requests received on addr/pattern
6262
// if the "pattern" is empty (i.e., ""), no endpoint will be started. An error
6363
// will be returned if there is any.
64-
StartServing(pattern, addr string, logger promhttp.Logger) error
64+
StartServing(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error)
6565

6666
// OperationStart takes in an operation and cache it's start time.
6767
// if the operation already exists, it's an no-op.
6868
OperationStart(op Operation)
6969

70+
// DropOperation removes an operation from cache.
71+
// if the operation does not exist, it's an no-op.
72+
DropOperation(op Operation)
73+
7074
// RecordMetrics records a metric point. Note that it will be an no-op if an
7175
// operation has not been marked "Started" previously via invoking "OperationStart".
7276
// op - the Operation which the metric is associated with.
@@ -89,11 +93,23 @@ type Operation struct {
8993
ResourceID types.UID
9094
}
9195

96+
type operationTs struct {
97+
// startTime is the timestamp when an operation has been picked up by the
98+
// controller for processing
99+
startTime time.Time
100+
101+
// lastFailTime is the timestamp of the last interim failure on an operation
102+
// if this field is not specified, i.e., lastFailTime.IsZero == true, it means
103+
// the operation has never failed previously
104+
lastFailTime time.Time
105+
}
106+
92107
type operationMetricsManager struct {
93108
// cache is a concurrent-safe map which stores start timestamps for all
94109
// ongoing operations.
95110
// key is an Operation
96-
// value is the Operation's start time
111+
// value is the operationTs which records the start time and last failing time
112+
// of the operation
97113
cache sync.Map
98114

99115
// registry is a wrapper around Prometheus Registry
@@ -112,7 +128,14 @@ func NewMetricsManager() MetricsManager {
112128
}
113129

114130
func (opMgr *operationMetricsManager) OperationStart(op Operation) {
115-
opMgr.cache.LoadOrStore(op, time.Now())
131+
opTs := operationTs{
132+
startTime: time.Now(),
133+
}
134+
opMgr.cache.LoadOrStore(op, opTs)
135+
}
136+
137+
func (opMgr *operationMetricsManager) DropOperation(op Operation) {
138+
opMgr.cache.Delete(op)
116139
}
117140

118141
func (opMgr *operationMetricsManager) RecordMetrics(op Operation, state OperationState) {
@@ -122,24 +145,34 @@ func (opMgr *operationMetricsManager) RecordMetrics(op Operation, state Operatio
122145
// the operation has not been cached, return directly
123146
return
124147
}
125-
ts, ok := obj.(time.Time)
148+
ts, ok := obj.(operationTs)
126149
if !ok {
127150
// the cached item is not a time.Time, should NEVER happen, clean and return
128151
klog.Errorf("Invalid cache entry for key %v", op)
129152
opMgr.cache.Delete(op)
130153
return
131154
}
132-
duration := time.Since(ts).Seconds()
133-
opMgr.opLatencyMetrics.WithLabelValues(op.Driver, op.Name, string(state)).Observe(duration)
155+
duration := time.Since(ts.startTime).Seconds()
134156
switch state {
135157
case Success, TerminatingFailure:
136158
opMgr.cache.Delete(op)
137159
case InterimFailure:
138-
// do nothing
160+
if !ts.lastFailTime.IsZero() {
161+
// override duration
162+
duration = time.Since(ts.lastFailTime).Seconds()
163+
}
164+
// override lastFailTime
165+
newTs := operationTs{
166+
startTime: ts.startTime,
167+
lastFailTime: time.Now(),
168+
}
169+
opMgr.cache.Store(op, newTs)
139170
default:
140171
// log error
141172
klog.Errorf("Not supported operation state %s", state)
173+
return
142174
}
175+
opMgr.opLatencyMetrics.WithLabelValues(op.Driver, op.Name, string(state)).Observe(duration)
143176
}
144177

145178
func (opMgr *operationMetricsManager) init() {
@@ -156,11 +189,11 @@ func (opMgr *operationMetricsManager) init() {
156189
opMgr.registry.MustRegister(opMgr.opLatencyMetrics)
157190
}
158191

159-
func (opMgr *operationMetricsManager) StartServing(pattern, addr string, logger promhttp.Logger) error {
192+
func (opMgr *operationMetricsManager) StartServing(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) {
160193
if addr == "" {
161-
return fmt.Errorf("metrics endpoint will not be started as endpoint address is not specified")
194+
return nil, fmt.Errorf("metrics endpoint will not be started as endpoint address is not specified")
162195
}
163-
196+
srv := &http.Server{Addr: addr}
164197
http.Handle(pattern, k8smetrics.HandlerFor(
165198
opMgr.registry,
166199
k8smetrics.HandlerOpts{
@@ -170,10 +203,10 @@ func (opMgr *operationMetricsManager) StartServing(pattern, addr string, logger
170203

171204
// start serving the endpoint
172205
go func() {
173-
err := http.ListenAndServe(addr, nil)
174-
if err != nil {
206+
defer wg.Done()
207+
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
175208
klog.Fatalf("failed to start endpoint at:%s/%s, error: %v", addr, pattern, err)
176209
}
177210
}()
178-
return nil
211+
return srv, nil
179212
}

pkg/metrics/metrics_test.go

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ limitations under the License.
1717
package metrics
1818

1919
import (
20+
"context"
2021
"io/ioutil"
22+
"log"
2123
"math"
2224
"net/http"
25+
"os"
2326
"reflect"
2427
"strings"
2528
"sync"
@@ -32,38 +35,54 @@ import (
3235
)
3336

3437
var (
35-
mgr MetricsManager
36-
once sync.Once
38+
mgr MetricsManager
39+
srv *http.Server
40+
wg *sync.WaitGroup
3741
)
3842

3943
const (
4044
httpPattern = "/metrics"
41-
addr = ":12120"
42-
httpAddr = "http://localhost:12120/metrics"
45+
addr = "localhost:12134"
46+
httpAddr = "http://" + addr + httpPattern
4347
)
4448

4549
func initMgr() {
50+
wg = &sync.WaitGroup{}
51+
wg.Add(1)
4652
mgr = NewMetricsManager()
47-
if err := mgr.StartServing(httpPattern, addr, nil); err != nil {
53+
var err error
54+
if srv, err = mgr.StartServing(httpPattern, addr, nil, wg); err != nil {
55+
log.Fatalf("failed to start serving%v", err)
4856
}
4957
}
5058

59+
func shutdown() {
60+
if err := srv.Shutdown(context.Background()); err != nil {
61+
panic(err)
62+
}
63+
wg.Wait()
64+
}
65+
66+
func TestMain(m *testing.M) {
67+
initMgr()
68+
c := m.Run()
69+
shutdown()
70+
os.Exit(c)
71+
}
72+
5173
func TestNew(t *testing.T) {
52-
once.Do(initMgr)
5374
if mgr == nil {
5475
t.Errorf("failed testing new")
5576
}
5677
}
5778

5879
func TestStartServing(t *testing.T) {
59-
once.Do(initMgr)
6080
if _, err := http.Get(httpAddr); err != nil {
6181
t.Errorf("failed to get response from server %v", err)
6282
}
6383
}
6484

6585
func TestOperationStart(t *testing.T) {
66-
once.Do(initMgr)
6786
// add an operation
6887
op := Operation{
6988
Name: "op1",
@@ -103,7 +122,7 @@ func TestOperationStart(t *testing.T) {
103122
}
104123

105124
expected :=
106-
`# HELP snapshot_controller_operation_total_seconds [ALPHA] Total number of seconds spent by the snapshot controller for an operation to complete end to end
125+
`# HELP snapshot_controller_operation_total_seconds [ALPHA] Total number of seconds spent by the controller on an operation from end to end
107126
# TYPE snapshot_controller_operation_total_seconds histogram
108127
snapshot_controller_operation_total_seconds_bucket{driver_name="driver1",operation_name="op1",operation_status="InterimFailure",le="0.1"} 0
109128
snapshot_controller_operation_total_seconds_bucket{driver_name="driver1",operation_name="op1",operation_status="InterimFailure",le="0.25"} 0

0 commit comments

Comments
 (0)