Skip to content

Commit

Permalink
Merge pull request #8 from Stebalien/dag-service
Browse files Browse the repository at this point in the history
Extract dagservice and friends from go-ipfs
  • Loading branch information
Stebalien authored Dec 11, 2017
2 parents fd7023e + 6cf32cc commit 9d8c846
Show file tree
Hide file tree
Showing 6 changed files with 518 additions and 4 deletions.
177 changes: 177 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package format

import (
"context"
"errors"
"runtime"
)

// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2

// ErrNotCommited is returned when closing a batch that hasn't been successfully
// committed.
var ErrNotCommited = errors.New("error: batch not commited")

// ErrClosed is returned when operating on a batch that has already been closed.
var ErrClosed = errors.New("error: batch closed")

// NewBatch returns a node buffer (Batch) that buffers nodes internally and
// commits them to the underlying DAGService in batches. Use this if you intend
// to add or remove a lot of nodes all at once.
//
// If the passed context is canceled, any in-progress commits are aborted.
func NewBatch(ctx context.Context, ds DAGService) *Batch {
ctx, cancel := context.WithCancel(ctx)
return &Batch{
ds: ds,
ctx: ctx,
cancel: cancel,
commitResults: make(chan error, ParallelBatchCommits),
MaxSize: 8 << 20,

// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
// descriptors at the same time for the optimized batch write.
MaxNodes: 128,
}
}

// Batch is a buffer for batching adds to a dag.
type Batch struct {
ds DAGService

ctx context.Context
cancel func()

activeCommits int
err error
commitResults chan error

nodes []Node
size int

MaxSize int
MaxNodes int
}

func (t *Batch) processResults() {
for t.activeCommits > 0 {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.setError(err)
return
}
default:
return
}
}
}

func (t *Batch) asyncCommit() {
numBlocks := len(t.nodes)
if numBlocks == 0 {
return
}
if t.activeCommits >= ParallelBatchCommits {
select {
case err := <-t.commitResults:
t.activeCommits--

if err != nil {
t.setError(err)
return
}
case <-t.ctx.Done():
t.setError(t.ctx.Err())
return
}
}
go func(ctx context.Context, b []Node, result chan error, ds DAGService) {
select {
case result <- ds.AddMany(ctx, b):
case <-ctx.Done():
}
}(t.ctx, t.nodes, t.commitResults, t.ds)

t.activeCommits++
t.nodes = make([]Node, 0, numBlocks)
t.size = 0

return
}

// Add adds a node to the batch and commits the batch if necessary.
func (t *Batch) Add(nd Node) error {
if t.err != nil {
return t.err
}
// Not strictly necessary but allows us to catch errors early.
t.processResults()

if t.err != nil {
return t.err
}

t.nodes = append(t.nodes, nd)
t.size += len(nd.RawData())

if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes {
t.asyncCommit()
}
return t.err
}

// Commit commits batched nodes.
func (t *Batch) Commit() error {
if t.err != nil {
return t.err
}

t.asyncCommit()

loop:
for t.activeCommits > 0 {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.setError(err)
break loop
}
case <-t.ctx.Done():
t.setError(t.ctx.Err())
break loop
}
}

return t.err
}

func (t *Batch) setError(err error) {
t.err = err

t.cancel()

// Drain as much as we can without blocking.
loop:
for {
select {
case <-t.commitResults:
default:
break loop
}
}

// Be nice and cleanup. These can take a *lot* of memory.
t.commitResults = nil
t.ds = nil
t.ctx = nil
t.nodes = nil
t.size = 0
t.activeCommits = 0
}
109 changes: 109 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package format

import (
"context"
"sync"
"testing"

cid "github.com/ipfs/go-cid"
)

// Test dag
type testDag struct {
mu sync.Mutex
nodes map[string]Node
}

func newTestDag() *testDag {
return &testDag{nodes: make(map[string]Node)}
}

func (d *testDag) Get(ctx context.Context, cid *cid.Cid) (Node, error) {
d.mu.Lock()
defer d.mu.Unlock()
if n, ok := d.nodes[cid.KeyString()]; ok {
return n, nil
}
return nil, ErrNotFound
}

