Skip to content

Commit 1525eb1

Browse files
authored
Fix RetryClient context (#97)
1 parent cf48e81 commit 1525eb1

File tree

2 files changed

+69
-8
lines changed

2 files changed

+69
-8
lines changed

retryclient.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type RetryClient struct {
3030
mu sync.Mutex
3131
muQueue sync.Mutex
3232
handler Handler
33-
chTask chan func(cli Client)
33+
chTask chan func(ctx context.Context, cli Client)
3434
}
3535

3636
// Handle registers the message handler.
@@ -46,23 +46,23 @@ func (c *RetryClient) Handle(handler Handler) {
4646
// Publish tries to publish the message and immediately return nil.
4747
// If it is not acknowledged to be published, the message will be queued.
4848
func (c *RetryClient) Publish(ctx context.Context, message *Message) error {
49-
return c.pushTask(ctx, func(cli Client) {
49+
return c.pushTask(ctx, func(ctx context.Context, cli Client) {
5050
c.publish(ctx, false, cli, message)
5151
})
5252
}
5353

5454
// Subscribe tries to subscribe the topic and immediately return nil.
5555
// If it is not acknowledged to be subscribed, the request will be queued.
5656
func (c *RetryClient) Subscribe(ctx context.Context, subs ...Subscription) error {
57-
return c.pushTask(ctx, func(cli Client) {
57+
return c.pushTask(ctx, func(ctx context.Context, cli Client) {
5858
c.subscribe(ctx, false, cli, subs...)
5959
})
6060
}
6161

6262
// Unsubscribe tries to unsubscribe the topic and immediately return nil.
6363
// If it is not acknowledged to be unsubscribed, the request will be queued.
6464
func (c *RetryClient) Unsubscribe(ctx context.Context, topics ...string) error {
65-
return c.pushTask(ctx, func(cli Client) {
65+
return c.pushTask(ctx, func(ctx context.Context, cli Client) {
6666
c.unsubscribe(ctx, false, cli, topics...)
6767
})
6868
}
@@ -148,7 +148,7 @@ func (c *RetryClient) removeEstablished(topics ...string) {
148148

149149
// Disconnect from the broker.
150150
func (c *RetryClient) Disconnect(ctx context.Context) error {
151-
err := c.pushTask(ctx, func(cli Client) {
151+
err := c.pushTask(ctx, func(ctx context.Context, cli Client) {
152152
cli.Disconnect(ctx)
153153
})
154154
close(c.chTask)
@@ -174,18 +174,19 @@ func (c *RetryClient) SetClient(ctx context.Context, cli Client) {
174174
return
175175
}
176176

177-
c.chTask = make(chan func(cli Client))
177+
c.chTask = make(chan func(ctx context.Context, cli Client))
178178
go func() {
179+
ctx := context.Background()
179180
for task := range c.chTask {
180181
c.mu.Lock()
181182
cli := c.cli
182183
c.mu.Unlock()
183-
task(cli)
184+
task(ctx, cli)
184185
}
185186
}()
186187
}
187188

188-
func (c *RetryClient) pushTask(ctx context.Context, task func(cli Client)) error {
189+
func (c *RetryClient) pushTask(ctx context.Context, task func(ctx context.Context, cli Client)) error {
189190
c.mu.Lock()
190191
chTask := c.chTask
191192
c.mu.Unlock()

retryclient_integration_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,65 @@ func TestIntegration_RetryClient(t *testing.T) {
5454
}
5555
})
5656
}
57+
}
58+
59+
func TestIntegration_RetryClient_Cancel(t *testing.T) {
60+
cliBase, err := Dial(urls["MQTT"], WithTLSConfig(&tls.Config{InsecureSkipVerify: true}))
61+
if err != nil {
62+
t.Fatalf("Unexpected error: '%v'", err)
63+
}
64+
65+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
66+
defer cancel()
67+
68+
cliRecv, err := Dial(
69+
urls["MQTT"],
70+
WithTLSConfig(&tls.Config{InsecureSkipVerify: true}),
71+
)
72+
if err != nil {
73+
t.Fatalf("Unexpected error: '%v'", err)
74+
}
75+
if _, err = cliRecv.Connect(ctx,
76+
"RetryClientCancelRecv",
77+
); err != nil {
78+
t.Fatalf("Unexpected error: '%v'", err)
79+
}
80+
chRecv := make(chan *Message)
81+
cliRecv.Handle(HandlerFunc(func(msg *Message) {
82+
chRecv <- msg
83+
}))
84+
if err := cliRecv.Subscribe(ctx, Subscription{Topic: "testCancel", QoS: QoS2}); err != nil {
85+
t.Fatalf("Unexpected error: '%v'", err)
86+
}
87+
88+
var cli RetryClient
89+
cli.SetClient(ctx, cliBase)
5790

91+
if _, err := cli.Connect(ctx, "RetryClientCancel"); err != nil {
92+
t.Fatalf("Unexpected error: '%v'", err)
93+
}
94+
95+
ctx2, cancel2 := context.WithCancel(ctx)
96+
if err := cli.Publish(ctx2, &Message{
97+
Topic: "testCancel",
98+
QoS: QoS2,
99+
Payload: []byte("message"),
100+
}); err != nil {
101+
t.Fatalf("Unexpected error: '%v'", err)
102+
}
103+
// Job must be already queued now.
104+
cancel2()
105+
106+
select {
107+
case <-chRecv:
108+
case <-time.After(time.Second):
109+
t.Error("Timeout")
110+
}
111+
112+
if err := cli.Disconnect(ctx); err != nil {
113+
t.Fatalf("Unexpected error: '%v'", err)
114+
}
115+
if err := cliRecv.Disconnect(ctx); err != nil {
116+
t.Fatalf("Unexpected error: '%v'", err)
117+
}
58118
}

0 commit comments

Comments
 (0)