Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WS URL can be optional when LogBroadcaster is disabled #14364

Merged
merged 55 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
b05a1a7
WS URL can be optional
huangzhen1997 Sep 5, 2024
71c84c6
add changeset
huangzhen1997 Sep 6, 2024
e99aae0
change
huangzhen1997 Sep 6, 2024
6737f0c
make WSURL optional
huangzhen1997 Sep 6, 2024
8df1659
fix test, and enforce SubscribeFilterLogs to fail when ws url not pro…
huangzhen1997 Sep 6, 2024
e1fa795
add comments
huangzhen1997 Sep 6, 2024
7f59b07
update changeset
huangzhen1997 Sep 6, 2024
48c7f1a
update dial logic and make ws optional not required
huangzhen1997 Sep 6, 2024
515888c
fix test
huangzhen1997 Sep 6, 2024
9ecc624
fix
huangzhen1997 Sep 9, 2024
01e9e08
fix lint
huangzhen1997 Sep 9, 2024
4b860a1
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
huangzhen1997 Sep 9, 2024
fdc5c8c
address comments
huangzhen1997 Sep 9, 2024
2660903
update comments
huangzhen1997 Sep 9, 2024
b3e270d
fix test
huangzhen1997 Sep 9, 2024
ba73ef6
add check when both ws and http missing
huangzhen1997 Sep 9, 2024
5bec775
add test and add restrictions
huangzhen1997 Sep 9, 2024
9a841ce
add comment
huangzhen1997 Sep 9, 2024
670ebb8
revert outdated change
huangzhen1997 Sep 9, 2024
799dc6c
remove extra line
huangzhen1997 Sep 9, 2024
427a8a0
fix test
huangzhen1997 Sep 9, 2024
7738ff0
revert changes from rpc client
huangzhen1997 Sep 10, 2024
795ee7b
unintended change
huangzhen1997 Sep 10, 2024
3b27171
remove unused
huangzhen1997 Sep 10, 2024
d3f523c
update verification logic
huangzhen1997 Sep 10, 2024
a71015d
add test fix
huangzhen1997 Sep 10, 2024
7908e2a
modify unit test to cover logbroadcaster enabled false
huangzhen1997 Sep 10, 2024
ae09d35
update doc
huangzhen1997 Sep 12, 2024
fe19568
udpate changeset
huangzhen1997 Sep 12, 2024
cc05a3f
address PR comments
huangzhen1997 Sep 13, 2024
d7a7719
address pr comments
huangzhen1997 Sep 13, 2024
3e92077
update invalid toml config
huangzhen1997 Sep 13, 2024
d7af044
fix test
huangzhen1997 Sep 13, 2024
4d83e3f
ws required for primary nodes when logbroadcaster enabled
huangzhen1997 Sep 16, 2024
d5dbade
minor
huangzhen1997 Sep 16, 2024
e8901e7
Dmytro's comments
huangzhen1997 Sep 19, 2024
2713252
fix nil ptr, more fix to come
huangzhen1997 Sep 19, 2024
8a71a49
resolve merge conflicts
huangzhen1997 Sep 23, 2024
85f56ca
Merge branch 'develop' into BCFR-451/make-ws-url-optional
huangzhen1997 Sep 23, 2024
774d025
fix make
huangzhen1997 Sep 24, 2024
5b3918d
refactor function sig
huangzhen1997 Sep 24, 2024
691f0d1
Merge branch 'develop' into BCFR-451/make-ws-url-optional
huangzhen1997 Sep 25, 2024
971e300
fix test
huangzhen1997 Sep 25, 2024
7cf0ba5
fix
huangzhen1997 Sep 25, 2024
4502d4d
make ws pointer
dhaidashenko Sep 26, 2024
5e9e9b7
Merge pull request #14573 from smartcontractkit/ws-optional
huangzhen1997 Sep 26, 2024
615248b
fix
huangzhen1997 Sep 26, 2024
e87e3c1
fix make
huangzhen1997 Sep 26, 2024
d689696
address comments
huangzhen1997 Sep 27, 2024
b267e7a
fix lint
huangzhen1997 Sep 27, 2024
51c5d96
fix make
huangzhen1997 Sep 27, 2024
c5f6b90
fix make
huangzhen1997 Sep 27, 2024
4c81cac
fix test conflicts
huangzhen1997 Sep 30, 2024
e2d7a8e
fix make
huangzhen1997 Sep 30, 2024
580795e
fix rpc disconnect with optional ws url
dhaidashenko Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/silly-lies-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"chainlink": minor
---

