-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract dagservice and friends from go-ipfs #8
Changes from 16 commits
ac092e5
b3a1f4b
0f9e9ed
2696405
5837bec
c0311d7
339e9ea
5bc8a07
30aecc4
2f52265
3380389
f2fc6ce
07869d6
5225978
1af7e81
0408f8d
6f9115b
b403323
f10b5dd
44a7801
6cf32cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
package format | ||
|
||
import ( | ||
"runtime" | ||
|
||
cid "github.com/ipfs/go-cid" | ||
) | ||
|
||
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking. | ||
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage | ||
// devices, and CPUs to find the right value/formula. | ||
var ParallelBatchCommits = runtime.NumCPU() * 2 | ||
|
||
// NewBatch returns a node buffer (Batch) that buffers nodes internally and | ||
// commits them to the underlying DAGService in batches. Use this if you intend | ||
// to add a lot of nodes all at once. | ||
func NewBatch(ds DAGService) *Batch { | ||
return &Batch{ | ||
ds: ds, | ||
commitResults: make(chan error, ParallelBatchCommits), | ||
MaxSize: 8 << 20, | ||
|
||
// By default, only batch up to 128 nodes at a time. | ||
// The current implementation of flatfs opens this many file | ||
// descriptors at the same time for the optimized batch write. | ||
MaxNodes: 128, | ||
} | ||
} | ||
|
||
// Batch is a buffer for batching adds to a dag. | ||
type Batch struct { | ||
ds DAGService | ||
|
||
activeCommits int | ||
commitError error | ||
commitResults chan error | ||
|
||
nodes []Node | ||
size int | ||
|
||
MaxSize int | ||
MaxNodes int | ||
} | ||
|
||
func (t *Batch) processResults() { | ||
for t.activeCommits > 0 && t.commitError == nil { | ||
select { | ||
case err := <-t.commitResults: | ||
t.activeCommits-- | ||
if err != nil { | ||
t.commitError = err | ||
} | ||
default: | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (t *Batch) asyncCommit() { | ||
numBlocks := len(t.nodes) | ||
if numBlocks == 0 || t.commitError != nil { | ||
return | ||
} | ||
if t.activeCommits >= ParallelBatchCommits { | ||
err := <-t.commitResults | ||
t.activeCommits-- | ||
|
||
if err != nil { | ||
t.commitError = err | ||
return | ||
} | ||
} | ||
go func(b []Node) { | ||
_, err := t.ds.AddMany(b) | ||
t.commitResults <- err | ||
}(t.nodes) | ||
|
||
t.activeCommits++ | ||
t.nodes = make([]Node, 0, numBlocks) | ||
t.size = 0 | ||
|
||
return | ||
} | ||
|
||
// Add adds a node to the batch and commits the batch if necessary. | ||
func (t *Batch) Add(nd Node) (*cid.Cid, error) { | ||
// Not strictly necessary but allows us to catch errors early. | ||
t.processResults() | ||
if t.commitError != nil { | ||
return nil, t.commitError | ||
} | ||
|
||
t.nodes = append(t.nodes, nd) | ||
t.size += len(nd.RawData()) | ||
if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes { | ||
t.asyncCommit() | ||
} | ||
return nd.Cid(), t.commitError | ||
} | ||
|
||
// Commit commits batched nodes. | ||
func (t *Batch) Commit() error { | ||
t.asyncCommit() | ||
for t.activeCommits > 0 && t.commitError == nil { | ||
err := <-t.commitResults | ||
t.activeCommits-- | ||
if err != nil { | ||
t.commitError = err | ||
} | ||
} | ||
|
||
return t.commitError | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
package format | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"testing" | ||
|
||
cid "github.com/ipfs/go-cid" | ||
) | ||
|
||
// Test dag | ||
type testDag struct { | ||
mu sync.Mutex | ||
nodes map[string]Node | ||
} | ||
|
||
func newTestDag() *testDag { | ||
return &testDag{nodes: make(map[string]Node)} | ||
} | ||
|
||
func (d *testDag) Get(ctx context.Context, cid *cid.Cid) (Node, error) { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
if n, ok := d.nodes[cid.KeyString()]; ok { | ||
return n, nil | ||
} | ||
return nil, ErrNotFound | ||
} | ||
|
||
func (d *testDag) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOption { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
out := make(chan *NodeOption, len(cids)) | ||
for _, c := range cids { | ||
if n, ok := d.nodes[c.KeyString()]; ok { | ||
out <- &NodeOption{Node: n} | ||
} else { | ||
out <- &NodeOption{Err: ErrNotFound} | ||
} | ||
} | ||
return out | ||
} | ||
|
||
func (d *testDag) Add(node Node) (*cid.Cid, error) { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
c := node.Cid() | ||
d.nodes[c.KeyString()] = node | ||
return c, nil | ||
} | ||
|
||
func (d *testDag) AddMany(nodes []Node) ([]*cid.Cid, error) { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
cids := make([]*cid.Cid, len(nodes)) | ||
for i, n := range nodes { | ||
c := n.Cid() | ||
d.nodes[c.KeyString()] = n | ||
cids[i] = c | ||
} | ||
return cids, nil | ||
} | ||
|
||
func (d *testDag) Remove(c *cid.Cid) error { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
key := c.KeyString() | ||
if _, exists := d.nodes[key]; !exists { | ||
return ErrNotFound | ||
} | ||
delete(d.nodes, key) | ||
return nil | ||
} | ||
|
||
var _ DAGService = new(testDag) | ||
|
||
func TestBatch(t *testing.T) { | ||
d := newTestDag() | ||
b := NewBatch(d) | ||
for i := 0; i < 1000; i++ { | ||
// It would be great if we could use *many* different nodes here | ||
// but we can't add any dependencies and I don't feel like adding | ||
// any more testing code. | ||
if _, err := b.Add(new(EmptyNode)); err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
if err := b.Commit(); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
n, err := d.Get(context.Background(), new(EmptyNode).Cid()) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
switch n.(type) { | ||
case *EmptyNode: | ||
default: | ||
t.Fatal("expected the node to exist in the dag") | ||
} | ||
|
||
if len(d.nodes) != 1 { | ||
t.Fatal("should have one node") | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
package format | ||
|
||
import ( | ||
"context" | ||
|
||
cid "github.com/ipfs/go-cid" | ||
) | ||
|
||
// GetLinks returns the CIDs of the children of the given node. Prefer this | ||
// method over looking up the node itself and calling `Links()` on it as this | ||
// method may be able to use a link cache. | ||
func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) { | ||
if c.Type() == cid.Raw { | ||
return nil, nil | ||
} | ||
if gl, ok := ng.(LinkGetter); ok { | ||
return gl.GetLinks(ctx, c) | ||
} | ||
node, err := ng.Get(ctx, c) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return node.Links(), nil | ||
} | ||
|
||
// GetDAG will fill out all of the links of the given Node. | ||
// It returns an array of NodePromise with the linked nodes all in the proper | ||
// order. | ||
func GetDAG(ctx context.Context, ds NodeGetter, root Node) []*NodePromise { | ||
var cids []*cid.Cid | ||
for _, lnk := range root.Links() { | ||
cids = append(cids, lnk.Cid) | ||
} | ||
|
||
return GetNodes(ctx, ds, cids) | ||
} | ||
|
||
// GetNodes returns an array of 'FutureNode' promises, with each corresponding | ||
// to the key with the same index as the passed in keys | ||
func GetNodes(ctx context.Context, ds NodeGetter, keys []*cid.Cid) []*NodePromise { | ||
|
||
// Early out if no work to do | ||
if len(keys) == 0 { | ||
return nil | ||
} | ||
|
||
promises := make([]*NodePromise, len(keys)) | ||
for i := range keys { | ||
promises[i] = NewNodePromise(ctx) | ||
} | ||
|
||
dedupedKeys := dedupeKeys(keys) | ||
go func() { | ||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
|
||
nodechan := ds.GetMany(ctx, dedupedKeys) | ||
|
||
for count := 0; count < len(keys); { | ||
select { | ||
case opt, ok := <-nodechan: | ||
if !ok { | ||
for _, p := range promises { | ||
p.Fail(ErrNotFound) | ||
} | ||
return | ||
} | ||
|
||
if opt.Err != nil { | ||
for _, p := range promises { | ||
p.Fail(opt.Err) | ||
} | ||
return | ||
} | ||
|
||
nd := opt.Node | ||
c := nd.Cid() | ||
for i, lnk_c := range keys { | ||
if c.Equals(lnk_c) { | ||
count++ | ||
promises[i].Send(nd) | ||
} | ||
} | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
}() | ||
return promises | ||
} | ||
|
||
// Remove duplicates from a list of keys | ||
func dedupeKeys(cids []*cid.Cid) []*cid.Cid { | ||
set := cid.NewSet() | ||
for _, c := range cids { | ||
set.Add(c) | ||
} | ||
return set.Keys() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,10 @@ type Resolver interface { | |
Tree(path string, depth int) []string | ||
} | ||
|
||
// Node is the base interface all IPLD nodes must implement. | ||
// | ||
// Nodes are **Immutable** and all methods defined on the interface are | ||
// **Thread Safe**. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If Nodes are immutable, what is the reason for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. None. I was planning on fixing that in this PR but have punted on fixing Nodes till a future one. |
||
type Node interface { | ||
blocks.Block | ||
Resolver | ||
|
@@ -40,10 +44,6 @@ type Node interface { | |
Size() (uint64, error) | ||
} | ||
|
||
type NodeGetter interface { | ||
Get(context.Context, *cid.Cid) (Node, error) | ||
} | ||
|
||
// Link represents an IPFS Merkle DAG Link between Nodes. | ||
type Link struct { | ||
// utf string name. should be unique per object | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a lot of code for an interface package. Unfortunately, the interface needs this (unless we decide to introduce a
Batch
interface.