Skip to content

feat(scheduler): set a ttl on deleted pipelines and experiments #5948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
22 changes: 17 additions & 5 deletions scheduler/pkg/store/experiment/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,20 @@ func (edb *ExperimentDBManager) save(experiment *Experiment) error {
if err != nil {
return err
}
return edb.db.Update(func(txn *badger.Txn) error {
err = txn.Set([]byte(experiment.Name), experimentBytes)
return err
})

if !experiment.Deleted {
return edb.db.Update(func(txn *badger.Txn) error {
err = txn.Set([]byte(experiment.Name), experimentBytes)
return err
})
} else {
ttl := utils.DeletedResourceTTL
return edb.db.Update(func(txn *badger.Txn) error {
e := badger.NewEntry([]byte(experiment.Name), experimentBytes).WithTTL(ttl)
err = txn.SetEntry(e)
return err
})
}
}

func (edb *ExperimentDBManager) saveVersion() error {
Expand All @@ -88,7 +98,8 @@ func (edb *ExperimentDBManager) delete(name string) error {
}

func (edb *ExperimentDBManager) restore(
startExperimentCb func(*Experiment) error, stopExperimentCb func(*Experiment) error) error {
startExperimentCb func(*Experiment) error, stopExperimentCb func(*Experiment) error,
) error {
return edb.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
Expand All @@ -108,6 +119,7 @@ func (edb *ExperimentDBManager) restore(
}
experiment := CreateExperimentFromSnapshot(&snapshot)
if experiment.Deleted {
experiment.DeletedAt = utils.GetDeletedAt(item)
err = stopExperimentCb(experiment)
} else {
// otherwise attempt to start the experiment
Expand Down
159 changes: 158 additions & 1 deletion scheduler/pkg/store/experiment/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package experiment
import (
"fmt"
"testing"
"time"

"github.com/dgraph-io/badger/v3"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -71,6 +72,55 @@ func createExperimentProto(experiment *Experiment) *scheduler.Experiment {
}
}

func TestSaveWithTTL(t *testing.T) {
g := NewGomegaWithT(t)

experiment := &Experiment{
Name: "test1",
Candidates: []*Candidate{
{
Name: "model1",
Weight: 50,
},
{
Name: "model2",
Weight: 50,
},
},
Mirror: &Mirror{
Name: "model3",
Percent: 90,
},
Config: &Config{
StickySessions: true,
},
KubernetesMeta: &KubernetesMeta{
Namespace: "default",
Generation: 2,
},
Deleted: true,
}

path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
g.Expect(err).To(BeNil())
experiment.DeletedAt = time.Now()
err = db.save(experiment)
g.Expect(err).To(BeNil())

var item *badger.Item
err = db.db.View(func(txn *badger.Txn) error {
item, err = txn.Get(([]byte(experiment.Name)))
return err
})
g.Expect(err).To(BeNil())
g.Expect(item.ExpiresAt()).ToNot(BeZero())

err = db.Stop()
g.Expect(err).To(BeNil())
}

func TestSaveAndRestore(t *testing.T) {
g := NewGomegaWithT(t)
type test struct {
Expand Down Expand Up @@ -268,6 +318,102 @@ func TestSaveAndRestore(t *testing.T) {
}
}

func TestSaveAndRestoreDeletedExperiments(t *testing.T) {
g := NewGomegaWithT(t)
type test struct {
name string
experiment Experiment
withTTL bool
}

getStrPtr := func(val string) *string { return &val }

createDeletedExperiment := func(name string) Experiment {
return Experiment{
Name: name,
ResourceType: ModelResourceType,
Default: getStrPtr("model1"),
Candidates: []*Candidate{
{
Name: "model1",
Weight: 50,
},
{
Name: "model2",
Weight: 50,
},
},
KubernetesMeta: &KubernetesMeta{
Namespace: "default",
Generation: 2,
},
Deleted: true,
}
}

tests := []test{
{
name: "deleted experiment with ttl has deletedAt set",
experiment: createDeletedExperiment("with-ttl"),
withTTL: true,
},
{
name: "deleted experiment without ttl has deletedAt set after cleanup",
experiment: createDeletedExperiment("without-ttl"),
withTTL: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
g.Expect(test.experiment.Deleted).To(BeTrue(), "this is a test for deleted experiments")
path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
g.Expect(err).To(BeNil())
if !test.withTTL {
err = saveWithOutTTL(&test.experiment, edb.db)
g.Expect(err).To(BeNil())
} else {
err := edb.save(&test.experiment)
g.Expect(err).To(BeNil())
}
err = edb.Stop()
g.Expect(err).To(BeNil())

es := NewExperimentServer(log.New(), nil, nil, nil)
err = es.InitialiseOrRestoreDB(path)
g.Expect(err).To(BeNil())

if !test.withTTL {
// check state before cleanup
var item *badger.Item
err = es.db.db.View(func(txn *badger.Txn) error {
item, err = txn.Get(([]byte(test.experiment.Name)))
return err
})
g.Expect(err).To(BeNil())
g.Expect(item.ExpiresAt()).To(BeZero())
g.Expect(es.experiments[test.experiment.Name]).ToNot(BeNil())
g.Expect(es.experiments[test.experiment.Name].DeletedAt.IsZero()).To(BeTrue())

// check state after cleanup
es.cleanupDeletedExperiments()
g.Expect(es.experiments[test.experiment.Name].DeletedAt.IsZero()).ToNot(BeTrue())
err = es.db.db.View(func(txn *badger.Txn) error {
item, err = txn.Get(([]byte(test.experiment.Name)))
return err
})
g.Expect(err).To(BeNil())
g.Expect(item.ExpiresAt()).ToNot(BeZero())

} else {
g.Expect(es.experiments[test.experiment.Name].DeletedAt.IsZero()).ToNot(BeTrue())
}
})
}
}

func TestGetExperimentFromDB(t *testing.T) {
g := NewGomegaWithT(t)
type test struct {
Expand Down Expand Up @@ -703,7 +849,18 @@ func TestMigrateFromV1ToV2(t *testing.T) {
err = es.InitialiseOrRestoreDB(path)
g.Expect(err).To(BeNil())
g.Expect(len(es.experiments)).To(Equal(0))

})
}
}

func saveWithOutTTL(experiment *Experiment, db *badger.DB) error {
experimentProto := CreateExperimentSnapshotProto(experiment)
experimentBytes, err := proto.Marshal(experimentProto)
if err != nil {
return err
}
return db.Update(func(txn *badger.Txn) error {
err = txn.Set([]byte(experiment.Name), experimentBytes)
return err
})
}
3 changes: 3 additions & 0 deletions scheduler/pkg/store/experiment/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ the Change License after the Change Date as each is defined in accordance with t

package experiment

import "time"

type ResourceType uint32

const (
Expand All @@ -20,6 +22,7 @@ type Experiment struct {
Name string
Active bool
Deleted bool
DeletedAt time.Time
Default *string
ResourceType ResourceType
Candidates []*Candidate
Expand Down
59 changes: 44 additions & 15 deletions scheduler/pkg/store/experiment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/mitchellh/copystructure"
"github.com/sirupsen/logrus"

"github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/pipeline"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/utils"
)

const (
pendingSyncsQueueSize int = 1000
experimentStartEventSource = "experiment.store.start"
experimentStopEventSource = "experiment.store.stop"
modelEventHandlerName = "experiment.store.models"
pipelineEventHandlerName = "experiment.store.pipelines"
experimentDbFolder = "experimentdb"
deletedExperimentTTL time.Duration = time.Duration(time.Hour * 24)
pendingSyncsQueueSize int = 1000
experimentStartEventSource = "experiment.store.start"
experimentStopEventSource = "experiment.store.stop"
modelEventHandlerName = "experiment.store.models"
pipelineEventHandlerName = "experiment.store.pipelines"
experimentDbFolder = "experimentdb"
)

type ExperimentServer interface {
Expand Down Expand Up @@ -57,7 +60,6 @@ type ExperimentStore struct {
}

func NewExperimentServer(logger logrus.FieldLogger, eventHub *coordinator.EventHub, store store.ModelStore, pipelineStore pipeline.PipelineHandler) *ExperimentStore {

es := &ExperimentStore{
logger: logger.WithField("source", "experimentServer"),
experiments: make(map[string]*Experiment),
Expand Down Expand Up @@ -95,7 +97,7 @@ func getExperimentDbFolder(basePath string) string {
// we just add a reference to the experiment in the memory store
// so that we can keep track of it in case we need to replay the event (to the controller)
// we do not trigger an event though as envoy has a clean state when the scheduler restarts
func (es *ExperimentStore) AddExperimentInMap(experiment *Experiment) error {
func (es *ExperimentStore) addExperimentInMap(experiment *Experiment) error {
es.mu.Lock()
defer es.mu.Unlock()
if _, ok := es.experiments[experiment.Name]; !ok {
Expand All @@ -120,10 +122,18 @@ func (es *ExperimentStore) InitialiseOrRestoreDB(path string) error {
}
es.db = db
// If database already existed we can restore else this is a noop
err = es.db.restore(es.StartExperiment, es.AddExperimentInMap)
err = es.db.restore(es.StartExperiment, es.addExperimentInMap)
if err != nil {
return err
}
go func() {
ticker := time.NewTicker(utils.DeletedResourceCleanupFrequency)
defer ticker.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

for range ticker.C {
es.cleanupDeletedExperiments()
}
}()

return nil
}

Expand Down Expand Up @@ -302,7 +312,7 @@ func (es *ExperimentStore) setStatusImpl(experimentName string, active bool, rea
if experiment, ok := es.experiments[experimentName]; !ok {
return nil, &ExperimentNotFound{experimentName: experimentName}
} else {
if !experiment.Deleted || !active { //can't reactivate a deleted experiment
if !experiment.Deleted || !active { // can't reactivate a deleted experiment
currentActive := experiment.Active
experiment.Active = active
experiment.StatusDescription = reason
Expand All @@ -321,7 +331,7 @@ func (es *ExperimentStore) StartExperiment(experiment *Experiment) error {
}
if es.eventHub != nil {
if modelEvt != nil {
es.eventHub.PublishModelEvent(experimentStateEventSource, *modelEvt)
es.eventHub.PublishModelEvent(experimentStartEventSource, *modelEvt)
}
if pipelineEvt != nil {
es.eventHub.PublishPipelineEvent(experimentStartEventSource, *pipelineEvt)
Expand Down Expand Up @@ -356,9 +366,8 @@ func (es *ExperimentStore) startExperimentImpl(experiment *Experiment) (*coordin
ModelName: *resourceName,
}
default:
return nil, nil, nil, fmt.Errorf("Unknown resource type %v", experiment.ResourceType)
return nil, nil, nil, fmt.Errorf("unknown resource type %v", experiment.ResourceType)
}

}
es.updateExperimentState(experiment)
if es.db != nil {
Expand All @@ -381,7 +390,7 @@ func (es *ExperimentStore) StopExperiment(experimentName string) error {
es.eventHub.PublishModelEvent(experimentStopEventSource, *modelEvt)
}
if pipelineEvt != nil {
es.eventHub.PublishPipelineEvent(experimentStartEventSource, *pipelineEvt)
es.eventHub.PublishPipelineEvent(experimentStopEventSource, *pipelineEvt)
}
if expEvt != nil {
es.eventHub.PublishExperimentEvent(experimentStopEventSource, *expEvt)
Expand All @@ -399,6 +408,7 @@ func (es *ExperimentStore) stopExperimentImpl(experimentName string) (*coordinat
var modelEvt *coordinator.ModelEventMsg
var pipelineEvt *coordinator.PipelineEventMsg
experiment.Deleted = true
experiment.DeletedAt = time.Now()
experiment.Active = false
es.cleanExperimentState(experiment)
if experiment.Default != nil {
Expand All @@ -413,7 +423,7 @@ func (es *ExperimentStore) stopExperimentImpl(experimentName string) (*coordinat
ModelName: *experiment.Default,
}
default:
return nil, nil, nil, fmt.Errorf("Unknown resource type %v", experiment.ResourceType)
return nil, nil, nil, fmt.Errorf("unknown resource type %v", experiment.ResourceType)
}
}
if es.db != nil {
Expand Down Expand Up @@ -461,3 +471,22 @@ func (es *ExperimentStore) GetExperiments() ([]*Experiment, error) {
}
return foundExperiments, nil
}

func (es *ExperimentStore) cleanupDeletedExperiments() {
es.logger.Info("cleaning up deleted experiments")
for _, experiment := range es.experiments {
if experiment.Deleted {
es.mu.Lock()
defer es.mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the lock should be done once before the for loop? otherwise using defer here is potentially going to cause a deadlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned above I think that this logic can also delete from badger db and not rely on ttl as well. just to make it simpler?

if experiment.DeletedAt.IsZero() {
experiment.DeletedAt = time.Now()
err := es.db.save(experiment)
if err != nil {
es.logger.Warnf("could not update DB TTL for experiment: %s", experiment.Name)
}
} else if experiment.DeletedAt.Add(utils.DeletedResourceTTL).Before(time.Now()) {
es.experiments[experiment.Name] = nil
}
}
}
}
Loading
Loading