Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternate strategies when buffer is full #10

Open
eric opened this issue May 20, 2022 · 9 comments
Open

Alternate strategies when buffer is full #10

eric opened this issue May 20, 2022 · 9 comments

Comments

@eric
Copy link

eric commented May 20, 2022

We've run into situations where having Write() block when the capped buffer is full is not desirable.

It would be nice to have alternative strategies for handling this scenario. The ones that have come to mind are:

  1. Moving the offset forward on the readers with the biggest buffer (and hopefully counting the dropped bytes)
  2. Closing the reader (hopefully while provided an error response to Read()) as long as there in more than one reader
  3. Closing the reader (with the error reporting above) regardless of if it's the last reader or not

I investigated what it would take to handle this, and it felt like something along the lines of this would be the right direction. I would appreciate feedback if this is a good avenue to pursue and if this is something you would be interested in:

diff --git a/bufit.go b/bufit.go
index ec3a955..2c68f0f 100644
--- a/bufit.go
+++ b/bufit.go
@@ -50,6 +50,14 @@ type Writer interface {
 	io.Writer
 }
 
+type BufferFullBehavior int
+
+const (
+	BufferFullBehaviorWait BufferFullBehavior = iota
+	BufferFullBehaviorDropExtraReader
+	BufferFullBehaviorDropReader
+)
+
 // Buffer is used to provide multiple readers with access to a shared buffer.
 // Readers may join/leave at any time, however a joining reader will only
 // see whats currently in the buffer onwards. Data is evicted from the buffer
@@ -63,6 +71,7 @@ type Buffer struct {
 	buf   Writer
 	cap   int
 	keep  int
+	bfb   BufferFullBehavior
 	life
 	callback atomic.Value
 }
