-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dot/rpc implement state_subscribeStorage RPC WebSocket method #983
Changes from 8 commits
8dcd114
1839679
e17115c
cb68030
a4a06d1
4e92819
4dd45e3
452a7a1
d822106
631f942
2696608
b7ff1ce
5bdbe0e
9389b5b
884c523
0d62122
ce4837c
8d533ca
46db2f6
0d19256
005f4e6
cb3c502
f9d65f0
85b6602
bd6e9d9
b0be8f1
cf9cc48
31f6b5a
bde2c68
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -285,9 +285,11 @@ func (sm *StateModule) SubscribeRuntimeVersion(r *http.Request, req *StateStorag | |
return sm.GetRuntimeVersion(r, nil, res) | ||
} | ||
|
||
// SubscribeStorage isn't implemented properly yet. | ||
func (sm *StateModule) SubscribeStorage(r *http.Request, req *StateStorageQueryRangeRequest, res *StorageChangeSetResponse) { | ||
// TODO implement change storage trie so that block hash parameter works (See issue #834) | ||
// SubscribeStorage Storage subscription. If storage keys are specified, it creates a message for each block which | ||
// changes the specified storage keys. If none are specified, then it creates a message for every block. | ||
// This endpoint communicates over the Websockt protocol, but this func should remain here so it's added to rpc_methods list | ||
func (sm *StateModule) SubscribeStorage(r *http.Request, req *StateStorageQueryRangeRequest, res *StorageChangeSetResponse) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also wondering, is there an unsubscribe method corresponding to the subscribe methods? if so, it would be nice to return the subscription ID |
||
return nil | ||
} | ||
|
||
func convertAPIs(in []*runtime.API_Item) []interface{} { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ import ( | |
"strings" | ||
|
||
"github.com/ChainSafe/gossamer/dot/rpc/modules" | ||
|
||
"github.com/ChainSafe/gossamer/lib/common" | ||
"github.com/ethereum/go-ethereum/log" | ||
"github.com/gorilla/websocket" | ||
) | ||
|
@@ -128,15 +128,16 @@ func (h *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
switch method { | ||
case "chain_subscribeNewHeads", "chain_subscribeNewHead": | ||
subType = SUB_NEW_HEAD | ||
case "chain_subscribeStorage": | ||
case "state_subscribeStorage": | ||
subType = SUB_STORAGE | ||
case "chain_subscribeFinalizedHeads": | ||
subType = SUB_FINALIZED_HEAD | ||
} | ||
|
||
params := msg["params"] | ||
var e1 error | ||
_, e1 = h.registerSubscription(ws, mid, subType) | ||
_, e1 = h.registerSubscription(ws, mid, subType, params) | ||
if e1 != nil { | ||
// todo send error message to client | ||
log.Error("[rpc] failed to register subscription", "error", err) | ||
} | ||
continue | ||
|
@@ -191,12 +192,18 @@ func (h *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
|
||
} | ||
|
||
func (h *HTTPServer) registerSubscription(conn *websocket.Conn, reqID float64, subscriptionType int) (uint32, error) { | ||
func (h *HTTPServer) registerSubscription(conn *websocket.Conn, reqID float64, subscriptionType int, params interface{}) (uint32, error) { | ||
wssub := h.serverConfig.WSSubscriptions | ||
sub := uint32(len(wssub)) + 1 | ||
pA := params.([]interface{}) | ||
filter := make(map[string]bool) | ||
for _, param := range pA { | ||
filter[param.(string)] = true | ||
} | ||
wss := &WebSocketSubscription{ | ||
WSConnection: conn, | ||
SubscriptionType: subscriptionType, | ||
Filter: filter, | ||
} | ||
wssub[sub] = wss | ||
h.serverConfig.WSSubscriptions = wssub | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the WSSubscriptions should be stored in the HTTPServer struct (or in a separate wsServer struct) not in the config |
||
|
@@ -233,3 +240,36 @@ func (h *HTTPServer) blockReceivedListener() { | |
} | ||
} | ||
} | ||
|
||
func (h *HTTPServer) storageChangeListener() { | ||
if h.serverConfig.StorageAPI == nil { | ||
return | ||
} | ||
|
||
for change := range h.storageChan { | ||
|
||
if change != nil { | ||
for i, sub := range h.serverConfig.WSSubscriptions { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering if it would be simpler to start a goroutine for each subscription, what do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I've refactor websockets to better handle this. |
||
if sub.SubscriptionType == SUB_STORAGE { | ||
// check it change key is in subscription filter | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. check if* |
||
cKey := common.BytesToHex(change.Key) | ||
if len(sub.Filter) > 0 && !sub.Filter[cKey] { | ||
continue | ||
} | ||
|
||
changeM := make(map[string]interface{}) | ||
changeM["result"] = []string{cKey, common.BytesToHex(change.Value)} | ||
res := newSubcriptionBaseResponseJSON(i) | ||
res.Method = "state_storage" | ||
res.Params = changeM | ||
if sub.WSConnection != nil { | ||
err := sub.WSConnection.WriteJSON(res) | ||
if err != nil { | ||
log.Error("[rpc] error writing response", "error", err) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,10 @@ type StorageState struct { | |
trie *trie.Trie | ||
db *StorageDB | ||
lock sync.RWMutex | ||
|
||
// change notifiers | ||
changed map[byte]chan<- *KeyValue | ||
changedLock sync.RWMutex | ||
} | ||
|
||
// NewStorageDB instantiates badgerDB instance for storing trie structure | ||
|
@@ -72,8 +76,9 @@ func NewStorageState(db chaindb.Database, t *trie.Trie) (*StorageState, error) { | |
} | ||
|
||
return &StorageState{ | ||
trie: t, | ||
db: NewStorageDB(db), | ||
trie: t, | ||
db: NewStorageDB(db), | ||
changed: make(map[byte]chan<- *KeyValue), | ||
}, nil | ||
} | ||
|
||
|
@@ -120,7 +125,18 @@ func (s *StorageState) EnumeratedTrieRoot(values [][]byte) { | |
func (s *StorageState) SetStorage(key []byte, value []byte) error { | ||
s.lock.Lock() | ||
defer s.lock.Unlock() | ||
return s.trie.Put(key, value) | ||
kv := &KeyValue{ | ||
Key: key, | ||
Value: value, | ||
} | ||
//fmt.Printf("SetStorage Key: %x Value: %x\n", key, value) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove? |
||
// TODO ed add channel notify here | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this done? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should also be added to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes |
||
err := s.trie.Put(key, value) | ||
if err != nil { | ||
return err | ||
} | ||
s.notifyChanged(kv) | ||
return nil | ||
} | ||
|
||
// ClearPrefix not implemented | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
// Copyright 2020 ChainSafe Systems (ON) Corp. | ||
// This file is part of gossamer. | ||
// | ||
// The gossamer library is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU Lesser General Public License as published by | ||
// the Free Software Foundation, either version 3 of the License, or | ||
// (at your option) any later version. | ||
// | ||
// The gossamer library is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU Lesser General Public License for more details. | ||
// | ||
// You should have received a copy of the GNU Lesser General Public License | ||
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>. | ||
package state | ||
|
||
import ( | ||
"errors" | ||
) | ||
|
||
// KeyValue struct to hold key value pairs | ||
type KeyValue struct { | ||
Key []byte | ||
Value []byte | ||
} | ||
|
||
// RegisterStorageChangeChannel function to register storage change channels | ||
func (s *StorageState) RegisterStorageChangeChannel(ch chan<- *KeyValue) (byte, error) { | ||
s.changedLock.RLock() | ||
|
||
if len(s.changed) == 256 { | ||
return 0, errors.New("channel limit reached") | ||
} | ||
|
||
var id byte | ||
for { | ||
id = generateID() | ||
if s.changed[id] == nil { | ||
break | ||
} | ||
} | ||
|
||
s.changedLock.RUnlock() | ||
|
||
s.changedLock.Lock() | ||
s.changed[id] = ch | ||
s.changedLock.Unlock() | ||
return id, nil | ||
} | ||
|
||
// UnregisterStorageChangeChannel removes the storage change notification channel with the given ID. | ||
// A channel must be unregistered before closing it. | ||
func (s *StorageState) UnregisterStorageChangeChannel(id byte) { | ||
s.changedLock.Lock() | ||
defer s.changedLock.Unlock() | ||
|
||
delete(s.changed, id) | ||
} | ||
|
||
func (s *StorageState) notifyChanged(change *KeyValue) { | ||
s.changedLock.RLock() | ||
defer s.changedLock.RUnlock() | ||
|
||
if len(s.changed) == 0 { | ||
return | ||
} | ||
|
||
logger.Trace("notifying changed storage chans...", "chans", s.changed) | ||
|
||
for _, ch := range s.changed { | ||
go func(ch chan<- *KeyValue) { | ||
ch <- change | ||
}(ch) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*websocket