Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream.Write() waits indefinitely if stream is full. Cancellations apply to entire stream, not messages. #1229

Closed
mandarjog opened this issue May 10, 2017 · 13 comments
Assignees

Comments

@mandarjog
Copy link

mandarjog commented May 10, 2017

We ran into
an indefinite wait issue.
https://github.com/istio/mixer/blob/a15700f0423e8fc0d96c154675bc37ff70aae0aa/pkg/api/grpcServer.go#L138

The issue was the grpc stream was full. The stream was being protected by a mutex.
The go-routine actively writing a message blocked indefinitely.
The only cancellation available is on the entire stream.

It would be useful to have
stream.Write(msg, timeout)

@mehrdada
Copy link
Member

It's not clear how we can cancel a Send once you have started it without resetting the stream. This sounds like a protocol level limitation.

@louiscryan
Copy link
Contributor

+ejona86

don't think this is a protocol issue at all. This is an issue with blocking writes in general, if the application can't be unblocked then we have a problem.

I do agree that if the write can't complete, possibly for flow-control reasons, then the RST_STREAM might be necessary to unblock if partial writes have occurred but that's already an allowed feature in the spec.

This problem becomes exacerbated for streaming RPCs where the semantics of the call deadlines are ill-defined

@mehrdada
Copy link
Member

@louiscryan Right. That is absolutely correct. The OP's requirement of cancelling writes without resetting stream was what I was referring to. Having a cancellable API is not.

However, I think doing that is going more than halfway the path of adding a fully async API, so if we were to do that, one might as well go all the way and make it return a Future.

@mehrdada mehrdada reopened this May 11, 2017
@mandarjog
Copy link
Author

Returning a Future would be even better.

@dfawley
Copy link
Member

dfawley commented May 11, 2017

Sorry for closing this issue too quickly. We thought the whole issue was about trying to salvage the stream even though the receiver isn't reading off the channel. It does seem reasonable to have a way of cancelling a send without relying upon the context's deadline, which the server can't influence.

If I understand your objective correctly, it's possible to achieve the behavior of adding a deadline to an individual SendMsg call as follows:

done := make(chan struct{})
go func () {
  stream.SendMsg(response)
  close(done)
}()
t := time.NewTimer(5*time.Second)
select {
  case <-t.C:
    return status.Errorf(codes.DeadlineExceeded, "too slow")
  case <-done:
    if !t.Stop() {
      <-t.C
    }
}

This is clearly too much boilerplate for sending a message with a timeout. We could simplify this significantly if we added an explicit Cancel method to the ServerStream type. The resulting code would look something like this:

t := time.AfterFunc(5*time.Second, stream.Cancel)
stream.SendMsg()
if !t.Stop() {
  <- t.C
}

Would adding the ability to cancel the stream without needing to return from the handler be sufficient to resolve this issue? (Note that the client side can already implement this pattern, because it created the context used to perform the call, so it can use the associated cancel function.)

@mandarjog
Copy link
Author

The last option is more usable, and I agree that cancellation are very general purpose.

However for the specific use case,
stream := createStream().withWriteTimeout("1s")
would be even more usable.

@dfawley
Copy link
Member

dfawley commented May 18, 2017

However for the specific use case,
stream := createStream().withWriteTimeout("1s")
would be even more usable.

Only the client creates the stream, so this wouldn't work for the server.

If we do something to make timeouts first-class, it should really be set per-send. Unfortunately, our hands are a bit tied here: any change will break the API, because {Client,Server}Stream are interfaces -- even adding a variadic SendOptions-type parameter to Send, which would be backward compatible if the stream wasn't in an interface.

@mwitkow
Copy link
Contributor

mwitkow commented May 19, 2017

@dfawley can we get your option but with using a context.Context for timeouts. E.g:

ctx := context.WithTimeout(parentCtx, 1 * time.Second)
err := stream.SendMsg(ctx)

