Skip to content

Commit fa5fdf6

Browse files
authored
feat(scheduler): exposing the deleted resource ttl as a CLI param (SeldonIO#5994)
* exposing the deleted resource ttl as a CLI param * switching to seconds and adding a little test for the cleanup functions * merge v2 * Revert "merge v2" This reverts commit 4cf8644.
1 parent 53a8f1f commit fa5fdf6

File tree

11 files changed

+107
-81
lines changed

11 files changed

+107
-81
lines changed

scheduler/cmd/scheduler/main.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,11 @@ var (
5555
tracingConfigPath string
5656
dbPath string
5757
nodeID string
58-
allowPlaintxt bool //scheduler server
58+
allowPlaintxt bool // scheduler server
5959
autoscalingDisabled bool
6060
kafkaConfigPath string
6161
schedulerReadyTimeoutSeconds uint
62+
deletedResourceTTLSeconds uint
6263
)
6364

6465
const (
@@ -115,6 +116,9 @@ func init() {
115116

116117
// Timeout for scheduler to be ready
117118
flag.UintVar(&schedulerReadyTimeoutSeconds, "scheduler-ready-timeout-seconds", 300, "Timeout for scheduler to be ready")
119+
120+
// This TTL is set in badger DB
121+
flag.UintVar(&deletedResourceTTLSeconds, "deleted-resource-ttl-seconds", 86400, "TTL for deleted experiments and pipelines (in seconds)")
118122
}
119123

120124
func getNamespace() string {
@@ -211,11 +215,11 @@ func main() {
211215
// Do here after other services created so eventHub events will be handled on pipeline/experiment load
212216
// If we start earlier events will be sent but not received by services that start listening "late" to eventHub
213217
if dbPath != "" {
214-
err := ps.InitialiseOrRestoreDB(dbPath)
218+
err := ps.InitialiseOrRestoreDB(dbPath, deletedResourceTTLSeconds)
215219
if err != nil {
216220
log.WithError(err).Fatalf("Failed to initialise pipeline db at %s", dbPath)
217221
}
218-
err = es.InitialiseOrRestoreDB(dbPath)
222+
err = es.InitialiseOrRestoreDB(dbPath, deletedResourceTTLSeconds)
219223
if err != nil {
220224
log.WithError(err).Fatalf("Failed to initialise experiment db at %s", dbPath)
221225
}

scheduler/pkg/server/server_test.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ func TestLoadModel(t *testing.T) {
127127
},
128128
},
129129
},
130-
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"},
130+
model: &pb.Model{
131+
Meta: &pb.MetaData{Name: "model1"},
131132
ModelSpec: &pb.ModelSpec{
132133
Uri: "gs://model",
133134
Requirements: []string{"sklearn"},
@@ -384,39 +385,54 @@ func TestUnloadModel(t *testing.T) {
384385
{
385386
name: "Simple",
386387
req: []*pba.AgentSubscribeRequest{
387-
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
388-
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}}},
388+
{
389+
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
390+
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
391+
},
392+
},
389393
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"}, ModelSpec: &pb.ModelSpec{Uri: "gs://model", Requirements: []string{"sklearn"}, MemoryBytes: &smallMemory}, DeploymentSpec: &pb.DeploymentSpec{Replicas: 1}},
390394
code: codes.OK,
391395
modelState: store.ModelTerminated,
392396
},
393397
{
394398
name: "Multiple",
395399
req: []*pba.AgentSubscribeRequest{
396-
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
397-
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn", "xgboost"}}}},
400+
{
401+
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
402+
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn", "xgboost"}},
403+
},
404+
},
398405
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"}, ModelSpec: &pb.ModelSpec{Uri: "gs://model", Requirements: []string{"sklearn", "xgboost"}, MemoryBytes: &smallMemory}, DeploymentSpec: &pb.DeploymentSpec{Replicas: 1}},
399406
code: codes.OK,
400407
modelState: store.ModelTerminated,
401408
},
402409
{
403410
name: "TwoReplicas",
404411
req: []*pba.AgentSubscribeRequest{
405-
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
406-
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}},
407-
{ServerName: "server1", ReplicaIdx: 1, Shared: true, AvailableMemoryBytes: 1000,
408-
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}}},
412+
{
413+
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
414+
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
415+
},
416+
{
417+
ServerName: "server1", ReplicaIdx: 1, Shared: true, AvailableMemoryBytes: 1000,
418+
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
419+
},
420+
},
409421
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"}, ModelSpec: &pb.ModelSpec{Uri: "gs://model", Requirements: []string{"sklearn"}, MemoryBytes: &smallMemory}, DeploymentSpec: &pb.DeploymentSpec{Replicas: 2}},
410422
code: codes.OK,
411423
modelState: store.ModelTerminated,
412424
},
413425
{
414426
name: "NotExist",
415427
req: []*pba.AgentSubscribeRequest{
416-
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
417-
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}}},
428+
{
429+
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
430+
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
431+
},
432+
},
418433
model: nil,
419-
code: codes.FailedPrecondition},
434+
code: codes.FailedPrecondition,
435+
},
420436
}
421437
for _, test := range tests {
422438
t.Run(test.name, func(t *testing.T) {
@@ -518,7 +534,6 @@ func TestLoadPipeline(t *testing.T) {
518534
}
519535
})
520536
}
521-
522537
}
523538

