Skip to content

Commit 2d6aff7

Browse files
corherexinfengliu
andcommitted
manager: fix task scheduler infinite loop
If the running tasks for a service are not well balanced across the placement-preference tree, the task scheduler could enter an infinite loop when scaling the service up. The scheduleNTasksOnSubtree loop terminates when either all tasks have been scheduled onto nodes, or the nodes in all subtrees are out of room to accept new tasks. The trouble is that the algorithm only considers a subtree to be out of room if an attempt was made to schedule tasks onto its nodes but not all tasks were scheduled. Subtrees with more tasks already running than the desired number of tasks for a balanced tree are skipped over without attempting to assign any tasks, so do not have a chance to be considered out of room. The scheduler will therefore enter a tight infinite loop when there exists a node of the placement-preferences tree in which at least one subtree has more tasks running than desired, and all other subtrees are out of room for more tasks. It would be incorrect to consider a subtree as out of room just because there are more tasks running than desired at a particular iteration of the scheduling loop. The desired number of tasks to assign changes as the scheduler iteratively schedules tasks and other subtrees run out of room, so it is possible for a subtree to become eligible in a future iteration. Add a third condition to the task scheduler loop. Make it so the loop exits if there are no subtrees which are eligible for task scheduling, whether due to being out of room or have more tasks running than desired. Co-authored-by: Xinfeng Liu <[email protected]> Signed-off-by: Cory Snider <[email protected]>
1 parent 8c19597 commit 2d6aff7

File tree

3 files changed

+318
-1
lines changed

3 files changed

+318
-1
lines changed

manager/scheduler/nodeset_test.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package scheduler
2+
3+
import (
4+
"testing"
5+
6+
"github.com/moby/swarmkit/v2/api"
7+
)
8+
9+
func TestTreeTaskCountConsistency(t *testing.T) {
10+
// Create a nodeSet with some test nodes
11+
ns := &nodeSet{nodes: make(map[string]NodeInfo)}
12+
13+
// Add test nodes with different labels and task counts
14+
nodes := []NodeInfo{
15+
{
16+
Node: &api.Node{
17+
ID: "node1",
18+
Spec: api.NodeSpec{
19+
Annotations: api.Annotations{
20+
Labels: map[string]string{"datacenter": "dc1", "rack": "r1"},
21+
},
22+
},
23+
},
24+
ActiveTasksCountByService: map[string]int{"service1": 3},
25+
},
26+
{
27+
Node: &api.Node{
28+
ID: "node2",
29+
Spec: api.NodeSpec{
30+
Annotations: api.Annotations{
31+
Labels: map[string]string{"datacenter": "dc1", "rack": "r2"},
32+
},
33+
},
34+
},
35+
ActiveTasksCountByService: map[string]int{"service1": 2},
36+
},
37+
{
38+
Node: &api.Node{
39+
ID: "node3",
40+
Spec: api.NodeSpec{
41+
Annotations: api.Annotations{
42+
Labels: map[string]string{"datacenter": "dc2", "rack": "r2"},
43+
},
44+
},
45+
},
46+
ActiveTasksCountByService: map[string]int{"service1": 4},
47+
},
48+
{
49+
Node: &api.Node{
50+
ID: "node4",
51+
Spec: api.NodeSpec{
52+
Annotations: api.Annotations{
53+
Labels: map[string]string{}, // no label
54+
},
55+
},
56+
},
57+
ActiveTasksCountByService: map[string]int{"service1": 2},
58+
},
59+
{
60+
Node: &api.Node{
61+
ID: "node5",
62+
Spec: api.NodeSpec{
63+
Annotations: api.Annotations{
64+
Labels: map[string]string{}, // no label
65+
},
66+
},
67+
},
68+
ActiveTasksCountByService: map[string]int{"service1": 1},
69+
},
70+
}
71+
72+
for _, node := range nodes {
73+
ns.addOrUpdateNode(node)
74+
}
75+
76+
preferences := []*api.PlacementPreference{
77+
{
78+
Preference: &api.PlacementPreference_Spread{
79+
Spread: &api.SpreadOver{
80+
SpreadDescriptor: "node.labels.datacenter",
81+
},
82+
},
83+
},
84+
{
85+
Preference: &api.PlacementPreference_Spread{
86+
Spread: &api.SpreadOver{
87+
SpreadDescriptor: "node.labels.rack",
88+
},
89+
},
90+
},
91+
}
92+
93+
// Create the tree
94+
tree := ns.tree("service1", preferences, 10,
95+
func(*NodeInfo) bool { return true },
96+
func(a, b *NodeInfo) bool { return true })
97+
98+
// Helper function to verify task count consistency recursively
99+
var verifyTaskCounts func(*testing.T, *decisionTree) int
100+
verifyTaskCounts = func(t *testing.T, dt *decisionTree) int {
101+
if dt == nil {
102+
return 0
103+
}
104+
105+
if dt.next == nil {
106+
return dt.tasks
107+
}
108+
109+
// Calculate sum of children's tasks
110+
childrenSum := 0
111+
for _, child := range dt.next {
112+
childrenSum += verifyTaskCounts(t, child)
113+
}
114+
115+
// Verify parent's task count equals sum of children
116+
if dt.tasks != childrenSum {
117+
t.Errorf("Parent task count (%d) does not equal sum of children (%d)",
118+
dt.tasks, childrenSum)
119+
}
120+
121+
return dt.tasks
122+
}
123+
124+
// Run the verification
125+
verifyTaskCounts(t, &tree)
126+
127+
// Verify specific expected values
128+
if tree.tasks != 12 { // Total tasks: 3 + 2 + 4 + 2 + 1 = 12
129+
t.Errorf("Expected root to have 12 tasks, got %d", tree.tasks)
130+
}
131+
132+
dc1Tasks := tree.next["dc1"].tasks
133+
if dc1Tasks != 5 { // dc1 tasks: 3 + 2 = 5
134+
t.Errorf("Expected dc1 to have 5 tasks, got %d", dc1Tasks)
135+
}
136+
dc1r1Tasks := tree.next["dc1"].next["r1"].tasks
137+
if dc1r1Tasks != 3 {
138+
t.Errorf("Expected dc1 r1 to have 3 tasks, got %d", dc1r1Tasks)
139+
}
140+
dc1r2Tasks := tree.next["dc1"].next["r2"].tasks
141+
if dc1r2Tasks != 2 {
142+
t.Errorf("Expected dc1 r1 to have 2 tasks, got %d", dc1r2Tasks)
143+
}
144+
145+
dc2Tasks := tree.next["dc2"].tasks
146+
if dc2Tasks != 4 { // dc2 tasks: 4
147+
t.Errorf("Expected dc2 to have 4 tasks, got %d", dc2Tasks)
148+
}
149+
dc2r2Tasks := tree.next["dc2"].next["r2"].tasks
150+
if dc2r2Tasks != 4 {
151+
t.Errorf("Expected dc1 r1 to have 4 tasks, got %d", dc1r2Tasks)
152+
}
153+
154+
otherTasks := tree.next[""].tasks
155+
if otherTasks != 3 {
156+
t.Errorf("Expected others to have 3 tasks, got %d", otherTasks)
157+
}
158+
subOtherTasks := tree.next[""].next[""].tasks
159+
if subOtherTasks != 3 {
160+
t.Errorf("Expected sub-others to have 3 tasks, got %d", subOtherTasks)
161+
}
162+
163+
}

