Skip to content

Commit df6ba4c

Browse files
authored
Start ReconnectClient connection on Connect (#60)
1 parent c1d1f13 commit df6ba4c

File tree

6 files changed

+147
-73
lines changed

6 files changed

+147
-73
lines changed

examples/mqtts-client-cert/main.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func main() {
5050

5151
println("Connecting to", host)
5252

53-
cli, err := mqtt.NewReconnectClient(ctx,
53+
cli, err := mqtt.NewReconnectClient(
5454
// Dialer to connect/reconnect to the server.
5555
mqtt.DialerFunc(func() (mqtt.ClientCloser, error) {
5656
cli, err := mqtt.Dial(
@@ -74,17 +74,6 @@ func main() {
7474
// mqtt.WithTLSConfig(tlsConfig),
7575
// },
7676
// },
77-
"sample", // Client ID
78-
mqtt.WithConnectOption(
79-
mqtt.WithKeepAlive(30),
80-
mqtt.WithWill(
81-
&mqtt.Message{
82-
Topic: "test",
83-
QoS: mqtt.QoS1,
84-
Payload: []byte("{\"message\": \"Bye\"}"),
85-
},
86-
),
87-
),
8877
mqtt.WithPingInterval(10*time.Second),
8978
mqtt.WithTimeout(5*time.Second),
9079
mqtt.WithReconnectWait(1*time.Second, 15*time.Second),
@@ -93,6 +82,21 @@ func main() {
9382
fmt.Printf("Error: %v\n", err)
9483
os.Exit(1)
9584
}
85+
_, err = cli.Connect(ctx,
86+
"sample", // Client ID
87+
mqtt.WithKeepAlive(30),
88+
mqtt.WithWill(
89+
&mqtt.Message{
90+
Topic: "test",
91+
QoS: mqtt.QoS1,
92+
Payload: []byte("{\"message\": \"Bye\"}"),
93+
},
94+
),
95+
)
96+
if err != nil {
97+
fmt.Printf("Error: %v\n", err)
98+
os.Exit(1)
99+
}
96100

97101
mux := &mqtt.ServeMux{}
98102
cli.Handle(mux) // Register muxer as a low level handler.

examples/wss-presign-url/main.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func main() {
3636

3737
println("Connecting to", host)
3838

39-
cli, err := mqtt.NewReconnectClient(ctx,
39+
cli, err := mqtt.NewReconnectClient(
4040
// Dialer to connect/reconnect to the server.
4141
mqtt.DialerFunc(func() (mqtt.ClientCloser, error) {
4242
// Presign URL here.
@@ -56,10 +56,6 @@ func main() {
5656
}
5757
return cli, nil
5858
}),
59-
"sample", // Client ID
60-
mqtt.WithConnectOption(
61-
mqtt.WithKeepAlive(30),
62-
),
6359
mqtt.WithPingInterval(10*time.Second),
6460
mqtt.WithTimeout(5*time.Second),
6561
mqtt.WithReconnectWait(1*time.Second, 15*time.Second),
@@ -68,6 +64,14 @@ func main() {
6864
fmt.Printf("Error: %v\n", err)
6965
os.Exit(1)
7066
}
67+
_, err = cli.Connect(ctx,
68+
"sample", // Client ID
69+
mqtt.WithKeepAlive(30),
70+
)
71+
if err != nil {
72+
fmt.Printf("Error: %v\n", err)
73+
os.Exit(1)
74+
}
7175

7276
mux := &mqtt.ServeMux{}
7377
cli.Handle(mux) // Register muxer as a low level handler.

paho/paho.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (c *pahoWrapper) connectRetry(opts []mqtt.ConnectOption) paho.Token {
9999
go func() {
100100
pingInterval := time.Duration(c.pahoConfig.KeepAlive) * time.Second
101101

102-
cli, err := mqtt.NewReconnectClient(context.Background(),
102+
cli, err := mqtt.NewReconnectClient(
103103
mqtt.DialerFunc(func() (mqtt.ClientCloser, error) {
104104
cb, err := mqtt.Dial(c.pahoConfig.Servers[0].String(),
105105
mqtt.WithTLSConfig(c.pahoConfig.TLSConfig),
@@ -124,15 +124,18 @@ func (c *pahoWrapper) connectRetry(opts []mqtt.ConnectOption) paho.Token {
124124
c.mu.Unlock()
125125
return cb, err
126126
}),
127-
c.pahoConfig.ClientID,
128-
mqtt.WithConnectOption(opts...),
129127
mqtt.WithPingInterval(pingInterval),
130128
mqtt.WithTimeout(c.pahoConfig.PingTimeout),
131129
mqtt.WithReconnectWait(
132130
time.Second, // c.pahoConfig.ConnectRetryInterval,
133131
10*time.Second, // c.pahoConfig.MaxReconnectInterval,
134132
),
135133
)
134+
if err != nil {
135+
token.err = err
136+
token.release()
137+
}
138+
_, err = cli.Connect(context.Background(), c.pahoConfig.ClientID, opts...)
136139
if err != nil {
137140
token.err = err
138141
token.release()

reconnclient.go

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,13 @@ import (
2222

2323
type reconnectClient struct {
2424
*RetryClient
25-
done chan struct{}
25+
done chan struct{}
26+
options *ReconnectOptions
27+
dialer Dialer
2628
}
2729

2830
// NewReconnectClient creates a MQTT client with re-connect/re-publish/re-subscribe features.
29-
func NewReconnectClient(ctx context.Context, dialer Dialer, clientID string, opts ...ReconnectOption) (Client, error) {
30-
rc := &RetryClient{}
31-
reconnCli := &reconnectClient{
32-
RetryClient: rc,
33-
done: make(chan struct{}),
34-
}
35-
31+
func NewReconnectClient(dialer Dialer, opts ...ReconnectOption) (Client, error) {
3632
options := &ReconnectOptions{
3733
ReconnectWaitBase: time.Second,
3834
ReconnectWaitMax: 10 * time.Second,
@@ -42,60 +38,71 @@ func NewReconnectClient(ctx context.Context, dialer Dialer, clientID string, opt
4238
return nil, err
4339
}
4440
}
41+
return &reconnectClient{
42+
RetryClient: &RetryClient{},
43+
done: make(chan struct{}),
44+
options: options,
45+
dialer: dialer,
46+
}, nil
47+
}
48+
49+
// Connect starts connection retry loop.
50+
func (c *reconnectClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (bool, error) {
4551
connOptions := &ConnectOptions{
4652
CleanSession: true,
4753
}
48-
for _, opt := range options.ConnectOptions {
54+
for _, opt := range opts {
4955
if err := opt(connOptions); err != nil {
50-
return nil, err
56+
return false, err
5157
}
5258
}
53-
if options.PingInterval == time.Duration(0) {
54-
options.PingInterval = time.Duration(connOptions.KeepAlive) * time.Second
59+
if c.options.PingInterval == time.Duration(0) {
60+
c.options.PingInterval = time.Duration(connOptions.KeepAlive) * time.Second
5561
}
56-
if options.Timeout == time.Duration(0) {
57-
options.Timeout = options.PingInterval
62+
if c.options.Timeout == time.Duration(0) {
63+
c.options.Timeout = c.options.PingInterval
5864
}
5965

6066
done := make(chan struct{})
6167
var doneOnce sync.Once
68+
var sessionPresent bool
6269
go func() {
6370
defer func() {
64-
close(reconnCli.done)
71+
close(c.done)
6572
}()
6673
clean := connOptions.CleanSession
67-
reconnWait := options.ReconnectWaitBase
74+
reconnWait := c.options.ReconnectWaitBase
6875
for {
69-
if c, err := dialer.Dial(); err == nil {
70-
optsCurr := append([]ConnectOption{}, options.ConnectOptions...)
76+
if baseCli, err := c.dialer.Dial(); err == nil {
77+
optsCurr := append([]ConnectOption{}, opts...)
7178
optsCurr = append(optsCurr, WithCleanSession(clean))
7279
clean = false // Clean only first time.
73-
rc.SetClient(ctx, c)
80+
c.RetryClient.SetClient(ctx, baseCli)
7481

75-
ctxTimeout, cancel := context.WithTimeout(ctx, options.Timeout)
76-
if present, err := rc.Connect(ctxTimeout, clientID, optsCurr...); err == nil {
82+
ctxTimeout, cancel := context.WithTimeout(ctx, c.options.Timeout)
83+
if sessionPresent, err := c.RetryClient.Connect(ctxTimeout, clientID, optsCurr...); err == nil {
7784
cancel()
7885

79-
if !present {
80-
rc.Resubscribe(ctx)
86+
if !sessionPresent {
87+
c.RetryClient.Resubscribe(ctx)
8188
}
82-
rc.Retry(ctx)
89+
c.RetryClient.Retry(ctx)
8390

84-
if options.PingInterval > time.Duration(0) {
91+
if c.options.PingInterval > time.Duration(0) {
8592
// Start keep alive.
8693
go func() {
8794
_ = KeepAlive(
88-
ctx, c,
89-
options.PingInterval,
90-
options.Timeout,
95+
ctx, baseCli,
96+
c.options.PingInterval,
97+
c.options.Timeout,
9198
)
9299
}()
93100
}
94-
reconnWait = options.ReconnectWaitBase // Reset reconnect wait.
101+
reconnWait = c.options.ReconnectWaitBase // Reset reconnect wait.
95102
doneOnce.Do(func() { close(done) })
96103
select {
97-
case <-c.Done():
98-
if err := c.Err(); err == nil {
104+
case <-baseCli.Done():
105+
if err := baseCli.Err(); err == nil {
99106
// Disconnected as expected; don't restart.
100107
return
101108
}
@@ -113,19 +120,20 @@ func NewReconnectClient(ctx context.Context, dialer Dialer, clientID string, opt
113120
return
114121
}
115122
reconnWait *= 2
116-
if reconnWait > options.ReconnectWaitMax {
117-
reconnWait = options.ReconnectWaitMax
123+
if reconnWait > c.options.ReconnectWaitMax {
124+
reconnWait = c.options.ReconnectWaitMax
118125
}
119126
}
120127
}()
121128
select {
122129
case <-done:
123130
case <-ctx.Done():
124-
return nil, ctx.Err()
131+
return false, ctx.Err()
125132
}
126-
return reconnCli, nil
133+
return sessionPresent, nil
127134
}
128135

136+
// Disconnect from the broker.
129137
func (c *reconnectClient) Disconnect(ctx context.Context) error {
130138
err := c.RetryClient.Disconnect(ctx)
131139
select {
@@ -148,14 +156,6 @@ type ReconnectOptions struct {
148156
// ReconnectOption sets option for Connect.
149157
type ReconnectOption func(*ReconnectOptions) error
150158

151-
// WithConnectOption sets ConnectOption to ReconnectClient.
152-
func WithConnectOption(connOpts ...ConnectOption) ReconnectOption {
153-
return func(o *ReconnectOptions) error {
154-
o.ConnectOptions = connOpts
155-
return nil
156-
}
157-
}
158-
159159
// WithTimeout sets timeout duration of server response.
160160
// Default value is PingInterval.
161161
func WithTimeout(timeout time.Duration) ReconnectOption {

reconnclient_integration_test.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,28 @@ func TestIntegration_ReconnectClient(t *testing.T) {
3535

3636
chReceived := make(chan *Message, 100)
3737
cli, err := NewReconnectClient(
38-
ctx,
3938
&URLDialer{
4039
URL: url,
4140
Options: []DialOption{
4241
WithTLSConfig(&tls.Config{InsecureSkipVerify: true}),
4342
},
4443
},
45-
"ReconnectClient"+name,
46-
WithConnectOption(
47-
WithKeepAlive(10),
48-
WithCleanSession(true),
49-
),
5044
WithPingInterval(time.Second),
5145
WithTimeout(time.Second),
5246
WithReconnectWait(time.Second, 10*time.Second),
5347
)
5448
if err != nil {
5549
t.Fatalf("Unexpected error: '%v'", err)
5650
}
51+
_, err = cli.Connect(
52+
ctx,
53+
"ReconnectClient"+name,
54+
WithKeepAlive(10),
55+
WithCleanSession(true),
56+
)
57+
if err != nil {
58+
t.Fatalf("Unexpected error: '%v'", err)
59+
}
5760
cli.Handle(HandlerFunc(func(msg *Message) {
5861
chReceived <- msg
5962
}))
@@ -148,7 +151,6 @@ func TestIntegration_ReconnectClient_Resubscribe(t *testing.T) {
148151

149152
chReceived := make(chan *Message, 100)
150153
cli, err := NewReconnectClient(
151-
ctx,
152154
DialerFunc(func() (ClientCloser, error) {
153155
cli, err := Dial(url,
154156
WithTLSConfig(&tls.Config{InsecureSkipVerify: true}),
@@ -165,14 +167,20 @@ func TestIntegration_ReconnectClient_Resubscribe(t *testing.T) {
165167
cli.Transport = cb
166168
return cli, nil
167169
}),
168-
"ReconnectClient"+name+pktName,
169170
WithPingInterval(250*time.Millisecond),
170171
WithTimeout(250*time.Millisecond),
171172
WithReconnectWait(200*time.Millisecond, time.Second),
172173
)
173174
if err != nil {
174175
t.Fatalf("Unexpected error: '%v'", err)
175176
}
177+
_, err = cli.Connect(
178+
ctx,
179+
"ReconnectClient"+name+pktName,
180+
)
181+
if err != nil {
182+
t.Fatalf("Unexpected error: '%v'", err)
183+
}
176184
cli.Handle(HandlerFunc(func(msg *Message) {
177185
chReceived <- msg
178186
}))
@@ -260,7 +268,6 @@ func TestIntegration_ReconnectClient_Retry(t *testing.T) {
260268
chConnected := make(chan struct{}, 1)
261269

262270
cli, err := NewReconnectClient(
263-
ctx,
264271
DialerFunc(func() (ClientCloser, error) {
265272
cli, err := Dial(url,
266273
WithTLSConfig(&tls.Config{InsecureSkipVerify: true}),
@@ -281,14 +288,20 @@ func TestIntegration_ReconnectClient_Retry(t *testing.T) {
281288
}
282289
return cli, nil
283290
}),
284-
"RetryClient"+name,
285291
WithPingInterval(250*time.Millisecond),
286292
WithTimeout(250*time.Millisecond),
287293
WithReconnectWait(200*time.Millisecond, time.Second),
288294
)
289295
if err != nil {
290296
t.Fatalf("Unexpected error: '%v'", err)
291297
}
298+
cli.Connect(
299+
ctx,
300+
"RetryClient"+name,
301+
)
302+
if err != nil {
303+
t.Fatalf("Unexpected error: '%v'", err)
304+
}
292305

293306
select {
294307
case <-ctx.Done():

0 commit comments

Comments
 (0)