Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libp2phttp: workaround for ResponseWriter's CloseNotifier #2821

Merged
merged 2 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/http/libp2phttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type WellKnownHandler struct {

// streamHostListen returns a net.Listener that listens on libp2p streams for HTTP/1.1 messages.
func streamHostListen(streamHost host.Host) (net.Listener, error) {
return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect)
return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect, gostream.IgnoreEOF())
}

func (h *WellKnownHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down
36 changes: 36 additions & 0 deletions p2p/http/libp2phttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,3 +719,39 @@ func TestServerLegacyWellKnownResource(t *testing.T) {
}

}

func TestResponseWriterShouldNotHaveCancelledContext(t *testing.T) {
h, err := libp2p.New()
require.NoError(t, err)
defer h.Close()
httpHost := libp2phttp.Host{StreamHost: h}
go httpHost.Serve()
defer httpHost.Close()

closeNotifyCh := make(chan bool, 1)
httpHost.SetHTTPHandlerAtPath("/test", "/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Legacy code uses this to check if the connection was closed
//lint:ignore SA1019 This is a test to assert we do the right thing since Go HTTP stdlib depends on this.
ch := w.(http.CloseNotifier).CloseNotify()
select {
case <-ch:
closeNotifyCh <- true
case <-time.After(100 * time.Millisecond):
closeNotifyCh <- false
}
w.WriteHeader(http.StatusOK)
}))

clientH, err := libp2p.New()
require.NoError(t, err)
defer clientH.Close()
clientHost := libp2phttp.Host{StreamHost: clientH}

rt, err := clientHost.NewConstrainedRoundTripper(peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
require.NoError(t, err)
httpClient := &http.Client{Transport: rt}
_, err = httpClient.Get("/")
require.NoError(t, err)

require.False(t, <-closeNotifyCh)
}
16 changes: 13 additions & 3 deletions p2p/net/gostream/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gostream

import (
"context"
"io"
"net"

"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -14,11 +15,20 @@ import (
// libp2p streams.
type conn struct {
network.Stream
ignoreEOF bool
}

func (c *conn) Read(b []byte) (int, error) {
n, err := c.Stream.Read(b)
if err != nil && c.ignoreEOF && err == io.EOF {
return n, nil
}
return n, err
}

// newConn creates a conn given a libp2p stream
func newConn(s network.Stream) net.Conn {
return &conn{s}
func newConn(s network.Stream, ignoreEOF bool) net.Conn {
return &conn{s, ignoreEOF}
}

// LocalAddr returns the local network address.
Expand All @@ -39,5 +49,5 @@ func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.C
if err != nil {
return nil, err
}
return newConn(s), nil
return newConn(s, false), nil
}
22 changes: 20 additions & 2 deletions p2p/net/gostream/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type listener struct {
tag protocol.ID
cancel func()
streamCh chan network.Stream
// ignoreEOF is a flag that tells the listener to return conns that ignore EOF errors.
// Necessary because the default responsewriter will consider a connection closed if it reads EOF.
// But when on streams, it's fine for us to read EOF, but still be able to write.
ignoreEOF bool
}

// Accept returns the next a connection to this listener.
Expand All @@ -26,7 +30,7 @@ type listener struct {
func (l *listener) Accept() (net.Conn, error) {
select {
case s := <-l.streamCh:
return newConn(s), nil
return newConn(s, l.ignoreEOF), nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
}
Expand All @@ -48,7 +52,7 @@ func (l *listener) Addr() net.Addr {
// Listen provides a standard net.Listener ready to accept "connections".
// Under the hood, these connections are libp2p streams tagged with the
// given protocol.ID.
func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
func Listen(h host.Host, tag protocol.ID, opts ...ListenerOption) (net.Listener, error) {
ctx, cancel := context.WithCancel(context.Background())

l := &listener{
Expand All @@ -58,6 +62,11 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
tag: tag,
streamCh: make(chan network.Stream),
}
for _, opt := range opts {
if err := opt(l); err != nil {
return nil, err
}
}

h.SetStreamHandler(tag, func(s network.Stream) {
select {
Expand All @@ -69,3 +78,12 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {

return l, nil
}

type ListenerOption func(*listener) error

func IgnoreEOF() ListenerOption {
return func(l *listener) error {
l.ignoreEOF = true
return nil
}
}
Loading