Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
feat!: add and connect missing context, remove RemovePinWithMode (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelMure authored Feb 22, 2023
1 parent 7e1406b commit 9abb80f
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 144 deletions.
165 changes: 68 additions & 97 deletions dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import (
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
ipfspinner "github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-pinner/dsindex"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-merkledag/dagutils"
"github.com/polydawn/refmt/cbor"
"github.com/polydawn/refmt/obj/atlas"

ipfspinner "github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-pinner/dsindex"
)

const (
Expand Down Expand Up @@ -179,23 +180,30 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
return err
}

c := node.Cid()
if recurse {
return p.doPinRecursive(ctx, node.Cid(), true)
} else {
return p.doPinDirect(ctx, node.Cid())
}
}

func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool) error {
cidKey := c.KeyString()

p.lock.Lock()
defer p.lock.Unlock()

if recurse {
found, err := p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
return nil
}
found, err := p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
return nil
}

dirtyBefore := p.dirty
dirtyBefore := p.dirty

if fetch {
// temporary unlock to fetch the entire graph
p.lock.Unlock()
// Fetch graph starting at node identified by cid
Expand All @@ -204,54 +212,63 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
if err != nil {
return err
}
}

// If autosyncing, sync dag service before making any change to pins
err = p.flushDagService(ctx, false)
if err != nil {
return err
}

// Only look again if something has changed.
if p.dirty != dirtyBefore {
found, err = p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
return nil
}
}
// If autosyncing, sync dag service before making any change to pins
err = p.flushDagService(ctx, false)
if err != nil {
return err
}

// TODO: remove this to support multiple pins per CID
found, err = p.cidDIndex.HasAny(ctx, cidKey)
// Only look again if something has changed.
if p.dirty != dirtyBefore {
found, err = p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
_, err = p.removePinsForCid(ctx, c, ipfspinner.Direct)
if err != nil {
return err
}
return nil
}
}

_, err = p.addPin(ctx, c, ipfspinner.Recursive, "")
if err != nil {
return err
}
} else {
found, err := p.cidRIndex.HasAny(ctx, cidKey)
// TODO: remove this to support multiple pins per CID
found, err = p.cidDIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
_, err = p.removePinsForCid(ctx, c, ipfspinner.Direct)
if err != nil {
return err
}
if found {
return fmt.Errorf("%s already pinned recursively", c.String())
}
}

_, err = p.addPin(ctx, c, ipfspinner.Direct, "")
if err != nil {
return err
}
_, err = p.addPin(ctx, c, ipfspinner.Recursive, "")
if err != nil {
return err
}
return p.flushPins(ctx, false)
}

func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid) error {
cidKey := c.KeyString()

p.lock.Lock()
defer p.lock.Unlock()

found, err := p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
return fmt.Errorf("%s already pinned recursively", c.String())
}

_, err = p.addPin(ctx, c, ipfspinner.Direct, "")
if err != nil {
return err
}

return p.flushPins(ctx, false)
}

Expand Down Expand Up @@ -555,35 +572,6 @@ func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]ipfspinn
return pinned, nil
}

// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
func (p *pinner) RemovePinWithMode(c cid.Cid, mode ipfspinner.Mode) {
ctx := context.TODO()
// Check cache to see if CID is pinned
switch mode {
case ipfspinner.Direct, ipfspinner.Recursive:
default:
// programmer error, panic OK
panic("unrecognized pin type")
}

p.lock.Lock()
defer p.lock.Unlock()

removed, err := p.removePinsForCid(ctx, c, mode)
if err != nil {
log.Error("cound not remove pins: %s", err)
return
}
if !removed {
return
}
if err = p.flushPins(ctx, false); err != nil {
log.Error("cound not remove pins: %s", err)
}
}

// removePinsForCid removes all pins for a cid that has the specified mode.
// Returns true if any pins, and all corresponding CID index entries, were
// removed. Otherwise, returns false.
Expand Down Expand Up @@ -826,32 +814,15 @@ func (p *pinner) Flush(ctx context.Context) error {

// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(c cid.Cid, mode ipfspinner.Mode) {
ctx := context.TODO()

p.lock.Lock()
defer p.lock.Unlock()

func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) error {
// TODO: remove his to support multiple pins per CID
switch mode {
case ipfspinner.Recursive:
if has, _ := p.cidRIndex.HasAny(ctx, c.KeyString()); has {
return // already a recursive pin for this CID
}
return p.doPinRecursive(ctx, c, false)
case ipfspinner.Direct:
if has, _ := p.cidDIndex.HasAny(ctx, c.KeyString()); has {
return // already a direct pin for this CID
}
return p.doPinDirect(ctx, c)
default:
panic("unrecognized pin mode")
}

_, err := p.addPin(ctx, c, mode, "")
if err != nil {
return
}
if err = p.flushPins(ctx, false); err != nil {
log.Errorf("failed to create %s pin: %s", mode, err)
return fmt.Errorf("unrecognized pin mode")
}
}

Expand Down
44 changes: 3 additions & 41 deletions dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
lds "github.com/ipfs/go-ds-leveldb"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipfspin "github.com/ipfs/go-ipfs-pinner"
util "github.com/ipfs/go-ipfs-util"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"

ipfspin "github.com/ipfs/go-ipfs-pinner"
)

var rand = util.NewTimeSeededRand()
Expand Down Expand Up @@ -375,45 +376,6 @@ func TestAddLoadPin(t *testing.T) {
}
}

func TestRemovePinWithMode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))

dserv := mdag.NewDAGService(bserv)

p, err := New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}

a, ak := randNode()
err = dserv.Add(ctx, a)
if err != nil {
panic(err)
}

err = p.Pin(ctx, a, false)
if err != nil {
t.Fatal(err)
}

ok, err := p.removePinsForCid(ctx, ak, ipfspin.Recursive)
if err != nil {
t.Fatal(err)
}
if ok {
t.Error("pin should not have been removed")
}

p.RemovePinWithMode(ak, ipfspin.Direct)

assertUnpinned(t, p, ak, "pin was not removed")
}

func TestIsPinnedLookup(t *testing.T) {
// Test that lookups work in pins which share
// the same branches. For that construct this tree:
Expand Down Expand Up @@ -523,7 +485,7 @@ func TestFlush(t *testing.T) {
}
_, k := randNode()

p.PinWithMode(k, ipfspin.Recursive)
p.PinWithMode(ctx, k, ipfspin.Recursive)
if err = p.Flush(ctx); err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 3 additions & 6 deletions pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type Pinner interface {
IsPinnedWithType(ctx context.Context, c cid.Cid, mode Mode) (string, bool, error)

// Pin the given node, optionally recursively.
// Pin will make sure that the given node and its children if recursive is set
// are stored locally.
Pin(ctx context.Context, node ipld.Node, recursive bool) error

// Unpin the given cid. If recursive is true, removes either a recursive or
Expand All @@ -111,12 +113,7 @@ type Pinner interface {
// PinWithMode is for manually editing the pin structure. Use with
// care! If used improperly, garbage collection may not be
// successful.
PinWithMode(cid.Cid, Mode)

// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
RemovePinWithMode(cid.Cid, Mode)
PinWithMode(context.Context, cid.Cid, Mode) error

// Flush writes the pin state to the backing datastore
Flush(ctx context.Context) error
Expand Down

0 comments on commit 9abb80f

Please sign in to comment.