524539
func TestUnloadPipeline(t *testing.T) {
@@ -562,7 +577,7 @@ func TestUnloadPipeline(t *testing.T) {
562577
for _, test := range tests {
563578
t.Run(test.name, func(t *testing.T) {
564579
path := fmt.Sprintf("%s/db", t.TempDir())
565-
_ = test.server.pipelineHandler.(*pipeline.PipelineStore).InitialiseOrRestoreDB(path)
580+
_ = test.server.pipelineHandler.(*pipeline.PipelineStore).InitialiseOrRestoreDB(path, 10)
566581
if test.loadReq != nil {
567582
err := test.server.pipelineHandler.AddPipeline(test.loadReq.Pipeline)
568583
g.Expect(err).To(BeNil())
@@ -575,7 +590,6 @@ func TestUnloadPipeline(t *testing.T) {
575590
}
576591
})
577592
}
578-
579593
}
580594

581595
func TestPipelineStatus(t *testing.T) {

scheduler/pkg/store/experiment/db.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ the Change License after the Change Date as each is defined in accordance with t
1010
package experiment
1111

1212
import (
13+
"time"
14+
1315
"github.com/dgraph-io/badger/v3"
1416
"github.com/sirupsen/logrus"
1517
"google.golang.org/protobuf/proto"
@@ -25,19 +27,21 @@ const (
2527
)
2628

2729
type ExperimentDBManager struct {
28-
db *badger.DB
29-
logger logrus.FieldLogger
30+
db *badger.DB
31+
logger logrus.FieldLogger
32+
deletedResourceTTL time.Duration
3033
}
3134

32-
func newExperimentDbManager(path string, logger logrus.FieldLogger) (*ExperimentDBManager, error) {
35+
func newExperimentDbManager(path string, logger logrus.FieldLogger, deletedResourceTTL uint) (*ExperimentDBManager, error) {
3336
db, err := utils.Open(path, logger, "experimentDb")
3437
if err != nil {
3538
return nil, err
3639
}
3740

3841
edb := &ExperimentDBManager{
39-
db: db,
40-
logger: logger,
42+
db: db,
43+
logger: logger,
44+
deletedResourceTTL: time.Duration(deletedResourceTTL * uint(time.Second)),
4145
}
4246

4347
version, err := edb.getVersion()
@@ -73,9 +77,8 @@ func (edb *ExperimentDBManager) save(experiment *Experiment) error {
7377
return err
7478
})
7579
} else {
76-
ttl := utils.DeletedResourceTTL
7780
return edb.db.Update(func(txn *badger.Txn) error {
78-
e := badger.NewEntry([]byte(experiment.Name), experimentBytes).WithTTL(ttl)
81+
e := badger.NewEntry([]byte(experiment.Name), experimentBytes).WithTTL(edb.deletedResourceTTL)
7982
err = txn.SetEntry(e)
8083
return err
8184
})
@@ -119,7 +122,7 @@ func (edb *ExperimentDBManager) restore(
119122
}
120123
experiment := CreateExperimentFromSnapshot(&snapshot)
121124
if experiment.Deleted {
122-
experiment.DeletedAt = utils.GetDeletedAt(item)
125+
experiment.DeletedAt = utils.GetDeletedAt(item, edb.deletedResourceTTL)
123126
err = stopExperimentCb(experiment)
124127
} else {
125128
// otherwise attempt to start the experiment

scheduler/pkg/store/experiment/db_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func TestSaveWithTTL(t *testing.T) {
102102

103103
path := fmt.Sprintf("%s/db", t.TempDir())
104104
logger := log.New()
105-
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
105+
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
106106
g.Expect(err).To(BeNil())
107107
err = db.save(experiment)
108108
g.Expect(err).To(BeNil())
@@ -307,7 +307,7 @@ func TestSaveAndRestore(t *testing.T) {
307307
t.Run(test.name, func(t *testing.T) {
308308
path := fmt.Sprintf("%s/db", t.TempDir())
309309
logger := log.New()
310-
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
310+
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
311311
g.Expect(err).To(BeNil())
312312
for _, p := range test.experiments {
313313
err := db.save(p)
@@ -317,7 +317,7 @@ func TestSaveAndRestore(t *testing.T) {
317317
g.Expect(err).To(BeNil())
318318

319319
es := NewExperimentServer(log.New(), nil, nil, nil)
320-
err = es.InitialiseOrRestoreDB(path)
320+
err = es.InitialiseOrRestoreDB(path, 10)
321321
g.Expect(err).To(BeNil())
322322
for idx, p := range test.experiments {
323323
if !test.errors[idx] {
@@ -379,7 +379,7 @@ func TestSaveAndRestoreDeletedExperiments(t *testing.T) {
379379
g.Expect(test.experiment.Deleted).To(BeTrue(), "this is a test for deleted experiments")
380380
path := fmt.Sprintf("%s/db", t.TempDir())
381381
logger := log.New()
382-
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
382+
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
383383
g.Expect(err).To(BeNil())
384384
if !test.withTTL {
385385
err = saveWithOutTTL(&test.experiment, edb.db)
@@ -392,7 +392,7 @@ func TestSaveAndRestoreDeletedExperiments(t *testing.T) {
392392
g.Expect(err).To(BeNil())
393393

394394
es := NewExperimentServer(log.New(), nil, nil, nil)
395-
err = es.InitialiseOrRestoreDB(path)
395+
err = es.InitialiseOrRestoreDB(path, 10)
396396
g.Expect(err).To(BeNil())
397397

398398
if !test.withTTL {
@@ -538,7 +538,7 @@ func TestGetExperimentFromDB(t *testing.T) {
538538
t.Run(test.name, func(t *testing.T) {
539539
path := fmt.Sprintf("%s/db", t.TempDir())
540540
logger := log.New()
541-
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
541+
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
542542
g.Expect(err).To(BeNil())
543543
for _, p := range test.experiments {
544544
err := db.save(p)
@@ -678,7 +678,7 @@ func TestDeleteExperimentFromDB(t *testing.T) {
678678
t.Run(test.name, func(t *testing.T) {
679679
path := fmt.Sprintf("%s/db", t.TempDir())
680680
logger := log.New()
681-
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
681+
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
682682
g.Expect(err).To(BeNil())
683683
for _, p := range test.experiments {
684684
err := db.save(p)
@@ -844,7 +844,7 @@ func TestMigrateFromV1ToV2(t *testing.T) {
844844
_ = db.Close()
845845

846846
// migrate
847-
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
847+
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
848848
g.Expect(err).To(BeNil())
849849

850850
g.Expect(err).To(BeNil())
@@ -856,7 +856,7 @@ func TestMigrateFromV1ToV2(t *testing.T) {
856856

857857
// check that we have no experiments in the db format
858858
es := NewExperimentServer(log.New(), nil, nil, nil)
859-
err = es.InitialiseOrRestoreDB(path)
859+
err = es.InitialiseOrRestoreDB(path, 10)
860860
g.Expect(err).To(BeNil())
861861
g.Expect(len(es.experiments)).To(Equal(0))
862862
})

scheduler/pkg/store/experiment/store.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,15 @@ func (es *ExperimentStore) addExperimentInMap(experiment *Experiment) error {
108108
}
109109
}
110110

111-
func (es *ExperimentStore) InitialiseOrRestoreDB(path string) error {
111+
func (es *ExperimentStore) InitialiseOrRestoreDB(path string, deletedResourceTTL uint) error {
112112
logger := es.logger.WithField("func", "initialiseDB")
113113
experimentDbPath := getExperimentDbFolder(path)
114114
logger.Infof("Initialise DB at %s", experimentDbPath)
115115
err := os.MkdirAll(experimentDbPath, os.ModePerm)
116116
if err != nil {
117117
return err
118118
}
119-
db, err := newExperimentDbManager(experimentDbPath, es.logger)
119+
db, err := newExperimentDbManager(experimentDbPath, es.logger, deletedResourceTTL)
120120
if err != nil {
121121
return err
122122
}
@@ -484,7 +484,7 @@ func (es *ExperimentStore) cleanupDeletedExperiments() {
484484
es.logger.Warnf("could not update DB TTL for experiment: %s", experiment.Name)
485485
}
486486
}
487-
} else if experiment.DeletedAt.Add(utils.DeletedResourceTTL).Before(time.Now()) {
487+
} else if experiment.DeletedAt.Add(es.db.deletedResourceTTL).Before(time.Now()) {
488488
delete(es.experiments, experiment.Name)
489489
es.logger.Info("cleaning up deleted experiment: %s", experiment.Name)
490490
}

scheduler/pkg/store/experiment/store_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func TestStartExperiment(t *testing.T) {
195195
g.Expect(err).To(BeNil())
196196
server := NewExperimentServer(logger, eventHub, fakeModelStore{}, fakePipelineStore{})
197197
// init db
198-
_ = server.InitialiseOrRestoreDB(path)
198+
_ = server.InitialiseOrRestoreDB(path, 10)
199199
for _, ea := range test.experiments {
200200
err := server.StartExperiment(ea.experiment)
201201
if ea.fail {
@@ -262,7 +262,7 @@ func TestStopExperiment(t *testing.T) {
262262
path := fmt.Sprintf("%s/db", t.TempDir())
263263

264264
// init db
265-
err := test.store.InitialiseOrRestoreDB(path)
265+
err := test.store.InitialiseOrRestoreDB(path, 1)
266266
g.Expect(err).To(BeNil())
267267
for _, p := range test.store.experiments {
268268
err := test.store.db.save(p)
@@ -288,6 +288,10 @@ func TestStopExperiment(t *testing.T) {
288288
// check db
289289
experimentFromDB, _ := test.store.db.get(test.experimentName)
290290
g.Expect(experimentFromDB.Deleted).To(BeTrue())
291+
292+
time.Sleep(1 * time.Second)
293+
test.store.cleanupDeletedExperiments()
294+
g.Expect(test.store.experiments[test.experimentName]).To(BeNil())
291295
}
292296
})
293297
}
@@ -413,7 +417,7 @@ func TestRestoreExperiments(t *testing.T) {
413417
experiments: make(map[string]*Experiment),
414418
}
415419
// init db
416-
err := store.InitialiseOrRestoreDB(path)
420+
err := store.InitialiseOrRestoreDB(path, 10)
417421
g.Expect(err).To(BeNil())
418422
for _, p := range test.experiments {
419423
err := store.db.save(p)
@@ -422,7 +426,7 @@ func TestRestoreExperiments(t *testing.T) {
422426
_ = store.db.Stop()
423427

424428
// restore from db now that we have state on disk
425-
_ = store.InitialiseOrRestoreDB(path)
429+
_ = store.InitialiseOrRestoreDB(path, 10)
426430

427431
for _, p := range test.experiments {
428432
experimentFromDB, _ := store.db.get(p.Name)

0 commit comments

Comments
 (0)