and the err optionally returning a context.Deadline exceeded which can be retried by the sender.

This also means that all cleanups on parentCtx will work.

@dfawley
Copy link
Member

dfawley commented May 19, 2017

Retrying on a stream after a timeout is basically impossible.

Technically it is possible in certain circumstances, but if you sent part of the message on the wire before the timeout, but not all of it, the stream must be reset (i.e. closed). Because of this, we would need to distinguish to the caller whether or not that happened, which is confusing. And if a timeout ever happens, it means the receiver isn't receiving its pending data for whatever reason, so what would be the goal of continuing to use the stream in that case? End the stream and let the client initiate a new one if desired. It's also somewhat arbitrary to give up in this case if the goal is not to end the stream: if there is sufficient flow control for the message, the send will return quickly, but there's no guarantee the client ever requests the data.

Using a context to specify timeouts is fine, but SendMsg doesn't currently accept a context. Backward compatibility must be broken to support per-send timeouts. For this reason, unfortunately, it's unlikely anything will happen for this any time soon.

I think for now the best idea is to wrap the code in #1229 (comment) in a function and call it where desired. It can be written fairly generically, too:

// DoWithTimeout runs f and returns its error.  If the deadline d elapses first,
// it returns a grpc DeadlineExceeded error instead.
func DoWithTimeout(f func() error, d time.Duration) error {
  errChan := make(chan error, 1)
  go func () {
    errChan <- f()
    close(errChan)
  }()
  t := time.NewTimer(d)
  select {
    case <-t.C:
      return status.Errorf(codes.DeadlineExceeded, "too slow")
    case err := <-errChan:
      if !t.Stop() {
        <-t.C
      }
      return err
  }
}

func (s *server) MyHandler(stream pb.Service_MyHandlerServer) error {
  for <work to do> {
    if err := DoWithTimeout(func() error { return stream.SendMsg(<data>) }, 5*time.Second); err != nil {
      return err
    }
  }
  return nil
}

Or with a context instead:

// DoWithContext runs f and returns its error.  If the context is cancelled or
// times out first, it returns the context's error instead.
func DoWithContext(ctx context.Context, f func() error) error {
  errChan := make(chan error, 1)
  go func () {
    errChan <- f()
    close(errChan)
  }()
  select {
    case <-ctx.Done():
      return ctx.Err()
    case err := <-errChan:
      if !t.Stop() {
        <-t.C
      }
      return err
  }
}

tl;dr: this is something we would really like to support, but it requires breaking the API, and there is a fairly reasonable workaround. This makes it both hard and low priority, so I'm going to close this for now.

@disksing
Copy link

Just in case someone run into this issue, the done := make(chan struct{}) in above code snippet should be done := make(chan struct{}, 1), or the sender goroutine will leak if receiver timeout. /cc @dfawley

@dfawley
Copy link
Member

dfawley commented Sep 26, 2017

Good catch, thanks. I updated the examples.

@rfarnham
Copy link

rfarnham commented Sep 7, 2018

I have adopted the workaround but it still doesn't solve the problem completely:

  errChan := make(chan error, 1)
  go func () {
    // conn is a gRPC server stream
    err := conn.Send(message)
    errChan <- err
    close(errChan)
  }()

In the case the client has become unresponsive, the goroutine remains blocked indefinitely on conn.Send. Is there a way to forcibly close the connection from the server side, so that conn.Send will return abnormally and be able to bounce the sender goroutine?

@dfawley
Copy link
Member

dfawley commented Sep 7, 2018

Is there a way to forcibly close the connection from the server side, so that conn.Send will return abnormally and be able to bounce the sender goroutine?

You can use the examples in this comment as ways to add a timeout to your send attempt. Note the important bits of starting a timer and exiting the RPC handler. When the handler returns, the RPC ends and all Send/Recv calls in any goroutines will unblock with errors indicating the stream is closed.

@lock lock bot locked as resolved and limited conversation to collaborators Mar 6, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants