Skip to content

Commit fda2662

Browse files
committed
reconnect websocket to whatsapp when it got closed
1 parent c310920 commit fda2662

File tree

1 file changed

+93
-17
lines changed

1 file changed

+93
-17
lines changed

conn.go

Lines changed: 93 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@ import (
66
"crypto/sha256"
77
"encoding/json"
88
"fmt"
9-
"github.com/Rhymen/go-whatsapp/binary"
10-
"github.com/Rhymen/go-whatsapp/crypto/cbc"
11-
"github.com/gorilla/websocket"
129
"math/rand"
1310
"net/http"
1411
"os"
1512
"strconv"
1613
"strings"
1714
"sync"
1815
"time"
16+
17+
"github.com/Rhymen/go-whatsapp/binary"
18+
"github.com/Rhymen/go-whatsapp/crypto/cbc"
19+
"github.com/gorilla/websocket"
1920
)
2021

2122
type metric byte
@@ -79,6 +80,7 @@ It holds all necessary information to make the package work internally.
7980
*/
8081
type Conn struct {
8182
wsConn *websocket.Conn
83+
wsConnMutex sync.RWMutex
8284
session *Session
8385
listener map[string]chan string
8486
listenerMutex sync.RWMutex
@@ -104,20 +106,9 @@ Creates a new connection with a given timeout. The websocket connection to the W
104106
The goroutine for handling incoming messages is started
105107
*/
106108
func NewConn(timeout time.Duration) (*Conn, error) {
107-
dialer := &websocket.Dialer{
108-
ReadBufferSize: 25 * 1024 * 1024,
109-
WriteBufferSize: 10 * 1024 * 1024,
110-
HandshakeTimeout: timeout,
111-
}
112-
113-
headers := http.Header{"Origin": []string{"https://web.whatsapp.com"}}
114-
wsConn, _, err := dialer.Dial("wss://w3.web.whatsapp.com/ws", headers)
115-
if err != nil {
116-
return nil, fmt.Errorf("couldn't dial whatsapp web websocket: %v", err)
117-
}
118-
119109
wac := &Conn{
120-
wsConn: wsConn,
110+
wsConn: nil, // will be set in connect()
111+
wsConnMutex: sync.RWMutex{},
121112
listener: make(map[string]chan string),
122113
listenerMutex: sync.RWMutex{},
123114
writeChan: make(chan wsMsg),
@@ -130,13 +121,77 @@ func NewConn(timeout time.Duration) (*Conn, error) {
130121
shortClientName: "go-whatsapp",
131122
}
132123

124+
if err := wac.connect(); err != nil {
125+
return nil, err
126+
}
127+
133128
go wac.readPump()
134129
go wac.writePump()
135130
go wac.keepAlive(20000, 90000)
136131

137132
return wac, nil
138133
}
139134

135+
func (wac *Conn) isConnected() bool {
136+
wac.wsConnMutex.RLock()
137+
defer wac.wsConnMutex.RUnlock()
138+
return wac.wsConn != nil
139+
}
140+
141+
// connect should be guarded with wsConnMutex
142+
func (wac *Conn) connect() error {
143+
dialer := &websocket.Dialer{
144+
ReadBufferSize: 25 * 1024 * 1024,
145+
WriteBufferSize: 10 * 1024 * 1024,
146+
HandshakeTimeout: wac.msgTimeout,
147+
}
148+
149+
headers := http.Header{"Origin": []string{"https://web.whatsapp.com"}}
150+
wsConn, _, err := dialer.Dial("wss://w3.web.whatsapp.com/ws", headers)
151+
if err != nil {
152+
return fmt.Errorf("couldn't dial whatsapp web websocket: %v", err)
153+
}
154+
155+
wsConn.SetCloseHandler(func(code int, text string) error {
156+
fmt.Fprintf(os.Stderr, "websocket connection closed(%d, %s)\n", code, text)
157+
158+
// from default CloseHandler
159+
message := websocket.FormatCloseMessage(code, "")
160+
wsConn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
161+
162+
// our close handling
163+
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
164+
fmt.Println("Trigger reconnect")
165+
go wac.reconnect()
166+
}
167+
return nil
168+
})
169+
170+
wac.wsConn = wsConn
171+
return nil
172+
}
173+
174+
// reconnect should be run as go routine
175+
func (wac *Conn) reconnect() {
176+
wac.wsConnMutex.Lock()
177+
wac.wsConn = nil
178+
wac.wsConnMutex.Unlock()
179+
180+
// wait up to 60 seconds and then reconnect. As writePump should send immediately, it might
181+
// reconnect as well. So we check its existance before reconnecting
182+
for !wac.isConnected() {
183+
time.Sleep(time.Duration(rand.Intn(60)) * time.Second)
184+
185+
wac.wsConnMutex.Lock()
186+
if wac.wsConn == nil {
187+
if err := wac.connect(); err != nil {
188+
fmt.Fprintf(os.Stderr, "could not reconnect to websocket: %v\n", err)
189+
}
190+
}
191+
wac.wsConnMutex.Unlock()
192+
}
193+
}
194+
140195
func (wac *Conn) write(data []interface{}) (<-chan string, error) {
141196
d, err := json.Marshal(data)
142197
if err != nil {
@@ -199,6 +254,9 @@ func (wac *Conn) readPump() {
199254
defer wac.wsConn.Close()
200255

201256
for {
257+
for !wac.isConnected() {
258+
time.Sleep(1 * time.Second)
259+
}
202260
msgType, msg, err := wac.wsConn.ReadMessage()
203261
if err != nil {
204262
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
@@ -249,8 +307,26 @@ func (wac *Conn) readPump() {
249307

250308
func (wac *Conn) writePump() {
251309
for msg := range wac.writeChan {
310+
for !wac.isConnected() {
311+
// reconnect to send the message ASAP
312+
wac.wsConnMutex.Lock()
313+
if wac.wsConn == nil {
314+
if err := wac.connect(); err != nil {
315+
fmt.Fprintf(os.Stderr, "could not reconnect to websocket: %v\n", err)
316+
}
317+
}
318+
wac.wsConnMutex.Unlock()
319+
if !wac.isConnected() {
320+
// reconnecting failed. Sleep for a while and try again afterwards
321+
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
322+
}
323+
}
252324
if err := wac.wsConn.WriteMessage(msg.messageType, msg.data); err != nil {
253-
fmt.Fprintf(os.Stderr, "error writing to socket: %v", err)
325+
fmt.Fprintf(os.Stderr, "error writing to socket: %v\n", err)
326+
// add message to channel again to no loose it
327+
go func() {
328+
wac.writeChan <- msg
329+
}()
254330
}
255331
}
256332
}

0 commit comments

Comments
 (0)