diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 4f5bb45e721..4ba9e179fa4 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -1,8 +1,6 @@ package bitswap import ( - "errors" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" @@ -120,14 +118,16 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { // TODO(brian): handle errors func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage, error) { + *peer.Peer, bsmsg.BitSwapMessage) { u.DOut("ReceiveMessage from %v\n", p.Key().Pretty()) if p == nil { - return nil, nil, errors.New("Received nil Peer") + // TODO propagate the error upward + return nil, nil } if incoming == nil { - return nil, nil, errors.New("Received nil Message") + // TODO propagate the error upward + return nil, nil } bs.strategy.MessageReceived(p, incoming) // FIRST @@ -157,7 +157,12 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs } } defer bs.strategy.MessageSent(p, message) - return p, message, nil + return p, message +} + +func (bs *bitswap) ReceiveError(err error) { + // TODO log the network error + // TODO bubble the network error up to the parent context/error logger } // send strives to ensure that accounting is always performed when a message is diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 15fa9c89e21..611dea8cbcc 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -33,7 +33,9 @@ type Adapter interface { type Receiver interface { ReceiveMessage( ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( - destination *peer.Peer, outgoing bsmsg.BitSwapMessage, err error) + destination *peer.Peer, outgoing bsmsg.BitSwapMessage) + + ReceiveError(error) } // TODO(brian): move this to go-ipfs/net package diff --git a/exchange/bitswap/network/net_message_adapter.go b/exchange/bitswap/network/net_message_adapter.go index 603317afb31..fe3bd6a36e6 100644 --- a/exchange/bitswap/network/net_message_adapter.go +++ b/exchange/bitswap/network/net_message_adapter.go @@ -1,8 +1,6 @@ package network import ( - "errors" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" @@ -31,33 +29,32 @@ type impl struct { // HandleMessage marshals and unmarshals net messages, forwarding them to the // BitSwapMessage receiver func (adapter *impl) HandleMessage( - ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) { + ctx context.Context, incoming netmsg.NetMessage) netmsg.NetMessage { if adapter.receiver == nil { - return nil, errors.New("No receiver. NetMessage dropped") + return nil } received, err := bsmsg.FromNet(incoming) if err != nil { - return nil, err + go adapter.receiver.ReceiveError(err) + return nil } - p, bsmsg, err := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received) - if err != nil { - return nil, err - } + p, bsmsg := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received) // TODO(brian): put this in a helper function if bsmsg == nil || p == nil { - return nil, nil + return nil } outgoing, err := bsmsg.ToNet(p) if err != nil { - return nil, err + go adapter.receiver.ReceiveError(err) + return nil } - return outgoing, nil + return outgoing } func (adapter *impl) SendMessage( diff --git a/exchange/bitswap/testnet/network.go b/exchange/bitswap/testnet/network.go index 5039e730be4..4d5f8c35ea4 100644 --- a/exchange/bitswap/testnet/network.go +++ b/exchange/bitswap/testnet/network.go @@ -76,18 +76,7 @@ func (n *network) deliver( return errors.New("Invalid input") } - nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message) - if err != nil { - - // TODO should this error be returned across network boundary? - - // TODO this raises an interesting question about network contract. How - // can the network be expected to behave under different failure - // conditions? What if peer is unreachable? Will we know if messages - // aren't delivered? - - return err - } + nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message) if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) { return errors.New("Malformed client request") @@ -119,15 +108,12 @@ func (n *network) SendRequest( if !ok { return nil, errors.New("Cannot locate peer on network") } - nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message) - if err != nil { - return nil, err - // TODO return nil, NoResponse - } + nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message) // TODO dedupe code if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) { - return nil, errors.New("Malformed client request") + r.ReceiveError(errors.New("Malformed client request")) + return nil, nil } // TODO dedupe code @@ -144,7 +130,7 @@ func (n *network) SendRequest( } n.deliver(nextReceiver, nextPeer, nextMsg) }() - return nil, NoResponse + return nil, nil } return nextMsg, nil } diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index 70b0615dbbe..15502783eac 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -26,7 +26,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { ctx context.Context, from *peer.Peer, incoming bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage, error) { + *peer.Peer, bsmsg.BitSwapMessage) { t.Log("Recipient received a message from the network") @@ -35,7 +35,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { m := bsmsg.New() m.AppendBlock(testutil.NewBlockOrFail(t, expectedStr)) - return from, m, nil + return from, m })) t.Log("Build a message and send a synchronous request to recipient") @@ -74,19 +74,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ctx context.Context, fromWaiter *peer.Peer, msgFromWaiter bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage, error) { + *peer.Peer, bsmsg.BitSwapMessage) { msgToWaiter := bsmsg.New() msgToWaiter.AppendBlock(testutil.NewBlockOrFail(t, expectedStr)) - return fromWaiter, msgToWaiter, nil + return fromWaiter, msgToWaiter })) waiter.SetDelegate(lambda(func( ctx context.Context, fromResponder *peer.Peer, msgFromResponder bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage, error) { + *peer.Peer, bsmsg.BitSwapMessage) { // TODO assert that this came from the correct peer and that the message contents are as expected ok := false @@ -101,7 +101,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { t.Fatal("Message not received from the responder") } - return nil, nil, nil + return nil, nil })) messageSentAsync := bsmsg.New() @@ -116,7 +116,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { } type receiverFunc func(ctx context.Context, p *peer.Peer, - incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage, error) + incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage) // lambda returns a Receiver instance given a receiver function func lambda(f receiverFunc) bsnet.Receiver { @@ -126,13 +126,16 @@ func lambda(f receiverFunc) bsnet.Receiver { } type lambdaImpl struct { - f func(ctx context.Context, p *peer.Peer, - incoming bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage, error) + f func(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) ( + *peer.Peer, bsmsg.BitSwapMessage) } func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage, error) { + *peer.Peer, bsmsg.BitSwapMessage) { return lam.f(ctx, p, incoming) } + +func (lam *lambdaImpl) ReceiveError(err error) { + // TODO log error +} diff --git a/net/service/service.go b/net/service/service.go index 91ee53c071c..f3d4ba5aa5d 100644 --- a/net/service/service.go +++ b/net/service/service.go @@ -20,7 +20,7 @@ type Handler interface { // HandleMessage receives an incoming message, and potentially returns // a response message to send back. - HandleMessage(context.Context, msg.NetMessage) (msg.NetMessage, error) + HandleMessage(context.Context, msg.NetMessage) msg.NetMessage } // Service is a networking component that protocols can use to multiplex @@ -181,11 +181,7 @@ func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) { } // should this be "go HandleMessage ... ?" - r1, err := s.Handler.HandleMessage(ctx, m2) - if err != nil { - u.PErr("handled message yielded error %v\n", err) - return - } + r1 := s.Handler.HandleMessage(ctx, m2) // if handler gave us a response, send it back out! if r1 != nil { diff --git a/net/service/service_test.go b/net/service/service_test.go index 6642117f30f..251be311e55 100644 --- a/net/service/service_test.go +++ b/net/service/service_test.go @@ -15,15 +15,14 @@ import ( // ReverseHandler reverses all Data it receives and sends it back. type ReverseHandler struct{} -func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) ( - msg.NetMessage, error) { +func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) msg.NetMessage { d := m.Data() for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 { d[i], d[j] = d[j], d[i] } - return msg.New(m.Peer(), d), nil + return msg.New(m.Peer(), d) } func newPeer(t *testing.T, id string) *peer.Peer { diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 507c19c3f5b..8ebecd5bd88 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -103,23 +103,26 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer *peer.Peer) (*peer.Peer, } // HandleMessage implements the inet.Handler interface. -func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) { +func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage { mData := mes.Data() if mData == nil { - return nil, errors.New("message did not include Data") + // TODO handle/log err + return nil } mPeer := mes.Peer() if mPeer == nil { - return nil, errors.New("message did not include a Peer") + // TODO handle/log err + return nil } // deserialize msg pmes := new(Message) err := proto.Unmarshal(mData, pmes) if err != nil { - return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err) + // TODO handle/log err + return nil } // update the peer (on valid msgs only) @@ -133,27 +136,30 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg. // get handler for this msg type. handler := dht.handlerForMsgType(pmes.GetType()) if handler == nil { - return nil, errors.New("Recieved invalid message type") + // TODO handle/log err + return nil } // dispatch handler. rpmes, err := handler(mPeer, pmes) if err != nil { - return nil, err + // TODO handle/log err + return nil } // if nil response, return it before serializing if rpmes == nil { - return nil, nil + return nil } // serialize response msg rmes, err := msg.FromObject(mPeer, rpmes) if err != nil { - return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err) + // TODO handle/log err + return nil } - return rmes, nil + return rmes } // sendRequest sends out a request using dht.sender, but also makes sure to diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 07999e651af..f8b9293a870 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -161,10 +161,7 @@ func TestGetFailures(t *testing.T) { t.Error(err) } - mes, err = d.HandleMessage(ctx, mes) - if err != nil { - t.Error(err) - } + mes = d.HandleMessage(ctx, mes) pmes := new(Message) err = proto.Unmarshal(mes.Data(), pmes)