Skip to content

Commit

Permalink
BCFR-1064 Fix WS Test Server (#14891)
Browse files Browse the repository at this point in the history
* Prevent WS server from closing connection right after handling batch request

* fix lint issues
  • Loading branch information
dhaidashenko authored and KMontag42 committed Oct 24, 2024
1 parent 2d6d1f9 commit 7786128
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 85 deletions.
5 changes: 5 additions & 0 deletions .changeset/perfect-peaches-work.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix flaky WS test server #internal
171 changes: 86 additions & 85 deletions core/chains/evm/testutils/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (ts *testWSServer) newWSHandler(chainID *big.Int, callback JSONRPCHandler)
ts.mu.Unlock()

for {
_, data, err := conn.ReadMessage()
err := ts.handleNewMsg(chainID, conn, callback)
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) {
ts.t.Log("Websocket closing")
Expand All @@ -146,97 +146,98 @@ func (ts *testWSServer) newWSHandler(chainID *big.Int, callback JSONRPCHandler)
ts.t.Logf("Failed to read message: %v", err)
return
}
ts.t.Log("Received message", string(data))

req := gjson.ParseBytes(data)

if req.IsArray() { // Handle batch request
ts.t.Log("Received batch request")
responses := []string{}
for _, reqElem := range req.Array() {
m := reqElem.Get("method")
if m.Type != gjson.String {
ts.t.Logf("Method must be string: %v", m.Type)
continue
}

var resp JSONRPCResponse
if chainID != nil && m.String() == "eth_chainId" {
resp.Result = `"0x` + chainID.Text(16) + `"`
} else if m.String() == "eth_syncing" {
resp.Result = "false"
} else {
resp = callback(m.String(), reqElem.Get("params"))
}
id := reqElem.Get("id")
var msg string
if resp.Error.Message != "" {
msg = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"error":{"code":%d,"message":"%s"}}`, id, resp.Error.Code, resp.Error.Message)
} else {
msg = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"result":%s}`, id, resp.Result)
}
responses = append(responses, msg)
}
responseBatch := fmt.Sprintf("[%s]", strings.Join(responses, ","))
ts.t.Logf("Sending batch response: %v", responseBatch)
ts.mu.Lock()
err = conn.WriteMessage(websocket.BinaryMessage, []byte(responseBatch))
ts.mu.Unlock()
if err != nil {
ts.t.Logf("Failed to write message: %v", err)
}
return
}
// Handle single request
if e := req.Get("error"); e.Exists() {
ts.t.Logf("Received jsonrpc error: %v", e)
continue
}
}
}
}

m := req.Get("method")
if m.Type != gjson.String {
ts.t.Logf("Method must be string: %v", m.Type)
return
}
func (ts *testWSServer) handleNewMsg(chainID *big.Int, conn *websocket.Conn, callback JSONRPCHandler) error {
_, data, err := conn.ReadMessage()
if err != nil {
return err
}

var resp JSONRPCResponse
if chainID != nil && m.String() == "eth_chainId" {
resp.Result = `"0x` + chainID.Text(16) + `"`
} else if m.String() == "eth_syncing" {
resp.Result = "false"
} else {
resp = callback(m.String(), req.Get("params"))
}
id := req.Get("id")
var msg string
if resp.Error.Message != "" {
msg = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"error":{"code":%d,"message":"%s"}}`, id, resp.Error.Code, resp.Error.Message)
} else {
msg = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"result":%s}`, id, resp.Result)
}
ts.t.Logf("Sending message: %v", msg)
ts.mu.Lock()
err = conn.WriteMessage(websocket.BinaryMessage, []byte(msg))
ts.mu.Unlock()
ts.t.Log("Received message", string(data))

req := gjson.ParseBytes(data)

if req.IsArray() { // Handle batch request
ts.t.Log("Received batch request")
var responses []string
for i, reqElem := range req.Array() {
var response string
response, _, err = ts.handleRequest(chainID, callback, reqElem)
if err != nil {
ts.t.Logf("Failed to write message: %v", err)
return
return fmt.Errorf("failed to handle elem %d of batch request: %w", i, err)
}
responses = append(responses, response)
}

if resp.Notify != "" {
time.Sleep(100 * time.Millisecond)
msg := fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0x00","result":%s}}`, resp.Notify)
ts.t.Log("Sending message", msg)
ts.mu.Lock()
err = conn.WriteMessage(websocket.BinaryMessage, []byte(msg))
ts.mu.Unlock()
if err != nil {
ts.t.Logf("Failed to write message: %v", err)
return
}
}
return ts.writeMsg(conn, fmt.Sprintf("[%s]", strings.Join(responses, ",")))
}
// Handle single request
response, asyncResponse, err := ts.handleRequest(chainID, callback, req)
if err != nil {
return fmt.Errorf("failed to handle request: %w", err)
}

if response != "" {
err = ts.writeMsg(conn, response)
if err != nil {
return err
}
}

if asyncResponse != "" {
time.Sleep(100 * time.Millisecond)
return ts.writeMsg(conn, asyncResponse)
}

return nil
}

func (ts *testWSServer) handleRequest(chainID *big.Int, callback JSONRPCHandler, req gjson.Result) (response, asyncResponse string, err error) {
if e := req.Get("error"); e.Exists() {
ts.t.Logf("Received jsonrpc error: %v", e)
return
}

m := req.Get("method")
if m.Type != gjson.String {
err = fmt.Errorf("method must be string: %v", m.Type)
return
}

var resp JSONRPCResponse
if chainID != nil && m.String() == "eth_chainId" {
resp.Result = `"0x` + chainID.Text(16) + `"`
} else if m.String() == "eth_syncing" {
resp.Result = "false"
} else {
resp = callback(m.String(), req.Get("params"))
}
id := req.Get("id")
if resp.Error.Message != "" {
response = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"error":{"code":%d,"message":"%s"}}`, id, resp.Error.Code, resp.Error.Message)
} else {
response = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"result":%s}`, id, resp.Result)
}

if resp.Notify != "" {
asyncResponse = fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0x00","result":%s}}`, resp.Notify)
}

return
}

func (ts *testWSServer) writeMsg(conn *websocket.Conn, msg string) error {
ts.t.Logf("Sending message: %v", msg)
ts.mu.Lock()
err := conn.WriteMessage(websocket.BinaryMessage, []byte(msg))
ts.mu.Unlock()
if err != nil {
return fmt.Errorf("failed to write msg: %w", err)
}
return nil
}

type MockEth struct {
Expand Down

0 comments on commit 7786128

Please sign in to comment.