Skip to content

Commit 6dd9450

Browse files
authored
Merge pull request #1267 from jcmoraisjr/jm-reload-ctx
add context on socket calls
2 parents 070f756 + 17c955b commit 6dd9450

File tree

6 files changed

+55
-37
lines changed

6 files changed

+55
-37
lines changed

pkg/controller/reconciler/reconciler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ type rparam struct {
5151

5252
// Reconcile ...
5353
func (r *IngressReconciler) Reconcile(ctx context.Context, req rparam) (ctrl.Result, error) {
54+
if err := ctx.Err(); err != nil {
55+
return ctrl.Result{}, err
56+
}
5457
changed := r.watchers.getChangedObjects()
5558
changed.NeedFullSync = req.fullsync
5659
err := r.Services.ReconcileIngress(ctx, changed)

pkg/controller/services/services.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (s *Services) setup(ctx context.Context) error {
139139
ReloadStrategy: cfg.ReloadStrategy,
140140
MaxOldConfigFiles: cfg.MaxOldConfigFiles,
141141
SortEndpointsBy: cfg.SortEndpointsBy,
142-
StopCh: ctx.Done(),
142+
StopCtx: ctx,
143143
TrackInstances: cfg.TrackOldInstances,
144144
ValidateConfig: cfg.ValidateConfig,
145145
AcmeSigner: acmeSigner,

pkg/haproxy/connections.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,25 @@ limitations under the License.
1717
package haproxy
1818

1919
import (
20+
"context"
2021
"strings"
2122
"sync"
2223
"time"
2324

2425
"github.com/jcmoraisjr/haproxy-ingress/pkg/haproxy/socket"
2526
)
2627

27-
func newConnections(masterSock, adminSock string) *connections {
28+
func newConnections(ctx context.Context, masterSock, adminSock string) *connections {
2829
return &connections{
30+
ctx: ctx,
2931
mutex: sync.Mutex{},
3032
masterSock: masterSock,
3133
adminSock: adminSock,
3234
}
3335
}
3436

3537
type connections struct {
38+
ctx context.Context
3639
mutex sync.Mutex
3740
masterSock string
3841
adminSock string
@@ -47,7 +50,7 @@ func (c *connections) TrackCurrentInstance(timeoutStopDur, closeSessDur time.Dur
4750
c.mutex.Lock()
4851
defer c.mutex.Unlock()
4952
c.shrinkConns()
50-
sock := socket.NewSocketConcurrent(c.adminSock, true)
53+
sock := socket.NewSocketConcurrent(c.ctx, c.adminSock, true)
5154
if err := sock.Unlistening(); err != nil {
5255
return err
5356
}
@@ -132,14 +135,14 @@ func shutdownSessionsSync(sock socket.HAProxySocket, duration time.Duration) {
132135

133136
func (c *connections) Admin() socket.HAProxySocket {
134137
if c.admin == nil {
135-
c.admin = socket.NewSocket(c.adminSock, false)
138+
c.admin = socket.NewSocket(c.ctx, c.adminSock, false)
136139
}
137140
return c.admin
138141
}
139142

140143
func (c *connections) Master() socket.HAProxySocket {
141144
if c.master == nil {
142-
c.master = socket.NewSocket(c.masterSock, false)
145+
c.master = socket.NewSocket(c.ctx, c.masterSock, false)
143146
}
144147
return c.master
145148
}
@@ -148,14 +151,14 @@ func (c *connections) DynUpdate() socket.HAProxySocket {
148151
if c.dynUpdate == nil {
149152
// using a non persistent connection (keep alive false)
150153
// to ensure that the current instance will be used
151-
c.dynUpdate = socket.NewSocket(c.adminSock, false)
154+
c.dynUpdate = socket.NewSocket(c.ctx, c.adminSock, false)
152155
}
153156
return c.dynUpdate
154157
}
155158

156159
func (c *connections) IdleChk() socket.HAProxySocket {
157160
if c.idleChk == nil {
158-
c.idleChk = socket.NewSocket(c.adminSock, false)
161+
c.idleChk = socket.NewSocket(c.ctx, c.adminSock, false)
159162
}
160163
return c.idleChk
161164
}

pkg/haproxy/instance.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package haproxy
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"os"
2223
"os/exec"
@@ -59,7 +60,7 @@ type InstanceOptions struct {
5960
ReloadQueue *workqueue.WorkQueue[any]
6061
ReloadStrategy string
6162
SortEndpointsBy string
62-
StopCh <-chan struct{}
63+
StopCtx context.Context
6364
TrackInstances bool
6465
ValidateConfig bool
6566
// TODO Fake is used to skip real haproxy calls. Use a mock instead.
@@ -84,7 +85,7 @@ func CreateInstance(logger types.Logger, options InstanceOptions) Instance {
8485
waitProc: make(chan struct{}),
8586
logger: logger,
8687
options: &options,
87-
conns: newConnections(options.MasterSocket, options.AdminSocket),
88+
conns: newConnections(options.StopCtx, options.MasterSocket, options.AdminSocket),
8889
metrics: options.Metrics,
8990
//
9091
haproxyTmpl: template.CreateConfig(),
@@ -638,7 +639,7 @@ func (i *instance) reloadEmbeddedDaemon() error {
638639
func (i *instance) reloadEmbeddedMasterWorker() error {
639640
i.embdStart.Do(func() {
640641
go func() {
641-
wait.Until(i.startHAProxySync, 4*time.Second, i.options.StopCh)
642+
wait.UntilWithContext(i.options.StopCtx, i.startHAProxySync, 4*time.Second)
642643
close(i.waitProc)
643644
}()
644645
})
@@ -655,7 +656,7 @@ func (i *instance) reloadEmbeddedMasterWorker() error {
655656
return i.waitWorker(prevReloads)
656657
}
657658

658-
func (i *instance) startHAProxySync() {
659+
func (i *instance) startHAProxySync(ctx context.Context) {
659660
cmd := exec.Command(
660661
"haproxy",
661662
"-W",
@@ -682,7 +683,7 @@ func (i *instance) startHAProxySync() {
682683
close(wait)
683684
}()
684685
select {
685-
case <-i.options.StopCh:
686+
case <-ctx.Done():
686687
i.logger.Info("stopping haproxy master process (pid: %d)", cmd.Process.Pid)
687688
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
688689
i.logger.Error("error stopping haproxy process: %v", err)
@@ -716,14 +717,14 @@ func (i *instance) waitMaster() error {
716717
errCh := make(chan error)
717718
masterSock := i.conns.Master()
718719
go func() {
719-
_, err := socket.HAProxyProcs(masterSock)
720+
_, err := socket.HAProxyProcs(i.options.StopCtx, masterSock)
720721
errCh <- err
721722
}()
722723
for {
723724
select {
724725
case err := <-errCh:
725726
return err
726-
case <-i.options.StopCh:
727+
case <-i.options.StopCtx.Done():
727728
return fmt.Errorf("received sigterm")
728729
case <-time.After(10 * time.Second):
729730
i.logger.Info("... still waiting for the master socket '%s'", masterSock.Address())
@@ -737,7 +738,7 @@ func (i *instance) reloadWorker() (int, error) {
737738
i.logger.Warn("failed to persist servers state before worker reload: %w", err)
738739
}
739740
}
740-
procs, err := socket.HAProxyProcs(i.conns.Master())
741+
procs, err := socket.HAProxyProcs(i.options.StopCtx, i.conns.Master())
741742
if err != nil {
742743
return 0, fmt.Errorf("error reading haproxy procs: %w", err)
743744
}
@@ -756,17 +757,17 @@ func (i *instance) reloadWorker() (int, error) {
756757
}
757758

758759
func (i *instance) waitWorker(prevReloads int) error {
759-
out, err := socket.HAProxyProcs(i.conns.Master())
760+
out, err := socket.HAProxyProcs(i.options.StopCtx, i.conns.Master())
760761
for err == nil && out.Master.Reloads <= prevReloads {
761762
// Continues to wait until we can see `Reloads` greater than the previous one.
762763
// This is needed on 2.6 and older, whose `reload` command runs async.
763764
select {
764765
case <-time.After(time.Second):
765-
case <-i.options.StopCh:
766+
case <-i.options.StopCtx.Done():
766767
return fmt.Errorf("received sigterm")
767768
}
768769
i.logger.Info("reconnecting master socket. current-reloads=%d prev-reloads=%d", out.Master.Reloads, prevReloads)
769-
out, err = socket.HAProxyProcs(i.conns.Master())
770+
out, err = socket.HAProxyProcs(i.options.StopCtx, i.conns.Master())
770771
}
771772
if err != nil {
772773
return fmt.Errorf("error reading procs from master socket: %w", err)

pkg/haproxy/socket/socket.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package socket
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"io"
@@ -33,19 +34,20 @@ import (
3334
)
3435

3536
// NewSocket ...
36-
func NewSocket(address string, keepalive bool) HAProxySocket {
37-
return newSocket(address, keepalive)
37+
func NewSocket(ctx context.Context, address string, keepalive bool) HAProxySocket {
38+
return newSocket(ctx, address, keepalive)
3839
}
3940

4041
// NewSocketConcurrent ...
41-
func NewSocketConcurrent(address string, keepalive bool) HAProxySocket {
42-
s := newSocket(address, keepalive)
42+
func NewSocketConcurrent(ctx context.Context, address string, keepalive bool) HAProxySocket {
43+
s := newSocket(ctx, address, keepalive)
4344
s.mutex = &sync.Mutex{}
4445
return s
4546
}
4647

47-
func newSocket(address string, keepalive bool) *sock {
48+
func newSocket(ctx context.Context, address string, keepalive bool) *sock {
4849
return &sock{
50+
ctx: ctx,
4951
address: address,
5052
listening: true,
5153
keepalive: keepalive,
@@ -63,6 +65,7 @@ type HAProxySocket interface {
6365
}
6466

6567
type sock struct {
68+
ctx context.Context
6669
mutex *sync.Mutex
6770
address string
6871
listening bool
@@ -200,7 +203,8 @@ func (s *sock) acquireConn() (net.Conn, error) {
200203
if !s.listening {
201204
return nil, fmt.Errorf("cannot connect to '%s': listening is down", s.address)
202205
}
203-
c, err := net.Dial("unix", s.address)
206+
var d net.Dialer
207+
c, err := d.DialContext(s.ctx, "unix", s.address)
204208
if err != nil {
205209
return nil, err
206210
}
@@ -244,17 +248,20 @@ type Proc struct {
244248
// instance. Waits for the reload to complete while master CLI is down and the
245249
// attempt to connect leads to a connection refused. Some context:
246250
// https://www.mail-archive.com/[email protected]/msg38415.html
247-
// The amount of time between attempts increases exponentially between 1ms and 64ms,
248-
// and arithmetically between 128ms and 1s in order to save CPU on long reload events
251+
// The amount of time between attempts increases exponentially between 1ms and 256ms,
252+
// and arithmetically between 256ms and 1s in order to save CPU on long reload events
249253
// and quit fast on the fastest ones. The whole processing time can be calculated by
250254
// the caller as the haproxy reload time.
251-
func HAProxyProcs(masterSocket HAProxySocket) (*ProcTable, error) {
252-
maxLogWait := 64 * time.Millisecond
255+
func HAProxyProcs(ctx context.Context, masterSocket HAProxySocket) (*ProcTable, error) {
256+
maxLogWait := 128 * time.Millisecond
253257
logFactor := 2
254258
maxArithWait := 1024 * time.Millisecond
255-
arithFactor := 32 * time.Millisecond
259+
arithFactor := 64 * time.Millisecond
256260
wait := time.Millisecond
257261
for {
262+
if err := ctx.Err(); err != nil {
263+
return nil, err
264+
}
258265
time.Sleep(wait)
259266
out, err := masterSocket.Send(nil, "show proc")
260267
if !waitHAProxy(masterSocket, err) {

pkg/haproxy/socket/socket_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package socket
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"reflect"
2223
"regexp"
@@ -129,13 +130,14 @@ func testSocket(t *testing.T, keepalive bool) {
129130
},
130131
},
131132
}
133+
ctx := context.Background()
132134
clientSocket := make([]HAProxySocket, len(testCases))
133135
clientSocketPos := make([]HAProxySocket, len(testCases))
134136
masterSocket := make([]HAProxySocket, len(testCases))
135137
for i, test := range testCases {
136-
clientSocket[i] = NewSocket(clisock, keepalive)
137-
clientSocketPos[i] = NewSocket(clisock, false)
138-
masterSocket[i] = NewSocket(mastersock, keepalive)
138+
clientSocket[i] = NewSocket(ctx, clisock, keepalive)
139+
clientSocketPos[i] = NewSocket(ctx, clisock, false)
140+
masterSocket[i] = NewSocket(ctx, mastersock, keepalive)
139141
time.Sleep(test.waitBefore)
140142
var sock, sockPos HAProxySocket
141143
if test.master {
@@ -406,14 +408,15 @@ func TestHAProxyProcs(t *testing.T) {
406408
},
407409
},
408410
}
411+
ctx := context.Background()
409412
for i, test := range testCases {
410413
c := setup(t)
411414
cli := &clientMock{
412415
cmdOutput: test.cmdOutput,
413416
cmdError: test.cmdError,
414417
hasSock: test.hasSock,
415418
}
416-
out, err := HAProxyProcs(cli)
419+
out, err := HAProxyProcs(ctx, cli)
417420
if !reflect.DeepEqual(out, test.expOutput) {
418421
t.Errorf("output differs on %d - expected: %+v, actual: %+v", i, test.expOutput, out)
419422
}
@@ -433,7 +436,7 @@ func TestHAProxyProcsLoop(t *testing.T) {
433436
// 0
434437
{
435438
reload: 0,
436-
minDelay: 1 * time.Millisecond,
439+
minDelay: 0 * time.Millisecond,
437440
maxCnt: 1,
438441
},
439442
// 1
@@ -444,19 +447,20 @@ func TestHAProxyProcsLoop(t *testing.T) {
444447
},
445448
// 2
446449
{
447-
reload: 450 * time.Millisecond,
448-
minDelay: (1 + 2 + 4 + 8 + 16 + 32 + 64 + 96 + 128 + 160) * time.Millisecond,
450+
reload: 650 * time.Millisecond,
451+
minDelay: (1 + 2 + 4 + 8 + 16 + 32 + 64 + 128 + 192 + 256) * time.Millisecond,
449452
maxCnt: 10,
450453
},
451454
}
455+
ctx := context.Background()
452456
for i, test := range testCases {
453457
c := setup(t)
454458
cli := &clientMock{
455459
cmdError: syscall.ECONNREFUSED,
456460
}
457461
time.AfterFunc(test.reload, func() { cli.cmdError = nil })
458462
start := time.Now()
459-
_, err := HAProxyProcs(cli)
463+
_, err := HAProxyProcs(ctx, cli)
460464
if err != nil {
461465
t.Errorf("%d should not return an error: %v", i, err)
462466
}

0 commit comments

Comments
 (0)