From 12955c2751b629a180069b7e7a610fe5038b5a1b Mon Sep 17 00:00:00 2001 From: Fabian Holler Date: Wed, 13 Apr 2022 15:45:52 +0200 Subject: [PATCH] connection: fix: reader go-routine is leaked on connection close When a message was sent and it's response was received while the connection was closed or an error happened, the reader go-routine could get stuck and be leaked. The reader go routine tries to send a received message to the unbuffered c.rpc channel via the dispatch0() and dispatchN() methods. The call() method reads from the rpc channel. If an error happened while the dispatch method sends a message to the rpc channel, the call() method could terminate because it read an error from c.errors or because c.errors was closed. To prevent the scenario: - the reader go-routine now closes c.rpc when it terminates, - The call() method, reads from c.rpc until a message was received or it is closed. When c.rpc is closed, it reads an error from c.errors or wait until c.errors is closed. When it reads an error, it returns it. If it is closed it returns ErrClosed. This ensures that the messages is read from c.rpc before call() returns. It also ensures that when a message was received that it is processed. Previously it could happen that the message was silently ignored because c.errors returned an error or was closed. --- connection.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/connection.go b/connection.go index 83ed165..f529020 100644 --- a/connection.go +++ b/connection.go @@ -539,6 +539,8 @@ func (c *Connection) reader(r io.Reader) { frames := &reader{buf} conn, haveDeadliner := r.(readDeadliner) + defer close(c.rpc) + for { frame, err := frames.ReadFrame() @@ -689,27 +691,26 @@ func (c *Connection) call(req message, res ...message) error { } } - select { - case err, ok := <-c.errors: - if !ok { + msg, ok := <-c.rpc + if !ok { + err, errorsChanIsOpen := <-c.errors + if !errorsChanIsOpen { return ErrClosed } return err + } - case msg := <-c.rpc: - // Try to match one of the result types - for _, try := range res { - if reflect.TypeOf(msg) == reflect.TypeOf(try) { - // *res = *msg - vres := reflect.ValueOf(try).Elem() - vmsg := reflect.ValueOf(msg).Elem() - vres.Set(vmsg) - return nil - } + // Try to match one of the result types + for _, try := range res { + if reflect.TypeOf(msg) == reflect.TypeOf(try) { + // *res = *msg + vres := reflect.ValueOf(try).Elem() + vmsg := reflect.ValueOf(msg).Elem() + vres.Set(vmsg) + return nil } - return ErrCommandInvalid } - // unreachable + return ErrCommandInvalid } // Connection = open-Connection *use-Connection close-Connection