Skip to content

Commit

Permalink
Handle network errors/stalls (#101)
Browse files Browse the repository at this point in the history
* feat(datatransfer): handle network errors

Handle errors when network messages don't go through

* fix(datatransfer): cleanups

lint, remove unused status
  • Loading branch information
hannahhoward authored Oct 13, 2020
1 parent ad43f2d commit eee8d1c
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 30 deletions.
10 changes: 8 additions & 2 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ var log = logging.Logger("data-transfer")
var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing),
fsm.Event(datatransfer.Restart).FromAny().To(datatransfer.Ongoing),
fsm.Event(datatransfer.Restart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = ""
return nil
}),

fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling),

Expand Down Expand Up @@ -46,7 +49,10 @@ var ChannelEvents = fsm.Events{
return nil
}),

fsm.Event(datatransfer.Disconnected).FromAny().To(datatransfer.PeerDisconnected),
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = datatransfer.ErrDisconnected.Error()
return nil
}),

fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
Expand Down
2 changes: 1 addition & 1 deletion channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestChannels(t *testing.T) {
err = channelList.Disconnected(chid)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.Disconnected)
require.Equal(t, datatransfer.PeerDisconnected, state.Status())
require.Equal(t, datatransfer.ErrDisconnected.Error(), state.Message())
})

t.Run("test self peer and other peer", func(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ const ErrRejected = errorType("response rejected")

// ErrUnsupported indicates an operation is not supported by the transport protocol
const ErrUnsupported = errorType("unsupported")

// ErrDisconnected indicates the other peer may have hung up and you should try restarting the channel.
const ErrDisconnected = errorType("other peer appears to have hung up. restart Channel")

// ErrRemoved indicates the channel was inactive long enough that it was put in a permaneant error state
const ErrRemoved = errorType("channel removed due to inactivity")
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-graphsync v0.2.1
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaH
github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.2.1 h1:MdehhqBSuTI2LARfKLkpYnt0mUrqHs/mtuDnESXHBfU=
github.com/ipfs/go-graphsync v0.2.1/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10=
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c h1:De/AZGvRa3WMyw5zdMMhcvRcho46BVo+C0NRud+T4io=
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
Expand Down
38 changes: 33 additions & 5 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"github.com/filecoin-project/go-data-transfer/registry"
)

var ChannelRemoveTimeout = 1 * time.Hour

func (m *manager) OnChannelOpened(chid datatransfer.ChannelID) error {
has, err := m.channels.HasChannel(chid)
if err != nil {
Expand Down Expand Up @@ -170,12 +168,42 @@ func (m *manager) OnRequestTimedOut(ctx context.Context, chid datatransfer.Chann
go func() {
select {
case <-ctx.Done():
case <-time.After(ChannelRemoveTimeout):
case <-time.After(m.channelRemoveTimeout):
channel, err := m.channels.GetByID(ctx, chid)
if err == nil {
if !(channels.IsChannelTerminated(channel.Status()) ||
channels.IsChannelCleaningUp(channel.Status())) {
if err := m.channels.Error(chid, datatransfer.ErrRemoved); err != nil {
log.Errorf("failed to cancel timed-out channel: %v", err)
return
}
log.Warnf("channel %+v has ben cancelled because of timeout", chid)
}
}
}
}()

return nil
}

func (m *manager) OnRequestDisconnected(ctx context.Context, chid datatransfer.ChannelID) error {
log.Warnf("channel %+v has stalled or disconnected", chid)

// mark peer disconnected for informational purposes
err := m.channels.Disconnected(chid)
if err != nil {
return err
}

go func() {
select {
case <-ctx.Done():
case <-time.After(m.channelRemoveTimeout):
channel, err := m.channels.GetByID(ctx, chid)
if err == nil {
if !(channels.IsChannelTerminated(channel.Status()) ||
channels.IsChannelCleaningUp(channel.Status())) {
if err := m.channels.Cancel(chid); err != nil {
if err := m.channels.Error(chid, datatransfer.ErrRemoved); err != nil {
log.Errorf("failed to cancel timed-out channel: %v", err)
return
}
Expand All @@ -198,7 +226,7 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)
if msg != nil {
if err := m.dataTransferNetwork.SendMessage(context.TODO(), chid.Initiator, msg); err != nil {
log.Warnf("failed to send completion message, err : %v", err)
return m.channels.Disconnected(chid)
return m.OnRequestDisconnected(context.TODO(), chid)
}
}
if msg.Accepted() {
Expand Down
26 changes: 22 additions & 4 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/hannahhoward/go-pubsub"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -39,6 +40,7 @@ type manager struct {
peerID peer.ID
transport datatransfer.Transport
storedCounter *storedcounter.StoredCounter
channelRemoveTimeout time.Duration
}

type internalEvent struct {
Expand Down Expand Up @@ -72,8 +74,20 @@ func readyDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error {
return nil
}

// DataTransferOption configures the data transfer manager
type DataTransferOption func(*manager)

// ChannelRemoveTimeout sets the timeout after which channels are removed from the manager
func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
return func(m *manager) {
m.channelRemoveTimeout = timeout
}
}

const defaultChannelRemoveTimeout = 1 * time.Hour

// NewDataTransfer initializes a new instance of a data transfer manager
func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter) (datatransfer.Manager, error) {
func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter, options ...DataTransferOption) (datatransfer.Manager, error) {
m := &manager{
dataTransferNetwork: dataTransferNetwork,
validatedTypes: registry.NewRegistry(),
Expand All @@ -85,12 +99,16 @@ func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTran
peerID: dataTransferNetwork.ID(),
transport: transport,
storedCounter: storedCounter,
channelRemoveTimeout: defaultChannelRemoveTimeout,
}
channels, err := channels.New(ds, m.notifier, m.voucherDecoder, m.resultTypes.Decoder, &channelEnvironment{m}, dataTransferNetwork.ID())
if err != nil {
return nil, err
}
m.channels = channels
for _, option := range options {
option(m)
}
return m, nil
}

Expand Down Expand Up @@ -230,7 +248,7 @@ func (m *manager) SendVoucher(ctx context.Context, channelID datatransfer.Channe
}
if err := m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), updateRequest); err != nil {
err = fmt.Errorf("Unable to send request: %w", err)
_ = m.channels.Error(channelID, err)
_ = m.OnRequestDisconnected(ctx, channelID)
return err
}
return m.channels.NewVoucher(channelID, voucher)
Expand All @@ -249,7 +267,7 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe

if err := m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid)); err != nil {
err = fmt.Errorf("Unable to send cancel message: %w", err)
_ = m.channels.Error(chid, err)
_ = m.OnRequestDisconnected(ctx, chid)
return err
}

Expand All @@ -271,7 +289,7 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe

if err := m.dataTransferNetwork.SendMessage(ctx, chid.OtherParty(m.peerID), m.pauseMessage(chid)); err != nil {
err = fmt.Errorf("Unable to send pause message: %w", err)
_ = m.channels.Error(chid, err)
_ = m.OnRequestDisconnected(ctx, chid)
return err
}

Expand Down
23 changes: 15 additions & 8 deletions impl/initiating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestDataTransferInitiating(t *testing.T) {
ctx := context.Background()
testCases := map[string]struct {
expectedEvents []datatransfer.EventCode
options []DataTransferOption
verify func(t *testing.T, h *harness)
}{
"OpenPushDataTransfer": {
Expand Down Expand Up @@ -83,21 +84,27 @@ func TestDataTransferInitiating(t *testing.T) {
},
},
"Remove Timed-out request": {
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Cancel, datatransfer.CleanupComplete},
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Error, datatransfer.CleanupComplete},
options: []DataTransferOption{ChannelRemoveTimeout(10 * time.Millisecond)},
verify: func(t *testing.T, h *harness) {
orig := ChannelRemoveTimeout
ChannelRemoveTimeout = 10 * time.Millisecond
defer func() {
ChannelRemoveTimeout = orig
}()

channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor)
require.NoError(t, err)
require.NoError(t, h.transport.EventHandler.OnRequestTimedOut(ctx, channelID))
// need time for the events to take place
time.Sleep(1 * time.Second)
},
},
"Remove disconnected request": {
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Disconnected, datatransfer.Error, datatransfer.CleanupComplete},
options: []DataTransferOption{ChannelRemoveTimeout(10 * time.Millisecond)},
verify: func(t *testing.T, h *harness) {
channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor)
require.NoError(t, err)
require.NoError(t, h.transport.EventHandler.OnRequestDisconnected(ctx, channelID))
// need time for the events to take place
time.Sleep(1 * time.Second)
},
},
"SendVoucher with no channel open": {
verify: func(t *testing.T, h *harness) {
err := h.dt.SendVoucher(h.ctx, datatransfer.ChannelID{Initiator: h.peers[1], Responder: h.peers[0], ID: 999999}, h.voucher)
Expand Down Expand Up @@ -344,7 +351,7 @@ func TestDataTransferInitiating(t *testing.T) {
h.transport = testutil.NewFakeTransport()
h.ds = dss.MutexWrap(datastore.NewMapDatastore())
h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter"))
dt, err := NewDataTransfer(h.ds, h.network, h.transport, h.storedCounter)
dt, err := NewDataTransfer(h.ds, h.network, h.transport, h.storedCounter, verify.options...)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt)
h.dt = dt
Expand Down
5 changes: 3 additions & 2 deletions impl/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/go-data-transfer/message"
)

// ChannelDataTransferType identifies the type of a data transfer channel for the purposes of a restart
type ChannelDataTransferType int

const (
Expand All @@ -23,10 +24,10 @@ const (
// ManagerPeerCreatePush is the type of a channel wherein the manager peer created a Push Data Transfer
ManagerPeerCreatePush

// ManagerPeerCreatePush is the type of a channel wherein the manager peer received a Pull Data Transfer Request
// ManagerPeerReceivePull is the type of a channel wherein the manager peer received a Pull Data Transfer Request
ManagerPeerReceivePull

// ManagerPeerCreatePush is the type of a channel wherein the manager peer received a Push Data Transfer Request
// ManagerPeerReceivePush is the type of a channel wherein the manager peer received a Push Data Transfer Request
ManagerPeerReceivePush
)

Expand Down
3 changes: 0 additions & 3 deletions statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ const (

// ChannelNotFoundError means the searched for data transfer does not exist
ChannelNotFoundError

// PeerDisconnected means that we do NOT have a connection to the other peer
PeerDisconnected
)

// Statuses are human readable names for data transfer states
Expand Down
14 changes: 14 additions & 0 deletions testutil/fakegraphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ type FakeGraphSync struct {
RequestUpdatedHook graphsync.OnRequestUpdatedHook
IncomingResponseHook graphsync.OnIncomingResponseHook
RequestorCancelledListener graphsync.OnRequestorCancelledListener
BlockSentListener graphsync.OnBlockSentListener
NetworkErrorListener graphsync.OnNetworkErrorListener
}

// NewFakeGraphSync returns a new fake graphsync implementation
Expand Down Expand Up @@ -352,6 +354,18 @@ func (fgs *FakeGraphSync) RegisterRequestorCancelledListener(listener graphsync.
return nil
}

// RegisterBlockSentListener adds a listener on the responder as blocks go out
func (fgs *FakeGraphSync) RegisterBlockSentListener(listener graphsync.OnBlockSentListener) graphsync.UnregisterHookFunc {
fgs.BlockSentListener = listener
return nil
}

// RegisterNetworkErrorListener adds a listener on the responder as blocks go out
func (fgs *FakeGraphSync) RegisterNetworkErrorListener(listener graphsync.OnNetworkErrorListener) graphsync.UnregisterHookFunc {
fgs.NetworkErrorListener = listener
return nil
}

var _ graphsync.GraphExchange = &FakeGraphSync{}

type fakeBlkData struct {
Expand Down
4 changes: 4 additions & 0 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type EventsHandler interface {
// OnRequestTimedOut is called when a request we opened (with the given channel Id) to receive data times out.
// Error returns are logged but otherwise have no effect
OnRequestTimedOut(ctx context.Context, chid ChannelID) error

// OnRequestDisconnected is called when a network error occurs in a graphsync request
// or we appear to stall while receiving data
OnRequestDisconnected(ctx context.Context, chid ChannelID) error
}

/*
Expand Down
14 changes: 14 additions & 0 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error {
t.gs.RegisterIncomingResponseHook(t.gsIncomingResponseHook)
t.gs.RegisterRequestUpdatedHook(t.gsRequestUpdatedHook)
t.gs.RegisterRequestorCancelledListener(t.gsRequestorCancelledListener)
t.gs.RegisterNetworkErrorListener(t.gsNetworkErrorListener)
return nil
}

Expand Down Expand Up @@ -597,3 +598,16 @@ func (t *Transport) gsRequestorCancelledListener(p peer.ID, request graphsync.Re
t.requestorCancelledMap[chid] = struct{}{}
}
}

func (t *Transport) gsNetworkErrorListener(p peer.ID, request graphsync.RequestData, err error) {
t.dataLock.Lock()
defer t.dataLock.Unlock()

chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}]
if ok {
err := t.events.OnRequestDisconnected(context.TODO(), chid)
if err != nil {
log.Error(err)
}
}
}
Loading

0 comments on commit eee8d1c

Please sign in to comment.