-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alternate strategies when buffer is full #10
Comments
Hey @eric sorry for the slow response, things are super busy for me lately! So basically you'd like to be able to drop slow readers when they impede the flow. I think an idea like this is interesting. I'll ponder for a bit about what the API should look like, but I think it's a reasonable ask. As an aside, what's your use case like? I'm always curious how these libs are used. |
One of the things I struggled with was how to indicate to the reader that it had been closed due to a case other than an EOF.
We are using it in Channels DVR to send video data to multiple clients. If one of the clients is running too slowly it was causing others to not be able to receive the latest video, which turned out to not be what we were looking for. I've been really impressed by this library — it's far more elaborate and efficient than I had anticipated before I dug into the code. |
The clients are connected over http. We explored trying to detect the stalled writes at that layer, so we could close those connections and let the buffer move forward as expected. However it seems there is no way to do so with net/http (golang/go#16100 (comment)) |
Hey just wanted to say I started looking into this over the weekend, and ran into a couple complications. I'm still thinking about the right way to handle it (ex. lock in the reader when touching the buffer, so that we can remove the Reader safely from the Writer goroutine). Currently, the readers basically work like this:
Technically, we could "free" memory after every Read call instead of once the entire current buffer has been read, but I'm not sure yet if the increased contention on the writer lock would be worth the potential extra space in the buffer (for capped buffers). In the case of your slow readers, are they "fully stalled" (no progress) or just much slower than the other readers? |
I believe it’s some of each. The test cases we’ve had provided to us are fully stalled, but we suspect in real usage it’ll more likely be slow readers. It seems we will need to solve the fully stalled regardless. |
I have also seen issues with stalled/dead readers. hopefully a solution will make it into this repo sometime :) @tmm1 gave me the idea of using go 1.20 responseController feature, is this a way to go?: controller := http.NewResponseController(writer)
for {
controller.SetWriteDeadline(time.Now().Add(time.Second * 3))
_, err := io.CopyN(writer, reader, 1024*128) // 128KB
if err != nil {
break
}
} from the source // SetWriteDeadline sets the deadline for writing the response.
// Writes to the response body after the deadline has been exceeded will not block,
// but may succeed if the data has been buffered.
// A zero value means no deadline.
//
// Setting the write deadline after it has been exceeded will not extend it.
func (c *ResponseController) SetWriteDeadline(deadline time.Time) error { |
@mstaack That sounds like a good solution (probably even the "right" solution more generally). Looking back at this, a stalled reader isn't stalled in bufit itself. It's stalled because the application has stopped calling Read. That's happening most likely because of a stalled "Write" in a an io.Copy from the bufit Reader to somewhere else. So breaking that stalled Write is a better fix since not only will this unblock the stalled Reader (you can Close it) but it will unblock that stalled Goroutine as well. Nearly any other solution to "killing" a stalled Reader would still leave that stalled Reader goroutine hanging because its not blocked on the Read call, its blocked on the Write call. |
For us, not blocking the writer was our primary objective. Because of that, we ended up implementing a much simpler (and less efficient) buffering mechanism that would provide strategies to drop some or all of the buffer for a slow reader (and store statistics on how much was lost) which also allowed us to identify slow vs stalled readers. It let us see slow/failing disks in some instances. |
@eric I'm glad you found a workaround. Just to clarify, the other suggestion here also should mostly unblock the bufit.Buffer.Write call as well though, since it would unblock any "stuck" reader copies. Something like this (untested): func main () {
buf := bufit.NewCapped(1)
r1, r2 := buf.NextReader(), buf.NextReader()
// This reader should never stall because the writer is always available.
go io.Copy(ioutil.Discard, r1)
// Create a pipe which will stall because we won't ever read the other side.
c, _ := net.Pipe()
// Wrap the connection with a write timeout on every write:
cWithTimeout:= &writerWithTimeout{w: c1, to: 5*time.Second}
// This copy would normally stall, but the timeout will cause it to break if any write stalls for 5s.
go io.Copy(cWithTimeout, r2)
// This write would block if a one reader is "stuck", but won't block for long because we break stalled copies.
io.WriteString(buf, "Hello World\n")
}
// ...definitions...
type writerWtihTimeout struct{
w interface{ io.Writer, SetWriteDeadline(time.Time) error }
to time.Duration
}
func (w *writerWithTimeout) Write(p []byte) (int, error) {
w.w.SetWriteDeadline(time.Now().Add(w.to))
return w.w.Write(p)
} |
We've run into situations where having
Write()
block when the capped buffer is full is not desirable.It would be nice to have alternative strategies for handling this scenario. The ones that have come to mind are:
error
response toRead()
) as long as there in more than one readerI investigated what it would take to handle this, and it felt like something along the lines of this would be the right direction. I would appreciate feedback if this is a good avenue to pursue and if this is something you would be interested in:
The text was updated successfully, but these errors were encountered: