Skip to content

Commit

Permalink
upgrade the websocket in xdpos (ethereum#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjrjerome authored Nov 6, 2023
1 parent 64a2c84 commit 39f6e6e
Show file tree
Hide file tree
Showing 57 changed files with 3,724 additions and 2,419 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ coverage.txt
go.sum
cicd/devnet/terraform/.terraform*
cicd/devnet/tmp
.env
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

GOBIN = $(shell pwd)/build/bin
GOFMT = gofmt
GO ?= 1.13.1
GO ?= 1.14
GO_PACKAGES = .
GO_FILES := $(shell find $(shell go list -f '{{.Dir}}' $(GO_PACKAGES)) -name \*.go)

Expand Down
74 changes: 44 additions & 30 deletions cmd/faucet/faucet.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/p2p/discv5"
"github.com/XinFinOrg/XDPoSChain/p2p/nat"
"github.com/XinFinOrg/XDPoSChain/params"
"golang.org/x/net/websocket"
"github.com/gorilla/websocket"
)

var (
Expand Down Expand Up @@ -204,14 +204,21 @@ type faucet struct {
nonce uint64 // Current pending nonce of the faucet
price *big.Int // Current gas price to issue funds with

conns []*websocket.Conn // Currently live websocket connections
conns []*wsConn // Currently live websocket connections
timeouts map[string]time.Time // History of users and their funding timeouts
reqs []*request // Currently pending funding requests
update chan struct{} // Channel to signal request updates

lock sync.RWMutex // Lock protecting the faucet's internals
}

// wsConn wraps a websocket connection with a write mutex as the underlying
// websocket library does not synchronize access to the stream.
type wsConn struct {
conn *websocket.Conn
wlock sync.Mutex
}

func newFaucet(genesis *core.Genesis, port int, enodes []*discv5.Node, network uint64, stats string, ks *keystore.KeyStore, index []byte) (*faucet, error) {
// Assemble the raw devp2p protocol stack
stack, err := node.New(&node.Config{
Expand Down Expand Up @@ -289,7 +296,7 @@ func (f *faucet) listenAndServe(port int) error {
go f.loop()

http.HandleFunc("/", f.webHandler)
http.Handle("/api", websocket.Handler(f.apiHandler))
http.HandleFunc("/api", f.apiHandler)

return http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}
Expand All @@ -301,18 +308,24 @@ func (f *faucet) webHandler(w http.ResponseWriter, r *http.Request) {
}

// apiHandler handles requests for Ether grants and transaction statuses.
func (f *faucet) apiHandler(conn *websocket.Conn) {
func (f *faucet) apiHandler(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
// Start tracking the connection and drop at the end
defer conn.Close()

f.lock.Lock()
f.conns = append(f.conns, conn)
wsconn := &wsConn{conn: conn}
f.conns = append(f.conns, wsconn)
f.lock.Unlock()

defer func() {
f.lock.Lock()
for i, c := range f.conns {
if c == conn {
if c.conn == conn {
f.conns = append(f.conns[:i], f.conns[i+1:]...)
break
}
Expand All @@ -324,7 +337,6 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {
head *types.Header
balance *big.Int
nonce uint64
err error
)
for {
// Attempt to retrieve the stats, may error on no faucet connectivity
Expand All @@ -340,7 +352,7 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {

// If stats retrieval failed, wait a bit and retry
if err != nil {
if err = sendError(conn, errors.New("Faucet offline: "+err.Error())); err != nil {
if err = sendError(wsconn, errors.New("Faucet offline: "+err.Error())); err != nil {
log.Warn("Failed to send faucet error to client", "err", err)
return
}
Expand All @@ -351,7 +363,7 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {
break
}
// Send over the initial stats and the latest header
if err = send(conn, map[string]interface{}{
if err = send(wsconn, map[string]interface{}{
"funds": balance.Div(balance, ether),
"funded": nonce,
"peers": f.stack.Server().PeerCount(),
Expand All @@ -360,7 +372,7 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {
log.Warn("Failed to send initial stats to client", "err", err)
return
}
if err = send(conn, head, 3*time.Second); err != nil {
if err = send(wsconn, head, 3*time.Second); err != nil {
log.Warn("Failed to send initial header to client", "err", err)
return
}
Expand All @@ -372,19 +384,19 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {
Tier uint `json:"tier"`
Captcha string `json:"captcha"`
}
if err = websocket.JSON.Receive(conn, &msg); err != nil {
if err = conn.ReadJSON(&msg); err != nil {
return
}
if !*noauthFlag && !strings.HasPrefix(msg.URL, "https://gist.github.com/") && !strings.HasPrefix(msg.URL, "https://twitter.com/") &&
!strings.HasPrefix(msg.URL, "https://plus.google.com/") && !strings.HasPrefix(msg.URL, "https://www.facebook.com/") {
if err = sendError(conn, errors.New("URL doesn't link to supported services")); err != nil {
if err = sendError(wsconn, errors.New("URL doesn't link to supported services")); err != nil {
log.Warn("Failed to send URL error to client", "err", err)
return
}
continue
}
if msg.Tier >= uint(*tiersFlag) {
if err = sendError(conn, errors.New("Invalid funding tier requested")); err != nil {
if err = sendError(wsconn, errors.New("Invalid funding tier requested")); err != nil {
log.Warn("Failed to send tier error to client", "err", err)
return
}
Expand All @@ -400,7 +412,7 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {

res, err := http.PostForm("https://www.google.com/recaptcha/api/siteverify", form)
if err != nil {
if err = sendError(conn, err); err != nil {
if err = sendError(wsconn, err); err != nil {
log.Warn("Failed to send captcha post error to client", "err", err)
return
}
Expand All @@ -413,15 +425,15 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {
err = json.NewDecoder(res.Body).Decode(&result)
res.Body.Close()
if err != nil {
if err = sendError(conn, err); err != nil {
if err = sendError(wsconn, err); err != nil {
log.Warn("Failed to send captcha decode error to client", "err", err)
return
}
continue
}
if !result.Success {
log.Warn("Captcha verification failed", "err", string(result.Errors))
if err = sendError(conn, errors.New("Beep-bop, you're a robot!")); err != nil {
if err = sendError(wsconn, errors.New("Beep-bop, you're a robot!")); err != nil {
log.Warn("Failed to send captcha failure to client", "err", err)
return
}
Expand All @@ -436,7 +448,7 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {
)
switch {
case strings.HasPrefix(msg.URL, "https://gist.github.com/"):
if err = sendError(conn, errors.New("GitHub authentication discontinued at the official request of GitHub")); err != nil {
if err = sendError(wsconn, errors.New("GitHub authentication discontinued at the official request of GitHub")); err != nil {
log.Warn("Failed to send GitHub deprecation to client", "err", err)
return
}
Expand All @@ -453,7 +465,7 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {
err = errors.New("Something funky happened, please open an issue at https://github.com/XinFinOrg/XDPoSChain/issues")
}
if err != nil {
if err = sendError(conn, err); err != nil {
if err = sendError(wsconn, err); err != nil {
log.Warn("Failed to send prefix error to client", "err", err)
return
}
Expand All @@ -477,7 +489,7 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {
signed, err := f.keystore.SignTx(f.account, tx, f.config.ChainId)
if err != nil {
f.lock.Unlock()
if err = sendError(conn, err); err != nil {
if err = sendError(wsconn, err); err != nil {
log.Warn("Failed to send transaction creation error to client", "err", err)
return
}
Expand All @@ -486,7 +498,7 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {
// Submit the transaction and mark as funded if successful
if err := f.client.SendTransaction(context.Background(), signed); err != nil {
f.lock.Unlock()
if err = sendError(conn, err); err != nil {
if err = sendError(wsconn, err); err != nil {
log.Warn("Failed to send transaction transmission error to client", "err", err)
return
}
Expand All @@ -505,13 +517,13 @@ func (f *faucet) apiHandler(conn *websocket.Conn) {

// Send an error if too frequent funding, othewise a success
if !fund {
if err = sendError(conn, fmt.Errorf("%s left until next allowance", common.PrettyDuration(timeout.Sub(time.Now())))); err != nil { // nolint: gosimple
if err = sendError(wsconn, fmt.Errorf("%s left until next allowance", common.PrettyDuration(timeout.Sub(time.Now())))); err != nil { // nolint: gosimple
log.Warn("Failed to send funding error to client", "err", err)
return
}
continue
}
if err = sendSuccess(conn, fmt.Sprintf("Funding request accepted for %s into %s", username, address.Hex())); err != nil {
if err = sendSuccess(wsconn, fmt.Sprintf("Funding request accepted for %s into %s", username, address.Hex())); err != nil {
log.Warn("Failed to send funding success to client", "err", err)
return
}
Expand Down Expand Up @@ -581,12 +593,12 @@ func (f *faucet) loop() {
"requests": f.reqs,
}, time.Second); err != nil {
log.Warn("Failed to send stats to client", "err", err)
conn.Close()
conn.conn.Close()
continue
}
if err := send(conn, head, time.Second); err != nil {
log.Warn("Failed to send header to client", "err", err)
conn.Close()
conn.conn.Close()
}
}
f.lock.RUnlock()
Expand All @@ -608,7 +620,7 @@ func (f *faucet) loop() {
for _, conn := range f.conns {
if err := send(conn, map[string]interface{}{"requests": f.reqs}, time.Second); err != nil {
log.Warn("Failed to send requests to client", "err", err)
conn.Close()
conn.conn.Close()
}
}
f.lock.RUnlock()
Expand All @@ -618,23 +630,25 @@ func (f *faucet) loop() {

// sends transmits a data packet to the remote end of the websocket, but also
// setting a write deadline to prevent waiting forever on the node.
func send(conn *websocket.Conn, value interface{}, timeout time.Duration) error {
func send(conn *wsConn, value interface{}, timeout time.Duration) error {
if timeout == 0 {
timeout = 60 * time.Second
}
conn.SetWriteDeadline(time.Now().Add(timeout))
return websocket.JSON.Send(conn, value)
conn.wlock.Lock()
defer conn.wlock.Unlock()
conn.conn.SetWriteDeadline(time.Now().Add(timeout))
return conn.conn.WriteJSON(value)
}

// sendError transmits an error to the remote end of the websocket, also setting
// the write deadline to 1 second to prevent waiting forever.
func sendError(conn *websocket.Conn, err error) error {
func sendError(conn *wsConn, err error) error {
return send(conn, map[string]string{"error": err.Error()}, time.Second)
}

// sendSuccess transmits a success message to the remote end of the websocket, also
// setting the write deadline to 1 second to prevent waiting forever.
func sendSuccess(conn *websocket.Conn, msg string) error {
func sendSuccess(conn *wsConn, msg string) error {
return send(conn, map[string]string{"success": msg}, time.Second)
}

Expand Down
Loading

0 comments on commit 39f6e6e

Please sign in to comment.