Skip to content

Commit 7574f10

Browse files
edwardmackarijitAD
andauthored
feat(rpc/subscription): implement state_unsubscribeStorage (#1574)
* implement state_unsubscribeStorage * add value checks, add tests * handle string parameter, add tests, use const for error messages * parse to uint * update type * update variable names (based on comments) Co-authored-by: Arijit Das <[email protected]>
1 parent 24e1516 commit 7574f10

File tree

3 files changed

+127
-6
lines changed

3 files changed

+127
-6
lines changed

dot/rpc/subscription/messages.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ type Params struct {
2828
SubscriptionID uint `json:"subscription"`
2929
}
3030

31+
// InvalidRequestCode error code returned for invalid request parameters, value derived from Substrate node output
32+
const InvalidRequestCode = -32600
33+
34+
// InvalidRequestMessage error message for invalid request parameters
35+
const InvalidRequestMessage = "Invalid request"
36+
3137
func newSubcriptionBaseResponseJSON() BaseResponseJSON {
3238
return BaseResponseJSON{
3339
Jsonrpc: "2.0",
@@ -52,10 +58,26 @@ type ResponseJSON struct {
5258
ID float64 `json:"id"`
5359
}
5460

55-
func newSubscriptionResponseJSON(subID uint, reqID float64) ResponseJSON {
61+
// NewSubscriptionResponseJSON builds a Response JSON object
62+
func NewSubscriptionResponseJSON(subID uint, reqID float64) ResponseJSON {
5663
return ResponseJSON{
5764
Jsonrpc: "2.0",
5865
Result: subID,
5966
ID: reqID,
6067
}
6168
}
69+
70+
// BooleanResponse for responses that return boolean values
71+
type BooleanResponse struct {
72+
JSONRPC string `json:"jsonrpc"`
73+
Result bool `json:"result"`
74+
ID float64 `json:"id"`
75+
}
76+
77+
func newBooleanResponseJSON(value bool, reqID float64) BooleanResponse {
78+
return BooleanResponse{
79+
JSONRPC: "2.0",
80+
Result: value,
81+
ID: reqID,
82+
}
83+
}

dot/rpc/subscription/websocket.go

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ import (
2323
"io/ioutil"
2424
"math/big"
2525
"net/http"
26+
"strconv"
2627
"strings"
2728
"sync"
2829

2930
"github.com/ChainSafe/gossamer/dot/rpc/modules"
31+
"github.com/ChainSafe/gossamer/dot/state"
3032
"github.com/ChainSafe/gossamer/dot/types"
3133
"github.com/ChainSafe/gossamer/lib/common"
3234
log "github.com/ChainSafe/log15"
@@ -106,6 +108,9 @@ func (c *WSConn) HandleComm() {
106108
continue
107109
}
108110
c.startListener(rvl)
111+
case "state_unsubscribeStorage":
112+
c.unsubscribeStorageListener(reqid, params)
113+
109114
}
110115
continue
111116
}
@@ -206,12 +211,51 @@ func (c *WSConn) initStorageChangeListener(reqID float64, params interface{}) (u
206211

207212
c.Subscriptions[myObs.id] = myObs
208213

209-
initRes := newSubscriptionResponseJSON(myObs.id, reqID)
214+
initRes := NewSubscriptionResponseJSON(myObs.id, reqID)
210215
c.safeSend(initRes)
211216

212217
return myObs.id, nil
213218
}
214219

220+
func (c *WSConn) unsubscribeStorageListener(reqID float64, params interface{}) {
221+
switch v := params.(type) {
222+
case []interface{}:
223+
if len(v) == 0 {
224+
c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage)
225+
return
226+
}
227+
default:
228+
c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage)
229+
return
230+
}
231+
232+
var id uint
233+
switch v := params.([]interface{})[0].(type) {
234+
case float64:
235+
id = uint(v)
236+
case string:
237+
i, err := strconv.ParseUint(v, 10, 32)
238+
if err != nil {
239+
c.safeSend(newBooleanResponseJSON(false, reqID))
240+
return
241+
}
242+
id = uint(i)
243+
default:
244+
c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage)
245+
return
246+
}
247+
248+
observer, ok := c.Subscriptions[id].(state.Observer)
249+
if !ok {
250+
initRes := newBooleanResponseJSON(false, reqID)
251+
c.safeSend(initRes)
252+
return
253+
}
254+
255+
c.StorageAPI.UnregisterStorageObserver(observer)
256+
c.safeSend(newBooleanResponseJSON(true, reqID))
257+
}
258+
215259
func (c *WSConn) initBlockListener(reqID float64) (uint, error) {
216260
bl := &BlockListener{
217261
Channel: make(chan *types.Block),
@@ -231,7 +275,7 @@ func (c *WSConn) initBlockListener(reqID float64) (uint, error) {
231275
bl.subID = c.qtyListeners
232276
c.Subscriptions[bl.subID] = bl
233277
c.BlockSubChannels[bl.subID] = chanID
234-
initRes := newSubscriptionResponseJSON(bl.subID, reqID)
278+
initRes := NewSubscriptionResponseJSON(bl.subID, reqID)
235279
c.safeSend(initRes)
236280

237281
return bl.subID, nil
@@ -256,7 +300,7 @@ func (c *WSConn) initBlockFinalizedListener(reqID float64) (uint, error) {
256300
bfl.subID = c.qtyListeners
257301
c.Subscriptions[bfl.subID] = bfl
258302
c.BlockSubChannels[bfl.subID] = chanID
259-
initRes := newSubscriptionResponseJSON(bfl.subID, reqID)
303+
initRes := NewSubscriptionResponseJSON(bfl.subID, reqID)
260304
c.safeSend(initRes)
261305

262306
return bfl.subID, nil
@@ -299,7 +343,7 @@ func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (uint, er
299343
if err != nil {
300344
return 0, err
301345
}
302-
c.safeSend(newSubscriptionResponseJSON(esl.subID, reqID))
346+
c.safeSend(NewSubscriptionResponseJSON(esl.subID, reqID))
303347

304348
// TODO (ed) since HandleSubmittedExtrinsic has been called we assume the extrinsic is in the tx queue
305349
// should we add a channel to tx queue so we're notified when it's in the queue (See issue #1535)
@@ -322,7 +366,7 @@ func (c *WSConn) initRuntimeVersionListener(reqID float64) (uint, error) {
322366
c.qtyListeners++
323367
rvl.subID = c.qtyListeners
324368
c.Subscriptions[rvl.subID] = rvl
325-
initRes := newSubscriptionResponseJSON(rvl.subID, reqID)
369+
initRes := NewSubscriptionResponseJSON(rvl.subID, reqID)
326370
c.safeSend(initRes)
327371

328372
return rvl.subID, nil

dot/rpc/subscription/websocket_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,61 @@ func TestWSConn_HandleComm(t *testing.T) {
113113
require.NoError(t, err)
114114
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":4,"id":7}`+"\n"), msg)
115115

116+
// test state_unsubscribeStorage
117+
c.WriteMessage(websocket.TextMessage, []byte(`{
118+
"jsonrpc": "2.0",
119+
"method": "state_unsubscribeStorage",
120+
"params": "foo",
121+
"id": 7}`))
122+
_, msg, err = c.ReadMessage()
123+
require.NoError(t, err)
124+
require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":7}`+"\n"), msg)
125+
126+
c.WriteMessage(websocket.TextMessage, []byte(`{
127+
"jsonrpc": "2.0",
128+
"method": "state_unsubscribeStorage",
129+
"params": [],
130+
"id": 7}`))
131+
_, msg, err = c.ReadMessage()
132+
require.NoError(t, err)
133+
require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":7}`+"\n"), msg)
134+
135+
c.WriteMessage(websocket.TextMessage, []byte(`{
136+
"jsonrpc": "2.0",
137+
"method": "state_unsubscribeStorage",
138+
"params": ["6"],
139+
"id": 7}`))
140+
_, msg, err = c.ReadMessage()
141+
require.NoError(t, err)
142+
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":false,"id":7}`+"\n"), msg)
143+
144+
c.WriteMessage(websocket.TextMessage, []byte(`{
145+
"jsonrpc": "2.0",
146+
"method": "state_unsubscribeStorage",
147+
"params": ["4"],
148+
"id": 7}`))
149+
_, msg, err = c.ReadMessage()
150+
require.NoError(t, err)
151+
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":true,"id":7}`+"\n"), msg)
152+
153+
c.WriteMessage(websocket.TextMessage, []byte(`{
154+
"jsonrpc": "2.0",
155+
"method": "state_unsubscribeStorage",
156+
"params": [6],
157+
"id": 7}`))
158+
_, msg, err = c.ReadMessage()
159+
require.NoError(t, err)
160+
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":false,"id":7}`+"\n"), msg)
161+
162+
c.WriteMessage(websocket.TextMessage, []byte(`{
163+
"jsonrpc": "2.0",
164+
"method": "state_unsubscribeStorage",
165+
"params": [4],
166+
"id": 7}`))
167+
_, msg, err = c.ReadMessage()
168+
require.NoError(t, err)
169+
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":true,"id":7}`+"\n"), msg)
170+
116171
// test initBlockListener
117172
res, err = wsconn.initBlockListener(1)
118173
require.EqualError(t, err, "error BlockAPI not set")

0 commit comments

Comments
 (0)