Skip to content

Commit 6825651

Browse files
committed
add context on socket calls
HAProxy Ingress shutdown is eventually racing during stop, leading to the 25s controller manager timeout, taking more time for the controller pod to be finalized. This is happening when sigint/sigterm happens just before a new haproxy reload is issued: the signal closes the reconciler context, which stops haproxy, making the reload check to wait forever until haproxy is back again, leading to the timeout of the controller manager. This update changes the StopCh channel to a proper context, and uses the context on all the socket calls. Should be added to v0.15 branch.
1 parent 936b39f commit 6825651

File tree

8 files changed

+65
-41
lines changed

8 files changed

+65
-41
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,9 @@ setup-envtest:
6464
test-integration: gotestsum setup-envtest
6565
@echo
6666
@echo "Running Kubernetes $(HAPROXY_INGRESS_ENVTEST)"
67+
haproxy -v
6768
KUBEBUILDER_ASSETS="$(shell $(LOCAL_SETUP_ENVTEST) use $(HAPROXY_INGRESS_ENVTEST) --bin-dir $(LOCALBIN) -i -p path)"\
68-
$(LOCAL_GOTESTSUM) --format=testname -- -count=1 -tags=cgo ./tests/integration/...
69+
$(LOCAL_GOTESTSUM) --format=testname --hide-summary=output -- -count=1 -tags=cgo ./tests/integration/...
6970

7071
.PHONY: linux-build
7172
linux-build:

pkg/controller/legacy/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (hc *HAProxyController) configController() {
145145
ReloadStrategy: hc.cfg.ReloadStrategy,
146146
MaxOldConfigFiles: hc.cfg.MaxOldConfigFiles,
147147
SortEndpointsBy: hc.cfg.SortEndpointsBy,
148-
StopCh: hc.stopCh,
148+
StopCtx: wait.ContextForChannel(hc.stopCh),
149149
TrackInstances: hc.cfg.TrackOldInstances,
150150
ValidateConfig: hc.cfg.ValidateConfig,
151151
}

