From 61782c904c2d4b13ce42d526cc322ce564af7c37 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 28 Jun 2018 01:02:23 +0200 Subject: [PATCH 1/3] Reset streams when no reading is going to be done anymore --- p2p/net/gostream/conn.go | 2 +- p2p/net/gostream/listener.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/net/gostream/conn.go b/p2p/net/gostream/conn.go index 1351beec60..5d28cd3047 100644 --- a/p2p/net/gostream/conn.go +++ b/p2p/net/gostream/conn.go @@ -35,7 +35,7 @@ func (c *conn) Write(b []byte) (n int, err error) { // Close closes the connection. // Any blocked Read or Write operations will be unblocked and return errors. func (c *conn) Close() error { - return c.s.Close() + return c.s.Reset() } // LocalAddr returns the local network address. diff --git a/p2p/net/gostream/listener.go b/p2p/net/gostream/listener.go index 9f37a956ca..e66c262a70 100644 --- a/p2p/net/gostream/listener.go +++ b/p2p/net/gostream/listener.go @@ -63,7 +63,7 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) { select { case l.streamCh <- s: case <-ctx.Done(): - s.Close() + s.Reset() } }) From d33d7e8c28500b79d3bac836d1329ff62593dca9 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 28 Jun 2018 01:17:33 +0200 Subject: [PATCH 2/3] Reset streams: do not close server while reading --- p2p/net/gostream/gostream_test.go | 40 ++++++++++++++++++------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/p2p/net/gostream/gostream_test.go b/p2p/net/gostream/gostream_test.go index d2e6937c9e..cdbd5be124 100644 --- a/p2p/net/gostream/gostream_test.go +++ b/p2p/net/gostream/gostream_test.go @@ -3,7 +3,6 @@ package gostream import ( "bufio" "context" - "io/ioutil" "testing" "time" @@ -39,8 +38,10 @@ func TestServerClient(t *testing.T) { clientHost.Peerstore().AddAddrs(srvHost.ID(), srvHost.Addrs(), peerstore.PermanentAddrTTL) var tag protocol.ID = "/testitytest" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - go func() { + go func(ctx context.Context) { listener, err := Listen(srvHost, tag) if err != nil { t.Fatal(err) @@ -58,19 +59,25 @@ func TestServerClient(t *testing.T) { defer servConn.Close() reader := bufio.NewReader(servConn) - msg, err := reader.ReadString('\n') - if err != nil { - t.Fatal(err) - } - if string(msg) != "is libp2p awesome?\n" { - t.Fatalf("Bad incoming message: %s", msg) - } - - _, err = servConn.Write([]byte("yes it is")) - if err != nil { - t.Fatal(err) + for { + msg, err := reader.ReadString('\n') + if err != nil { + t.Fatal(err) + } + if string(msg) != "is libp2p awesome?\n" { + t.Fatalf("Bad incoming message: %s", msg) + } + + _, err = servConn.Write([]byte("yes it is\n")) + if err != nil { + t.Fatal(err) + } + select { + case <-ctx.Done(): + return + } } - }() + }(ctx) clientConn, err := Dial(clientHost, srvHost.ID(), tag) if err != nil { @@ -109,12 +116,13 @@ func TestServerClient(t *testing.T) { t.Fatal(err) } - resp, err := ioutil.ReadAll(clientConn) + reader := bufio.NewReader(clientConn) + resp, err := reader.ReadString('\n') if err != nil { t.Fatal(err) } - if string(resp) != "yes it is" { + if string(resp) != "yes it is\n" { t.Errorf("Bad response: %s", resp) } From d0d288b9d2fc599f5fb50dd019dc3fd63e135b8a Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 28 Jun 2018 02:13:42 +0200 Subject: [PATCH 3/3] Do not reset streams on close immediately. Close first. The reset after 1 min --- p2p/net/gostream/conn.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/p2p/net/gostream/conn.go b/p2p/net/gostream/conn.go index 5d28cd3047..dc5050ffaa 100644 --- a/p2p/net/gostream/conn.go +++ b/p2p/net/gostream/conn.go @@ -35,7 +35,12 @@ func (c *conn) Write(b []byte) (n int, err error) { // Close closes the connection. // Any blocked Read or Write operations will be unblocked and return errors. func (c *conn) Close() error { - return c.s.Reset() + if err := c.s.Close(); err != nil { + c.s.Reset() + return err + } + go pnet.AwaitEOF(c.s) + return nil } // LocalAddr returns the local network address.