-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
autobatch: thread-safe, debounce, max delay and implement Batching #180
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,30 @@ | ||
// Package autobatch provides a go-datastore implementation that | ||
// automatically batches together writes by holding puts in memory until | ||
// a certain threshold is met. | ||
// a certain threshold is met. It also acts as a debounce. | ||
package autobatch | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
|
||
ds "github.com/ipfs/go-datastore" | ||
dsq "github.com/ipfs/go-datastore/query" | ||
logging "github.com/ipfs/go-log" | ||
) | ||
|
||
var log = logging.Logger("datastore/autobatch") | ||
|
||
// Datastore implements a go-datastore. | ||
type Datastore struct { | ||
child ds.Batching | ||
|
||
// TODO: discuss making ds.Batch implement the full ds.Datastore interface | ||
buffer map[ds.Key]op | ||
maxBufferEntries int | ||
mu sync.RWMutex | ||
buffer map[ds.Key]op | ||
|
||
maxWrite int | ||
maxDelay time.Duration | ||
newWrite chan struct{} | ||
exit chan struct{} | ||
} | ||
|
||
type op struct { | ||
|
@@ -23,28 +33,79 @@ type op struct { | |
} | ||
|
||
// NewAutoBatching returns a new datastore that automatically | ||
// batches writes using the given Batching datastore. The size | ||
// of the memory pool is given by size. | ||
func NewAutoBatching(d ds.Batching, size int) *Datastore { | ||
return &Datastore{ | ||
child: d, | ||
buffer: make(map[ds.Key]op, size), | ||
maxBufferEntries: size, | ||
// batches writes using the given Batching datastore. The maximum number of | ||
// write before triggering a batch is given by maxWrite. The maximum delay | ||
// before triggering a batch is given by maxDelay. | ||
func NewAutoBatching(child ds.Batching, maxWrite int, maxDelay time.Duration) *Datastore { | ||
d := &Datastore{ | ||
child: child, | ||
buffer: make(map[ds.Key]op, maxWrite), | ||
maxWrite: maxWrite, | ||
maxDelay: maxDelay, | ||
newWrite: make(chan struct{}), | ||
exit: make(chan struct{}), | ||
} | ||
go d.runBatcher() | ||
return d | ||
} | ||
|
||
func (d *Datastore) addOp(key ds.Key, op op) { | ||
d.mu.Lock() | ||
d.buffer[key] = op | ||
d.mu.Unlock() | ||
d.newWrite <- struct{}{} | ||
} | ||
|
||
func (d *Datastore) runBatcher() { | ||
var timer <-chan time.Time | ||
|
||
write := func() { | ||
timer = nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This leaks the timer until it fires, e.g. if there is a high rate of ops that hit the maxWrite threshold really fast, we could end up with a lot of concurrent no-op timers running. I think it'd be better to use a proper timer and stop it here. |
||
|
||
b, err := d.prepareBatch(nil) | ||
if err != nil { | ||
log.Error(err) | ||
return | ||
} | ||
err = b.Commit() | ||
if err != nil { | ||
log.Error(err) | ||
return | ||
} | ||
} | ||
|
||
for { | ||
select { | ||
case <-d.exit: | ||
return | ||
case <-timer: | ||
write() | ||
case <-d.newWrite: | ||
d.mu.RLock() | ||
ready := len(d.buffer) | ||
d.mu.RUnlock() | ||
if ready > d.maxWrite { | ||
write() | ||
} | ||
if timer == nil { | ||
timer = time.After(d.maxDelay) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Delete deletes a key/value | ||
func (d *Datastore) Delete(k ds.Key) error { | ||
d.buffer[k] = op{delete: true} | ||
if len(d.buffer) > d.maxBufferEntries { | ||
return d.Flush() | ||
} | ||
d.addOp(k, op{delete: true}) | ||
return nil | ||
} | ||
|
||
// Get retrieves a value given a key. | ||
func (d *Datastore) Get(k ds.Key) ([]byte, error) { | ||
d.mu.RLock() | ||
o, ok := d.buffer[k] | ||
d.mu.RUnlock() | ||
|
||
if ok { | ||
if o.delete { | ||
return nil, ds.ErrNotFound | ||
|
@@ -57,69 +118,67 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) { | |
|
||
// Put stores a key/value. | ||
func (d *Datastore) Put(k ds.Key, val []byte) error { | ||
d.buffer[k] = op{value: val} | ||
if len(d.buffer) > d.maxBufferEntries { | ||
return d.Flush() | ||
} | ||
d.addOp(k, op{value: val}) | ||
return nil | ||
} | ||
|
||
// Sync flushes all operations on keys at or under the prefix | ||
// from the current batch to the underlying datastore | ||
func (d *Datastore) Sync(prefix ds.Key) error { | ||
b, err := d.child.Batch() | ||
b, err := d.prepareBatch(&prefix) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for k, o := range d.buffer { | ||
if !(k.Equal(prefix) || k.IsDescendantOf(prefix)) { | ||
continue | ||
} | ||
|
||
var err error | ||
if o.delete { | ||
err = b.Delete(k) | ||
} else { | ||
err = b.Put(k, o.value) | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
|
||
delete(d.buffer, k) | ||
} | ||
|
||
return b.Commit() | ||
} | ||
|
||
// Flush flushes the current batch to the underlying datastore. | ||
func (d *Datastore) Flush() error { | ||
b, err := d.child.Batch() | ||
b, err := d.prepareBatch(nil) | ||
if err != nil { | ||
return err | ||
} | ||
return b.Commit() | ||
} | ||
|
||
func (d *Datastore) prepareBatch(prefix *ds.Key) (ds.Batch, error) { | ||
b, err := d.child.Batch() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
d.mu.Lock() | ||
|
||
for k, o := range d.buffer { | ||
if prefix != nil && !(k.Equal(*prefix) || k.IsDescendantOf(*prefix)) { | ||
continue | ||
} | ||
|
||
var err error | ||
if o.delete { | ||
err = b.Delete(k) | ||
} else { | ||
err = b.Put(k, o.value) | ||
} | ||
if err != nil { | ||
return err | ||
d.mu.Unlock() | ||
return nil, err | ||
} | ||
|
||
delete(d.buffer, k) | ||
} | ||
// clear out buffer | ||
d.buffer = make(map[ds.Key]op, d.maxBufferEntries) | ||
|
||
return b.Commit() | ||
d.mu.Unlock() | ||
|
||
return b, nil | ||
} | ||
|
||
// Has checks if a key is stored. | ||
func (d *Datastore) Has(k ds.Key) (bool, error) { | ||
d.mu.RLock() | ||
o, ok := d.buffer[k] | ||
d.mu.RUnlock() | ||
|
||
if ok { | ||
return !o.delete, nil | ||
} | ||
|
@@ -129,7 +188,10 @@ func (d *Datastore) Has(k ds.Key) (bool, error) { | |
|
||
// GetSize implements Datastore.GetSize | ||
func (d *Datastore) GetSize(k ds.Key) (int, error) { | ||
d.mu.RLock() | ||
o, ok := d.buffer[k] | ||
d.mu.RUnlock() | ||
|
||
if ok { | ||
if o.delete { | ||
return -1, ds.ErrNotFound | ||
|
@@ -155,6 +217,18 @@ func (d *Datastore) DiskUsage() (uint64, error) { | |
return ds.DiskUsage(d.child) | ||
} | ||
|
||
func (d *Datastore) Batch() (ds.Batch, error) { | ||
b, err := d.child.Batch() | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &batch{ | ||
parent: d, | ||
child: b, | ||
toDelete: make(map[ds.Key]struct{}), | ||
}, nil | ||
} | ||
|
||
func (d *Datastore) Close() error { | ||
err1 := d.Flush() | ||
err2 := d.child.Close() | ||
|
@@ -164,5 +238,32 @@ func (d *Datastore) Close() error { | |
if err2 != nil { | ||
return err2 | ||
} | ||
close(d.exit) | ||
close(d.newWrite) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this might be racy if the timer fires and prepares a batch, then this flushes and closes the underlying datastore, then the async writer tries to commit the batch but the underlying DS is now closed Also this could return before the async writer finishes flushing things, which is surprising. |
||
return nil | ||
} | ||
|
||
type batch struct { | ||
parent *Datastore | ||
child ds.Batch | ||
toDelete map[ds.Key]struct{} | ||
} | ||
|
||
func (b *batch) Put(key ds.Key, value []byte) error { | ||
delete(b.toDelete, key) | ||
return b.child.Put(key, value) | ||
} | ||
|
||
func (b *batch) Delete(key ds.Key) error { | ||
b.toDelete[key] = struct{}{} | ||
return b.child.Delete(key) | ||
} | ||
|
||
func (b *batch) Commit() error { | ||
b.parent.mu.Lock() | ||
for key := range b.toDelete { | ||
delete(b.parent.buffer, key) | ||
} | ||
b.parent.mu.Unlock() | ||
return b.child.Commit() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,18 +4,20 @@ import ( | |
"bytes" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
ds "github.com/ipfs/go-datastore" | ||
"github.com/ipfs/go-datastore/sync" | ||
dstest "github.com/ipfs/go-datastore/test" | ||
) | ||
|
||
func TestAutobatch(t *testing.T) { | ||
dstest.SubtestAll(t, NewAutoBatching(ds.NewMapDatastore(), 16)) | ||
dstest.SubtestAll(t, NewAutoBatching(sync.MutexWrap(ds.NewMapDatastore()), 16, time.Second)) | ||
} | ||
|
||
func TestFlushing(t *testing.T) { | ||
child := ds.NewMapDatastore() | ||
d := NewAutoBatching(child, 16) | ||
child := sync.MutexWrap(ds.NewMapDatastore()) | ||
d := NewAutoBatching(child, 16, 500*time.Millisecond) | ||
|
||
var keys []ds.Key | ||
for i := 0; i < 16; i++ { | ||
|
@@ -70,6 +72,9 @@ func TestFlushing(t *testing.T) { | |
t.Fatal(err) | ||
} | ||
|
||
// flushing is async so we can rely on having it happening immediately | ||
time.Sleep(100 * time.Millisecond) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we make these deterministic to avoid flaky tests? (stub out time fns, inject mock clock. etc) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a library you would recommend for that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Recommend https://github.com/benbjohnson/clock |
||
|
||
// should be flushed now, try to get keys from child datastore | ||
for _, k := range keys[:14] { | ||
val, err := child.Get(k) | ||
|
@@ -102,11 +107,29 @@ func TestFlushing(t *testing.T) { | |
if !bytes.Equal(val, v) { | ||
t.Fatal("wrong value") | ||
} | ||
|
||
// let's test the maximum delay with a single key | ||
key17 := ds.NewKey("test17") | ||
err = d.Put(key17, v) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
time.Sleep(600 * time.Millisecond) | ||
|
||
val, err = child.Get(key17) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
if !bytes.Equal(val, v) { | ||
t.Fatal("wrong value") | ||
} | ||
} | ||
|
||
func TestSync(t *testing.T) { | ||
child := ds.NewMapDatastore() | ||
d := NewAutoBatching(child, 100) | ||
d := NewAutoBatching(child, 100, time.Second) | ||
|
||
put := func(key ds.Key) { | ||
if err := d.Put(key, []byte(key.String())); err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly speaking this does break compatibility, although it's an easy change and I'm not sure many people use it.
It would be possible to maintain the previous signature with
maxDelay
set to infinity and have a second constructor with the new parameter.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to use varargs functional opts here to a) preserve backwards compat and b) give us a way to extend this in the future? (I think backwards compat would also require disabling the async flushing if maxDelay=0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That can be done but that would be a bit weird to keep maxWrite as a parameter and maxDelay as a functional arg.
Also, not sure how much time I can find to work on this.