manager/scheduler/scheduler.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -787,9 +787,11 @@ func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGrou
787787

788788
// Try to make branches even until either all branches are
789789
// full, or all tasks have been scheduled.
790-
for tasksScheduled != n && len(noRoom) != len(tree.next) {
790+
converging := true
791+
for tasksScheduled != n && len(noRoom) != len(tree.next) && converging {
791792
desiredTasksPerBranch := (tasksInUsableBranches + n - tasksScheduled) / (len(tree.next) - len(noRoom))
792793
remainder := (tasksInUsableBranches + n - tasksScheduled) % (len(tree.next) - len(noRoom))
794+
converging = false
793795

794796
for _, subtree := range tree.next {
795797
if noRoom != nil {
@@ -799,6 +801,7 @@ func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGrou
799801
}
800802
subtreeTasks := subtree.tasks
801803
if subtreeTasks < desiredTasksPerBranch || (subtreeTasks == desiredTasksPerBranch && remainder > 0) {
804+
converging = true
802805
tasksToAssign := desiredTasksPerBranch - subtreeTasks
803806
if remainder > 0 {
804807
tasksToAssign++

manager/scheduler/scheduler_test.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,6 +1110,156 @@ func TestMultiplePreferences(t *testing.T) {
11101110
t.Run("useSpecVersion=true", func(t *testing.T) { testMultiplePreferences(t, true) })
11111111
}
11121112

1113+
// TestMultiplePreferencesScaleUp is a regression test for an infinite loop
1114+
// bug in the scheduler.
1115+
func TestMultiplePreferencesScaleUp(t *testing.T) {
1116+
ctx := context.Background()
1117+
initialNodeSet := []*api.Node{
1118+
{
1119+
ID: "id11",
1120+
Status: api.NodeStatus{
1121+
State: api.NodeStatus_READY,
1122+
},
1123+
Spec: api.NodeSpec{
1124+
Annotations: api.Annotations{
1125+
Labels: map[string]string{
1126+
"az": "dc1",
1127+
"rack": "r1",
1128+
},
1129+
},
1130+
},
1131+
},
1132+
{
1133+
ID: "id12",
1134+
Status: api.NodeStatus{
1135+
State: api.NodeStatus_READY,
1136+
},
1137+
Spec: api.NodeSpec{
1138+
Annotations: api.Annotations{
1139+
Labels: map[string]string{
1140+
"az": "dc1",
1141+
"rack": "r2",
1142+
},
1143+
},
1144+
},
1145+
},
1146+
{
1147+
ID: "id21",
1148+
Status: api.NodeStatus{
1149+
State: api.NodeStatus_READY,
1150+
},
1151+
Spec: api.NodeSpec{
1152+
Annotations: api.Annotations{
1153+
Labels: map[string]string{
1154+
"az": "dc2",
1155+
"rack": "r1",
1156+
},
1157+
},
1158+
},
1159+
},
1160+
}
1161+
1162+
taskTemplate1 := &api.Task{
1163+
DesiredState: api.TaskStateRunning,
1164+
ServiceID: "service1",
1165+
// The service needs to have a spec version to be scheduled as a
1166+
// group, a necessary precondition for the scheduler
1167+
// infinite-loop bug.
1168+
SpecVersion: &api.Version{Index: 1},
1169+
Spec: api.TaskSpec{
1170+
Runtime: &api.TaskSpec_Container{
1171+
Container: &api.ContainerSpec{
1172+
Image: "v:1",
1173+
},
1174+
},
1175+
Placement: &api.Placement{
1176+
Preferences: []*api.PlacementPreference{
1177+
{
1178+
Preference: &api.PlacementPreference_Spread{
1179+
Spread: &api.SpreadOver{
1180+
SpreadDescriptor: "node.labels.az",
1181+
},
1182+
},
1183+
},
1184+
{
1185+
Preference: &api.PlacementPreference_Spread{
1186+
Spread: &api.SpreadOver{
1187+
SpreadDescriptor: "node.labels.rack",
1188+
},
1189+
},
1190+
},
1191+
},
1192+
},
1193+
},
1194+
Status: api.TaskStatus{
1195+
State: api.TaskStatePending,
1196+
},
1197+
}
1198+
1199+
s := store.NewMemoryStore(nil)
1200+
assert.NotNil(t, s)
1201+
defer s.Close()
1202+
1203+
t1Instances := 2
1204+
1205+
err := s.Update(func(tx store.Tx) error {
1206+
// Prepoulate nodes
1207+
for _, n := range initialNodeSet {
1208+
assert.NoError(t, store.CreateNode(tx, n))
1209+
}
1210+
1211+
// Prepopulate tasks from template 1
1212+
for i := 0; i != t1Instances; i++ {
1213+
taskTemplate1.ID = fmt.Sprintf("t1id%d", i)
1214+
assert.NoError(t, store.CreateTask(tx, taskTemplate1))
1215+
}
1216+
1217+
// Populate some running tasks to simulate a service scaling scenario
1218+
for node, tasks := range map[string]int{
1219+
"id11": 3,
1220+
"id12": 1,
1221+
"id21": 3,
1222+
} {
1223+
for i := 0; i != tasks; i++ {
1224+
taskTemplate1.ID = fmt.Sprintf("t1running-%s-%d", node, i)
1225+
taskTemplate1.NodeID = node
1226+
taskTemplate1.Status.State = api.TaskStateRunning
1227+
assert.NoError(t, store.CreateTask(tx, taskTemplate1))
1228+
}
1229+
}
1230+
return nil
1231+
})
1232+
assert.NoError(t, err)
1233+
1234+
scheduler := New(s)
1235+
1236+
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
1237+
defer cancel()
1238+
1239+
go func() {
1240+
assert.NoError(t, scheduler.Run(ctx))
1241+
}()
1242+
defer scheduler.Stop()
1243+
1244+
t1Assignments := make(map[string]int)
1245+
totalAssignments := 0
1246+
for i := 0; i != t1Instances; i++ {
1247+
assignment := watchAssignment(t, watch)
1248+
if !strings.HasPrefix(assignment.ID, "t1") {
1249+
t.Fatal("got assignment for different kind of task")
1250+
}
1251+
t1Assignments[assignment.NodeID]++
1252+
totalAssignments++
1253+
}
1254+
1255+
t.Logf("t1Assignments: %#v", t1Assignments)
1256+
assert.Equal(t, t1Instances, totalAssignments)
1257+
// It would be valid for the scheduler either assign the tasks to id12,
1258+
// which balances r1 and r2 of dc1, or assign the tasks to id21, which
1259+
// balances dc1 and dc2.
1260+
assert.Equal(t, 2, t1Assignments["id12"]+t1Assignments["id21"])
1261+
}
1262+
11131263
func TestSchedulerNoReadyNodes(t *testing.T) {
11141264
ctx := context.Background()
11151265
initialTask := &api.Task{
@@ -2698,6 +2848,7 @@ func watchAssignmentFailure(t *testing.T, watch chan events.Event) *api.Task {
26982848
}
26992849

27002850
func watchAssignment(t *testing.T, watch chan events.Event) *api.Task {
2851+
t.Helper()
27012852
for {
27022853
select {
27032854
case event := <-watch:

0 commit comments

Comments
 (0)