Skip to content

Commit 0580238

Browse files
author
Sherif Akoush
authored
Merge pull request #5762 from SeldonIO/v2
ci: Merge change from v2 for release 2.8.3
2 parents 346fd1e + c99d281 commit 0580238

File tree

3 files changed

+132
-27
lines changed

3 files changed

+132
-27
lines changed

scheduler/pkg/agent/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ func (s *Server) Subscribe(request *pb.AgentSubscribeRequest, stream pb.AgentSer
390390
}
391391
s.mutex.Unlock()
392392

393-
err := s.syncMessage(request, stream)
393+
err := s.syncMessage(request)
394394
if err != nil {
395395
return err
396396
}
@@ -421,7 +421,7 @@ func (s *Server) StopAgentStreams() {
421421
}
422422
}
423423

424-
func (s *Server) syncMessage(request *pb.AgentSubscribeRequest, stream pb.AgentService_SubscribeServer) error {
424+
func (s *Server) syncMessage(request *pb.AgentSubscribeRequest) error {
425425
s.logger.Debugf("Add Server Replica %+v with config %+v", request, request.ReplicaConfig)
426426
err := s.store.AddServerReplica(request)
427427
if err != nil {

scheduler/pkg/server/server.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -431,29 +431,35 @@ func (s *SchedulerServer) ServerStatus(
431431
}
432432

433433
func createServerStatusResponse(s *store.ServerSnapshot) *pb.ServerStatusResponse {
434+
// note we dont count draining replicas in available replicas
435+
434436
resp := &pb.ServerStatusResponse{
435-
ServerName: s.Name,
436-
AvailableReplicas: int32(len(s.Replicas)),
437-
ExpectedReplicas: int32(s.ExpectedReplicas),
438-
KubernetesMeta: s.KubernetesMeta,
437+
ServerName: s.Name,
438+
ExpectedReplicas: int32(s.ExpectedReplicas),
439+
KubernetesMeta: s.KubernetesMeta,
439440
}
440441

441-
var totalModels int32
442+
totalModels := int32(0)
443+
numAvailableServerReplicas := int32(0)
442444
for _, replica := range s.Replicas {
443-
numLoadedModelsOnReplica := int32(replica.GetNumLoadedModels())
444-
resp.Resources = append(
445-
resp.Resources,
446-
&pb.ServerReplicaResources{
447-
ReplicaIdx: uint32(replica.GetReplicaIdx()),
448-
TotalMemoryBytes: replica.GetMemory(),
449-
AvailableMemoryBytes: replica.GetAvailableMemory(),
450-
NumLoadedModels: numLoadedModelsOnReplica,
451-
OverCommitPercentage: replica.GetOverCommitPercentage(),
452-
},
453-
)
454-
totalModels += numLoadedModelsOnReplica
445+
if !replica.GetIsDraining() {
446+
numLoadedModelsOnReplica := int32(replica.GetNumLoadedModels())
447+
resp.Resources = append(
448+
resp.Resources,
449+
&pb.ServerReplicaResources{
450+
ReplicaIdx: uint32(replica.GetReplicaIdx()),
451+
TotalMemoryBytes: replica.GetMemory(),
452+
AvailableMemoryBytes: replica.GetAvailableMemory(),
453+
NumLoadedModels: numLoadedModelsOnReplica,
454+
OverCommitPercentage: replica.GetOverCommitPercentage(),
455+
},
456+
)
457+
totalModels += numLoadedModelsOnReplica
458+
numAvailableServerReplicas++
459+
}
455460
}
456461
resp.NumLoadedModelReplicas = totalModels
462+
resp.AvailableReplicas = numAvailableServerReplicas
457463

458464
return resp
459465
}

scheduler/pkg/server/server_status_test.go

Lines changed: 107 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -172,19 +172,101 @@ func TestModelsStatusEvents(t *testing.T) {
172172
}
173173

174174
func TestServersStatusStream(t *testing.T) {
175+
type serverReplicaRequest struct {
176+
request *pba.AgentSubscribeRequest
177+
draining bool
178+
}
179+
175180
g := NewGomegaWithT(t)
176181
type test struct {
177182
name string
178-
loadReq *pba.AgentSubscribeRequest
183+
loadReq []serverReplicaRequest
179184
server *SchedulerServer
180185
err bool
181186
}
182187

183188
tests := []test{
184189
{
185-
name: "server ok",
186-
loadReq: &pba.AgentSubscribeRequest{
187-
ServerName: "foo",
190+
name: "server ok - 1 empty replica",
191+
loadReq: []serverReplicaRequest{
192+
{
193+
request: &pba.AgentSubscribeRequest{
194+
ServerName: "foo",
195+
},
196+
},
197+
},
198+
server: &SchedulerServer{
199+
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
200+
logger: log.New(),
201+
timeout: 10 * time.Millisecond,
202+
},
203+
},
204+
{
205+
name: "server ok - multiple replicas",
206+
loadReq: []serverReplicaRequest{
207+
{
208+
request: &pba.AgentSubscribeRequest{
209+
ServerName: "foo",
210+
ReplicaIdx: 0,
211+
LoadedModels: []*pba.ModelVersion{
212+
{
213+
Model: &pb.Model{
214+
Meta: &pb.MetaData{Name: "foo-model"},
215+
},
216+
},
217+
},
218+
},
219+
},
220+
{
221+
request: &pba.AgentSubscribeRequest{
222+
ServerName: "foo",
223+
ReplicaIdx: 1,
224+
LoadedModels: []*pba.ModelVersion{
225+
{
226+
Model: &pb.Model{
227+
Meta: &pb.MetaData{Name: "foo-model"},
228+
},
229+
},
230+
},
231+
},
232+
},
233+
},
234+
server: &SchedulerServer{
235+
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
236+
logger: log.New(),
237+
timeout: 10 * time.Millisecond,
238+
},
239+
},
240+
{
241+
name: "server ok - multiple replicas with draining",
242+
loadReq: []serverReplicaRequest{
243+
{
244+
request: &pba.AgentSubscribeRequest{
245+
ServerName: "foo",
246+
ReplicaIdx: 0,
247+
LoadedModels: []*pba.ModelVersion{
248+
{
249+
Model: &pb.Model{
250+
Meta: &pb.MetaData{Name: "foo-model"},
251+
},
252+
},
253+
},
254+
},
255+
},
256+
{
257+
request: &pba.AgentSubscribeRequest{
258+
ServerName: "foo",
259+
ReplicaIdx: 1,
260+
LoadedModels: []*pba.ModelVersion{
261+
{
262+
Model: &pb.Model{
263+
Meta: &pb.MetaData{Name: "foo-model"},
264+
},
265+
},
266+
},
267+
},
268+
draining: true,
269+
},
188270
},
189271
server: &SchedulerServer{
190272
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
@@ -194,8 +276,12 @@ func TestServersStatusStream(t *testing.T) {
194276
},
195277
{
196278
name: "timeout",
197-
loadReq: &pba.AgentSubscribeRequest{
198-
ServerName: "foo",
279+
loadReq: []serverReplicaRequest{
280+
{
281+
request: &pba.AgentSubscribeRequest{
282+
ServerName: "foo",
283+
},
284+
},
199285
},
200286
server: &SchedulerServer{
201287
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
@@ -208,9 +294,20 @@ func TestServersStatusStream(t *testing.T) {
208294

209295
for _, test := range tests {
210296
t.Run(test.name, func(t *testing.T) {
297+
expectedReplicas := int32(0)
298+
expectedNumLoadedModelReplicas := int32(0)
211299
if test.loadReq != nil {
212-
err := test.server.modelStore.AddServerReplica(test.loadReq)
213-
g.Expect(err).To(BeNil())
300+
for _, r := range test.loadReq {
301+
err := test.server.modelStore.AddServerReplica(r.request)
302+
g.Expect(err).To(BeNil())
303+
if !r.draining {
304+
expectedReplicas++
305+
expectedNumLoadedModelReplicas += int32(len(r.request.LoadedModels))
306+
} else {
307+
server, _ := test.server.modelStore.GetServer("foo", true, false)
308+
server.Replicas[int(r.request.ReplicaIdx)].SetIsDraining()
309+
}
310+
}
214311
}
215312

216313
stream := newStubServerStatusServer(1, 5*time.Millisecond)
@@ -230,6 +327,8 @@ func TestServersStatusStream(t *testing.T) {
230327

231328
g.Expect(ssr).ToNot(BeNil())
232329
g.Expect(ssr.ServerName).To(Equal("foo"))
330+
g.Expect(ssr.GetAvailableReplicas()).To(Equal(expectedReplicas))
331+
g.Expect(ssr.NumLoadedModelReplicas).To(Equal(expectedNumLoadedModelReplicas))
233332
}
234333
})
235334
}

0 commit comments

Comments
 (0)