Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat: remove the context from the donthavetimeoutmanager (#303)
Browse files Browse the repository at this point in the history
This removes one goroutine per peer which tends to be a pretty big deal. This
brings go-ipfs down from 5.5 to 4.5 goroutines per peer.
  • Loading branch information
Stebalien authored Mar 17, 2020
1 parent 694d2f8 commit 5a278ff
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 31 deletions.
20 changes: 4 additions & 16 deletions internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ type dontHaveTimeoutMgr struct {

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(ctx context.Context, pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(ctx, pc, onDontHaveTimeout, dontHaveTimeout,
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout,
latencyMultiplier, maxExpectedWantProcessTime)
}

// newDontHaveTimeoutMgrWithParams is used by the tests
func newDontHaveTimeoutMgrWithParams(ctx context.Context, pc PeerConnection, onDontHaveTimeout func([]cid.Cid),
func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration, latencyMultiplier int,
maxExpectedWantProcessTime time.Duration) *dontHaveTimeoutMgr {

ctx, shutdown := context.WithCancel(ctx)
ctx, shutdown := context.WithCancel(context.Background())
mqp := &dontHaveTimeoutMgr{
ctx: ctx,
shutdown: shutdown,
Expand All @@ -101,10 +101,7 @@ func newDontHaveTimeoutMgrWithParams(ctx context.Context, pc PeerConnection, onD
// Shutdown the dontHaveTimeoutMgr. Any subsequent call to Start() will be ignored
func (dhtm *dontHaveTimeoutMgr) Shutdown() {
dhtm.shutdown()
}

// onShutdown is called when the dontHaveTimeoutMgr shuts down
func (dhtm *dontHaveTimeoutMgr) onShutdown() {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()

Expand All @@ -114,13 +111,6 @@ func (dhtm *dontHaveTimeoutMgr) onShutdown() {
}
}

// closeAfterContext is called when the dontHaveTimeoutMgr starts.
// It monitors for the context being cancelled.
func (dhtm *dontHaveTimeoutMgr) closeAfterContext() {
<-dhtm.ctx.Done()
dhtm.onShutdown()
}

// Start the dontHaveTimeoutMgr. This method is idempotent
func (dhtm *dontHaveTimeoutMgr) Start() {
dhtm.lk.Lock()
Expand All @@ -132,8 +122,6 @@ func (dhtm *dontHaveTimeoutMgr) Start() {
}
dhtm.started = true

go dhtm.closeAfterContext()

// If we already have a measure of latency to the peer, use it to
// calculate a reasonable timeout
latency := dhtm.peerConn.Latency()
Expand Down
28 changes: 14 additions & 14 deletions internal/messagequeue/donthavetimeoutmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
latMultiplier := 2
expProcessTime := 5 * time.Millisecond
expectedTimeout := expProcessTime + latency*time.Duration(latMultiplier)
ctx := context.Background()
pc := &mockPeerConn{latency: latency}
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout,
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

// Add first set of keys
dhtm.AddPending(firstks)
Expand Down Expand Up @@ -125,13 +125,13 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) {
latMultiplier := 1
expProcessTime := time.Duration(0)
expectedTimeout := latency
ctx := context.Background()
pc := &mockPeerConn{latency: latency}
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout,
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

// Add keys
dhtm.AddPending(ks)
Expand All @@ -156,13 +156,13 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) {
latMultiplier := 1
expProcessTime := time.Duration(0)
expectedTimeout := latency
ctx := context.Background()
pc := &mockPeerConn{latency: latency}
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout,
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

// Add keys
dhtm.AddPending(ks)
Expand Down Expand Up @@ -200,13 +200,13 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
latency := time.Millisecond * 5
latMultiplier := 1
expProcessTime := time.Duration(0)
ctx := context.Background()
pc := &mockPeerConn{latency: latency}
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout,
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

// Add keys repeatedly
for _, c := range ks {
Expand All @@ -230,12 +230,12 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
defaultTimeout := 10 * time.Millisecond
expectedTimeout := expProcessTime + defaultTimeout
tr := timeoutRecorder{}
ctx := context.Background()
pc := &mockPeerConn{latency: latency, err: fmt.Errorf("ping error")}

dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout,
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, latMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

// Add keys
dhtm.AddPending(ks)
Expand Down Expand Up @@ -264,12 +264,12 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) {
expProcessTime := time.Duration(0)
defaultTimeout := 10 * time.Millisecond
tr := timeoutRecorder{}
ctx := context.Background()
pc := &mockPeerConn{latency: latency}

dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout,
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, latMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

// Add keys
dhtm.AddPending(ks)
Expand Down Expand Up @@ -297,12 +297,12 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
latMultiplier := 1
expProcessTime := time.Duration(0)
tr := timeoutRecorder{}
ctx := context.Background()
pc := &mockPeerConn{latency: latency}

dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout,
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

// Add keys
dhtm.AddPending(ks)
Expand Down
2 changes: 1 addition & 1 deletion internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo
log.Infow("Bitswap: timeout waiting for blocks", "cids", ks, "peer", p)
onDontHaveTimeout(p, ks)
}
dhTimeoutMgr := newDontHaveTimeoutMgr(ctx, newPeerConnection(p, network), onTimeout)
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, dhTimeoutMgr)
}

Expand Down

0 comments on commit 5a278ff

Please sign in to comment.