Skip to content

Commit

Permalink
reuse deadline contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzo committed May 22, 2019
1 parent 54e6e16 commit 05c1acd
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 26 deletions.
4 changes: 3 additions & 1 deletion multiplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,10 @@ func (mp *Multiplex) handleIncoming() {
if !isClosed {
msch.doCloseLocal()
}

msch.clLock.Unlock()

msch.cancelDeadlines()

mp.chLock.Lock()
delete(mp.channels, ch)
mp.chLock.Unlock()
Expand All @@ -435,6 +436,7 @@ func (mp *Multiplex) handleIncoming() {
msch.clLock.Unlock()

if cleanup {
msch.cancelDeadlines()
mp.chLock.Lock()
delete(mp.channels, ch)
mp.chLock.Unlock()
Expand Down
97 changes: 72 additions & 25 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
streammux "github.com/libp2p/go-stream-muxer"
)

var errStreamClosed = errors.New("closed stream")

// streamID is a convenience type for operating on stream IDs
type streamID struct {
id uint64
Expand Down Expand Up @@ -38,9 +40,9 @@ type Stream struct {
// for later memory pool freeing
exbuf []byte

deadlineLock sync.Mutex
wDeadline time.Time
rDeadline time.Time
deadlineLock sync.Mutex
wDeadlineCtx, rDeadlineCtx context.Context
wDeadlineCancel, rDeadlineCancel func()

clLock sync.Mutex
closedRemote bool
Expand Down Expand Up @@ -70,12 +72,13 @@ func (s *Stream) preloadData() {
}
}

func (s *Stream) waitForData(ctx context.Context) error {
func (s *Stream) waitForData() error {
var ctx context.Context
s.deadlineLock.Lock()
if !s.rDeadline.IsZero() {
dctx, cancel := context.WithDeadline(ctx, s.rDeadline)
defer cancel()
ctx = dctx
if s.rDeadlineCtx != nil {
ctx = s.rDeadlineCtx
} else {
ctx = context.Background()
}
s.deadlineLock.Unlock()

Expand Down Expand Up @@ -125,7 +128,7 @@ func (s *Stream) Read(b []byte) (int, error) {
default:
}
if s.extra == nil {
err := s.waitForData(context.Background())
err := s.waitForData()
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -172,21 +175,16 @@ func (s *Stream) write(b []byte) (int, error) {
return 0, errors.New("cannot write to closed stream")
}

var ctx context.Context
s.deadlineLock.Lock()
wDeadlineCtx, cleanup := func(s *Stream) (context.Context, context.CancelFunc) {
if s.wDeadline.IsZero() {
return s.closedLocal, nil
} else {
return context.WithDeadline(s.closedLocal, s.wDeadline)
}
}(s)
if s.wDeadlineCtx != nil {
ctx = s.wDeadlineCtx
} else {
ctx = s.closedLocal
}
s.deadlineLock.Unlock()

err := s.mp.sendMsg(wDeadlineCtx, s.id.header(messageTag), b)

if cleanup != nil {
cleanup()
}
err := s.mp.sendMsg(ctx, s.id.header(messageTag), b)

if err != nil {
if err == context.Canceled {
Expand Down Expand Up @@ -219,6 +217,7 @@ func (s *Stream) Close() error {
s.doCloseLocal()

if remote {
s.cancelDeadlines()
s.mp.chLock.Lock()
delete(s.mp.channels, s.id)
s.mp.chLock.Unlock()
Expand Down Expand Up @@ -252,6 +251,7 @@ func (s *Stream) Reset() error {
close(s.reset)
s.doCloseLocal()
s.closedRemote = true
s.cancelDeadlines()

go s.mp.sendResetMsg(s.id.header(resetTag), true)

Expand All @@ -264,24 +264,71 @@ func (s *Stream) Reset() error {
return nil
}

func (s *Stream) cancelDeadlines() {
s.deadlineLock.Lock()
defer s.deadlineLock.Unlock()

if s.rDeadlineCancel != nil {
s.rDeadlineCancel()
s.rDeadlineCtx = nil
s.rDeadlineCancel = nil
}

if s.wDeadlineCancel != nil {
s.wDeadlineCancel()
s.wDeadlineCtx = nil
s.wDeadlineCancel = nil
}
}

func (s *Stream) SetDeadline(t time.Time) error {
s.deadlineLock.Lock()
defer s.deadlineLock.Unlock()
s.rDeadline = t
s.wDeadline = t
if s.isClosed() {
return errStreamClosed
}
s.setReadDeadline(t)
s.setWriteDeadline(t)
return nil
}

func (s *Stream) SetReadDeadline(t time.Time) error {
s.deadlineLock.Lock()
defer s.deadlineLock.Unlock()
s.rDeadline = t
s.setReadDeadline(t)
return nil
}

func (s *Stream) setReadDeadline(t time.Time) {
if s.rDeadlineCancel != nil {
s.rDeadlineCancel()
}
if t.IsZero() {
s.rDeadlineCtx = nil
s.rDeadlineCancel = nil
} else {
s.rDeadlineCtx, s.rDeadlineCancel = context.WithDeadline(context.Background(), t)
}
}

func (s *Stream) SetWriteDeadline(t time.Time) error {
s.deadlineLock.Lock()
defer s.deadlineLock.Unlock()
s.wDeadline = t
if s.isClosed() {
return errStreamClosed
}
s.setWriteDeadline(t)
return nil
}

func (s *Stream) setWriteDeadline(t time.Time) {
if s.wDeadlineCancel != nil {
s.wDeadlineCancel()
}
if t.IsZero() {
s.wDeadlineCtx = nil
s.wDeadlineCancel = nil
} else {
s.wDeadlineCtx, s.wDeadlineCancel = context.WithDeadline(s.closedLocal, t)
}
}

0 comments on commit 05c1acd

Please sign in to comment.