Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autobatch: thread-safe, debounce, max delay and implement Batching #180

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 145 additions & 44 deletions autobatch/autobatch.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
// Package autobatch provides a go-datastore implementation that
// automatically batches together writes by holding puts in memory until
// a certain threshold is met.
// a certain threshold is met. It also acts as a debounce.
package autobatch

import (
"sync"
"time"

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log"
)

var log = logging.Logger("datastore/autobatch")

// Datastore implements a go-datastore.
type Datastore struct {
child ds.Batching

// TODO: discuss making ds.Batch implement the full ds.Datastore interface
buffer map[ds.Key]op
maxBufferEntries int
mu sync.RWMutex
buffer map[ds.Key]op

maxWrite int
maxDelay time.Duration
newWrite chan struct{}
exit chan struct{}
}

type op struct {
Expand All @@ -23,28 +33,79 @@ type op struct {
}

// NewAutoBatching returns a new datastore that automatically
// batches writes using the given Batching datastore. The size
// of the memory pool is given by size.
func NewAutoBatching(d ds.Batching, size int) *Datastore {
return &Datastore{
child: d,
buffer: make(map[ds.Key]op, size),
maxBufferEntries: size,
// batches writes using the given Batching datastore. The maximum number of
// write before triggering a batch is given by maxWrite. The maximum delay
// before triggering a batch is given by maxDelay.
func NewAutoBatching(child ds.Batching, maxWrite int, maxDelay time.Duration) *Datastore {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking this does break compatibility, although it's an easy change and I'm not sure many people use it.
It would be possible to maintain the previous signature with maxDelay set to infinity and have a second constructor with the new parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use varargs functional opts here to a) preserve backwards compat and b) give us a way to extend this in the future? (I think backwards compat would also require disabling the async flushing if maxDelay=0)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That can be done but that would be a bit weird to keep maxWrite as a parameter and maxDelay as a functional arg.
Also, not sure how much time I can find to work on this.

d := &Datastore{
child: child,
buffer: make(map[ds.Key]op, maxWrite),
maxWrite: maxWrite,
maxDelay: maxDelay,
newWrite: make(chan struct{}),
exit: make(chan struct{}),
}
go d.runBatcher()
return d
}

func (d *Datastore) addOp(key ds.Key, op op) {
d.mu.Lock()
d.buffer[key] = op
d.mu.Unlock()
d.newWrite <- struct{}{}
}

func (d *Datastore) runBatcher() {
var timer <-chan time.Time

write := func() {
timer = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaks the timer until it fires, e.g. if there is a high rate of ops that hit the maxWrite threshold really fast, we could end up with a lot of concurrent no-op timers running. I think it'd be better to use a proper timer and stop it here.


b, err := d.prepareBatch(nil)
if err != nil {
log.Error(err)
return
}
err = b.Commit()
if err != nil {
log.Error(err)
return
}
}

for {
select {
case <-d.exit:
return
case <-timer:
write()
case <-d.newWrite:
d.mu.RLock()
ready := len(d.buffer)
d.mu.RUnlock()
if ready > d.maxWrite {
write()
}
if timer == nil {
timer = time.After(d.maxDelay)
}
}
}
}

// Delete deletes a key/value
func (d *Datastore) Delete(k ds.Key) error {
d.buffer[k] = op{delete: true}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
}
d.addOp(k, op{delete: true})
return nil
}

