Skip to content

Commit

Permalink
Add PeerConnection.GracefulClose
Browse files Browse the repository at this point in the history
  • Loading branch information
edaniels committed Aug 2, 2024
1 parent cbe3465 commit 9dc532b
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 11 deletions.
62 changes: 62 additions & 0 deletions datachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type DataChannel struct {
readyState atomic.Value // DataChannelState
bufferedAmountLowThreshold uint64
detachCalled bool
readLoopActive chan struct{}
isUserClosed bool

// The binaryType represents attribute MUST, on getting, return the value to
// which it was last set. On setting, if the new value is either the string
Expand Down Expand Up @@ -225,6 +227,10 @@ func (d *DataChannel) OnOpen(f func()) {
func (d *DataChannel) onOpen() {
d.mu.RLock()
handler := d.onOpenHandler
if d.isUserClosed {
d.mu.RUnlock()
return
}
d.mu.RUnlock()

if handler != nil {
Expand Down Expand Up @@ -252,6 +258,10 @@ func (d *DataChannel) OnDial(f func()) {
func (d *DataChannel) onDial() {
d.mu.RLock()
handler := d.onDialHandler
if d.isUserClosed {
d.mu.RUnlock()
return
}
d.mu.RUnlock()

if handler != nil {
Expand All @@ -261,6 +271,10 @@ func (d *DataChannel) onDial() {

// OnClose sets an event handler which is invoked when
// the underlying data transport has been closed.
// Note: Due to backwards compatibility, there is a chance that
// OnClose can be called, even if the GracefulClose is used.
// If this is the case for you, you can deregister OnClose
// prior to GracefulClose.
func (d *DataChannel) OnClose(f func()) {
d.mu.Lock()
defer d.mu.Unlock()
Expand Down Expand Up @@ -292,6 +306,10 @@ func (d *DataChannel) OnMessage(f func(msg DataChannelMessage)) {
func (d *DataChannel) onMessage(msg DataChannelMessage) {
d.mu.RLock()
handler := d.onMessageHandler
if d.isUserClosed {
d.mu.RUnlock()
return
}
d.mu.RUnlock()

if handler == nil {
Expand All @@ -302,6 +320,10 @@ func (d *DataChannel) onMessage(msg DataChannelMessage) {

func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlreadyNegotiated bool) {
d.mu.Lock()
if d.isUserClosed {
d.mu.Unlock()
return
}
d.dataChannel = dc
bufferedAmountLowThreshold := d.bufferedAmountLowThreshold
onBufferedAmountLow := d.onBufferedAmountLow
Expand All @@ -326,7 +348,12 @@ func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlread
d.mu.Lock()
defer d.mu.Unlock()

if d.isUserClosed {
return
}

if !d.api.settingEngine.detach.DataChannels {
d.readLoopActive = make(chan struct{})
go d.readLoop()
}
}
Expand All @@ -342,6 +369,10 @@ func (d *DataChannel) OnError(f func(err error)) {
func (d *DataChannel) onError(err error) {
d.mu.RLock()
handler := d.onErrorHandler
if d.isUserClosed {
d.mu.RUnlock()
return
}
d.mu.RUnlock()

if handler != nil {
Expand All @@ -350,6 +381,12 @@ func (d *DataChannel) onError(err error) {
}

func (d *DataChannel) readLoop() {
defer func() {
d.mu.Lock()
readLoopActive := d.readLoopActive
d.mu.Unlock()
defer close(readLoopActive)
}()
buffer := make([]byte, dataChannelBufferSize)
for {
n, isString, err := d.dataChannel.ReadDataChannel(buffer)
Expand Down Expand Up @@ -449,7 +486,32 @@ func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
// Close Closes the DataChannel. It may be called regardless of whether
// the DataChannel object was created by this peer or the remote peer.
func (d *DataChannel) Close() error {
return d.close(false)
}

// GracefulClose Closes the DataChannel. It may be called regardless of whether
// the DataChannel object was created by this peer or the remote peer. It also waits
// for any goroutines it started to complete. This is only safe to call outside of
// DataChannel callbacks or if in a callback, in its own goroutine.
func (d *DataChannel) GracefulClose() error {
return d.close(true)
}

// Normally, close only stops writes from happening, so graceful=true
// will wait for reads to be finished based on underlying SCTP association
// closure or a SCTP reset stream from the other side. This is safe to call
// with graceful=true after tearing down a PeerConnection but not
// necessarily before. For example, if you used a vnet and dropped all packets
// right before closing the DataChannel, you'd need never see a reset stream.
func (d *DataChannel) close(graceful shouldGracefullyClose) error {
d.mu.Lock()
d.isUserClosed = true
readLoopActive := d.readLoopActive
if graceful && readLoopActive != nil {
defer func() {
<-readLoopActive
}()
}
haveSctpTransport := d.dataChannel != nil
d.mu.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.19
require (
github.com/pion/datachannel v1.5.8
github.com/pion/dtls/v3 v3.0.0
github.com/pion/ice/v3 v3.0.13
github.com/pion/ice/v3 v3.0.15
github.com/pion/interceptor v0.1.29
github.com/pion/logging v0.2.2
github.com/pion/randutil v0.1.0
Expand All @@ -15,7 +15,7 @@ require (
github.com/pion/sdp/v3 v3.0.9
github.com/pion/srtp/v3 v3.0.3
github.com/pion/stun/v2 v2.0.0
github.com/pion/transport/v3 v3.0.6
github.com/pion/transport/v3 v3.0.7
github.com/sclevine/agouti v3.0.0+incompatible
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.27.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/dtls/v3 v3.0.0 h1:m2hzwPkzqoBjVKXm5ymNuX01OAjht82TdFL6LoTzgi4=
github.com/pion/dtls/v3 v3.0.0/go.mod h1:tiX7NaneB0wNoRaUpaMVP7igAlkMCTQkbpiY+OfeIi0=
github.com/pion/ice/v3 v3.0.13 h1:tPi5fh2xbWhS0DBcs7LTEG0SOUTHLVDjTlFwBy3hXfw=
github.com/pion/ice/v3 v3.0.13/go.mod h1:q2M/RnfpgGhC4HcluxPpD1wImaqFqU0Z1PE2eeOPrIs=
github.com/pion/ice/v3 v3.0.15 h1:6FFM1k1Ei36keZN1drl8/xaEs+NpMMG6M+MsVRchXho=
github.com/pion/ice/v3 v3.0.15/go.mod h1:SdmubtIsCcvdb1ZInrTUz7Iaqi90/rYd1pzbzlMxsZg=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
Expand All @@ -69,8 +69,8 @@ github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLh
github.com/pion/transport/v2 v2.2.8 h1:HzsqGBChgtF4Cj47gu51l5hONuK/NwgbZL17CMSuwS0=
github.com/pion/transport/v2 v2.2.8/go.mod h1:sq1kSLWs+cHW9E+2fJP95QudkzbK7wscs8yYgQToO5E=
github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0=
github.com/pion/transport/v3 v3.0.6 h1:k1mQU06bmmX143qSWgXFqSH1KUJceQvIUuVH/K5ELWw=
github.com/pion/transport/v3 v3.0.6/go.mod h1:HvJr2N/JwNJAfipsRleqwFoR3t/pWyHeZUs89v3+t5s=
github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0=
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
github.com/pion/turn/v3 v3.0.3 h1:1e3GVk8gHZLPBA5LqadWYV60lmaKUaHCkm9DX9CkGcE=
github.com/pion/turn/v3 v3.0.3/go.mod h1:vw0Dz420q7VYAF3J4wJKzReLHIo2LGp4ev8nXQexYsc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
22 changes: 20 additions & 2 deletions icegatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,31 @@ func (g *ICEGatherer) Gather() error {

// Close prunes all local candidates, and closes the ports.
func (g *ICEGatherer) Close() error {
return g.close(shouldGracefullyCloseNo)
}

// GracefulClose prunes all local candidates, and closes the ports. It also waits
// for any goroutines it started to complete. This is only safe to call outside of
// ICEGatherer callbacks or if in a callback, in its own goroutine.
func (g *ICEGatherer) GracefulClose() error {
return g.close(shouldGracefullyCloseYes)
}

func (g *ICEGatherer) close(graceful shouldGracefullyClose) error {
g.lock.Lock()
defer g.lock.Unlock()

if g.agent == nil {
return nil
} else if err := g.agent.Close(); err != nil {
return err
}
if graceful {
if err := g.agent.GracefulClose(); err != nil {
return err
}
} else {
if err := g.agent.Close(); err != nil {
return err
}
}

g.agent = nil
Expand Down
14 changes: 14 additions & 0 deletions icetransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,17 @@ func (t *ICETransport) restart() error {

// Stop irreversibly stops the ICETransport.
func (t *ICETransport) Stop() error {
return t.stop(shouldGracefullyCloseNo)
}

// GracefulStop irreversibly stops the ICETransport. It also waits
// for any goroutines it started to complete. This is only safe to call outside of
// ICETransport callbacks or if in a callback, in its own goroutine.
func (t *ICETransport) GracefulStop() error {
return t.stop(shouldGracefullyCloseYes)
}

func (t *ICETransport) stop(graceful shouldGracefullyClose) error {
t.lock.Lock()
defer t.lock.Unlock()

Expand All @@ -199,6 +210,9 @@ func (t *ICETransport) Stop() error {
if t.mux != nil {
return t.mux.Close()
} else if t.gatherer != nil {
if graceful {
return t.gatherer.GracefulClose()
}
return t.gatherer.Close()
}
return nil
Expand Down
55 changes: 52 additions & 3 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type PeerConnection struct {
idpLoginURL *string

isClosed *atomicBool
isGracefulClosed *atomicBool
isGracefulClosedDone chan struct{}
isNegotiationNeeded *atomicBool
updateNegotiationNeededFlagOnEmptyChain *atomicBool

Expand Down Expand Up @@ -117,6 +119,8 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
ICECandidatePoolSize: 0,
},
isClosed: &atomicBool{},
isGracefulClosed: &atomicBool{},
isGracefulClosedDone: make(chan struct{}),
isNegotiationNeeded: &atomicBool{},
updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
lastOffer: "",
Expand Down Expand Up @@ -2092,13 +2096,41 @@ func (pc *PeerConnection) writeRTCP(pkts []rtcp.Packet, _ interceptor.Attributes
return pc.dtlsTransport.WriteRTCP(pkts)
}

// Close ends the PeerConnection
type shouldGracefullyClose bool

const (
shouldGracefullyCloseNo shouldGracefullyClose = false
shouldGracefullyCloseYes shouldGracefullyClose = false
)

// Close ends the PeerConnection.
func (pc *PeerConnection) Close() error {
return pc.close(shouldGracefullyCloseNo)
}

// GracefulClose ends the PeerConnection. It also waits
// for any goroutines it started to complete. This is only safe to call outside of
// PeerConnection callbacks or if in a callback, in its own goroutine.
func (pc *PeerConnection) GracefulClose() error {
return pc.close(shouldGracefullyCloseYes)
}

func (pc *PeerConnection) close(graceful shouldGracefullyClose) error {
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
alreadyGracefullyClosed := graceful == shouldGracefullyCloseYes && pc.isGracefulClosed.swap(true)
if pc.isClosed.swap(true) {
if alreadyGracefullyClosed {
// similar but distinct condition where we may be waiting for some
// other graceful close to finish. Incorrectly using isClosed may
// leak a goroutine.
<-pc.isGracefulClosedDone
}
return nil
}
if graceful == shouldGracefullyCloseYes && !alreadyGracefullyClosed {
defer close(pc.isGracefulClosedDone)
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
pc.signalingState.Set(SignalingStateClosed)
Expand Down Expand Up @@ -2142,12 +2174,26 @@ func (pc *PeerConnection) Close() error {

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
if pc.iceTransport != nil {
closeErrs = append(closeErrs, pc.iceTransport.Stop())
if graceful {
// note that it isn't canon to stop gracefully
closeErrs = append(closeErrs, pc.iceTransport.GracefulStop())
} else {
closeErrs = append(closeErrs, pc.iceTransport.Stop())
}
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State())

if graceful {
// note that it isn't canon to stop gracefully
pc.sctpTransport.lock.Lock()
for _, d := range pc.sctpTransport.dataChannels {
closeErrs = append(closeErrs, d.GracefulClose())
}
pc.sctpTransport.lock.Unlock()
}

return util.FlattenErrs(closeErrs)
}

Expand Down Expand Up @@ -2321,8 +2367,11 @@ func (pc *PeerConnection) startTransports(iceRole ICERole, dtlsRole DTLSRole, re
}

pc.dtlsTransport.internalOnCloseHandler = func() {
pc.log.Info("Closing PeerConnection from DTLS CloseNotify")
if pc.isClosed.get() {
return
}

pc.log.Info("Closing PeerConnection from DTLS CloseNotify")
go func() {
if pcClosErr := pc.Close(); pcClosErr != nil {
pc.log.Warnf("Failed to close PeerConnection from DTLS CloseNotify: %s", pcClosErr)
Expand Down
Loading

0 comments on commit 9dc532b

Please sign in to comment.