Skip to content

Commit

Permalink
Allow for gc during adds
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <[email protected]>
  • Loading branch information
whyrusleeping committed Dec 6, 2015
1 parent 742f6da commit 7341486
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 13 deletions.
15 changes: 14 additions & 1 deletion blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package blockstore
import (
"errors"
"sync"
"sync/atomic"

ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsns "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
Expand Down Expand Up @@ -49,6 +50,10 @@ type GCBlockstore interface {
// at the same time, but no GC should not happen simulatenously.
// Reading during Pinning is safe, and requires no lock.
PinLock() func()

// GcRequested returns true if GCLock has been called and is waiting to
// take the lock
GCRequested() bool
}

func NewBlockstore(d ds.Batching) *blockstore {
Expand All @@ -63,7 +68,9 @@ func NewBlockstore(d ds.Batching) *blockstore {
type blockstore struct {
datastore ds.Batching

lk sync.RWMutex
lk sync.RWMutex
gcreq int32
gcreqlk sync.Mutex
}

func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) {
Expand Down Expand Up @@ -192,11 +199,17 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
}

func (bs *blockstore) GCLock() func() {
atomic.AddInt32(&bs.gcreq, 1)
bs.lk.Lock()
atomic.AddInt32(&bs.gcreq, -1)
return bs.lk.Unlock
}

func (bs *blockstore) PinLock() func() {
bs.lk.RLock()
return bs.lk.RUnlock
}

func (bs *blockstore) GCRequested() bool {
return atomic.LoadInt32(&bs.gcreq) > 0
}
4 changes: 4 additions & 0 deletions blocks/blockstore/write_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ func (w *writecache) GCLock() func() {
func (w *writecache) PinLock() func() {
return w.blockstore.(GCBlockstore).PinLock()
}

func (w *writecache) GCRequested() bool {
return w.blockstore.(GCBlockstore).GCRequested()
}
58 changes: 47 additions & 11 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
key "github.com/ipfs/go-ipfs/blocks/key"
bserv "github.com/ipfs/go-ipfs/blockservice"
"github.com/ipfs/go-ipfs/exchange/offline"
importer "github.com/ipfs/go-ipfs/importer"
Expand Down Expand Up @@ -99,6 +100,8 @@ type Adder struct {
Chunker string
root *dag.Node
mr *mfs.Root
unlock func()
tempRoot key.Key
}

// Perform the actual add & pin locally, outputting results to reader
Expand Down Expand Up @@ -157,6 +160,14 @@ func (params *Adder) PinRoot() error {
return err
}

if params.tempRoot != "" {
err := params.node.Pinning.Unpin(params.ctx, params.tempRoot, true)
if err != nil {
return err
}
params.tempRoot = rnk
}

params.node.Pinning.PinWithMode(rnk, pin.Recursive)
return params.node.Pinning.Flush()
}
Expand Down Expand Up @@ -256,7 +267,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
return "", err
}

err = fileAdder.AddFile(f)
err = fileAdder.addFile(f)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -289,7 +300,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.No
unlock := n.Blockstore.PinLock()
defer unlock()

err = fileAdder.AddFile(file)
err = fileAdder.addFile(file)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -330,12 +341,24 @@ func (params *Adder) addNode(node *dag.Node, path string) error {

// Add the given file while respecting the params.
func (params *Adder) AddFile(file files.File) error {
params.unlock = params.node.Blockstore.PinLock()
defer params.unlock()

return params.addFile(file)
}

func (adder *Adder) addFile(file files.File) error {
err := adder.maybePauseForGC()
if err != nil {
return err
}

switch {
case files.IsHidden(file) && !params.Hidden:
case files.IsHidden(file) && !adder.Hidden:
log.Debugf("%s is hidden, skipping", file.FileName())
return &hiddenFileError{file.FileName()}
case file.IsDirectory():
return params.addDir(file)
return adder.addDir(file)
}

// case for symlink
Expand All @@ -346,29 +369,29 @@ func (params *Adder) AddFile(file files.File) error {
}

dagnode := &dag.Node{Data: sdata}
_, err = params.node.DAG.Add(dagnode)
_, err = adder.node.DAG.Add(dagnode)
if err != nil {
return err
}

return params.addNode(dagnode, s.FileName())
return adder.addNode(dagnode, s.FileName())
}

// case for regular file
// if the progress flag was specified, wrap the file so that we can send
// progress updates to the client (over the output channel)
var reader io.Reader = file
if params.Progress {
reader = &progressReader{file: file, out: params.out}
if adder.Progress {
reader = &progressReader{file: file, out: adder.out}
}

dagnode, err := params.add(reader)
dagnode, err := adder.add(reader)
if err != nil {
return err
}

// patch it into the root
return params.addNode(dagnode, file.FileName())
return adder.addNode(dagnode, file.FileName())
}

func (params *Adder) addDir(dir files.File) error {
Expand All @@ -388,7 +411,7 @@ func (params *Adder) addDir(dir files.File) error {
break
}

err = params.AddFile(file)
err = params.addFile(file)
if _, ok := err.(*hiddenFileError); ok {
// hidden file error, skip file
continue
Expand All @@ -400,6 +423,19 @@ func (params *Adder) addDir(dir files.File) error {
return nil
}

func (adder *Adder) maybePauseForGC() error {
if adder.node.Blockstore.GCRequested() {
err := adder.PinRoot()
if err != nil {
return err
}

adder.unlock()
adder.unlock = adder.node.Blockstore.PinLock()
}
return nil
}

// outputDagnode sends dagnode info over the output channel
func outputDagnode(out chan interface{}, name string, dn *dag.Node) error {
if out == nil {
Expand Down
2 changes: 1 addition & 1 deletion pin/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var log = logging.Logger("gc")
// deletes any block that is not found in the marked set.
func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.Key, error) {
unlock := bs.GCLock()
defer unlock()

bsrv := bserv.New(bs, offline.Exchange(bs))
ds := dag.NewDAGService(bsrv)
Expand All @@ -42,6 +41,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.
output := make(chan key.Key)
go func() {
defer close(output)
defer unlock()

This comment has been minimized.

Copy link
@jbenet

jbenet Dec 6, 2015

Member

hahha whoops!

This comment has been minimized.

Copy link
@jbenet

jbenet Dec 6, 2015

Member

glad this is in dev0.4.0

This comment has been minimized.

Copy link
@whyrusleeping

whyrusleeping Dec 6, 2015

Author Member

yeeeeaaahhhh, i'm going to pretend this wasnt me.

for {
select {
case k, ok := <-keychan:
Expand Down

0 comments on commit 7341486

Please sign in to comment.