// Get retrieves a value given a key.
func (d *Datastore) Get(k ds.Key) ([]byte, error) {
d.mu.RLock()
o, ok := d.buffer[k]
d.mu.RUnlock()

if ok {
if o.delete {
return nil, ds.ErrNotFound
Expand All @@ -57,69 +118,67 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) {

// Put stores a key/value.
func (d *Datastore) Put(k ds.Key, val []byte) error {
d.buffer[k] = op{value: val}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
}
d.addOp(k, op{value: val})
return nil
}

// Sync flushes all operations on keys at or under the prefix
// from the current batch to the underlying datastore
func (d *Datastore) Sync(prefix ds.Key) error {
b, err := d.child.Batch()
b, err := d.prepareBatch(&prefix)
if err != nil {
return err
}

for k, o := range d.buffer {
if !(k.Equal(prefix) || k.IsDescendantOf(prefix)) {
continue
}

var err error
if o.delete {
err = b.Delete(k)
} else {
err = b.Put(k, o.value)
}
if err != nil {
return err
}

delete(d.buffer, k)
}

return b.Commit()
}

// Flush flushes the current batch to the underlying datastore.
func (d *Datastore) Flush() error {
b, err := d.child.Batch()
b, err := d.prepareBatch(nil)
if err != nil {
return err
}
return b.Commit()
}

func (d *Datastore) prepareBatch(prefix *ds.Key) (ds.Batch, error) {
b, err := d.child.Batch()
if err != nil {
return nil, err
}

d.mu.Lock()

for k, o := range d.buffer {
if prefix != nil && !(k.Equal(*prefix) || k.IsDescendantOf(*prefix)) {
continue
}

var err error
if o.delete {
err = b.Delete(k)
} else {
err = b.Put(k, o.value)
}
if err != nil {
return err
d.mu.Unlock()
return nil, err
}

delete(d.buffer, k)
}
// clear out buffer
d.buffer = make(map[ds.Key]op, d.maxBufferEntries)

return b.Commit()
d.mu.Unlock()

return b, nil
}

// Has checks if a key is stored.
func (d *Datastore) Has(k ds.Key) (bool, error) {
d.mu.RLock()
o, ok := d.buffer[k]
d.mu.RUnlock()

if ok {
return !o.delete, nil
}
Expand All @@ -129,7 +188,10 @@ func (d *Datastore) Has(k ds.Key) (bool, error) {

// GetSize implements Datastore.GetSize
func (d *Datastore) GetSize(k ds.Key) (int, error) {
d.mu.RLock()
o, ok := d.buffer[k]
d.mu.RUnlock()

if ok {
if o.delete {
return -1, ds.ErrNotFound
Expand All @@ -155,6 +217,18 @@ func (d *Datastore) DiskUsage() (uint64, error) {
return ds.DiskUsage(d.child)
}

func (d *Datastore) Batch() (ds.Batch, error) {
b, err := d.child.Batch()
if err != nil {
return nil, err
}
return &batch{
parent: d,
child: b,
toDelete: make(map[ds.Key]struct{}),
}, nil
}

func (d *Datastore) Close() error {
err1 := d.Flush()
err2 := d.child.Close()
Expand All @@ -164,5 +238,32 @@ func (d *Datastore) Close() error {
if err2 != nil {
return err2
}
close(d.exit)
close(d.newWrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be racy if the timer fires and prepares a batch, then this flushes and closes the underlying datastore, then the async writer tries to commit the batch but the underlying DS is now closed

Also this could return before the async writer finishes flushing things, which is surprising.

return nil
}

type batch struct {
parent *Datastore
child ds.Batch
toDelete map[ds.Key]struct{}
}

func (b *batch) Put(key ds.Key, value []byte) error {
delete(b.toDelete, key)
return b.child.Put(key, value)
}

func (b *batch) Delete(key ds.Key) error {
b.toDelete[key] = struct{}{}
return b.child.Delete(key)
}

func (b *batch) Commit() error {
b.parent.mu.Lock()
for key := range b.toDelete {
delete(b.parent.buffer, key)
}
b.parent.mu.Unlock()
return b.child.Commit()
}
31 changes: 27 additions & 4 deletions autobatch/autobatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ import (
"bytes"
"fmt"
"testing"
"time"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
dstest "github.com/ipfs/go-datastore/test"
)

func TestAutobatch(t *testing.T) {
dstest.SubtestAll(t, NewAutoBatching(ds.NewMapDatastore(), 16))
dstest.SubtestAll(t, NewAutoBatching(sync.MutexWrap(ds.NewMapDatastore()), 16, time.Second))
}

func TestFlushing(t *testing.T) {
child := ds.NewMapDatastore()
d := NewAutoBatching(child, 16)
child := sync.MutexWrap(ds.NewMapDatastore())
d := NewAutoBatching(child, 16, 500*time.Millisecond)

var keys []ds.Key
for i := 0; i < 16; i++ {
Expand Down Expand Up @@ -70,6 +72,9 @@ func TestFlushing(t *testing.T) {
t.Fatal(err)
}

// flushing is async so we can rely on having it happening immediately
time.Sleep(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make these deterministic to avoid flaky tests? (stub out time fns, inject mock clock. etc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a library you would recommend for that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// should be flushed now, try to get keys from child datastore
for _, k := range keys[:14] {
val, err := child.Get(k)
Expand Down Expand Up @@ -102,11 +107,29 @@ func TestFlushing(t *testing.T) {
if !bytes.Equal(val, v) {
t.Fatal("wrong value")
}

// let's test the maximum delay with a single key
key17 := ds.NewKey("test17")
err = d.Put(key17, v)
if err != nil {
t.Fatal(err)
}

time.Sleep(600 * time.Millisecond)

val, err = child.Get(key17)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(val, v) {
t.Fatal("wrong value")
}
}

func TestSync(t *testing.T) {
child := ds.NewMapDatastore()
d := NewAutoBatching(child, 100)
d := NewAutoBatching(child, 100, time.Second)

put := func(key ds.Key) {
if err := d.Put(key, []byte(key.String())); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ require (
github.com/google/uuid v1.1.1
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8
github.com/ipfs/go-log v1.0.5
github.com/jbenet/goprocess v0.1.4
github.com/kr/pretty v0.2.0 // indirect
go.uber.org/multierr v1.5.0
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7
go.uber.org/multierr v1.6.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
)

Expand Down
Loading