pkg/controller/reconciler/reconciler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ type rparam struct {
5151

5252
// Reconcile ...
5353
func (r *IngressReconciler) Reconcile(ctx context.Context, req rparam) (ctrl.Result, error) {
54+
if err := ctx.Err(); err != nil {
55+
return ctrl.Result{}, err
56+
}
5457
changed := r.watchers.getChangedObjects()
5558
changed.NeedFullSync = req.fullsync
5659
err := r.Services.ReconcileIngress(ctx, changed)

pkg/controller/services/services.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (s *Services) setup(ctx context.Context) error {
142142
ReloadStrategy: cfg.ReloadStrategy,
143143
MaxOldConfigFiles: cfg.MaxOldConfigFiles,
144144
SortEndpointsBy: cfg.SortEndpointsBy,
145-
StopCh: ctx.Done(),
145+
StopCtx: ctx,
146146
TrackInstances: cfg.TrackOldInstances,
147147
ValidateConfig: cfg.ValidateConfig,
148148
AcmeSigner: acmeSigner,

pkg/haproxy/connections.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,25 @@ limitations under the License.
1717
package haproxy
1818

1919
import (
20+
"context"
2021
"strings"
2122
"sync"
2223
"time"
2324

2425
"github.com/jcmoraisjr/haproxy-ingress/pkg/haproxy/socket"
2526
)
2627

27-
func newConnections(masterSock, adminSock string) *connections {
28+
func newConnections(ctx context.Context, masterSock, adminSock string) *connections {
2829
return &connections{
30+
ctx: ctx,
2931
mutex: sync.Mutex{},
3032
masterSock: masterSock,
3133
adminSock: adminSock,
3234
}
3335
}
3436

3537
type connections struct {
38+
ctx context.Context
3639
mutex sync.Mutex
3740
masterSock string
3841
adminSock string
@@ -47,7 +50,7 @@ func (c *connections) TrackCurrentInstance(timeoutStopDur, closeSessDur time.Dur
4750
c.mutex.Lock()
4851
defer c.mutex.Unlock()
4952
c.shrinkConns()
50-
sock := socket.NewSocketConcurrent(c.adminSock, true)
53+
sock := socket.NewSocketConcurrent(c.ctx, c.adminSock, true)
5154
if err := sock.Unlistening(); err != nil {
5255
return err
5356
}
@@ -132,14 +135,14 @@ func shutdownSessionsSync(sock socket.HAProxySocket, duration time.Duration) {
132135

133136
func (c *connections) Admin() socket.HAProxySocket {
134137
if c.admin == nil {
135-
c.admin = socket.NewSocket(c.adminSock, false)
138+
c.admin = socket.NewSocket(c.ctx, c.adminSock, false)
136139
}
137140
return c.admin
138141
}
139142

140143
func (c *connections) Master() socket.HAProxySocket {
141144
if c.master == nil {
142-
c.master = socket.NewSocket(c.masterSock, false)
145+
c.master = socket.NewSocket(c.ctx, c.masterSock, false)
143146
}
144147
return c.master
145148
}
@@ -148,14 +151,14 @@ func (c *connections) DynUpdate() socket.HAProxySocket {
148151
if c.dynUpdate == nil {
149152
// using a non persistent connection (keep alive false)
150153
// to ensure that the current instance will be used
151-
c.dynUpdate = socket.NewSocket(c.adminSock, false)
154+
c.dynUpdate = socket.NewSocket(c.ctx, c.adminSock, false)
152155
}
153156
return c.dynUpdate
154157
}
155158

156159
func (c *connections) IdleChk() socket.HAProxySocket {
157160
if c.idleChk == nil {
158-
c.idleChk = socket.NewSocket(c.adminSock, false)
161+
c.idleChk = socket.NewSocket(c.ctx, c.adminSock, false)
159162
}
160163
return c.idleChk
161164
}

pkg/haproxy/instance.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package haproxy
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"os"
2223
"os/exec"
@@ -58,7 +59,7 @@ type InstanceOptions struct {
5859
ReloadQueue utils.QueueFacade
5960
ReloadStrategy string
6061
SortEndpointsBy string
61-
StopCh <-chan struct{}
62+
StopCtx context.Context
6263
TrackInstances bool
6364
ValidateConfig bool
6465
// TODO Fake is used to skip real haproxy calls. Use a mock instead.
@@ -83,7 +84,7 @@ func CreateInstance(logger types.Logger, options InstanceOptions) Instance {
8384
waitProc: make(chan struct{}),
8485
logger: logger,
8586
options: &options,
86-
conns: newConnections(options.MasterSocket, options.AdminSocket),
87+
conns: newConnections(options.StopCtx, options.MasterSocket, options.AdminSocket),
8788
metrics: options.Metrics,
8889
//
8990
haproxyTmpl: template.CreateConfig(),
@@ -648,7 +649,7 @@ func (i *instance) reloadEmbeddedDaemon() error {
648649
func (i *instance) reloadEmbeddedMasterWorker() error {
649650
i.embdStart.Do(func() {
650651
go func() {
651-
wait.Until(i.startHAProxySync, 4*time.Second, i.options.StopCh)
652+
wait.UntilWithContext(i.options.StopCtx, i.startHAProxySync, 4*time.Second)
652653
close(i.waitProc)
653654
}()
654655
})
@@ -665,7 +666,7 @@ func (i *instance) reloadEmbeddedMasterWorker() error {
665666
return i.waitWorker(prevReloads)
666667
}
667668

668-
func (i *instance) startHAProxySync() {
669+
func (i *instance) startHAProxySync(ctx context.Context) {
669670
cmd := exec.Command(
670671
"haproxy",
671672
"-W",
@@ -692,7 +693,7 @@ func (i *instance) startHAProxySync() {
692693
close(wait)
693694
}()
694695
select {
695-
case <-i.options.StopCh:
696+
case <-ctx.Done():
696697
i.logger.Info("stopping haproxy master process (pid: %d)", cmd.Process.Pid)
697698
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
698699
i.logger.Error("error stopping haproxy process: %v", err)
@@ -726,14 +727,14 @@ func (i *instance) waitMaster() error {
726727
errCh := make(chan error)
727728
masterSock := i.conns.Master()
728729
go func() {
729-
_, err := socket.HAProxyProcs(masterSock)
730+
_, err := socket.HAProxyProcs(i.options.StopCtx, masterSock)
730731
errCh <- err
731732
}()
732733
for {
733734
select {
734735
case err := <-errCh:
735736
return err
736-
case <-i.options.StopCh:
737+
case <-i.options.StopCtx.Done():
737738
return fmt.Errorf("received sigterm")
738739
case <-time.After(10 * time.Second):
739740
i.logger.Info("... still waiting for the master socket '%s'", masterSock.Address())
@@ -747,7 +748,7 @@ func (i *instance) reloadWorker() (int, error) {
747748
i.logger.Warn("failed to persist servers state before worker reload: %w", err)
748749
}
749750
}
750-
procs, err := socket.HAProxyProcs(i.conns.Master())
751+
procs, err := socket.HAProxyProcs(i.options.StopCtx, i.conns.Master())
751752
if err != nil {
752753
return 0, fmt.Errorf("error reading haproxy procs: %w", err)
753754
}
@@ -758,12 +759,17 @@ func (i *instance) reloadWorker() (int, error) {
758759
}
759760

760761
func (i *instance) waitWorker(prevReloads int) error {
761-
out, err := socket.HAProxyProcs(i.conns.Master())
762+
out, err := socket.HAProxyProcs(i.options.StopCtx, i.conns.Master())
762763
for err == nil && out.Master.Reloads <= prevReloads {
763-
// continues to wait until we can see `reloads` greater than the previous one.
764-
time.Sleep(time.Second)
765-
i.logger.Info("reconnecting to master socket. current-reloads=%d prev-reloads=%d", out.Master.Reloads, prevReloads)
766-
out, err = socket.HAProxyProcs(i.conns.Master())
764+
// Continues to wait until we can see `Reloads` greater than the previous one.
765+
// This is needed on 2.6 and older, whose `reload` command runs async.
766+
select {
767+
case <-time.After(time.Second):
768+
case <-i.options.StopCtx.Done():
769+
return fmt.Errorf("received sigterm")
770+
}
771+
i.logger.Info("reconnecting master socket. current-reloads=%d prev-reloads=%d", out.Master.Reloads, prevReloads)
772+
out, err = socket.HAProxyProcs(i.options.StopCtx, i.conns.Master())
767773
}
768774
if err != nil {
769775
return fmt.Errorf("error reading procs from master socket: %w", err)

pkg/haproxy/socket/socket.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package socket
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"io"
@@ -33,19 +34,20 @@ import (
3334
)
3435

3536
// NewSocket ...
36-
func NewSocket(address string, keepalive bool) HAProxySocket {
37-
return newSocket(address, keepalive)
37+
func NewSocket(ctx context.Context, address string, keepalive bool) HAProxySocket {
38+
return newSocket(ctx, address, keepalive)
3839
}
3940

4041
// NewSocketConcurrent ...
41-
func NewSocketConcurrent(address string, keepalive bool) HAProxySocket {
42-
s := newSocket(address, keepalive)
42+
func NewSocketConcurrent(ctx context.Context, address string, keepalive bool) HAProxySocket {
43+
s := newSocket(ctx, address, keepalive)
4344
s.mutex = &sync.Mutex{}
4445
return s
4546
}
4647

47-
func newSocket(address string, keepalive bool) *sock {
48+
func newSocket(ctx context.Context, address string, keepalive bool) *sock {
4849
return &sock{
50+
ctx: ctx,
4951
address: address,
5052
listening: true,
5153
keepalive: keepalive,
@@ -63,6 +65,7 @@ type HAProxySocket interface {
6365
}
6466

6567
type sock struct {
68+
ctx context.Context
6669
mutex *sync.Mutex
6770
address string
6871
listening bool
@@ -200,7 +203,8 @@ func (s *sock) acquireConn() (net.Conn, error) {
200203
if !s.listening {
201204
return nil, fmt.Errorf("cannot connect to '%s': listening is down", s.address)
202205
}
203-
c, err := net.Dial("unix", s.address)
206+
var d net.Dialer
207+
c, err := d.DialContext(s.ctx, "unix", s.address)
204208
if err != nil {
205209
return nil, err
206210
}
@@ -244,17 +248,20 @@ type Proc struct {
244248
// instance. Waits for the reload to complete while master CLI is down and the
245249
// attempt to connect leads to a connection refused. Some context:
246250
// https://www.mail-archive.com/[email protected]/msg38415.html
247-
// The amount of time between attempts increases exponentially between 1ms and 64ms,
248-
// and arithmetically between 128ms and 1s in order to save CPU on long reload events
251+
// The amount of time between attempts increases exponentially between 1ms and 256ms,
252+
// and arithmetically between 256ms and 1s in order to save CPU on long reload events
249253
// and quit fast on the fastest ones. The whole processing time can be calculated by
250254
// the caller as the haproxy reload time.
251-
func HAProxyProcs(masterSocket HAProxySocket) (*ProcTable, error) {
252-
maxLogWait := 64 * time.Millisecond
255+
func HAProxyProcs(ctx context.Context, masterSocket HAProxySocket) (*ProcTable, error) {
256+
maxLogWait := 128 * time.Millisecond
253257
logFactor := 2
254258
maxArithWait := 1024 * time.Millisecond
255-
arithFactor := 32 * time.Millisecond
259+
arithFactor := 64 * time.Millisecond
256260
wait := time.Millisecond
257261
for {
262+
if err := ctx.Err(); err != nil {
263+
return nil, err
264+
}
258265
time.Sleep(wait)
259266
out, err := masterSocket.Send(nil, "show proc")
260267
if !waitHAProxy(masterSocket, err) {

pkg/haproxy/socket/socket_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package socket
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"reflect"
2223
"regexp"
@@ -129,13 +130,14 @@ func testSocket(t *testing.T, keepalive bool) {
129130
},
130131
},
131132
}
133+
ctx := context.Background()
132134
clientSocket := make([]HAProxySocket, len(testCases))
133135
clientSocketPos := make([]HAProxySocket, len(testCases))
134136
masterSocket := make([]HAProxySocket, len(testCases))
135137
for i, test := range testCases {
136-
clientSocket[i] = NewSocket(clisock, keepalive)
137-
clientSocketPos[i] = NewSocket(clisock, false)
138-
masterSocket[i] = NewSocket(mastersock, keepalive)
138+
clientSocket[i] = NewSocket(ctx, clisock, keepalive)
139+
clientSocketPos[i] = NewSocket(ctx, clisock, false)
140+
masterSocket[i] = NewSocket(ctx, mastersock, keepalive)
139141
time.Sleep(test.waitBefore)
140142
var sock, sockPos HAProxySocket
141143
if test.master {
@@ -406,14 +408,15 @@ func TestHAProxyProcs(t *testing.T) {
406408
},
407409
},
408410
}
411+
ctx := context.Background()
409412
for i, test := range testCases {
410413
c := setup(t)
411414
cli := &clientMock{
412415
cmdOutput: test.cmdOutput,
413416
cmdError: test.cmdError,
414417
hasSock: test.hasSock,
415418
}
416-
out, err := HAProxyProcs(cli)
419+
out, err := HAProxyProcs(ctx, cli)
417420
if !reflect.DeepEqual(out, test.expOutput) {
418421
t.Errorf("output differs on %d - expected: %+v, actual: %+v", i, test.expOutput, out)
419422
}
@@ -433,7 +436,7 @@ func TestHAProxyProcsLoop(t *testing.T) {
433436
// 0
434437
{
435438
reload: 0,
436-
minDelay: 1 * time.Millisecond,
439+
minDelay: 0 * time.Millisecond,
437440
maxCnt: 1,
438441
},
439442
// 1
@@ -444,19 +447,20 @@ func TestHAProxyProcsLoop(t *testing.T) {
444447
},
445448
// 2
446449
{
447-
reload: 450 * time.Millisecond,
448-
minDelay: (1 + 2 + 4 + 8 + 16 + 32 + 64 + 96 + 128 + 160) * time.Millisecond,
450+
reload: 650 * time.Millisecond,
451+
minDelay: (1 + 2 + 4 + 8 + 16 + 32 + 64 + 128 + 192 + 256) * time.Millisecond,
449452
maxCnt: 10,
450453
},
451454
}
455+
ctx := context.Background()
452456
for i, test := range testCases {
453457
c := setup(t)
454458
cli := &clientMock{
455459
cmdError: syscall.ECONNREFUSED,
456460
}
457461
time.AfterFunc(test.reload, func() { cli.cmdError = nil })
458462
start := time.Now()
459-
_, err := HAProxyProcs(cli)
463+
_, err := HAProxyProcs(ctx, cli)
460464
if err != nil {
461465
t.Errorf("%d should not return an error: %v", i, err)
462466
}

0 commit comments

Comments
 (0)