func (d *testDag) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOption {
d.mu.Lock()
defer d.mu.Unlock()
out := make(chan *NodeOption, len(cids))
for _, c := range cids {
if n, ok := d.nodes[c.KeyString()]; ok {
out <- &NodeOption{Node: n}
} else {
out <- &NodeOption{Err: ErrNotFound}
}
}
return out
}

func (d *testDag) Add(ctx context.Context, node Node) error {
d.mu.Lock()
defer d.mu.Unlock()
d.nodes[node.Cid().KeyString()] = node
return nil
}

func (d *testDag) AddMany(ctx context.Context, nodes []Node) error {
d.mu.Lock()
defer d.mu.Unlock()
for _, n := range nodes {
d.nodes[n.Cid().KeyString()] = n
}
return nil
}

func (d *testDag) Remove(ctx context.Context, c *cid.Cid) error {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.nodes, c.KeyString())
return nil
}

func (d *testDag) RemoveMany(ctx context.Context, cids []*cid.Cid) error {
d.mu.Lock()
defer d.mu.Unlock()
for _, c := range cids {
delete(d.nodes, c.KeyString())
}
return nil
}

var _ DAGService = new(testDag)

func TestBatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := newTestDag()
b := NewBatch(ctx, d)
for i := 0; i < 1000; i++ {
// It would be great if we could use *many* different nodes here
// but we can't add any dependencies and I don't feel like adding
// any more testing code.
if err := b.Add(new(EmptyNode)); err != nil {
t.Fatal(err)
}
}
if err := b.Commit(); err != nil {
t.Fatal(err)
}

n, err := d.Get(ctx, new(EmptyNode).Cid())
if err != nil {
t.Fatal(err)
}
switch n.(type) {
case *EmptyNode:
default:
t.Fatal("expected the node to exist in the dag")
}

if len(d.nodes) != 1 {
t.Fatal("should have one node")
}
}
99 changes: 99 additions & 0 deletions daghelpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package format

import (
"context"

cid "github.com/ipfs/go-cid"
)

// GetLinks returns the CIDs of the children of the given node. Prefer this
// method over looking up the node itself and calling `Links()` on it as this
// method may be able to use a link cache.
func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) {
if c.Type() == cid.Raw {
return nil, nil
}
if gl, ok := ng.(LinkGetter); ok {
return gl.GetLinks(ctx, c)
}
node, err := ng.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links(), nil
}

// GetDAG will fill out all of the links of the given Node.
// It returns an array of NodePromise with the linked nodes all in the proper
// order.
func GetDAG(ctx context.Context, ds NodeGetter, root Node) []*NodePromise {
var cids []*cid.Cid
for _, lnk := range root.Links() {
cids = append(cids, lnk.Cid)
}

return GetNodes(ctx, ds, cids)
}

// GetNodes returns an array of 'FutureNode' promises, with each corresponding
// to the key with the same index as the passed in keys
func GetNodes(ctx context.Context, ds NodeGetter, keys []*cid.Cid) []*NodePromise {

// Early out if no work to do
if len(keys) == 0 {
return nil
}

promises := make([]*NodePromise, len(keys))
for i := range keys {
promises[i] = NewNodePromise(ctx)
}

dedupedKeys := dedupeKeys(keys)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

nodechan := ds.GetMany(ctx, dedupedKeys)

for count := 0; count < len(keys); {
select {
case opt, ok := <-nodechan:
if !ok {
for _, p := range promises {
p.Fail(ErrNotFound)
}
return
}

if opt.Err != nil {
for _, p := range promises {
p.Fail(opt.Err)
}
return
}

nd := opt.Node
c := nd.Cid()
for i, lnk_c := range keys {
if c.Equals(lnk_c) {
count++
promises[i].Send(nd)
}
}
case <-ctx.Done():
return
}
}
}()
return promises
}

// Remove duplicates from a list of keys
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
set := cid.NewSet()
for _, c := range cids {
set.Add(c)
}
return set.Keys()
}
Loading

0 comments on commit 9d8c846

Please sign in to comment.