Make websocket URL `WSURL` for `EVM.Nodes` optional, and apply logic so that:
* If WS URL was not provided, SubscribeFilterLogs should fail with an explicit error
* If WS URL was not provided LogBroadcaster should be disabled
#nops
17 changes: 11 additions & 6 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ type node[
services.StateMachine
lfcLog logger.Logger
name string
id int32
id int
chainID CHAIN_ID
nodePoolCfg NodeConfig
chainCfg ChainConfig
order int32
chainFamily string

ws url.URL
ws *url.URL
http *url.URL

rpc RPC
Expand All @@ -121,10 +121,10 @@ func NewNode[
nodeCfg NodeConfig,
chainCfg ChainConfig,
lggr logger.Logger,
wsuri url.URL,
wsuri *url.URL,
httpuri *url.URL,
name string,
id int32,
id int,
chainID CHAIN_ID,
nodeOrder int32,
rpc RPC,
Expand All @@ -136,8 +136,10 @@ func NewNode[
n.chainID = chainID
n.nodePoolCfg = nodeCfg
n.chainCfg = chainCfg
n.ws = wsuri
n.order = nodeOrder
if wsuri != nil {
n.ws = wsuri
}
if httpuri != nil {
n.http = httpuri
}
Expand All @@ -157,7 +159,10 @@ func NewNode[
}

func (n *node[CHAIN_ID, HEAD, RPC]) String() string {
s := fmt.Sprintf("(%s)%s:%s", Primary.String(), n.name, n.ws.String())
s := fmt.Sprintf("(%s)%s", Primary.String(), n.name)
if n.ws != nil {
s = s + fmt.Sprintf(":%s", n.ws.String())
}
if n.http != nil {
s = s + fmt.Sprintf(":%s", n.http.String())
}
Expand Down
4 changes: 2 additions & 2 deletions common/client/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ type testNodeOpts struct {
config testNodeConfig
chainConfig clientMocks.ChainConfig
lggr logger.Logger
wsuri url.URL
wsuri *url.URL
httpuri *url.URL
name string
id int32
id int
chainID types.ID
nodeOrder int32
rpc *mockNodeClient[types.ID, Head]
Expand Down
18 changes: 12 additions & 6 deletions core/chains/evm/client/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,21 @@ func NewClientConfigs(
func parseNodeConfigs(nodeCfgs []NodeConfig) ([]*toml.Node, error) {
nodes := make([]*toml.Node, len(nodeCfgs))
for i, nodeCfg := range nodeCfgs {
if nodeCfg.WSURL == nil || nodeCfg.HTTPURL == nil {
return nil, fmt.Errorf("node config [%d]: missing WS or HTTP URL", i)
var wsURL, httpURL *commonconfig.URL
// wsUrl requirement will be checked in EVMConfig validation
if nodeCfg.WSURL != nil {
wsURL = commonconfig.MustParseURL(*nodeCfg.WSURL)
}
wsUrl := commonconfig.MustParseURL(*nodeCfg.WSURL)
httpUrl := commonconfig.MustParseURL(*nodeCfg.HTTPURL)

if nodeCfg.HTTPURL == nil {
return nil, fmt.Errorf("node config [%d]: missing HTTP URL", i)
}

httpURL = commonconfig.MustParseURL(*nodeCfg.HTTPURL)
node := &toml.Node{
Name: nodeCfg.Name,
WSURL: wsUrl,
HTTPURL: httpUrl,
WSURL: wsURL,
HTTPURL: httpURL,
SendOnly: nodeCfg.SendOnly,
Order: nodeCfg.Order,
}
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ func TestNodeConfigs(t *testing.T) {
require.Len(t, tomlNodes, len(nodeConfigs))
})

t.Run("parsing missing ws url fails", func(t *testing.T) {
t.Run("ws can be optional", func(t *testing.T) {
nodeConfigs := []client.NodeConfig{
{
Name: ptr("foo1"),
HTTPURL: ptr("http://foo1.test"),
},
}
_, err := client.ParseTestNodeConfigs(nodeConfigs)
require.Error(t, err)
require.Nil(t, err)
})

t.Run("parsing missing http url fails", func(t *testing.T) {
Expand Down
11 changes: 7 additions & 4 deletions core/chains/evm/client/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,25 @@ import (
)

func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, clientErrors evmconfig.ClientErrors, lggr logger.Logger, chainID *big.Int, nodes []*toml.Node, chainType chaintype.ChainType) Client {
var empty url.URL
var primaries []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]
var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient]
largePayloadRPCTimeout, defaultRPCTimeout := getRPCTimeouts(chainType)
for i, node := range nodes {
var ws *url.URL
if node.WSURL != nil {
ws = (*url.URL)(node.WSURL)
}
if node.SendOnly != nil && *node.SendOnly {
rpc := NewRPCClient(lggr, empty, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID,
rpc := NewRPCClient(lggr, nil, (*url.URL)(node.HTTPURL), *node.Name, i, chainID,
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL),
*node.Name, chainID, rpc)
sendonlys = append(sendonlys, sendonly)
} else {
rpc := NewRPCClient(lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i),
rpc := NewRPCClient(lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i,
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
primaryNode := commonclient.NewNode(cfg, chainCfg,
lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, *node.Order,
lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i, chainID, *node.Order,
rpc, "EVM")
primaries = append(primaries, primaryNode)
}
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func NewChainClientWithTestNode(
rpcUrl string,
rpcHTTPURL *url.URL,
sendonlyRPCURLs []url.URL,
id int32,
id int,
chainID *big.Int,
) (Client, error) {
parsed, err := url.ParseRequestURI(rpcUrl)
Expand All @@ -148,10 +148,10 @@ func NewChainClientWithTestNode(
}

lggr := logger.Test(t)
rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n}

var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient]
Expand All @@ -160,7 +160,7 @@ func NewChainClientWithTestNode(
return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String())
}
var empty url.URL
rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, &empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
s := commonclient.NewSendOnlyNode[*big.Int, RPCClient](
lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc)
sendonlys = append(sendonlys, s)
Expand Down Expand Up @@ -206,7 +206,7 @@ func NewChainClientWithMockedRpc(
parsed, _ := url.ParseRequestURI("ws://test")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n}
clientErrors := NewTestClientErrors()
c := NewChainClient(lggr, selectionMode, leaseDuration, noNewHeadsThreshold, primaries, nil, chainID, chainType, &clientErrors, 0)
Expand Down
74 changes: 47 additions & 27 deletions core/chains/evm/client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type rawclient struct {
type rpcClient struct {
rpcLog logger.SugaredLogger
name string
id int32
id int
chainID *big.Int
tier commonclient.NodeTier
largePayloadRpcTimeout time.Duration
Expand All @@ -126,7 +126,7 @@ type rpcClient struct {
newHeadsPollInterval time.Duration
chainType chaintype.ChainType

ws rawclient
ws *rawclient
http *rawclient

stateMu sync.RWMutex // protects state* fields
Expand Down Expand Up @@ -154,10 +154,10 @@ type rpcClient struct {
// NewRPCCLient returns a new *rpcClient as commonclient.RPC
func NewRPCClient(
lggr logger.Logger,
wsuri url.URL,
wsuri *url.URL,
httpuri *url.URL,
name string,
id int32,
id int,
chainID *big.Int,
tier commonclient.NodeTier,
finalizedBlockPollInterval time.Duration,
Expand All @@ -175,9 +175,11 @@ func NewRPCClient(
r.id = id
r.chainID = chainID
r.tier = tier
r.ws.uri = wsuri
r.finalizedBlockPollInterval = finalizedBlockPollInterval
r.newHeadsPollInterval = newHeadsPollInterval
if wsuri != nil {
r.ws = &rawclient{uri: *wsuri}
}
if httpuri != nil {
dhaidashenko marked this conversation as resolved.
Show resolved Hide resolved
r.http = &rawclient{uri: *httpuri}
}
Expand All @@ -199,30 +201,33 @@ func (r *rpcClient) Dial(callerCtx context.Context) error {
ctx, cancel := r.makeQueryCtx(callerCtx, r.rpcTimeout)
defer cancel()

promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc()
lggr := r.rpcLog.With("wsuri", r.ws.uri.Redacted())
if r.http != nil {
lggr = lggr.With("httpuri", r.http.uri.Redacted())
if r.ws == nil && r.http == nil {
return errors.New("cannot dial rpc client when both ws and http info are missing")
}
lggr.Debugw("RPC dial: evmclient.Client#dial")

wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "")
if err != nil {
promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc()
return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted()))
}
promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc()
lggr := r.rpcLog
if r.ws != nil {
lggr = lggr.With("wsuri", r.ws.uri.Redacted())
wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "")
if err != nil {
promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc()
return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted()))
}

r.ws.rpc = wsrpc
r.ws.geth = ethclient.NewClient(wsrpc)
r.ws.rpc = wsrpc
r.ws.geth = ethclient.NewClient(wsrpc)
}

if r.http != nil {
lggr = lggr.With("httpuri", r.http.uri.Redacted())
if err := r.DialHTTP(); err != nil {
return err
}
}

lggr.Debugw("RPC dial: evmclient.Client#dial")
promEVMPoolRPCNodeDialsSuccess.WithLabelValues(r.chainID.String(), r.name).Inc()

return nil
}

Expand All @@ -231,7 +236,7 @@ func (r *rpcClient) Dial(callerCtx context.Context) error {
// It can only return error if the URL is malformed.
func (r *rpcClient) DialHTTP() error {
promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc()
lggr := r.rpcLog.With("httpuri", r.ws.uri.Redacted())
lggr := r.rpcLog.With("httpuri", r.http.uri.Redacted())
lggr.Debugw("RPC dial: evmclient.Client#dial")

var httprpc *rpc.Client
Expand All @@ -251,7 +256,7 @@ func (r *rpcClient) DialHTTP() error {

func (r *rpcClient) Close() {
defer func() {
if r.ws.rpc != nil {
if r.ws != nil && r.ws.rpc != nil {
r.ws.rpc.Close()
}
}()
Expand All @@ -270,7 +275,10 @@ func (r *rpcClient) cancelInflightRequests() {
}

func (r *rpcClient) String() string {
s := fmt.Sprintf("(%s)%s:%s", r.tier.String(), r.name, r.ws.uri.Redacted())
s := fmt.Sprintf("(%s)%s", r.tier.String(), r.name)
if r.ws != nil {
s = s + fmt.Sprintf(":%s", r.ws.uri.Redacted())
}
if r.http != nil {
s = s + fmt.Sprintf(":%s", r.http.uri.Redacted())
}
Expand Down Expand Up @@ -336,7 +344,7 @@ func (r *rpcClient) registerSub(sub ethereum.Subscription, stopInFLightCh chan s
// DisconnectAll disconnects all clients connected to the rpcClient
func (r *rpcClient) DisconnectAll() {
r.stateMu.Lock()
if r.ws.rpc != nil {
if r.ws != nil && r.ws.rpc != nil {
r.ws.rpc.Close()
}
r.cancelInflightRequests()
Expand Down Expand Up @@ -497,7 +505,6 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp
defer cancel()
args := []interface{}{"newHeads"}
lggr := r.newRqLggr().With("args", args)

if r.newHeadsPollInterval > 0 {
interval := r.newHeadsPollInterval
timeout := interval
Expand Down Expand Up @@ -529,6 +536,10 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp
return &poller, nil
}

if ws == nil {
return nil, errors.New("SubscribeNewHead is not allowed without ws url")
}

lggr.Debug("RPC call: evmclient.Client#EthSubscribe")
start := time.Now()
defer func() {
Expand Down Expand Up @@ -557,7 +568,6 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp
func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.Head, sub commontypes.Subscription, err error) {
ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout)
defer cancel()

args := []interface{}{rpcSubscriptionMethodNewHeads}
start := time.Now()
lggr := r.newRqLggr().With("args", args)
Expand All @@ -580,6 +590,10 @@ func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.H
return channel, &poller, nil
}

if ws == nil {
return nil, nil, errors.New("SubscribeNewHead is not allowed without ws url")
}

lggr.Debug("RPC call: evmclient.Client#EthSubscribe")
defer func() {
duration := time.Since(start)
Expand Down Expand Up @@ -1286,6 +1300,9 @@ func (r *rpcClient) ClientVersion(ctx context.Context) (version string, err erro
func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (_ ethereum.Subscription, err error) {
ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout)
defer cancel()
if ws == nil {
return nil, errors.New("SubscribeFilterLogs is not allowed without ws url")
}
lggr := r.newRqLggr().With("q", q)

lggr.Debug("RPC call: evmclient.Client#SubscribeFilterLogs")
Expand Down Expand Up @@ -1390,18 +1407,21 @@ func (r *rpcClient) wrapHTTP(err error) error {
}

// makeLiveQueryCtxAndSafeGetClients wraps makeQueryCtx
func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient) {
func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, ws *rawclient, http *rawclient) {
ctx, cancel, _, ws, http = r.acquireQueryCtx(parentCtx, timeout)
return
}

func (r *rpcClient) acquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
chStopInFlight chan struct{}, ws rawclient, http *rawclient) {
chStopInFlight chan struct{}, ws *rawclient, http *rawclient) {
// Need to wrap in mutex because state transition can cancel and replace the
// context
r.stateMu.RLock()
chStopInFlight = r.chStopInFlight
ws = r.ws
if r.ws != nil {
cp := *r.ws
ws = &cp
}
if r.http != nil {
cp := *r.http
http = &cp
Expand Down
Loading
Loading