Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract dagservice and friends from go-ipfs #8

Merged
merged 21 commits into from
Dec 11, 2017
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package format
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot of code for an interface package. Unfortunately, the interface needs this (unless we decide to introduce a Batch interface.


import (
"runtime"

cid "github.com/ipfs/go-cid"
)

// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2

// NewBatch returns a node buffer (Batch) that buffers nodes internally and
// commits them to the underlying DAGService in batches. Use this if you intend
// to add a lot of nodes all at once.
func NewBatch(ds DAGService) *Batch {
return &Batch{
ds: ds,
commitResults: make(chan error, ParallelBatchCommits),
MaxSize: 8 << 20,

// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
// descriptors at the same time for the optimized batch write.
MaxNodes: 128,
}
}

// Batch is a buffer for batching adds to a dag.
type Batch struct {
ds DAGService

activeCommits int
commitError error
commitResults chan error

nodes []Node
size int

MaxSize int
MaxNodes int
}

func (t *Batch) processResults() {
for t.activeCommits > 0 && t.commitError == nil {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.commitError = err
}
default:
return
}
}
}

func (t *Batch) asyncCommit() {
numBlocks := len(t.nodes)
if numBlocks == 0 || t.commitError != nil {
return
}
if t.activeCommits >= ParallelBatchCommits {
err := <-t.commitResults
t.activeCommits--

if err != nil {
t.commitError = err
return
}
}
go func(b []Node) {
_, err := t.ds.AddMany(b)
t.commitResults <- err
}(t.nodes)

t.activeCommits++
t.nodes = make([]Node, 0, numBlocks)
t.size = 0

return
}

// Add adds a node to the batch and commits the batch if necessary.
func (t *Batch) Add(nd Node) (*cid.Cid, error) {
// Not strictly necessary but allows us to catch errors early.
t.processResults()
if t.commitError != nil {
return nil, t.commitError
}

t.nodes = append(t.nodes, nd)
t.size += len(nd.RawData())
if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes {
t.asyncCommit()
}
return nd.Cid(), t.commitError
}

// Commit commits batched nodes.
func (t *Batch) Commit() error {
t.asyncCommit()
for t.activeCommits > 0 && t.commitError == nil {
err := <-t.commitResults
t.activeCommits--
if err != nil {
t.commitError = err
}
}

return t.commitError
}
105 changes: 105 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package format

import (
"context"
"sync"
"testing"

cid "github.com/ipfs/go-cid"
)

// Test dag
type testDag struct {
mu sync.Mutex
nodes map[string]Node
}

func newTestDag() *testDag {
return &testDag{nodes: make(map[string]Node)}
}

func (d *testDag) Get(ctx context.Context, cid *cid.Cid) (Node, error) {
d.mu.Lock()
defer d.mu.Unlock()
if n, ok := d.nodes[cid.KeyString()]; ok {
return n, nil
}
return nil, ErrNotFound
}

func (d *testDag) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOption {
d.mu.Lock()
defer d.mu.Unlock()
out := make(chan *NodeOption, len(cids))
for _, c := range cids {
if n, ok := d.nodes[c.KeyString()]; ok {
out <- &NodeOption{Node: n}
} else {
out <- &NodeOption{Err: ErrNotFound}
}
}
return out
}

func (d *testDag) Add(node Node) (*cid.Cid, error) {
d.mu.Lock()
defer d.mu.Unlock()
c := node.Cid()
d.nodes[c.KeyString()] = node
return c, nil
}

func (d *testDag) AddMany(nodes []Node) ([]*cid.Cid, error) {
d.mu.Lock()
defer d.mu.Unlock()
cids := make([]*cid.Cid, len(nodes))
for i, n := range nodes {
c := n.Cid()
d.nodes[c.KeyString()] = n
cids[i] = c
}
return cids, nil
}

func (d *testDag) Remove(c *cid.Cid) error {
d.mu.Lock()
defer d.mu.Unlock()
key := c.KeyString()
if _, exists := d.nodes[key]; !exists {
return ErrNotFound
}
delete(d.nodes, key)
return nil
}

var _ DAGService = new(testDag)

func TestBatch(t *testing.T) {
d := newTestDag()
b := NewBatch(d)
for i := 0; i < 1000; i++ {
// It would be great if we could use *many* different nodes here
// but we can't add any dependencies and I don't feel like adding
// any more testing code.
if _, err := b.Add(new(EmptyNode)); err != nil {
t.Fatal(err)
}
}
if err := b.Commit(); err != nil {
t.Fatal(err)
}

n, err := d.Get(context.Background(), new(EmptyNode).Cid())
if err != nil {
t.Fatal(err)
}
switch n.(type) {
case *EmptyNode:
default:
t.Fatal("expected the node to exist in the dag")
}

if len(d.nodes) != 1 {
t.Fatal("should have one node")
}
}
99 changes: 99 additions & 0 deletions daghelpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package format

import (
"context"

cid "github.com/ipfs/go-cid"
)

// GetLinks returns the CIDs of the children of the given node. Prefer this
// method over looking up the node itself and calling `Links()` on it as this
// method may be able to use a link cache.
func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) {
if c.Type() == cid.Raw {
return nil, nil
}
if gl, ok := ng.(LinkGetter); ok {
return gl.GetLinks(ctx, c)
}
node, err := ng.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links(), nil
}

// GetDAG will fill out all of the links of the given Node.
// It returns an array of NodePromise with the linked nodes all in the proper
// order.
func GetDAG(ctx context.Context, ds NodeGetter, root Node) []*NodePromise {
var cids []*cid.Cid
for _, lnk := range root.Links() {
cids = append(cids, lnk.Cid)
}

return GetNodes(ctx, ds, cids)
}

// GetNodes returns an array of 'FutureNode' promises, with each corresponding
// to the key with the same index as the passed in keys
func GetNodes(ctx context.Context, ds NodeGetter, keys []*cid.Cid) []*NodePromise {

// Early out if no work to do
if len(keys) == 0 {
return nil
}

promises := make([]*NodePromise, len(keys))
for i := range keys {
promises[i] = NewNodePromise(ctx)
}

dedupedKeys := dedupeKeys(keys)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

nodechan := ds.GetMany(ctx, dedupedKeys)

for count := 0; count < len(keys); {
select {
case opt, ok := <-nodechan:
if !ok {
for _, p := range promises {
p.Fail(ErrNotFound)
}
return
}

if opt.Err != nil {
for _, p := range promises {
p.Fail(opt.Err)
}
return
}

nd := opt.Node
c := nd.Cid()
for i, lnk_c := range keys {
if c.Equals(lnk_c) {
count++
promises[i].Send(nd)
}
}
case <-ctx.Done():
return
}
}
}()
return promises
}

// Remove duplicates from a list of keys
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
set := cid.NewSet()
for _, c := range cids {
set.Add(c)
}
return set.Keys()
}
8 changes: 4 additions & 4 deletions format.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type Resolver interface {
Tree(path string, depth int) []string
}

// Node is the base interface all IPLD nodes must implement.
//
// Nodes are **Immutable** and all methods defined on the interface are
// **Thread Safe**.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Nodes are immutable, what is the reason for Copy method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None. I was planning on fixing that in this PR but have punted on fixing Nodes till a future one.

type Node interface {
blocks.Block
Resolver
Expand All @@ -40,10 +44,6 @@ type Node interface {
Size() (uint64, error)
}

type NodeGetter interface {
Get(context.Context, *cid.Cid) (Node, error)
}

// Link represents an IPFS Merkle DAG Link between Nodes.
type Link struct {
// utf string name. should be unique per object
Expand Down
Loading