Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refac(net, exchange, dht) network service errors #101

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package bitswap

import (
"errors"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

Expand Down Expand Up @@ -120,14 +118,16 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {

// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {
u.DOut("ReceiveMessage from %v\n", p.Key().Pretty())

if p == nil {
return nil, nil, errors.New("Received nil Peer")
// TODO propagate the error upward
return nil, nil
}
if incoming == nil {
return nil, nil, errors.New("Received nil Message")
// TODO propagate the error upward
return nil, nil
}

bs.strategy.MessageReceived(p, incoming) // FIRST
Expand Down Expand Up @@ -157,7 +157,12 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs
}
}
defer bs.strategy.MessageSent(p, message)
return p, message, nil
return p, message
}

func (bs *bitswap) ReceiveError(err error) {
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
}

// send strives to ensure that accounting is always performed when a message is
Expand Down
4 changes: 3 additions & 1 deletion exchange/bitswap/network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ type Adapter interface {
type Receiver interface {
ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
destination *peer.Peer, outgoing bsmsg.BitSwapMessage, err error)
destination *peer.Peer, outgoing bsmsg.BitSwapMessage)

ReceiveError(error)
}

// TODO(brian): move this to go-ipfs/net package
Expand Down
21 changes: 9 additions & 12 deletions exchange/bitswap/network/net_message_adapter.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package network

import (
"errors"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
Expand Down Expand Up @@ -31,33 +29,32 @@ type impl struct {
// HandleMessage marshals and unmarshals net messages, forwarding them to the
// BitSwapMessage receiver
func (adapter *impl) HandleMessage(
ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) {
ctx context.Context, incoming netmsg.NetMessage) netmsg.NetMessage {

if adapter.receiver == nil {
return nil, errors.New("No receiver. NetMessage dropped")
return nil
}

received, err := bsmsg.FromNet(incoming)
if err != nil {
return nil, err
go adapter.receiver.ReceiveError(err)
return nil
}

p, bsmsg, err := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)
if err != nil {
return nil, err
}
p, bsmsg := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)

// TODO(brian): put this in a helper function
if bsmsg == nil || p == nil {
return nil, nil
return nil
}

outgoing, err := bsmsg.ToNet(p)
if err != nil {
return nil, err
go adapter.receiver.ReceiveError(err)
return nil
}

return outgoing, nil
return outgoing
}

func (adapter *impl) SendMessage(
Expand Down
24 changes: 5 additions & 19 deletions exchange/bitswap/testnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,7 @@ func (n *network) deliver(
return errors.New("Invalid input")
}

nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message)
if err != nil {

// TODO should this error be returned across network boundary?

// TODO this raises an interesting question about network contract. How
// can the network be expected to behave under different failure
// conditions? What if peer is unreachable? Will we know if messages
// aren't delivered?

return err
}
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)

if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
return errors.New("Malformed client request")
Expand Down Expand Up @@ -119,15 +108,12 @@ func (n *network) SendRequest(
if !ok {
return nil, errors.New("Cannot locate peer on network")
}
nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message)
if err != nil {
return nil, err
// TODO return nil, NoResponse
}
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)

// TODO dedupe code
if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
return nil, errors.New("Malformed client request")
r.ReceiveError(errors.New("Malformed client request"))
return nil, nil
}

// TODO dedupe code
Expand All @@ -144,7 +130,7 @@ func (n *network) SendRequest(
}
n.deliver(nextReceiver, nextPeer, nextMsg)
}()
return nil, NoResponse
return nil, nil
}
return nextMsg, nil
}
Expand Down
25 changes: 14 additions & 11 deletions exchange/bitswap/testnet/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
ctx context.Context,
from *peer.Peer,
incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {

t.Log("Recipient received a message from the network")

Expand All @@ -35,7 +35,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
m := bsmsg.New()
m.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))

return from, m, nil
return from, m
}))

t.Log("Build a message and send a synchronous request to recipient")
Expand Down Expand Up @@ -74,19 +74,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
ctx context.Context,
fromWaiter *peer.Peer,
msgFromWaiter bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {

msgToWaiter := bsmsg.New()
msgToWaiter.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))

return fromWaiter, msgToWaiter, nil
return fromWaiter, msgToWaiter
}))

