Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc/subscription): implement state_unsubscribeStorage #1574

Merged
merged 13 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion dot/rpc/subscription/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ type Params struct {
SubscriptionID uint `json:"subscription"`
}

// InvalidRequestCode error code returned for invalid request parameters, value derived from Substrate node output
const InvalidRequestCode = -32600

// InvalidRequestMessage error message for invalid request parameters
const InvalidRequestMessage = "Invalid request"

func newSubcriptionBaseResponseJSON() BaseResponseJSON {
return BaseResponseJSON{
Jsonrpc: "2.0",
Expand All @@ -52,10 +58,26 @@ type ResponseJSON struct {
ID float64 `json:"id"`
}

func newSubscriptionResponseJSON(subID uint, reqID float64) ResponseJSON {
// NewSubscriptionResponseJSON builds a Response JSON object
func NewSubscriptionResponseJSON(subID uint, reqID float64) ResponseJSON {
return ResponseJSON{
Jsonrpc: "2.0",
Result: subID,
ID: reqID,
}
}

// BooleanResponse for responses that return boolean values
type BooleanResponse struct {
JSONRPC string `json:"jsonrpc"`
Result bool `json:"result"`
ID float64 `json:"id"`
}

func newBooleanResponseJSON(value bool, reqID float64) BooleanResponse {
return BooleanResponse{
JSONRPC: "2.0",
Result: value,
ID: reqID,
}
}
54 changes: 49 additions & 5 deletions dot/rpc/subscription/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"io/ioutil"
"math/big"
"net/http"
"strconv"
"strings"
"sync"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
log "github.com/ChainSafe/log15"
Expand Down Expand Up @@ -106,6 +108,9 @@ func (c *WSConn) HandleComm() {
continue
}
c.startListener(rvl)
case "state_unsubscribeStorage":
c.unsubscribeStorageListener(reqid, params)

}
continue
}
Expand Down Expand Up @@ -206,12 +211,51 @@ func (c *WSConn) initStorageChangeListener(reqID float64, params interface{}) (u

c.Subscriptions[myObs.id] = myObs

initRes := newSubscriptionResponseJSON(myObs.id, reqID)
initRes := NewSubscriptionResponseJSON(myObs.id, reqID)
c.safeSend(initRes)

return myObs.id, nil
}

func (c *WSConn) unsubscribeStorageListener(reqID float64, params interface{}) {
switch v := params.(type) {
case []interface{}:
if len(v) == 0 {
c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage)
return
}
default:
c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage)
return
}

var id uint
switch v := params.([]interface{})[0].(type) {
case float64:
id = uint(v)
case string:
i, err := strconv.ParseUint(v, 10, 32)
if err != nil {
c.safeSend(newBooleanResponseJSON(false, reqID))
return
}
id = uint(i)
default:
c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage)
return
}

observer, ok := c.Subscriptions[id].(state.Observer)
if !ok {
initRes := newBooleanResponseJSON(false, reqID)
c.safeSend(initRes)
return
}

c.StorageAPI.UnregisterStorageObserver(observer)
c.safeSend(newBooleanResponseJSON(true, reqID))
}

func (c *WSConn) initBlockListener(reqID float64) (uint, error) {
bl := &BlockListener{
Channel: make(chan *types.Block),
Expand All @@ -231,7 +275,7 @@ func (c *WSConn) initBlockListener(reqID float64) (uint, error) {
bl.subID = c.qtyListeners
c.Subscriptions[bl.subID] = bl
c.BlockSubChannels[bl.subID] = chanID
initRes := newSubscriptionResponseJSON(bl.subID, reqID)
initRes := NewSubscriptionResponseJSON(bl.subID, reqID)
c.safeSend(initRes)

return bl.subID, nil
Expand All @@ -256,7 +300,7 @@ func (c *WSConn) initBlockFinalizedListener(reqID float64) (uint, error) {
bfl.subID = c.qtyListeners
c.Subscriptions[bfl.subID] = bfl
c.BlockSubChannels[bfl.subID] = chanID
initRes := newSubscriptionResponseJSON(bfl.subID, reqID)
initRes := NewSubscriptionResponseJSON(bfl.subID, reqID)
c.safeSend(initRes)

return bfl.subID, nil
Expand Down Expand Up @@ -299,7 +343,7 @@ func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (uint, er
if err != nil {
return 0, err
}
c.safeSend(newSubscriptionResponseJSON(esl.subID, reqID))
c.safeSend(NewSubscriptionResponseJSON(esl.subID, reqID))

// TODO (ed) since HandleSubmittedExtrinsic has been called we assume the extrinsic is in the tx queue
// should we add a channel to tx queue so we're notified when it's in the queue (See issue #1535)
Expand All @@ -322,7 +366,7 @@ func (c *WSConn) initRuntimeVersionListener(reqID float64) (uint, error) {
c.qtyListeners++
rvl.subID = c.qtyListeners
c.Subscriptions[rvl.subID] = rvl
initRes := newSubscriptionResponseJSON(rvl.subID, reqID)
initRes := NewSubscriptionResponseJSON(rvl.subID, reqID)
c.safeSend(initRes)

return rvl.subID, nil
Expand Down
55 changes: 55 additions & 0 deletions dot/rpc/subscription/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,61 @@ func TestWSConn_HandleComm(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":4,"id":7}`+"\n"), msg)

// test state_unsubscribeStorage
c.WriteMessage(websocket.TextMessage, []byte(`{
"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": "foo",
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":7}`+"\n"), msg)

c.WriteMessage(websocket.TextMessage, []byte(`{
"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": [],
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":7}`+"\n"), msg)

c.WriteMessage(websocket.TextMessage, []byte(`{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to spec, this shouldn't fail.
subscriber ID is either string or U32
https://github.com/w3f/PSPs/blob/psp-rpc-api/PSPs/drafts/psp-6.md#11116-state_unsubscribestorage-pubsub
Any reason we have subscriber ID as float?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I've updated so that it can handle strings as well. I've got params as a interface{}, so that it can accept any type, then when I parse the types it seems that all number are treated as float64 (I think this is happens when the json is unmashalled). I'm now taking the float64 or string and casting into uint. Let me know if there is a better approach to handling these.

"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": ["6"],
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":false,"id":7}`+"\n"), msg)

c.WriteMessage(websocket.TextMessage, []byte(`{
"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": ["4"],
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":true,"id":7}`+"\n"), msg)

c.WriteMessage(websocket.TextMessage, []byte(`{
"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": [6],
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":false,"id":7}`+"\n"), msg)

c.WriteMessage(websocket.TextMessage, []byte(`{
"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": [4],
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":true,"id":7}`+"\n"), msg)

// test initBlockListener
res, err = wsconn.initBlockListener(1)
require.EqualError(t, err, "error BlockAPI not set")
Expand Down