Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions pkg/pipelinerunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pipelinerunmetrics

import (
"context"
"encoding/hex"
"errors"
"fmt"
"sync"
Expand All @@ -31,6 +32,7 @@ import (
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
"golang.org/x/crypto/blake2b"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -118,6 +120,8 @@ type Recorder struct {
pipelinerun string) []tag.Mutator

ReportingPeriod time.Duration

hash string
}

// We cannot register the view multiple times, so NewRecorder lazily
Expand Down Expand Up @@ -292,7 +296,10 @@ func OnStore(logger *zap.SugaredLogger, r *Recorder) func(name string,
logger.Error("Failed to do type insertion for extracting metrics config")
return
}
r.updateConfig(cfg)
updated := r.updateConfig(cfg)
if !updated {
return
}
// Update metrics according to configuration
viewUnregister()
err := viewRegister(cfg)
Expand Down Expand Up @@ -337,11 +344,24 @@ func getPipelineTagName(pr *v1.PipelineRun) string {
return pipelineName
}

func (r *Recorder) updateConfig(cfg *config.Metrics) {
func (r *Recorder) updateConfig(cfg *config.Metrics) bool {
r.mutex.Lock()
defer r.mutex.Unlock()
var hash string
if cfg != nil {
s := fmt.Sprintf("%v", *cfg)
sum := blake2b.Sum256([]byte(s))
hash = hex.EncodeToString(sum[:])
}

if r.hash == hash {
return false
}

r.cfg = cfg
r.hash = hash

return true
}

// DurationAndCount logs the duration of PipelineRun execution and
Expand Down
115 changes: 89 additions & 26 deletions pkg/pipelinerunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,44 +89,107 @@ func TestUninitializedMetrics(t *testing.T) {
}

func TestOnStore(t *testing.T) {
log := zap.NewExample()
defer log.Sync()
logger := log.Sugar()
unregisterMetrics()
log := zap.NewExample().Sugar()

ctx := getConfigContext(false)
metrics, err := NewRecorder(ctx)
// 1. Initial state
initialCfg := &config.Config{Metrics: &config.Metrics{
PipelinerunLevel: config.PipelinerunLevelAtPipelinerun,
DurationPipelinerunType: config.DurationPipelinerunTypeLastValue,
}}
ctx := config.ToContext(t.Context(), initialCfg)
r, err := NewRecorder(ctx)
if err != nil {
t.Fatalf("NewRecorder: %v", err)
t.Fatalf("NewRecorder failed: %v", err)
}
onStoreCallback := OnStore(log, r)

// We check that there's no change when incorrect config is passed
OnStore(logger, metrics)(config.GetMetricsConfigName(), &config.Store{})
// Comparing function assign to struct with the one which should yield same value
if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(pipelinerunInsertTag).Pointer() {
t.Fatal("metrics recorder shouldn't change during this OnStore call")
// Check initial state
if reflect.ValueOf(r.insertTag).Pointer() != reflect.ValueOf(pipelinerunInsertTag).Pointer() {
t.Fatalf("Initial insertTag function is incorrect")
}
initialHash := r.hash

// Test when incorrect value in configmap is pass
cfg := &config.Metrics{
TaskrunLevel: "foo",
PipelinerunLevel: "bar",
DurationTaskrunType: config.DurationTaskrunTypeHistogram,
DurationPipelinerunType: config.DurationPipelinerunTypeLastValue,
// 2. Call with wrong name - should not change anything
onStoreCallback("wrong-name", &config.Metrics{PipelinerunLevel: config.PipelinerunLevelAtNS})
if r.hash != initialHash {
t.Errorf("Hash changed after call with wrong name")
}
OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg)
if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(pipelinerunInsertTag).Pointer() {
t.Fatal("metrics recorder shouldn't change during this OnStore call")
if reflect.ValueOf(r.insertTag).Pointer() != reflect.ValueOf(pipelinerunInsertTag).Pointer() {
t.Errorf("insertTag changed after call with wrong name")
}

cfg = &config.Metrics{
TaskrunLevel: config.TaskrunLevelAtNS,
// 3. Call with wrong type - should log an error and not change anything
onStoreCallback(config.GetMetricsConfigName(), &config.Store{})
if r.hash != initialHash {
t.Errorf("Hash changed after call with wrong type")
}
if reflect.ValueOf(r.insertTag).Pointer() != reflect.ValueOf(pipelinerunInsertTag).Pointer() {
t.Errorf("insertTag changed after call with wrong type")
}

// 4. Call with a valid new config - should change
newCfg := &config.Metrics{
PipelinerunLevel: config.PipelinerunLevelAtNS,
DurationTaskrunType: config.DurationTaskrunTypeHistogram,
DurationPipelinerunType: config.DurationPipelinerunTypeLastValue,
}
OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg)
if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(nilInsertTag).Pointer() {
t.Fatal("metrics recorder didn't change during OnStore call")
onStoreCallback(config.GetMetricsConfigName(), newCfg)
if r.hash == initialHash {
t.Errorf("Hash did not change after valid config update")
}
if reflect.ValueOf(r.insertTag).Pointer() != reflect.ValueOf(nilInsertTag).Pointer() {
t.Errorf("insertTag did not change after valid config update")
}
newHash := r.hash

// 5. Call with the same config again - should not change
onStoreCallback(config.GetMetricsConfigName(), newCfg)
if r.hash != newHash {
t.Errorf("Hash changed after second call with same config")
}
if reflect.ValueOf(r.insertTag).Pointer() != reflect.ValueOf(nilInsertTag).Pointer() {
t.Errorf("insertTag changed after second call with same config")
}

// 6. Call with an invalid config - should update hash but not insertTag
invalidCfg := &config.Metrics{PipelinerunLevel: "invalid-level"}
onStoreCallback(config.GetMetricsConfigName(), invalidCfg)
if r.hash == newHash {
t.Errorf("Hash did not change after invalid config update")
}
// Because viewRegister fails, the insertTag function should not be updated and should remain `nilInsertTag` from the previous step.
if reflect.ValueOf(r.insertTag).Pointer() != reflect.ValueOf(nilInsertTag).Pointer() {
t.Errorf("insertTag changed after invalid config update")
}
}

func TestUpdateConfig(t *testing.T) {
// Test that the config is updated when it changes, and not when it doesn't.
ctx := getConfigContext(false)
r, err := NewRecorder(ctx)
if err != nil {
t.Fatalf("NewRecorder: %v", err)
}

// First, update with a new config.
newConfig := &config.Metrics{
PipelinerunLevel: config.PipelinerunLevelAtPipeline,
}
if !r.updateConfig(newConfig) {
t.Error("updateConfig should have returned true, but returned false")
}

// Then, update with the same config.
if r.updateConfig(newConfig) {
t.Error("updateConfig should have returned false, but returned true")
}

// Finally, update with a different config.
differentConfig := &config.Metrics{
PipelinerunLevel: config.PipelinerunLevelAtNS,
}
if !r.updateConfig(differentConfig) {
t.Error("updateConfig should have returned true, but returned false")
}
}

Expand Down
25 changes: 23 additions & 2 deletions pkg/taskrunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package taskrunmetrics

import (
"context"
"encoding/hex"
"fmt"
"sync"
"time"
Expand All @@ -32,6 +33,7 @@ import (
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
"golang.org/x/crypto/blake2b"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -130,6 +132,8 @@ type Recorder struct {

insertPipelineTag func(pipeline,
pipelinerun string) []tag.Mutator

hash string
}

// We cannot register the view multiple times, so NewRecorder lazily
Expand Down Expand Up @@ -335,7 +339,10 @@ func OnStore(logger *zap.SugaredLogger, r *Recorder) func(name string, value int
logger.Error("Failed to do type insertion for extracting metrics config")
return
}
r.updateConfig(cfg)
updated := r.updateConfig(cfg)
if !updated {
return
}
// Update metrics according to the configuration
viewUnregister()
err := viewRegister(cfg)
Expand Down Expand Up @@ -395,11 +402,25 @@ func getTaskTagName(tr *v1.TaskRun) string {
return taskName
}

func (r *Recorder) updateConfig(cfg *config.Metrics) {
func (r *Recorder) updateConfig(cfg *config.Metrics) bool {
r.mutex.Lock()
defer r.mutex.Unlock()

var hash string
if cfg != nil {
s := fmt.Sprintf("%v", *cfg)
sum := blake2b.Sum256([]byte(s))
hash = hex.EncodeToString(sum[:])
}

if r.hash == hash {
return false
}

r.cfg = cfg
r.hash = hash

return true
}

// DurationAndCount logs the duration of TaskRun execution and
Expand Down
Loading
Loading