waiter.SetDelegate(lambda(func(
ctx context.Context,
fromResponder *peer.Peer,
msgFromResponder bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {

// TODO assert that this came from the correct peer and that the message contents are as expected
ok := false
Expand All @@ -101,7 +101,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
t.Fatal("Message not received from the responder")

}
return nil, nil, nil
return nil, nil
}))

messageSentAsync := bsmsg.New()
Expand All @@ -116,7 +116,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
}

type receiverFunc func(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage, error)
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage)

// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
Expand All @@ -126,13 +126,16 @@ func lambda(f receiverFunc) bsnet.Receiver {
}

type lambdaImpl struct {
f func(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error)
f func(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage)
}

func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {
return lam.f(ctx, p, incoming)
}

func (lam *lambdaImpl) ReceiveError(err error) {
// TODO log error
}
8 changes: 2 additions & 6 deletions net/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Handler interface {

// HandleMessage receives an incoming message, and potentially returns
// a response message to send back.
HandleMessage(context.Context, msg.NetMessage) (msg.NetMessage, error)
HandleMessage(context.Context, msg.NetMessage) msg.NetMessage
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: why couldnt the same error propagation work, and then have Service be the only one to send it over to through a channel?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g.

func (w *whatever) HandleMessage(context.Context, msg.NetMessage) (msg.NetMessage, error) {
  // do whatever
  return nil, errors.New("beep boop")
}

// in service.go
func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) {

  ...

  r1, err := s.Handler.HandleMessage(ctx, m2)
    if err != nil {
        go something.ReceiveError(err)
        return
    }

  ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are based on this discussion:

#69 (comment)

It's that it's weird to send errors back down to the service. What does it mean to return error in a handler function? What is the service meant to do with this error?

It would appear that the better thing to do is for the handler to log and perform error handling internally.

}

// Service is a networking component that protocols can use to multiplex
Expand Down Expand Up @@ -181,11 +181,7 @@ func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) {
}

// should this be "go HandleMessage ... ?"
r1, err := s.Handler.HandleMessage(ctx, m2)
if err != nil {
u.PErr("handled message yielded error %v\n", err)
return
}
r1 := s.Handler.HandleMessage(ctx, m2)

// if handler gave us a response, send it back out!
if r1 != nil {
Expand Down
5 changes: 2 additions & 3 deletions net/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ import (
// ReverseHandler reverses all Data it receives and sends it back.
type ReverseHandler struct{}

func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) (
msg.NetMessage, error) {
func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) msg.NetMessage {

d := m.Data()
for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 {
d[i], d[j] = d[j], d[i]
}

return msg.New(m.Peer(), d), nil
return msg.New(m.Peer(), d)
}

func newPeer(t *testing.T, id string) *peer.Peer {
Expand Down
24 changes: 15 additions & 9 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,26 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer *peer.Peer) (*peer.Peer,
}

// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage {

mData := mes.Data()
if mData == nil {
return nil, errors.New("message did not include Data")
// TODO handle/log err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prob should keep the old error text here commented to know what to modify it to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going through and adding logging now.

return nil
}

mPeer := mes.Peer()
if mPeer == nil {
return nil, errors.New("message did not include a Peer")
// TODO handle/log err
return nil
}

// deserialize msg
pmes := new(Message)
err := proto.Unmarshal(mData, pmes)
if err != nil {
return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
// TODO handle/log err
return nil
}

// update the peer (on valid msgs only)
Expand All @@ -133,27 +136,30 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.
// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil {
return nil, errors.New("Recieved invalid message type")
// TODO handle/log err
return nil
}

// dispatch handler.
rpmes, err := handler(mPeer, pmes)
if err != nil {
return nil, err
// TODO handle/log err
return nil
}

// if nil response, return it before serializing
if rpmes == nil {
return nil, nil
return nil
}

// serialize response msg
rmes, err := msg.FromObject(mPeer, rpmes)
if err != nil {
return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
// TODO handle/log err
return nil
}

return rmes, nil
return rmes
}

// sendRequest sends out a request using dht.sender, but also makes sure to
Expand Down
5 changes: 1 addition & 4 deletions routing/dht/ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ func TestGetFailures(t *testing.T) {
t.Error(err)
}

mes, err = d.HandleMessage(ctx, mes)
if err != nil {
t.Error(err)
}
mes = d.HandleMessage(ctx, mes)

pmes := new(Message)
err = proto.Unmarshal(mes.Data(), pmes)
Expand Down