Skip to content

Commit

Permalink
connection: fix: reader go-routine is leaked on connection close
Browse files Browse the repository at this point in the history
When a message was sent and it's response was received while the
connection was closed or an error happened, the reader go-routine could
get stuck and be leaked.

The reader go routine tries to send a received message to the unbuffered
c.rpc channel via the dispatch0() and dispatchN() methods.
The call() method reads from the rpc channel.
If an error happened while the dispatch method sends a message to the
rpc channel, the call() method could terminate because it read an error
from c.errors or because c.errors was closed.

To prevent the scenario:
- the reader go-routine now closes c.rpc when it terminates,
- The call() method, reads from c.rpc until a message was received or it
  is closed. When c.rpc is closed, it reads an error from c.errors or
  wait until c.errors is closed.
  When it reads an error, it returns it. If it is closed it returns
  ErrClosed.

This ensures that the messages is read from c.rpc before call() returns.
It also ensures that when a message was received that it is processed.
Previously it could happen that the message was silently ignored because
c.errors returned an error or was closed.
  • Loading branch information
fho committed Apr 13, 2022
1 parent 6cac2fa commit 12955c2
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ func (c *Connection) reader(r io.Reader) {
frames := &reader{buf}
conn, haveDeadliner := r.(readDeadliner)

defer close(c.rpc)

for {
frame, err := frames.ReadFrame()

Expand Down Expand Up @@ -689,27 +691,26 @@ func (c *Connection) call(req message, res ...message) error {
}
}

select {
case err, ok := <-c.errors:
if !ok {
msg, ok := <-c.rpc
if !ok {
err, errorsChanIsOpen := <-c.errors
if !errorsChanIsOpen {
return ErrClosed
}
return err
}

case msg := <-c.rpc:
// Try to match one of the result types
for _, try := range res {
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
// *res = *msg
vres := reflect.ValueOf(try).Elem()
vmsg := reflect.ValueOf(msg).Elem()
vres.Set(vmsg)
return nil
}
// Try to match one of the result types
for _, try := range res {
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
// *res = *msg
vres := reflect.ValueOf(try).Elem()
vmsg := reflect.ValueOf(msg).Elem()
vres.Set(vmsg)
return nil
}
return ErrCommandInvalid
}
// unreachable
return ErrCommandInvalid
}

// Connection = open-Connection *use-Connection close-Connection
Expand Down

0 comments on commit 12955c2

Please sign in to comment.