@@ -13,6 +13,7 @@ func (c *BaseClient) serve() error {
13
13
close (c .connClosed )
14
14
}()
15
15
r := c .Transport
16
+ subBuffer := make (map [uint16 ]* Message )
16
17
for {
17
18
pktTypeBytes := make ([]byte , 1 )
18
19
if _ , err := io .ReadFull (r , pktTypeBytes ); err != nil {
@@ -45,18 +46,23 @@ func (c *BaseClient) serve() error {
45
46
}
46
47
case packetPublish :
47
48
publish := (& pktPublish {}).parse (pktFlag , contents )
48
- if c .Handler != nil {
49
- c .Handler .Serve (& publish .Message )
50
- }
51
49
switch publish .Message .QoS {
50
+ case QoS0 :
51
+ if c .Handler != nil {
52
+ c .Handler .Serve (& publish .Message )
53
+ }
52
54
case QoS1 :
55
+ // Ownership of the message is now transferred to the receiver.
53
56
pktPubAck := pack (
54
57
packetPubAck .b ()| packetFromClient .b (),
55
58
packUint16 (publish .Message .ID ),
56
59
)
57
60
if err := c .write (pktPubAck ); err != nil {
58
61
return err
59
62
}
63
+ if c .Handler != nil {
64
+ c .Handler .Serve (& publish .Message )
65
+ }
60
66
case QoS2 :
61
67
pktPubRec := pack (
62
68
packetPubRec .b ()| packetFromClient .b (),
@@ -65,6 +71,7 @@ func (c *BaseClient) serve() error {
65
71
if err := c .write (pktPubRec ); err != nil {
66
72
return err
67
73
}
74
+ subBuffer [publish .Message .ID ] = & publish .Message
68
75
}
69
76
case packetPubAck :
70
77
pubAck := (& pktPubAck {}).parse (pktFlag , contents )
@@ -84,6 +91,14 @@ func (c *BaseClient) serve() error {
84
91
}
85
92
case packetPubRel :
86
93
pubRel := (& pktPubRel {}).parse (pktFlag , contents )
94
+ if msg , ok := subBuffer [pubRel .ID ]; ok {
95
+ // Ownership of the message is now transferred to the receiver.
96
+ if c .Handler != nil {
97
+ c .Handler .Serve (msg )
98
+ }
99
+ delete (subBuffer , pubRel .ID )
100
+ }
101
+
87
102
pktPubComp := pack (
88
103
packetPubComp .b ()| packetFromClient .b (),
89
104
packUint16 (pubRel .ID ),
0 commit comments