diff --git a/go.mod b/go.mod index bf6d5b7..db6e741 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ require ( github.com/ipfs/go-log v0.0.1 github.com/libp2p/go-buffer-pool v0.0.1 github.com/libp2p/go-libp2p-blankhost v0.0.1 - github.com/libp2p/go-libp2p-host v0.0.1 + github.com/libp2p/go-libp2p-host v0.0.3 github.com/libp2p/go-libp2p-net v0.0.2 github.com/libp2p/go-libp2p-peer v0.0.1 github.com/libp2p/go-libp2p-peerstore v0.0.1 diff --git a/go.sum b/go.sum index d491a45..0282ecb 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-semver v0.2.1-0.20180108230905-e214231b295a h1:U0BbGfKnviqVBJQB4etvm+mKx53KfkumNLBt6YeF/0Q= +github.com/coreos/go-semver v0.2.1-0.20180108230905-e214231b295a/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -76,8 +78,12 @@ github.com/libp2p/go-libp2p-crypto v0.0.1 h1:JNQd8CmoGTohO/akqrH16ewsqZpci2CbgYH github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE= github.com/libp2p/go-libp2p-host v0.0.1 h1:dnqusU+DheGcdxrE718kG4XgHNuL2n9eEv8Rg5zy8hQ= github.com/libp2p/go-libp2p-host v0.0.1/go.mod h1:qWd+H1yuU0m5CwzAkvbSjqKairayEHdR5MMl7Cwa7Go= +github.com/libp2p/go-libp2p-host v0.0.3 h1:BB/1Z+4X0rjKP5lbQTmjEjLbDVbrcmLOlA6QDsN5/j4= +github.com/libp2p/go-libp2p-host v0.0.3/go.mod h1:Y/qPyA6C8j2coYyos1dfRm0I8+nvd4TGrDGt4tA7JR8= github.com/libp2p/go-libp2p-interface-connmgr v0.0.1 h1:Q9EkNSLAOF+u90L88qmE9z/fTdjLh8OsJwGw74mkwk4= github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= +github.com/libp2p/go-libp2p-interface-connmgr v0.0.4 h1:/LngXETpII5qOD7YjAcQiIxhVtdAk/NQe5t9sC6BR0E= +github.com/libp2p/go-libp2p-interface-connmgr v0.0.4/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= github.com/libp2p/go-libp2p-interface-pnet v0.0.1 h1:7GnzRrBTJHEsofi1ahFdPN9Si6skwXQE9UqR2S+Pkh8= github.com/libp2p/go-libp2p-interface-pnet v0.0.1/go.mod h1:el9jHpQAXK5dnTpKA4yfCNBZXvrzdOU75zz+C6ryp3k= github.com/libp2p/go-libp2p-loggables v0.0.1 h1:HVww9oAnINIxbt69LJNkxD8lnbfgteXR97Xm4p3l9ps= diff --git a/relay.go b/relay.go index 81d5d0a..0afcf04 100644 --- a/relay.go +++ b/relay.go @@ -118,10 +118,15 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts .. func (r *Relay) addLiveHop(from, to peer.ID) { atomic.AddInt32(&r.liveHopCount, 1) + r.host.ConnManager().UpsertTag(from, "relay-hop-stream", func(v int) int { return v + 1 }) + r.host.ConnManager().UpsertTag(to, "relay-hop-stream", func(v int) int { return v + 1 }) } func (r *Relay) rmLiveHop(from, to peer.ID) { atomic.AddInt32(&r.liveHopCount, -1) + r.host.ConnManager().UpsertTag(from, "relay-hop-stream", func(v int) int { return v - 1 }) + r.host.ConnManager().UpsertTag(to, "relay-hop-stream", func(v int) int { return v - 1 }) + } func (r *Relay) GetActiveHops() int32 { @@ -364,10 +369,18 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { r.addLiveHop(src.ID, dst.ID) + goroutines := new(int32) + *goroutines = 2 + done := func() { + if atomic.AddInt32(goroutines, -1) == 0 { + r.rmLiveHop(src.ID, dst.ID) + } + } + // Don't reset streams after finishing or the other side will get an // error, not an EOF. go func() { - defer r.rmLiveHop(src.ID, dst.ID) + defer done() buf := pool.Get(HopStreamBufferSize) defer pool.Put(buf) @@ -386,6 +399,8 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { }() go func() { + defer done() + buf := pool.Get(HopStreamBufferSize) defer pool.Put(buf)