Skip to content

Commit 261fcbd

Browse files
authored
Support order option in paho wrapper (#88)
1 parent 4374428 commit 261fcbd

File tree

2 files changed

+66
-54
lines changed

2 files changed

+66
-54
lines changed

paho/paho.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,11 @@ func (c *pahoWrapper) connectWithRetry(opts []mqtt.ConnectOption) paho.Token {
158158
token.release()
159159
return
160160
}
161-
cli.Handle(c.serveMux)
161+
if c.pahoConfig.Order {
162+
cli.Handle(c.serveMux)
163+
} else {
164+
cli.Handle(&mqtt.ServeAsync{Handler: c.serveMux})
165+
}
162166
c.mu.Lock()
163167
c.cli = cli
164168
c.mu.Unlock()
@@ -197,7 +201,11 @@ func (c *pahoWrapper) connectOnce(opts []mqtt.ConnectOption) paho.Token {
197201
}
198202
}
199203
}
200-
cli.Handle(c.serveMux)
204+
if c.pahoConfig.Order {
205+
cli.Handle(c.serveMux)
206+
} else {
207+
cli.Handle(&mqtt.ServeAsync{Handler: c.serveMux})
208+
}
201209
c.mu.Lock()
202210
c.cli = cli
203211
c.cliCloser = cli

paho/paho_integration_test.go

Lines changed: 56 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -28,64 +28,68 @@ import (
2828
)
2929

3030
func TestIntegration_PublishSubscribe(t *testing.T) {
31-
for name, recon := range map[string]bool{"Reconnect": true, "NoReconnect": false} {
32-
t.Run(name, func(t *testing.T) {
33-
opts := paho.NewClientOptions()
34-
server, err := url.Parse("mqtt://localhost:1883")
35-
if err != nil {
36-
t.Fatalf("Unexpected error: '%v'", err)
37-
}
38-
opts.Servers = []*url.URL{server}
39-
opts.AutoReconnect = recon
40-
opts.ClientID = "PahoWrapper" + name
41-
opts.KeepAlive = 0
31+
for name0, recon := range map[string]bool{"Reconnect": true, "NoReconnect": false} {
32+
for name1, order := range map[string]bool{"Sync": true, "Async": false} {
33+
name := name0 + name1
34+
t.Run(name, func(t *testing.T) {
35+
opts := paho.NewClientOptions()
36+
server, err := url.Parse("mqtt://localhost:1883")
37+
if err != nil {
38+
t.Fatalf("Unexpected error: '%v'", err)
39+
}
40+
opts.Servers = []*url.URL{server}
41+
opts.AutoReconnect = recon
42+
opts.ClientID = "PahoWrapper" + name
43+
opts.KeepAlive = 0
44+
opts.Order = order
45+
46+
cli := NewClient(opts)
47+
token := cli.Connect()
48+
if !token.WaitTimeout(5 * time.Second) {
49+
t.Fatal("Connect timeout")
50+
}
4251

43-
cli := NewClient(opts)
44-
token := cli.Connect()
45-
if !token.WaitTimeout(5 * time.Second) {
46-
t.Fatal("Connect timeout")
47-
}
52+
msg := make(chan paho.Message, 100)
53+
token = cli.Subscribe("paho"+name, 1, func(c paho.Client, m paho.Message) {
54+
msg <- m
55+
})
56+
if !token.WaitTimeout(5 * time.Second) {
57+
t.Fatal("Subscribe timeout")
58+
}
59+
token = cli.Publish("paho"+name, 1, false, []byte{0x12})
60+
if !token.WaitTimeout(5 * time.Second) {
61+
t.Fatal("Publish timeout")
62+
}
4863

49-
msg := make(chan paho.Message, 100)
50-
token = cli.Subscribe("paho"+name, 1, func(c paho.Client, m paho.Message) {
51-
msg <- m
52-
})
53-
if !token.WaitTimeout(5 * time.Second) {
54-
t.Fatal("Subscribe timeout")
55-
}
56-
token = cli.Publish("paho"+name, 1, false, []byte{0x12})
57-
if !token.WaitTimeout(5 * time.Second) {
58-
t.Fatal("Publish timeout")
59-
}
64+
if !cli.IsConnected() {
65+
t.Error("Not connected")
66+
}
67+
if !cli.IsConnectionOpen() {
68+
t.Error("Not connection open")
69+
}
6070

61-
if !cli.IsConnected() {
62-
t.Error("Not connected")
63-
}
64-
if !cli.IsConnectionOpen() {
65-
t.Error("Not connection open")
66-
}
71+
select {
72+
case m := <-msg:
73+
if m.Topic() != "paho"+name {
74+
t.Errorf("Expected topic: 'topic%s', got: %s", name, m.Topic())
75+
}
76+
if !bytes.Equal(m.Payload(), []byte{0x12}) {
77+
t.Errorf("Expected payload: [18], got: %v", m.Payload())
78+
}
79+
case <-time.After(5 * time.Second):
80+
t.Errorf("Message timeout")
81+
}
82+
cli.Disconnect(10)
83+
time.Sleep(time.Second)
6784

68-
select {
69-
case m := <-msg:
70-
if m.Topic() != "paho"+name {
71-
t.Errorf("Expected topic: 'topic%s', got: %s", name, m.Topic())
85+
if cli.IsConnected() {
86+
t.Error("Connected after disconnect")
7287
}
73-
if !bytes.Equal(m.Payload(), []byte{0x12}) {
74-
t.Errorf("Expected payload: [18], got: %v", m.Payload())
88+
if cli.IsConnectionOpen() {
89+
t.Error("Connection open after disconnect")
7590
}
76-
case <-time.After(5 * time.Second):
77-
t.Errorf("Message timeout")
78-
}
79-
cli.Disconnect(10)
80-
time.Sleep(time.Second)
81-
82-
if cli.IsConnected() {
83-
t.Error("Connected after disconnect")
84-
}
85-
if cli.IsConnectionOpen() {
86-
t.Error("Connection open after disconnect")
87-
}
88-
})
91+
})
92+
}
8993
}
9094
}
9195

0 commit comments

Comments
 (0)