Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dot/rpc implement state_subscribeStorage RPC WebSocket method #983

Merged
merged 29 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8dcd114
stub functions for storage change
edwardmack Jul 6, 2020
1839679
create channels
edwardmack Jul 6, 2020
e17115c
added websocket subscription filter
edwardmack Jul 6, 2020
cb68030
implement rpc state_subscribeStorage
edwardmack Jul 6, 2020
a4a06d1
make lint
edwardmack Jul 6, 2020
4e92819
add tests
edwardmack Jul 6, 2020
4dd45e3
add tests
edwardmack Jul 6, 2020
452a7a1
update comments
edwardmack Jul 6, 2020
d822106
Merge branch 'development' into ed/subscribe_storage
edwardmack Jul 8, 2020
631f942
move channel listener logic to individual go routines
edwardmack Jul 10, 2020
2696608
update logging to use chainsafe logger
edwardmack Jul 10, 2020
b7ff1ce
refactor websocket connection handling storage
edwardmack Jul 11, 2020
5bdbe0e
refactor block listener
edwardmack Jul 11, 2020
9389b5b
handle closing listening channels and websocket connections
edwardmack Jul 11, 2020
884c523
lint issues
edwardmack Jul 11, 2020
0d62122
Merge branch 'development' into ed/subscribe_storage
edwardmack Jul 11, 2020
ce4837c
added notification on clear storage for storage change listener
edwardmack Jul 11, 2020
8d533ca
handle closing channes and subscriptions
edwardmack Jul 11, 2020
46db2f6
implement close functions for block listener
edwardmack Jul 11, 2020
0d19256
fix test
edwardmack Jul 11, 2020
005f4e6
move websocket_test to service_test to fix cyclic import
edwardmack Jul 13, 2020
cb3c502
fix lint issues
edwardmack Jul 13, 2020
f9d65f0
add tests for websocket
edwardmack Jul 13, 2020
85b6602
update test
edwardmack Jul 13, 2020
bd6e9d9
Merge branch 'development' into ed/subscribe_storage
edwardmack Jul 15, 2020
b0be8f1
code cleanup
edwardmack Jul 15, 2020
cf9cc48
add mock BlockAPI and mock StorageAPI to tests
edwardmack Jul 15, 2020
31f6b5a
Merge branch 'development' into ed/subscribe_storage
edwardmack Jul 15, 2020
bde2c68
Merge branch 'development' into ed/subscribe_storage
edwardmack Jul 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/gorilla/mux"
"github.com/gorilla/rpc/v2"
Expand All @@ -32,11 +33,13 @@ import (

// HTTPServer gateway for RPC server
type HTTPServer struct {
logger log.Logger
rpcServer *rpc.Server // Actual RPC call handler
serverConfig *HTTPServerConfig
blockChan chan *types.Block
chanID byte // channel ID
logger log.Logger
rpcServer *rpc.Server // Actual RPC call handler
serverConfig *HTTPServerConfig
blockChan chan *types.Block
chanID byte // channel ID
storageChan chan *state.KeyValue
storageChanID byte // storage channel ID
}

// HTTPServerConfig configures the HTTPServer
Expand All @@ -63,6 +66,7 @@ type HTTPServerConfig struct {
type WebSocketSubscription struct {
WSConnection *websocket.Conn
SubscriptionType int
Filter map[string]bool
}

// NewHTTPServer creates a new http server and registers an associated rpc server
Expand Down Expand Up @@ -162,6 +166,17 @@ func (h *HTTPServer) Start() error {
go h.blockReceivedListener()
}

// init and start storage change listener routine
if h.serverConfig.StorageAPI != nil {
var err error
h.storageChan = make(chan *state.KeyValue)
h.storageChanID, err = h.serverConfig.StorageAPI.RegisterStorageChangeChannel(h.storageChan)
if err != nil {
return err
}
go h.storageChangeListener()
}

return nil
}

Expand All @@ -170,6 +185,9 @@ func (h *HTTPServer) Stop() error {
if h.serverConfig.WSEnabled {
h.serverConfig.BlockAPI.UnregisterImportedChannel(h.chanID)
close(h.blockChan)

h.serverConfig.StorageAPI.UnregisterStorageChangeChannel(h.storageChanID)
close(h.storageChan)
}
return nil
}
3 changes: 3 additions & 0 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package modules
import (
"math/big"

"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto"
Expand All @@ -14,6 +15,8 @@ import (
type StorageAPI interface {
GetStorage(key []byte) ([]byte, error)
Entries() map[string][]byte
RegisterStorageChangeChannel(ch chan<- *state.KeyValue) (byte, error)
UnregisterStorageChangeChannel(id byte)
}

// BlockAPI is the interface for the block state
Expand Down
8 changes: 5 additions & 3 deletions dot/rpc/modules/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,11 @@ func (sm *StateModule) SubscribeRuntimeVersion(r *http.Request, req *StateStorag
return sm.GetRuntimeVersion(r, nil, res)
}

// SubscribeStorage isn't implemented properly yet.
func (sm *StateModule) SubscribeStorage(r *http.Request, req *StateStorageQueryRangeRequest, res *StorageChangeSetResponse) {
// TODO implement change storage trie so that block hash parameter works (See issue #834)
// SubscribeStorage Storage subscription. If storage keys are specified, it creates a message for each block which
// changes the specified storage keys. If none are specified, then it creates a message for every block.
// This endpoint communicates over the Websockt protocol, but this func should remain here so it's added to rpc_methods list
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*websocket

func (sm *StateModule) SubscribeStorage(r *http.Request, req *StateStorageQueryRangeRequest, res *StorageChangeSetResponse) error {
Copy link
Contributor

@noot noot Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering, is there an unsubscribe method corresponding to the subscribe methods? if so, it would be nice to return the subscription ID

return nil
}

func convertAPIs(in []*runtime.API_Item) []interface{} {
Expand Down
50 changes: 45 additions & 5 deletions dot/rpc/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"strings"

"github.com/ChainSafe/gossamer/dot/rpc/modules"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ethereum/go-ethereum/log"
"github.com/gorilla/websocket"
)
Expand Down Expand Up @@ -128,15 +128,16 @@ func (h *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch method {
case "chain_subscribeNewHeads", "chain_subscribeNewHead":
subType = SUB_NEW_HEAD
case "chain_subscribeStorage":
case "state_subscribeStorage":
subType = SUB_STORAGE
case "chain_subscribeFinalizedHeads":
subType = SUB_FINALIZED_HEAD
}

params := msg["params"]
var e1 error
_, e1 = h.registerSubscription(ws, mid, subType)
_, e1 = h.registerSubscription(ws, mid, subType, params)
if e1 != nil {
// todo send error message to client
log.Error("[rpc] failed to register subscription", "error", err)
}
continue
Expand Down Expand Up @@ -191,12 +192,18 @@ func (h *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {

}

func (h *HTTPServer) registerSubscription(conn *websocket.Conn, reqID float64, subscriptionType int) (uint32, error) {
func (h *HTTPServer) registerSubscription(conn *websocket.Conn, reqID float64, subscriptionType int, params interface{}) (uint32, error) {
wssub := h.serverConfig.WSSubscriptions
sub := uint32(len(wssub)) + 1
pA := params.([]interface{})
filter := make(map[string]bool)
for _, param := range pA {
filter[param.(string)] = true
}
wss := &WebSocketSubscription{
WSConnection: conn,
SubscriptionType: subscriptionType,
Filter: filter,
}
wssub[sub] = wss
h.serverConfig.WSSubscriptions = wssub
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the WSSubscriptions should be stored in the HTTPServer struct (or in a separate wsServer struct) not in the config

Expand Down Expand Up @@ -233,3 +240,36 @@ func (h *HTTPServer) blockReceivedListener() {
}
}
}

func (h *HTTPServer) storageChangeListener() {
if h.serverConfig.StorageAPI == nil {
return
}

for change := range h.storageChan {

if change != nil {
for i, sub := range h.serverConfig.WSSubscriptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it would be simpler to start a goroutine for each subscription, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've refactor websockets to better handle this.

if sub.SubscriptionType == SUB_STORAGE {
// check it change key is in subscription filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if*

cKey := common.BytesToHex(change.Key)
if len(sub.Filter) > 0 && !sub.Filter[cKey] {
continue
}

changeM := make(map[string]interface{})
changeM["result"] = []string{cKey, common.BytesToHex(change.Value)}
res := newSubcriptionBaseResponseJSON(i)
res.Method = "state_storage"
res.Params = changeM
if sub.WSConnection != nil {
err := sub.WSConnection.WriteJSON(res)
if err != nil {
log.Error("[rpc] error writing response", "error", err)
}
}
}
}
}
}
}
5 changes: 3 additions & 2 deletions dot/rpc/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ var testCalls = []struct {
expected []byte
}{
{[]byte(`{"jsonrpc":"2.0","method":"system_name","params":[],"id":1}`), []byte(`{"id":1,"jsonrpc":"2.0","result":"gossamer"}` + "\n")}, // working request
{[]byte(`{"jsonrpc":"2.0","method":"unknown","params":[],"id":1}`), []byte(`{"error":{"code":-32000,"data":null,"message":"rpc error method unknown not found"},"id":1,"jsonrpc":"2.0"}` + "\n")}, // unknown method
{[]byte(`{"jsonrpc":"2.0","method":"unknown","params":[],"id":2}`), []byte(`{"error":{"code":-32000,"data":null,"message":"rpc error method unknown not found"},"id":2,"jsonrpc":"2.0"}` + "\n")}, // unknown method
{[]byte{}, []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":null}` + "\n")}, // empty request
{[]byte(`{"jsonrpc":"2.0","method":"chain_subscribeNewHeads","params":[],"id":1}`), []byte(`{"jsonrpc":"2.0","result":1,"id":1}` + "\n")},
{[]byte(`{"jsonrpc":"2.0","method":"chain_subscribeNewHeads","params":[],"id":3}`), []byte(`{"jsonrpc":"2.0","result":1,"id":3}` + "\n")},
{[]byte(`{"jsonrpc":"2.0","method":"state_subscribeStorage","params":[],"id":4}`), []byte(`{"jsonrpc":"2.0","result":2,"id":4}` + "\n")},
}

func TestNewWebSocketServer(t *testing.T) {
Expand Down
22 changes: 19 additions & 3 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type StorageState struct {
trie *trie.Trie
db *StorageDB
lock sync.RWMutex

// change notifiers
changed map[byte]chan<- *KeyValue
changedLock sync.RWMutex
}

// NewStorageDB instantiates badgerDB instance for storing trie structure
Expand All @@ -72,8 +76,9 @@ func NewStorageState(db chaindb.Database, t *trie.Trie) (*StorageState, error) {
}

return &StorageState{
trie: t,
db: NewStorageDB(db),
trie: t,
db: NewStorageDB(db),
changed: make(map[byte]chan<- *KeyValue),
}, nil
}

Expand Down Expand Up @@ -120,7 +125,18 @@ func (s *StorageState) EnumeratedTrieRoot(values [][]byte) {
func (s *StorageState) SetStorage(key []byte, value []byte) error {
s.lock.Lock()
defer s.lock.Unlock()
return s.trie.Put(key, value)
kv := &KeyValue{
Key: key,
Value: value,
}
//fmt.Printf("SetStorage Key: %x Value: %x\n", key, value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

// TODO ed add channel notify here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this done?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also be added to ClearStorage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

err := s.trie.Put(key, value)
if err != nil {
return err
}
s.notifyChanged(kv)
return nil
}

// ClearPrefix not implemented
Expand Down
76 changes: 76 additions & 0 deletions dot/state/storage_notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2020 ChainSafe Systems (ON) Corp.
// This file is part of gossamer.
//
// The gossamer library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The gossamer library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.
package state

import (
"errors"
)

// KeyValue struct to hold key value pairs
type KeyValue struct {
Key []byte
Value []byte
}

// RegisterStorageChangeChannel function to register storage change channels
func (s *StorageState) RegisterStorageChangeChannel(ch chan<- *KeyValue) (byte, error) {
s.changedLock.RLock()

if len(s.changed) == 256 {
return 0, errors.New("channel limit reached")
}

var id byte
for {
id = generateID()
if s.changed[id] == nil {
break
}
}

s.changedLock.RUnlock()

s.changedLock.Lock()
s.changed[id] = ch
s.changedLock.Unlock()
return id, nil
}

// UnregisterStorageChangeChannel removes the storage change notification channel with the given ID.
// A channel must be unregistered before closing it.
func (s *StorageState) UnregisterStorageChangeChannel(id byte) {
s.changedLock.Lock()
defer s.changedLock.Unlock()

delete(s.changed, id)
}

func (s *StorageState) notifyChanged(change *KeyValue) {
s.changedLock.RLock()
defer s.changedLock.RUnlock()

if len(s.changed) == 0 {
return
}

logger.Trace("notifying changed storage chans...", "chans", s.changed)

for _, ch := range s.changed {
go func(ch chan<- *KeyValue) {
ch <- change
}(ch)
}
}
Loading