Skip to content

Commit

Permalink
chore(lint): fix lint errors
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Sep 16, 2022
1 parent a7d2b67 commit 395f14f
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 21 deletions.
21 changes: 12 additions & 9 deletions loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,10 @@ func (lb *LoadBalancer) handleForwarding(s network.Stream) {

// only accept outbound requests
if request.Kind != messages.ForwardingOutbound {
messages.WriteForwardingResponseError(s, ErrNoInboundRequests)
err = messages.WriteForwardingResponseError(s, ErrNoInboundRequests)
if err != nil {
log.Warnf("writing forwarding response: %s", err)
}
return
}

Expand Down Expand Up @@ -223,24 +226,24 @@ func (lb *LoadBalancer) bridgeStreams(s1, s2 network.Stream) {
defer wg.Done()
_, err := io.Copy(s2, s1)
if err != nil {
s1.Reset()
_ = s1.Reset()
return
}
err = s2.CloseWrite()
if err != nil {
s1.Reset()
_ = s1.Reset()
}
}()
go func() {
// pipe reads on s2 to writes on s1
defer wg.Done()
_, err := io.Copy(s1, s2)
if err != nil {
s2.Reset()
_ = s2.Reset()
}
err = s1.CloseWrite()
if err != nil {
s2.Reset()
_ = s2.Reset()
}
}()
wg.Wait()
Expand All @@ -259,15 +262,15 @@ func (lb *LoadBalancer) handleIncoming(s network.Stream) {
if !ok {
// if none exists, return
log.Warnf("received protocol request for protocol '%s' with no router peer", s.Protocol())
s.Reset()
_ = s.Reset()
return
}

// open a forwarding stream
routedStream, err := lb.h.NewStream(lb.ctx, routedPeer, ForwardingProtocolID)
if err != nil {
log.Warnf("unable to open forwarding stream for protocol '%s' with peer %s", s.Protocol(), routedPeer)
s.Reset()
_ = s.Reset()
return
}

Expand All @@ -276,8 +279,8 @@ func (lb *LoadBalancer) handleIncoming(s network.Stream) {
err = messages.WriteInboundForwardingRequest(routedStream, s.Conn().RemotePeer(), s.Protocol())
if err != nil {
log.Warnf("writing forwarding request: %s", err)
routedStream.Reset()
s.Reset()
_ = routedStream.Reset()
_ = s.Reset()
return
}

Expand Down
12 changes: 7 additions & 5 deletions loadbalancer/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func TestOutboundForwarding(t *testing.T) {
if response.Code == messages.ResponseOk {
_, err := s.Write([]byte("request"))
require.NoError(t, err)
s.CloseWrite()
err = s.CloseWrite()
require.NoError(t, err)
streamResponse, err := io.ReadAll(s)
require.NoError(t, err)
require.Equal(t, "response", string(streamResponse))
Expand Down Expand Up @@ -196,7 +197,7 @@ func TestInboundForwarding(t *testing.T) {
}, request)
require.NoError(tn.t, err)
if testCase.rejectResponse {
s.Reset()
_ = s.Reset()
} else {
userRequest, err := ioutil.ReadAll(s)
require.NoError(t, err)
Expand All @@ -208,12 +209,13 @@ func TestInboundForwarding(t *testing.T) {
tn.serviceNode.SetStreamHandler(ForwardingProtocolID, handler)
}
if !testCase.doNotConnect {
tn.serviceNode.Connect(ctx, peer.AddrInfo{
err = tn.serviceNode.Connect(ctx, peer.AddrInfo{
ID: peers.loadBalancer.id,
Addrs: []multiaddr.Multiaddr{
peers.loadBalancer.multiAddr,
},
})
require.NoError(t, err)
}
s, err := tn.publicNode.NewStream(tn.ctx, tn.loadBalancer.ID(), testCase.protocols...)
if testCase.willErrorOpening {
Expand All @@ -223,7 +225,8 @@ func TestInboundForwarding(t *testing.T) {
_, err := s.Write([]byte("request"))

require.NoError(t, err)
s.CloseWrite()
_ = s.CloseWrite()
require.NoError(t, err)
streamResponse, err := io.ReadAll(s)
if testCase.willErrorReading {
require.Error(t, err)
Expand All @@ -239,7 +242,6 @@ func TestInboundForwarding(t *testing.T) {
}
}

type forwardingRequestHandler func(*messages.ForwardingRequest, io.Writer) error
type testNet struct {
ctx context.Context
t *testing.T
Expand Down
3 changes: 0 additions & 3 deletions loadbalancer/messages/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"io"
"testing"
"time"

"github.com/filecoin-project/boost/loadbalancer/messages"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
Expand Down Expand Up @@ -48,8 +47,6 @@ func TestRoundtripForwardingResponse(t *testing.T) {
func TestReadWriteFunctions(t *testing.T) {
singleTestProtocol := protocol.ID("test/me")
testProtocols := []protocol.ID{"applesauce/cheese", "big/face"}
expiry := time.Now().Add(time.Hour)
expiry = time.Unix(0, expiry.UnixNano())
responseErr := errors.New("something went wrong")
remote := peer.ID("something")
_, pubKey, err := crypto.GenerateECDSAKeyPair(rand.Reader)
Expand Down
6 changes: 3 additions & 3 deletions loadbalancer/servicenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ func (wc *wrappedConn) RemotePeer() peer.ID {
func (sn *ServiceNode) handleForwarding(s network.Stream) {
// only accept requests from the load balancer
if s.Conn().RemotePeer() != sn.balancer {
s.Reset()
_ = s.Reset()
return
}

// read the forwarding request
request, err := messages.ReadForwardingRequest(s)
if err != nil {
log.Warnf("reading forwarding request: %s", err)
s.Reset()
_ = s.Reset()
return
}

Expand All @@ -104,7 +104,7 @@ func (sn *ServiceNode) handleForwarding(s network.Stream) {

if responseErr != nil {
log.Infof("rejected forwarding request: %s", responseErr)
s.Reset()
_ = s.Reset()
return
}

Expand Down
2 changes: 1 addition & 1 deletion node/modules/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewSetBitswapPeerIDFunc(r lotus_repo.LockedRepo, lb *loadbalancer.LoadBalan
network.ProtocolBitswapOneOne,
network.ProtocolBitswapOneZero,
}
lb.UpdatePeerConfig(peerConfig)
err = lb.UpdatePeerConfig(peerConfig)
return
}, nil
}
Expand Down

0 comments on commit 395f14f

Please sign in to comment.