Skip to content

Commit cb5cfbd

Browse files
author
Sherif Akoush
authored
fix(ci): fix flaky test (#6022)
* fix flaky test * cap execution of test in case of failures
1 parent 520ba61 commit cb5cfbd

File tree

2 files changed

+30
-9
lines changed

2 files changed

+30
-9
lines changed

scheduler/pkg/agent/server_test.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package agent
1212
import (
1313
"context"
1414
"fmt"
15+
"sync"
1516
"testing"
1617
"time"
1718

@@ -1042,16 +1043,23 @@ func TestSubscribe(t *testing.T) {
10421043
}
10431044
time.Sleep(100 * time.Millisecond)
10441045

1046+
mu := sync.Mutex{}
10451047
streams := make([]*grpc.ClientConn, 0)
10461048
for _, a := range test.agents {
10471049
go func(id uint32) {
10481050
conn := getStream(id, context.Background(), port)
1051+
mu.Lock()
10491052
streams = append(streams, conn)
1053+
mu.Unlock()
10501054
}(a.id)
10511055
}
10521056

1053-
time.Sleep(500 * time.Millisecond)
1054-
1057+
maxCount := 10
1058+
count := 0
1059+
for len(server.agents) != test.expectedAgentsCount && count < maxCount {
1060+
time.Sleep(100 * time.Millisecond)
1061+
count++
1062+
}
10551063
g.Expect(len(server.agents)).To(Equal(test.expectedAgentsCount))
10561064

10571065
for idx, s := range streams {
@@ -1062,8 +1070,11 @@ func TestSubscribe(t *testing.T) {
10621070
}(idx, s)
10631071
}
10641072

1065-
time.Sleep(10 * time.Second)
1066-
1073+
count = 0
1074+
for len(server.agents) != test.expectedAgentsCountAfterClose && count < maxCount {
1075+
time.Sleep(100 * time.Millisecond)
1076+
count++
1077+
}
10671078
g.Expect(len(server.agents)).To(Equal(test.expectedAgentsCountAfterClose))
10681079

10691080
server.StopAgentStreams()

scheduler/pkg/kafka/dataflow/server_test.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"context"
1414
"fmt"
1515
"os"
16+
"sync"
1617
"testing"
1718
"time"
1819

@@ -639,7 +640,6 @@ func TestPipelineRebalance(t *testing.T) {
639640

640641
func TestPipelineSubscribe(t *testing.T) {
641642
g := NewGomegaWithT(t)
642-
643643
type ag struct {
644644
id uint32
645645
doClose bool
@@ -729,16 +729,23 @@ func TestPipelineSubscribe(t *testing.T) {
729729

730730
time.Sleep(100 * time.Millisecond)
731731

732+
mu := sync.Mutex{}
732733
streams := make([]*grpc.ClientConn, 0)
733734
for _, a := range test.agents {
734735
go func(id uint32) {
735736
conn := getStream(id, context.Background(), port)
737+
mu.Lock()
736738
streams = append(streams, conn)
739+
mu.Unlock()
737740
}(a.id)
738741
}
739742

740-
time.Sleep(700 * time.Millisecond)
741-
743+
maxCount := 10
744+
count := 0
745+
for len(s.streams) != test.expectedAgentsCount && count < maxCount {
746+
time.Sleep(100 * time.Millisecond)
747+
count++
748+
}
742749
g.Expect(len(s.streams)).To(Equal(test.expectedAgentsCount))
743750

744751
for idx, s := range streams {
@@ -749,8 +756,11 @@ func TestPipelineSubscribe(t *testing.T) {
749756
}(idx, s)
750757
}
751758

752-
time.Sleep(10 * time.Second)
753-
759+
count = 0
760+
for len(s.streams) != test.expectedAgentsCountAfterClose && count < maxCount {
761+
time.Sleep(100 * time.Millisecond)
762+
count++
763+
}
754764
g.Expect(len(s.streams)).To(Equal(test.expectedAgentsCountAfterClose))
755765

756766
s.StopSendPipelineEvents()

0 commit comments

Comments
 (0)