-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathbufit.go
285 lines (244 loc) · 7.94 KB
/
bufit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
package bufit
import (
"container/heap"
"io"
"sync"
"sync/atomic"
)
// Reader provides an io.Reader whose methods MUST be concurrent-safe
// with the Write method of the Writer from which it was generated.
// It also MUST be safe for concurrent calls to Writer.Discard
// for bytes which have already been read by this Reader.
type Reader interface {
// Len returns the unread # of bytes in this Reader
Len() int
// Discard drops the next n bytes from the Reader, as if it were Read()
// it returns the # of bytes actually dropped. It may return io.EOF
// if all remaining bytes have been discarded.
Discard(int) (int, error)
// Read bytes into the provided buffer.
io.Reader
}
// Writer accepts bytes and generates Readers who consume those bytes.
// Generated Readers methods must be concurrent-safe with the Write method.
type Writer interface {
// Len returns the # of bytes buffered for Readers
Len() int
// Discard drops the next n buffered bytes. It returns the actual number of
// bytes dropped and may return io.EOF if all remaining bytes have been
// discarded. Discard must be concurrent-safe with methods calls
// on generated Readers, when discarding bytes that have been read
// by all Readers.
Discard(int) (int, error)
// NextReader returns a Reader which reads a "snapshot" of the current written bytes
// (excluding discarded bytes). The Reader should work independently of the Writer
// and be concurrent-safe with the Write method on the Writer.
NextReader() Reader
// Write writes the given bytes into the Writer's underlying buffer. Which will
// be available for reading using NextReader() to grab a snapshot of the current
// written bytes.
io.Writer
}
// Buffer is used to provide multiple readers with access to a shared buffer.
// Readers may join/leave at any time, however a joining reader will only
// see whats currently in the buffer onwards. Data is evicted from the buffer
// once all active readers have read that section.
type Buffer struct {
mu sync.Mutex
rwait *sync.Cond
wwait *sync.Cond
off int
rh readerHeap
buf Writer
cap int
keep int
life
callback atomic.Value
}
type life struct {
state int32
}
func (l *life) alive() bool { return atomic.LoadInt32(&l.state) == 0 }
func (l *life) kill() { atomic.AddInt32(&l.state, 1) }
// Keep sets the minimum amount of bytes to keep in the buffer even if all
// other current readers have read those bytes. This allows new readers
// to join slightly behind.
// Keep is safe to call concurrently with other methods.
// Fewer than keep bytes may be in the buffer if less than keep bytes have
// been written since keep was set.
// If this buffer has a cap, it is invalid to call this method with a keep >= cap
// since the buffer would never be able to write new bytes once it reached
// the cap.
func (b *Buffer) Keep(keep int) {
b.mu.Lock()
defer b.mu.Unlock()
if keep >= 0 && (b.cap == 0 || keep < b.cap) {
b.keep = keep
}
}
func (b *Buffer) fetch(r *reader) {
b.mu.Lock()
defer b.mu.Unlock()
if r.alive() {
r.off += r.size
r.size = 0
heap.Fix(&b.rh, r.i)
b.shift()
}
for r.off == b.off+b.buf.Len() && b.alive() && r.alive() {
b.rwait.Wait()
}
if !r.alive() {
return
}
r.data = b.buf.NextReader()
r.data.Discard(r.off - b.off)
r.size = r.data.Len()
}
func (b *Buffer) drop(r *reader) {
b.mu.Lock()
if len(b.rh) == 1 { // this is the last reader
if call := b.callback.Load(); call != nil { // callback is registered
defer call.(func() error)() // run this after we've unlocked
}
}
defer b.rwait.Broadcast() // wake up and blocking reads
defer b.mu.Unlock()
b.shift() // remove bytes read if this was the peek
heap.Remove(&b.rh, r.i)
b.shift() // shift to next peek
}
func (b *Buffer) shift() {
l := b.buf.Len()
if l == 0 || l <= b.keep || b.rh.Len() == 0 {
return
}
if diff := b.rh.Peek().off - b.off; diff > 0 {
if l < b.keep+diff {
diff = l - b.keep
}
b.buf.Discard(diff)
b.off += diff
b.wwait.Broadcast()
}
}
// NumReaders returns the number of readers returned by NextReader() which have not called Reader.Close().
// This method is safe to call concurrently with all methods.
func (b *Buffer) NumReaders() int {
b.mu.Lock()
defer b.mu.Unlock()
return len(b.rh)
}
// OnLastReaderClose registers the passed callback to be run after any call to Reader.Close() which drops the NumReaders() to 0.
// This method is safe to call concurrently with all other methods and Reader methods, however it's only guaranteed to be triggered if it completes before
// the Reader.Close call which would trigger it.
func (b *Buffer) OnLastReaderClose(runOnLastClose func() error) {
b.callback.Store(runOnLastClose)
}
// NextReader returns a new io.ReadCloser for this shared buffer.
// Read/Close are safe to call concurrently with the buffers Write/Close methods.
// Read calls will block if the Buffer is not Closed and contains no data.
// Note that the returned reader sees all data that is currently in the buffer,
// data is only dropped out of the buffer once all active readers point to
// locations in the buffer after that section.
func (b *Buffer) NextReader() io.ReadCloser {
b.mu.Lock()
defer b.mu.Unlock()
r := &reader{
buf: b,
size: b.buf.Len(),
off: b.off,
data: b.buf.NextReader(),
}
heap.Push(&b.rh, r)
return r
}
// NextReaderFromNow returns a new io.ReadCloser for this shared buffer.
// Unlike NextReader(), this reader will only see writes which occur after this reader is returned
// even if there is other data in the buffer. In other words, this reader points to the end
// of the buffer.
func (b *Buffer) NextReaderFromNow() io.ReadCloser {
b.mu.Lock()
defer b.mu.Unlock()
l := b.buf.Len()
r := &reader{
buf: b,
off: b.off + l,
data: b.buf.NextReader(),
}
r.data.Discard(l)
heap.Push(&b.rh, r)
return r
}
// Len returns the current size of the buffer. This is safe to call concurrently with all other methods.
func (b *Buffer) Len() int {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.Len()
}
// Write appends the given data to the buffer. All active readers will
// see this write.
func (b *Buffer) Write(p []byte) (int, error) {
if !b.alive() {
return 0, io.ErrClosedPipe
}
b.mu.Lock()
defer b.rwait.Broadcast()
defer b.mu.Unlock()
if !b.alive() {
return 0, io.ErrClosedPipe
}
var m, n int
var err error
for len(p[n:]) > 0 && err == nil { // bytes left to write
for b.cap > 0 && b.buf.Len() == b.cap && b.alive() { // wait for space
b.wwait.Wait()
}
if !b.alive() {
return n, io.ErrClosedPipe
}
if b.cap == 0 || b.cap-b.buf.Len() > len(p[n:]) { // remaining bytes fit in gap, or no cap.
m, err := b.buf.Write(p[n:])
return n + m, err
}
gap := b.cap - b.buf.Len() // there is a cap, and we didn't fit in the gap
m, err = b.buf.Write(p[n : n+gap])
n += m
b.rwait.Broadcast() // wake up readers to read the partial write
}
return n, err
}
// Close marks the buffer as complete. Readers will return io.EOF instead of blocking
// when they reach the end of the buffer.
func (b *Buffer) Close() error {
b.mu.Lock()
defer b.rwait.Broadcast() // readers should wake up since there will be no more writes
defer b.wwait.Broadcast() // writers should wake up since blocking writes should unblock
defer b.mu.Unlock()
b.kill()
return nil
}
// NewBuffer creates and returns a new Buffer backed by the passed Writer
func NewBuffer(w Writer) *Buffer {
return NewCappedBuffer(w, 0)
}
// New creates and returns a new Buffer
func New() *Buffer {
return NewBuffer(NewMemoryWriter(nil))
}
// NewCapped creates a new in-memory Buffer whose Write() call blocks to prevent Len() from exceeding
// the passed capacity
func NewCapped(cap int) *Buffer {
return NewCappedBuffer(NewMemoryWriter(nil), cap)
}
// NewCappedBuffer creates a new Buffer whose Write() call blocks to prevent Len() from exceeding
// the passed capacity
func NewCappedBuffer(w Writer, cap int) *Buffer {
buf := Buffer{
buf: w,
cap: cap,
}
buf.rwait = sync.NewCond(&buf.mu)
buf.wwait = sync.NewCond(&buf.mu)
return &buf
}