@@ -22,12 +22,21 @@ import (
22
22
"github.com/gorilla/websocket"
23
23
)
24
24
25
+ type websocketMessage struct {
26
+ ID float64 `json:"id"`
27
+ Method string `json:"method"`
28
+ Params any `json:"params"`
29
+ }
30
+
25
31
type httpclient interface {
26
32
Do (* http.Request ) (* http.Response , error )
27
33
}
28
34
29
- var errCannotReadFromWebsocket = errors .New ("cannot read message from websocket" )
30
- var errCannotUnmarshalMessage = errors .New ("cannot unmarshal webasocket message data" )
35
+ var (
36
+ errCannotReadFromWebsocket = errors .New ("cannot read message from websocket" )
37
+ errEmptyMethod = errors .New ("empty method" )
38
+ )
39
+
31
40
var logger = log .NewFromGlobal (log .AddContext ("pkg" , "rpc/subscription" ))
32
41
33
42
// WSConn struct to hold WebSocket Connection references
@@ -46,87 +55,82 @@ type WSConn struct {
46
55
}
47
56
48
57
// readWebsocketMessage will read and parse the message data to a string->interface{} data
49
- func (c * WSConn ) readWebsocketMessage () ([]byte , map [ string ] interface {}, error ) {
50
- _ , mbytes , err : = c .Wsconn .ReadMessage ()
58
+ func (c * WSConn ) readWebsocketMessage () (rawBytes []byte , wsMessage * websocketMessage , err error ) {
59
+ _ , rawBytes , err = c .Wsconn .ReadMessage ()
51
60
if err != nil {
52
- logger .Debugf ("websocket failed to read message: %s" , err )
53
- return nil , nil , errCannotReadFromWebsocket
61
+ return nil , nil , fmt .Errorf ("%w: %s" , errCannotReadFromWebsocket , err .Error ())
54
62
}
55
63
56
- logger .Tracef ("websocket message received: %s" , string (mbytes ))
57
-
58
- // determine if request is for subscribe method type
59
- var msg map [string ]interface {}
60
- err = json .Unmarshal (mbytes , & msg )
61
-
64
+ wsMessage = new (websocketMessage )
65
+ err = json .Unmarshal (rawBytes , wsMessage )
62
66
if err != nil {
63
- logger .Debugf ("websocket failed to unmarshal request message: %s" , err )
64
- return nil , nil , errCannotUnmarshalMessage
67
+ return nil , nil , err
65
68
}
66
69
67
- return mbytes , msg , nil
70
+ if wsMessage .Method == "" {
71
+ return nil , nil , errEmptyMethod
72
+ }
73
+
74
+ return rawBytes , wsMessage , nil
68
75
}
69
76
70
- //HandleComm handles messages received on websocket connections
71
- func (c * WSConn ) HandleComm () {
77
+ // HandleConn handles messages received on websocket connections
78
+ func (c * WSConn ) HandleConn () {
72
79
for {
73
- mbytes , msg , err := c .readWebsocketMessage ()
74
- if errors .Is (err , errCannotReadFromWebsocket ) {
75
- return
76
- }
80
+ rawBytes , wsMessage , err := c .readWebsocketMessage ()
81
+ if err != nil {
82
+ logger .Debugf ("websocket failed to read message: %s" , err )
83
+ if errors .Is (err , errCannotReadFromWebsocket ) {
84
+ return
85
+ }
77
86
78
- if errors .Is (err , errCannotUnmarshalMessage ) {
79
87
c .safeSendError (0 , big .NewInt (InvalidRequestCode ), InvalidRequestMessage )
80
88
continue
81
89
}
82
90
83
- params := msg ["params" ]
84
- reqid := msg ["id" ].(float64 )
85
- method := msg ["method" ].(string )
86
-
87
- logger .Debugf ("ws method %s called with params %v" , method , params )
91
+ logger .Tracef ("websocket message received: %s" , string (rawBytes ))
92
+ logger .Debugf ("ws method %s called with params %v" , wsMessage .Method , wsMessage .Params )
88
93
89
- if ! strings .Contains (method , "_unsubscribe" ) && ! strings .Contains (method , "_unwatch" ) {
90
- setupListener := c .getSetupListener (method )
94
+ if ! strings .Contains (wsMessage . Method , "_unsubscribe" ) && ! strings .Contains (wsMessage . Method , "_unwatch" ) {
95
+ setupListener := c .getSetupListener (wsMessage . Method )
91
96
92
97
if setupListener == nil {
93
- c .executeRPCCall (mbytes )
98
+ c .executeRPCCall (rawBytes )
94
99
continue
95
100
}
96
101
97
- listener , err := setupListener (reqid , params )
102
+ listener , err := setupListener (wsMessage . ID , wsMessage . Params )
98
103
if err != nil {
99
- logger .Warnf ("failed to create listener (method=%s): %s" , method , err )
104
+ logger .Warnf ("failed to create listener (method=%s): %s" , wsMessage . Method , err )
100
105
continue
101
106
}
102
107
103
108
listener .Listen ()
104
109
continue
105
110
}
106
111
107
- listener , err := c .getUnsubListener (params )
108
-
112
+ listener , err := c .getUnsubListener (wsMessage .Params )
109
113
if err != nil {
110
- logger .Warnf ("failed to get unsubscriber (method=%s): %s" , method , err )
114
+ logger .Warnf ("failed to get unsubscriber (method=%s): %s" , wsMessage . Method , err )
111
115
112
116
if errors .Is (err , errUknownParamSubscribeID ) || errors .Is (err , errCannotFindUnsubsriber ) {
113
- c .safeSendError (reqid , big .NewInt (InvalidRequestCode ), InvalidRequestMessage )
117
+ c .safeSendError (wsMessage . ID , big .NewInt (InvalidRequestCode ), InvalidRequestMessage )
114
118
continue
115
119
}
116
120
117
121
if errors .Is (err , errCannotParseID ) || errors .Is (err , errCannotFindListener ) {
118
- c .safeSend (newBooleanResponseJSON (false , reqid ))
122
+ c .safeSend (newBooleanResponseJSON (false , wsMessage . ID ))
119
123
continue
120
124
}
121
125
}
122
126
123
127
err = listener .Stop ()
124
128
if err != nil {
125
- logger .Warnf ("failed to stop listener goroutine (method=%s): %s" , method , err )
126
- c .safeSend (newBooleanResponseJSON (false , reqid ))
129
+ logger .Warnf ("failed to stop listener goroutine (method=%s): %s" , wsMessage . Method , err )
130
+ c .safeSend (newBooleanResponseJSON (false , wsMessage . ID ))
127
131
}
128
132
129
- c .safeSend (newBooleanResponseJSON (true , reqid ))
133
+ c .safeSend (newBooleanResponseJSON (true , wsMessage . ID ))
130
134
continue
131
135
}
132
136
}
0 commit comments