@@ -225,7 +234,25 @@ func (b *Buffer) Write(p []byte) (int, error) {
 	for len(p[n:]) > 0 && err == nil { // bytes left to write
 
 		for b.cap > 0 && b.buf.Len() == b.cap && b.alive() { // wait for space
-			b.wwait.Wait()
+			switch b.bfb {
+			case BufferFullBehaviorWait:
+				b.wwait.Wait()
+			case BufferFullBehaviorDropExtraReader:
+				if len(b.rh) > 1 {
+					r := b.rh.Peek()
+					heap.Remove(&b.rh, r.i)
+					b.shift() // shift to next peek
+
+					continue
+				}
+			case BufferFullBehaviorDropReader:
+					r := b.rh.Peek()
+					heap.Remove(&b.rh, r.i)
+					b.shift() // shift to next peek
+
+					continue
+				}
+			}
 		}
 
 		if !b.alive() {
@djherbis
Copy link
Owner

Hey @eric sorry for the slow response, things are super busy for me lately!

So basically you'd like to be able to drop slow readers when they impede the flow. I think an idea like this is interesting.
I think when dropping a reader you'll want to do b.rh.Peek().Close() to properly disconnect the reader (it will also cleanup the heap by dropping the reader).

I'll ponder for a bit about what the API should look like, but I think it's a reasonable ask.

As an aside, what's your use case like? I'm always curious how these libs are used.

@eric
Copy link
Author

eric commented May 31, 2022

I think when dropping a reader you'll want to do b.rh.Peek().Close() to properly disconnect the reader (it will also cleanup the heap by dropping the reader).

One of the things I struggled with was how to indicate to the reader that it had been closed due to a case other than an EOF.

As an aside, what's your use case like? I'm always curious how these libs are used.

We are using it in Channels DVR to send video data to multiple clients. If one of the clients is running too slowly it was causing others to not be able to receive the latest video, which turned out to not be what we were looking for.

I've been really impressed by this library — it's far more elaborate and efficient than I had anticipated before I dug into the code.

@tmm1
Copy link
Contributor

tmm1 commented Jun 2, 2022

We are using it in Channels DVR to send video data to multiple clients.

The clients are connected over http. We explored trying to detect the stalled writes at that layer, so we could close those connections and let the buffer move forward as expected. However it seems there is no way to do so with net/http (golang/go#16100 (comment))

@djherbis
Copy link
Owner

djherbis commented Jun 6, 2022

Hey just wanted to say I started looking into this over the weekend, and ran into a couple complications.
Mainly just that currently NextReader() returns a reader which is not concurrent-safe with being closed while it's being read.
The currently suggested solution would free memory being pointed to by readers that may be actively reading it.

I'm still thinking about the right way to handle it (ex. lock in the reader when touching the buffer, so that we can remove the Reader safely from the Writer goroutine).

Currently, the readers basically work like this:

  • Fetch the currently available data, marking that section of data as being held by a reader in the heap
  • Read the entire section of data
  • Fetch the next available data, and at the same time mark that last section of data as read by this reader (possibly freeing space in the shared memory).

Technically, we could "free" memory after every Read call instead of once the entire current buffer has been read, but I'm not sure yet if the increased contention on the writer lock would be worth the potential extra space in the buffer (for capped buffers).

In the case of your slow readers, are they "fully stalled" (no progress) or just much slower than the other readers?

@eric
Copy link
Author

eric commented Jun 6, 2022

In the case of your slow readers, are they "fully stalled" (no progress) or just much slower than the other readers?

I believe it’s some of each. The test cases we’ve had provided to us are fully stalled, but we suspect in real usage it’ll more likely be slow readers. It seems we will need to solve the fully stalled regardless.

@mstaack
Copy link

mstaack commented Mar 10, 2023

I have also seen issues with stalled/dead readers. hopefully a solution will make it into this repo sometime :)

@tmm1 gave me the idea of using go 1.20 responseController feature, is this a way to go?:

	controller := http.NewResponseController(writer)
	for {
		controller.SetWriteDeadline(time.Now().Add(time.Second * 3))
		_, err := io.CopyN(writer, reader, 1024*128) // 128KB
		if err != nil {
			break
		}
	}

from the source

// SetWriteDeadline sets the deadline for writing the response.
// Writes to the response body after the deadline has been exceeded will not block,
// but may succeed if the data has been buffered.
// A zero value means no deadline.
//
// Setting the write deadline after it has been exceeded will not extend it.
func (c *ResponseController) SetWriteDeadline(deadline time.Time) error {

@djherbis
Copy link
Owner

@mstaack That sounds like a good solution (probably even the "right" solution more generally).

Looking back at this, a stalled reader isn't stalled in bufit itself. It's stalled because the application has stopped calling Read. That's happening most likely because of a stalled "Write" in a an io.Copy from the bufit Reader to somewhere else. So breaking that stalled Write is a better fix since not only will this unblock the stalled Reader (you can Close it) but it will unblock that stalled Goroutine as well. Nearly any other solution to "killing" a stalled Reader would still leave that stalled Reader goroutine hanging because its not blocked on the Read call, its blocked on the Write call.

@eric
Copy link
Author

eric commented Mar 20, 2023

For us, not blocking the writer was our primary objective. Because of that, we ended up implementing a much simpler (and less efficient) buffering mechanism that would provide strategies to drop some or all of the buffer for a slow reader (and store statistics on how much was lost) which also allowed us to identify slow vs stalled readers. It let us see slow/failing disks in some instances.

@djherbis
Copy link
Owner

@eric I'm glad you found a workaround.

Just to clarify, the other suggestion here also should mostly unblock the bufit.Buffer.Write call as well though, since it would unblock any "stuck" reader copies.

Something like this (untested):

func main () {
  buf := bufit.NewCapped(1)
  r1, r2 := buf.NextReader(), buf.NextReader()

  // This reader should never stall because the writer is always available.
  go io.Copy(ioutil.Discard, r1)

  // Create a pipe which will stall because we won't ever read the other side.
  c, _ := net.Pipe()

  // Wrap the connection with a write timeout on every write:
  cWithTimeout:= &writerWithTimeout{w: c1, to: 5*time.Second}

  // This copy would normally stall, but the timeout will cause it to break if any write stalls for 5s.
  go io.Copy(cWithTimeout, r2)

  // This write would block if a one reader is "stuck", but won't block for long because we break stalled copies.
  io.WriteString(buf, "Hello World\n")
}

// ...definitions...

type writerWtihTimeout struct{
  w interface{ io.Writer, SetWriteDeadline(time.Time) error }
  to time.Duration
}

func (w *writerWithTimeout) Write(p []byte) (int, error) {
  w.w.SetWriteDeadline(time.Now().Add(w.to))
  return w.w.Write(p)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants