diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 13934a47fa0..5c737e6a9be 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -55,6 +55,7 @@ const ( enablePubSubKwd = "enable-pubsub-experiment" enableIPNSPubSubKwd = "enable-namesys-pubsub" enableMultiplexKwd = "enable-mplex-experiment" + enableFollowKwd = "enable-follow-experiment" // apiAddrKwd = "address-api" // swarmAddrKwd = "address-swarm" ) @@ -167,6 +168,7 @@ Headers. cmdkit.BoolOption(enablePubSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."), cmdkit.BoolOption(enableIPNSPubSubKwd, "Enable IPNS record distribution through pubsub; enables pubsub."), cmdkit.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction.").WithDefault(true), + cmdkit.BoolOption(enableFollowKwd, "Enable IPNS name following."), // TODO: add way to override addresses. tricky part: updating the config if also --init. // cmdkit.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"), @@ -287,6 +289,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment ipnsps, _ := req.Options[enableIPNSPubSubKwd].(bool) pubsub, _ := req.Options[enablePubSubKwd].(bool) mplex, _ := req.Options[enableMultiplexKwd].(bool) + follow, _ := req.Options[enableFollowKwd].(bool) // Start assembling node config ncfg := &core.BuildCfg{ @@ -298,6 +301,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment "pubsub": pubsub, "ipnsps": ipnsps, "mplex": mplex, + "follow": follow, }, //TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral } diff --git a/core/builder.go b/core/builder.go index e089bae838e..8e8c94e8973 100644 --- a/core/builder.go +++ b/core/builder.go @@ -256,7 +256,11 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { if cfg.Online { do := setupDiscoveryOption(rcfg.Discovery) - if err := n.startOnlineServices(ctx, cfg.Routing, hostOption, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil { + pubsub := cfg.getOpt("pubsub") + ipnsps := cfg.getOpt("ipnsps") + mplex := cfg.getOpt("mplex") + follow := cfg.getOpt("follow") + if err := n.startOnlineServices(ctx, cfg.Routing, hostOption, do, pubsub, ipnsps, mplex, follow); err != nil { return err } } else { diff --git a/core/commands/name/follow.go b/core/commands/name/follow.go new file mode 100644 index 00000000000..fb5b7495543 --- /dev/null +++ b/core/commands/name/follow.go @@ -0,0 +1,143 @@ +package name + +import ( + "fmt" + "io" + "time" + + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + nc "github.com/ipfs/go-ipfs/namecache" + + "gx/ipfs/QmR77mMvvh8mJBBWQmBfQBu8oD38NUN4KE9SL2gDgAQNc6/go-ipfs-cmds" + "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" +) + +type ipnsFollowResult struct { + Result string +} + +var IpnsFollowCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "Follow IPNS names.", + ShortDescription: ` +Periodically resolve and optionally pin IPNS names in the background. +`, + }, + Subcommands: map[string]*cmds.Command{ + "add": ipnsFollowAddCmd, + "list": ipnsFollowListCmd, + "cancel": ipnsFollowCancelCmd, + }, +} + +var ipnsFollowAddCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "Follow one or more names", + ShortDescription: ` +Follows an IPNS name by periodically resolving in the backround. +`, + }, + Arguments: []cmdkit.Argument{ + cmdkit.StringArg("name", true, true, "IPNS Name to follow."), + }, + Options: []cmdkit.Option{ + cmdkit.BoolOption("pin", "Recursively pin the resolved pointer"), + cmdkit.StringOption("refresh-interval", "Follow refresh interval; defaults to 1hr."), + }, + + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if n.Namecache == nil { + return cmdkit.Errorf(cmdkit.ErrClient, "IPNS Namecache is not available") + } + + prefetch, _ := req.Options["prefetch"].(bool) + refrS, _ := req.Options["refresh-interval"].(string) + refr := nc.DefaultFollowInterval + + if refrS != "" { + refr, err = time.ParseDuration(refrS) + if err != nil { + return err + } + } + + for _, name := range req.Arguments { + err = n.Namecache.Follow(name, prefetch, refr) + if err != nil { + return err + } + } + + return cmds.EmitOnce(res, &ipnsFollowResult{"ok"}) + }, + Type: ipnsFollowResult{}, + Encoders: cmds.EncoderMap{ + cmds.Text: encodeFollowResult(), + }, +} + +var ipnsFollowListCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "List names followed by the daemon", + }, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if n.Namecache == nil { + return cmdkit.Errorf(cmdkit.ErrClient, "IPNS Namecache is not available") + } + + return cmds.EmitOnce(res, &stringList{n.Namecache.ListFollows()}) + }, + Type: stringList{}, + Encoders: cmds.EncoderMap{ + cmds.Text: stringListEncoder(), + }, +} + +var ipnsFollowCancelCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "Cancels a follow", + }, + Arguments: []cmdkit.Argument{ + cmdkit.StringArg("name", true, true, "Name follow to cancel."), + }, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if n.Namecache == nil { + return cmdkit.Errorf(cmdkit.ErrClient, "IPNS Namecache is not available") + } + + for _, name := range req.Arguments { + err = n.Namecache.Unfollow(name) + if err != nil { + return err + } + } + + return cmds.EmitOnce(res, &ipnsFollowResult{"ok"}) + }, + Type: ipnsFollowResult{}, + Encoders: cmds.EncoderMap{ + cmds.Text: encodeFollowResult(), + }, +} + +func encodeFollowResult() cmds.EncoderFunc { + return cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, list *ipnsFollowResult) error { + _, err := fmt.Fprintf(w, "%s\n", list.Result) + return err + }) +} diff --git a/core/commands/name/name.go b/core/commands/name/name.go index 7dbae11f2b2..8ec960b4c53 100644 --- a/core/commands/name/name.go +++ b/core/commands/name/name.go @@ -63,5 +63,6 @@ Resolve the value of a dnslink: "publish": PublishCmd, "resolve": IpnsCmd, "pubsub": IpnsPubsubCmd, + "follow": IpnsFollowCmd, }, } diff --git a/core/core.go b/core/core.go index 5f24f453d06..86f51b348aa 100644 --- a/core/core.go +++ b/core/core.go @@ -24,6 +24,7 @@ import ( rp "github.com/ipfs/go-ipfs/exchange/reprovide" filestore "github.com/ipfs/go-ipfs/filestore" mount "github.com/ipfs/go-ipfs/fuse/mount" + namecache "github.com/ipfs/go-ipfs/namecache" namesys "github.com/ipfs/go-ipfs/namesys" ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" p2p "github.com/ipfs/go-ipfs/p2p" @@ -133,7 +134,8 @@ type IpfsNode struct { Routing routing.IpfsRouting // the routing system. recommend ipfs-dht Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes - Reprovider *rp.Reprovider // the value reprovider system + Namecache namecache.NameCache // the name system follower cache + Reprovider *rp.Reprovider // the value reprovider system IpnsRepub *ipnsrp.Republisher AutoNAT *autonat.AutoNATService @@ -157,7 +159,7 @@ type Mounts struct { Ipns mount.Mount } -func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, ipnsps, mplex bool) error { +func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, ipnsps, mplex, follow bool) error { if n.PeerHost != nil { // already online. return errors.New("node already online") } @@ -293,6 +295,14 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore) + if follow { + n.Namecache = namecache.NewNameCache(ctx, n.Namesys, n.DAG) + n.Namecache, err = namecache.NewPersistentCache(n.Namecache, n.Repo.Datastore()) + if err != nil { + return err + } + } + // setup local discovery if do != nil { service, err := do(ctx, n.PeerHost) diff --git a/dagutils/diff.go b/dagutils/diff.go index 94ab974729d..6a9f2d65d35 100644 --- a/dagutils/diff.go +++ b/dagutils/diff.go @@ -99,7 +99,7 @@ func ApplyChange(ctx context.Context, ds ipld.DAGService, nd *dag.ProtoNode, cs // 1. two node's links number are greater than 0. // 2. both of two nodes are ProtoNode. // Otherwise, it compares the cid and emits a Mod change object. -func Diff(ctx context.Context, ds ipld.DAGService, a, b ipld.Node) ([]*Change, error) { +func Diff(ctx context.Context, ds ipld.NodeGetter, a, b ipld.Node) ([]*Change, error) { // Base case where both nodes are leaves, just compare // their CIDs. if len(a.Links()) == 0 && len(b.Links()) == 0 { diff --git a/namecache/namecache.go b/namecache/namecache.go new file mode 100644 index 00000000000..458db258343 --- /dev/null +++ b/namecache/namecache.go @@ -0,0 +1,207 @@ +// Package namecache implements background following (resolution and pinning) of names +package namecache + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/ipfs/go-ipfs/core/coreapi/interface" + "github.com/ipfs/go-ipfs/dagutils" + "github.com/ipfs/go-ipfs/namesys" + + "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" + ipld "gx/ipfs/QmRL22E4paat7ky7vx9MLpR97JHHbFPrg3ytFQw6qp1y1s/go-ipld-format" + "gx/ipfs/QmWqh9oob7ZHQRwU5CdTqpnC8ip8BEkFNrwXRxeNo5Y7vA/go-path" + dag "gx/ipfs/Qmb2UEG2TAeVrEJSjqsZF7Y2he7wRDkrdt6c3bECxwZf4k/go-merkledag" + logging "gx/ipfs/QmcuXC5cxs79ro2cUuHs4HQ2bkDLJUYokwL8aivcX6HW3C/go-log" +) + +const ( + DefaultFollowInterval = 1 * time.Hour + resolveTimeout = 1 * time.Minute +) + +var log = logging.Logger("namecache") + +// NameCache represents a following cache of names +type NameCache interface { + // Follow starts following name + Follow(name string, prefetch bool, followInterval time.Duration) error + // Unofollow cancels a follow + Unfollow(name string) error + // ListFollows returns a list of followed names + ListFollows() []string +} + +type nameCache struct { + nsys namesys.NameSystem + dag ipld.NodeGetter + + ctx context.Context + follows map[string]func() + mx sync.Mutex +} + +func NewNameCache(ctx context.Context, nsys namesys.NameSystem, dag ipld.NodeGetter) NameCache { + return &nameCache{ + ctx: ctx, + nsys: nsys, + dag: dag, + follows: make(map[string]func()), + } +} + +// Follow spawns a goroutine that periodically resolves a name +// and (when dopin is true) pins it in the background +func (nc *nameCache) Follow(name string, prefetch bool, followInterval time.Duration) error { + nc.mx.Lock() + defer nc.mx.Unlock() + + if !strings.HasPrefix(name, "/ipns/") { + name = "/ipns/" + name + } + + if _, ok := nc.follows[name]; ok { + return fmt.Errorf("already following %s", name) + } + + ctx, cancel := context.WithCancel(nc.ctx) + go nc.followName(ctx, name, prefetch, followInterval) + nc.follows[name] = cancel + + return nil +} + +// Unfollow cancels a follow +func (nc *nameCache) Unfollow(name string) error { + nc.mx.Lock() + defer nc.mx.Unlock() + + if !strings.HasPrefix(name, "/ipns/") { + name = "/ipns/" + name + } + + cancel, ok := nc.follows[name] + if !ok { + return fmt.Errorf("unknown name %s", name) + } + + cancel() + delete(nc.follows, name) + return nil +} + +// ListFollows returns a list of names currently being followed +func (nc *nameCache) ListFollows() []string { + nc.mx.Lock() + defer nc.mx.Unlock() + + follows := make([]string, 0, len(nc.follows)) + for name := range nc.follows { + follows = append(follows, name) + } + + return follows +} + +func (nc *nameCache) followName(ctx context.Context, name string, prefetch bool, followInterval time.Duration) { + emptynode := new(dag.ProtoNode) + + c, err := nc.resolveAndUpdate(ctx, name, prefetch, emptynode.Cid()) + if err != nil { + log.Errorf("Error following %s: %s", name, err.Error()) + } + + ticker := time.NewTicker(followInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c, err = nc.resolveAndUpdate(ctx, name, prefetch, c) + + if err != nil { + log.Errorf("Error following %s: %s", name, err.Error()) + } + + case <-ctx.Done(): + return + } + } +} + +func (nc *nameCache) resolveAndUpdate(ctx context.Context, name string, prefetch bool, oldcid cid.Cid) (cid.Cid, error) { + ptr, err := nc.resolve(ctx, name) + if err != nil { + return cid.Undef, err + } + + newcid, err := pathToCid(ptr) + if err != nil { + return cid.Undef, err + } + + if newcid.Equals(oldcid) || !prefetch { + return newcid, nil + } + + oldnd, err := nc.dag.Get(ctx, oldcid) + if err != nil { + return cid.Undef, err + } + + newnd, err := nc.dag.Get(ctx, newcid) + if err != nil { + return cid.Undef, err + } + + changes, err := dagutils.Diff(ctx, nc.dag, oldnd, newnd) + if err != nil { + return cid.Undef, err + } + + log.Debugf("fetching changes in %s (%s -> %s)", name, oldcid, newcid) + for _, change := range changes { + if change.Type == iface.DiffRemove { + continue + } + + toFetch, err := nc.dag.Get(ctx, change.After) + if err != nil { + return cid.Undef, err + } + + // just iterate over all nodes + walker := ipld.NewWalker(ctx, ipld.NewNavigableIPLDNode(toFetch, nc.dag)) + if err := walker.Iterate(func(node ipld.NavigableNode) error { + return nil + }); err != ipld.EndOfDag { + return cid.Undef, fmt.Errorf("unexpected error when prefetching followed name: %s", err) + } + } + + return newcid, err +} + +func (nc *nameCache) resolve(ctx context.Context, name string) (path.Path, error) { + log.Debugf("resolving %s", name) + + rctx, cancel := context.WithTimeout(ctx, resolveTimeout) + defer cancel() + + p, err := nc.nsys.Resolve(rctx, name) + if err != nil { + return "", err + } + + log.Debugf("resolved %s to %s", name, p) + + return p, nil +} + +func pathToCid(p path.Path) (cid.Cid, error) { + return cid.Decode(p.Segments()[1]) +} diff --git a/namecache/persistent.go b/namecache/persistent.go new file mode 100644 index 00000000000..6503a682874 --- /dev/null +++ b/namecache/persistent.go @@ -0,0 +1,72 @@ +package namecache + +import ( + "encoding/json" + "time" + + ds "gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore" + nsds "gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore/namespace" + dsq "gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore/query" +) + +var dsPrefix = ds.NewKey("/namecache") + +// persistent is a cache layer which persists followed names between node +// restarts +type persistent struct { + NameCache + + ds ds.Datastore +} + +type follow struct { + Prefetch bool + Deadline time.Time +} + +func NewPersistentCache(base NameCache, d ds.Datastore) (NameCache, error) { + d = nsds.Wrap(d, dsPrefix) + + q ,err := d.Query(dsq.Query{}) + if err != nil { + return nil, err + } + defer q.Close() + for e := range q.Next() { + var f follow + if err := json.Unmarshal(e.Value, &f); err != nil { + return nil, err + } + if err := base.Follow(e.Key, f.Prefetch, time.Now().Sub(f.Deadline)); err != nil { + return nil, err + } + } + + + return &persistent{ + NameCache: base, + ds: d, + }, nil +} + +func (p *persistent) Follow(name string, prefetch bool, followInterval time.Duration) error { + b, err := json.Marshal(&follow{ + Prefetch: prefetch, + Deadline: time.Now().Add(followInterval), + }) + if err != nil { + return err + } + + if err := p.NameCache.Follow(name, prefetch, followInterval); err != nil { + return err + } + return p.ds.Put(ds.NewKey(name), b) +} + +func (p *persistent) Unfollow(name string) error { + if err := p.NameCache.Unfollow(name); err != nil { + return err + } + return p.ds.Delete(ds.NewKey(name)) +} \ No newline at end of file