diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f62337e522..6eaa93b8c7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # Kubo Changelogs +- [v0.14](docs/changelogs/v0.14.md) - [v0.13](docs/changelogs/v0.13.md) - [v0.12](docs/changelogs/v0.12.md) - [v0.11](docs/changelogs/v0.11.md) diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 8d337931590..4b2328f5d13 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -400,10 +400,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment routingOption, _ := req.Options[routingOptionKwd].(string) if routingOption == routingOptionDefaultKwd { - routingOption = cfg.Routing.Type - if routingOption == "" { - routingOption = routingOptionDHTKwd - } + routingOption = cfg.Routing.Type.WithDefault(routingOptionDHTKwd) } switch routingOption { case routingOptionSupernodeKwd: diff --git a/config/init.go b/config/init.go index d6dcace7431..edf64465cf6 100644 --- a/config/init.go +++ b/config/init.go @@ -48,7 +48,7 @@ func InitWithIdentity(identity Identity) (*Config, error) { }, Routing: Routing{ - Type: "dht", + Type: NewOptionalString("dht"), }, // setup the node mount points. diff --git a/config/profile.go b/config/profile.go index cbc7c976453..8252d1ab91e 100644 --- a/config/profile.go +++ b/config/profile.go @@ -174,7 +174,7 @@ functionality - performance of content discovery and data fetching may be degraded. `, Transform: func(c *Config) error { - c.Routing.Type = "dhtclient" + c.Routing.Type = NewOptionalString("dhtclient") c.AutoNAT.ServiceMode = AutoNATServiceDisabled c.Reprovider.Interval = "0" diff --git a/config/routing.go b/config/routing.go index c6157ec9637..4a96589daad 100644 --- a/config/routing.go +++ b/config/routing.go @@ -5,5 +5,40 @@ type Routing struct { // Type sets default daemon routing mode. // // Can be one of "dht", "dhtclient", "dhtserver", "none", or unset. + Type *OptionalString `json:",omitempty"` + + Routers map[string]Router +} + +type Router struct { + + // Currenly only supported Type is "reframe". + // Reframe type allows to add other resolvers using the Reframe spec: + // https://github.com/ipfs/specs/tree/main/reframe + // In the future we will support "dht" and other Types here. Type string + + Enabled Flag `json:",omitempty"` + + // Parameters are extra configuration that this router might need. + // A common one for reframe router is "Endpoint". + Parameters map[string]string } + +// Type is the routing type. +// Depending of the type we need to instantiate different Routing implementations. +type RouterType string + +const ( + RouterTypeReframe RouterType = "reframe" +) + +type RouterParam string + +const ( + // RouterParamEndpoint is the URL where the routing implementation will point to get the information. + // Usually used for reframe Routers. + RouterParamEndpoint RouterParam = "Endpoint" + + RouterParamPriority RouterParam = "Priority" +) diff --git a/config/types.go b/config/types.go index c33689c5b2a..f27c5fa13e9 100644 --- a/config/types.go +++ b/config/types.go @@ -321,6 +321,11 @@ type OptionalString struct { value *string } +// NewOptionalString returns an OptionalString from a string +func NewOptionalString(s string) *OptionalString { + return &OptionalString{value: &s} +} + // WithDefault resolves the integer with the given default. func (p *OptionalString) WithDefault(defaultValue string) (value string) { if p == nil || p.value == nil { diff --git a/core/commands/commands_test.go b/core/commands/commands_test.go index ea5c6ac8468..562b17022ce 100644 --- a/core/commands/commands_test.go +++ b/core/commands/commands_test.go @@ -119,6 +119,12 @@ func TestCommands(t *testing.T) { "/dht/provide", "/dht/put", "/dht/query", + "/routing", + "/routing/put", + "/routing/get", + "/routing/findpeer", + "/routing/findprovs", + "/routing/provide", "/diag", "/diag/cmds", "/diag/cmds/clear", diff --git a/core/commands/dht.go b/core/commands/dht.go index 79e8580a1ed..1f6f50449ca 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -2,28 +2,18 @@ package commands import ( "context" - "encoding/base64" "errors" "fmt" "io" - "time" - cmdenv "github.com/ipfs/kubo/core/commands/cmdenv" - - cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" - ipld "github.com/ipfs/go-ipld-format" - dag "github.com/ipfs/go-merkledag" - path "github.com/ipfs/go-path" + "github.com/ipfs/kubo/core/commands/cmdenv" peer "github.com/libp2p/go-libp2p-core/peer" routing "github.com/libp2p/go-libp2p-core/routing" ) var ErrNotDHT = errors.New("routing service is not a DHT") -// TODO: Factor into `ipfs dht` and `ipfs routing`. -// Everything *except `query` goes into `ipfs routing`. - var DhtCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Issue commands directly through the DHT.", @@ -40,9 +30,55 @@ var DhtCmd = &cmds.Command{ }, } -const ( - dhtVerboseOptionName = "verbose" -) +var findProvidersDhtCmd = &cmds.Command{ + Helptext: findProvidersRoutingCmd.Helptext, + Arguments: findProvidersRoutingCmd.Arguments, + Options: findProvidersRoutingCmd.Options, + Run: findProvidersRoutingCmd.Run, + Encoders: findProvidersRoutingCmd.Encoders, + Type: findProvidersRoutingCmd.Type, + Status: cmds.Deprecated, +} + +var findPeerDhtCmd = &cmds.Command{ + Helptext: findPeerRoutingCmd.Helptext, + Arguments: findPeerRoutingCmd.Arguments, + Options: findPeerRoutingCmd.Options, + Run: findPeerRoutingCmd.Run, + Encoders: findPeerRoutingCmd.Encoders, + Type: findPeerRoutingCmd.Type, + Status: cmds.Deprecated, +} + +var getValueDhtCmd = &cmds.Command{ + Helptext: getValueRoutingCmd.Helptext, + Arguments: getValueRoutingCmd.Arguments, + Options: getValueRoutingCmd.Options, + Run: getValueRoutingCmd.Run, + Encoders: getValueRoutingCmd.Encoders, + Type: getValueRoutingCmd.Type, + Status: cmds.Deprecated, +} + +var putValueDhtCmd = &cmds.Command{ + Helptext: putValueRoutingCmd.Helptext, + Arguments: putValueRoutingCmd.Arguments, + Options: putValueRoutingCmd.Options, + Run: putValueRoutingCmd.Run, + Encoders: putValueRoutingCmd.Encoders, + Type: putValueRoutingCmd.Type, + Status: cmds.Deprecated, +} + +var provideRefDhtCmd = &cmds.Command{ + Helptext: provideRefRoutingCmd.Helptext, + Arguments: provideRefRoutingCmd.Arguments, + Options: provideRefRoutingCmd.Options, + Run: provideRefRoutingCmd.Run, + Encoders: provideRefRoutingCmd.Encoders, + Type: provideRefRoutingCmd.Type, + Status: cmds.Deprecated, +} // kademlia extends the routing interface with a command to get the peers closest to the target type kademlia interface { @@ -133,568 +169,3 @@ var queryDhtCmd = &cmds.Command{ }, Type: routing.QueryEvent{}, } - -const ( - numProvidersOptionName = "num-providers" -) - -var findProvidersDhtCmd = &cmds.Command{ - Helptext: cmds.HelpText{ - Tagline: "Find peers that can provide a specific value, given a key.", - ShortDescription: "Outputs a list of newline-delimited provider Peer IDs.", - }, - - Arguments: []cmds.Argument{ - cmds.StringArg("key", true, true, "The key to find providers for."), - }, - Options: []cmds.Option{ - cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), - cmds.IntOption(numProvidersOptionName, "n", "The number of providers to find.").WithDefault(20), - }, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) - if err != nil { - return err - } - - if !n.IsOnline { - return ErrNotOnline - } - - numProviders, _ := req.Options[numProvidersOptionName].(int) - if numProviders < 1 { - return fmt.Errorf("number of providers must be greater than 0") - } - - c, err := cid.Parse(req.Arguments[0]) - - if err != nil { - return err - } - - ctx, cancel := context.WithCancel(req.Context) - ctx, events := routing.RegisterForQueryEvents(ctx) - - pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders) - - go func() { - defer cancel() - for p := range pchan { - np := p - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.Provider, - Responses: []*peer.AddrInfo{&np}, - }) - } - }() - for e := range events { - if err := res.Emit(e); err != nil { - return err - } - } - - return nil - }, - Encoders: cmds.EncoderMap{ - cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { - pfm := pfuncMap{ - routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { - if verbose { - fmt.Fprintf(out, "* closest peer %s\n", obj.ID) - } - return nil - }, - routing.Provider: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { - prov := obj.Responses[0] - if verbose { - fmt.Fprintf(out, "provider: ") - } - fmt.Fprintf(out, "%s\n", prov.ID.Pretty()) - if verbose { - for _, a := range prov.Addrs { - fmt.Fprintf(out, "\t%s\n", a) - } - } - return nil - }, - } - - verbose, _ := req.Options[dhtVerboseOptionName].(bool) - return printEvent(out, w, verbose, pfm) - }), - }, - Type: routing.QueryEvent{}, -} - -const ( - recursiveOptionName = "recursive" -) - -var provideRefDhtCmd = &cmds.Command{ - Helptext: cmds.HelpText{ - Tagline: "Announce to the network that you are providing given values.", - }, - - Arguments: []cmds.Argument{ - cmds.StringArg("key", true, true, "The key[s] to send provide records for.").EnableStdin(), - }, - Options: []cmds.Option{ - cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), - cmds.BoolOption(recursiveOptionName, "r", "Recursively provide entire graph."), - }, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - nd, err := cmdenv.GetNode(env) - if err != nil { - return err - } - - if !nd.IsOnline { - return ErrNotOnline - } - - if len(nd.PeerHost.Network().Conns()) == 0 { - return errors.New("cannot provide, no connected peers") - } - - // Needed to parse stdin args. - // TODO: Lazy Load - err = req.ParseBodyArgs() - if err != nil { - return err - } - - rec, _ := req.Options[recursiveOptionName].(bool) - - var cids []cid.Cid - for _, arg := range req.Arguments { - c, err := cid.Decode(arg) - if err != nil { - return err - } - - has, err := nd.Blockstore.Has(req.Context, c) - if err != nil { - return err - } - - if !has { - return fmt.Errorf("block %s not found locally, cannot provide", c) - } - - cids = append(cids, c) - } - - ctx, cancel := context.WithCancel(req.Context) - ctx, events := routing.RegisterForQueryEvents(ctx) - - var provideErr error - go func() { - defer cancel() - if rec { - provideErr = provideKeysRec(ctx, nd.Routing, nd.DAG, cids) - } else { - provideErr = provideKeys(ctx, nd.Routing, cids) - } - if provideErr != nil { - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.QueryError, - Extra: provideErr.Error(), - }) - } - }() - - for e := range events { - if err := res.Emit(e); err != nil { - return err - } - } - - return provideErr - }, - Encoders: cmds.EncoderMap{ - cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { - pfm := pfuncMap{ - routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { - if verbose { - fmt.Fprintf(out, "sending provider record to peer %s\n", obj.ID) - } - return nil - }, - } - - verbose, _ := req.Options[dhtVerboseOptionName].(bool) - return printEvent(out, w, verbose, pfm) - }), - }, - Type: routing.QueryEvent{}, -} - -func provideKeys(ctx context.Context, r routing.Routing, cids []cid.Cid) error { - for _, c := range cids { - err := r.Provide(ctx, c, true) - if err != nil { - return err - } - } - return nil -} - -func provideKeysRec(ctx context.Context, r routing.Routing, dserv ipld.DAGService, cids []cid.Cid) error { - provided := cid.NewSet() - for _, c := range cids { - kset := cid.NewSet() - - err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, kset.Visit) - if err != nil { - return err - } - - for _, k := range kset.Keys() { - if provided.Has(k) { - continue - } - - err = r.Provide(ctx, k, true) - if err != nil { - return err - } - provided.Add(k) - } - } - - return nil -} - -var findPeerDhtCmd = &cmds.Command{ - Helptext: cmds.HelpText{ - Tagline: "Find the multiaddresses associated with a Peer ID.", - ShortDescription: "Outputs a list of newline-delimited multiaddresses.", - }, - - Arguments: []cmds.Argument{ - cmds.StringArg("peerID", true, true, "The ID of the peer to search for."), - }, - Options: []cmds.Option{ - cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), - }, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - nd, err := cmdenv.GetNode(env) - if err != nil { - return err - } - - if !nd.IsOnline { - return ErrNotOnline - } - - pid, err := peer.Decode(req.Arguments[0]) - if err != nil { - return err - } - - ctx, cancel := context.WithCancel(req.Context) - ctx, events := routing.RegisterForQueryEvents(ctx) - - var findPeerErr error - go func() { - defer cancel() - var pi peer.AddrInfo - pi, findPeerErr = nd.Routing.FindPeer(ctx, pid) - if findPeerErr != nil { - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.QueryError, - Extra: findPeerErr.Error(), - }) - return - } - - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.FinalPeer, - Responses: []*peer.AddrInfo{&pi}, - }) - }() - - for e := range events { - if err := res.Emit(e); err != nil { - return err - } - } - - return findPeerErr - }, - Encoders: cmds.EncoderMap{ - cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { - pfm := pfuncMap{ - routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { - pi := obj.Responses[0] - for _, a := range pi.Addrs { - fmt.Fprintf(out, "%s\n", a) - } - return nil - }, - } - - verbose, _ := req.Options[dhtVerboseOptionName].(bool) - return printEvent(out, w, verbose, pfm) - }), - }, - Type: routing.QueryEvent{}, -} - -var getValueDhtCmd = &cmds.Command{ - Helptext: cmds.HelpText{ - Tagline: "Given a key, query the routing system for its best value.", - ShortDescription: ` -Outputs the best value for the given key. - -There may be several different values for a given key stored in the routing -system; in this context 'best' means the record that is most desirable. There is -no one metric for 'best': it depends entirely on the key type. For IPNS, 'best' -is the record that is both valid and has the highest sequence number (freshest). -Different key types can specify other 'best' rules. -`, - }, - - Arguments: []cmds.Argument{ - cmds.StringArg("key", true, true, "The key to find a value for."), - }, - Options: []cmds.Option{ - cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), - }, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - nd, err := cmdenv.GetNode(env) - if err != nil { - return err - } - - if !nd.IsOnline { - return ErrNotOnline - } - - dhtkey, err := escapeDhtKey(req.Arguments[0]) - if err != nil { - return err - } - - ctx, cancel := context.WithCancel(req.Context) - ctx, events := routing.RegisterForQueryEvents(ctx) - - var getErr error - go func() { - defer cancel() - var val []byte - val, getErr = nd.Routing.GetValue(ctx, dhtkey) - if getErr != nil { - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.QueryError, - Extra: getErr.Error(), - }) - } else { - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.Value, - Extra: base64.StdEncoding.EncodeToString(val), - }) - } - }() - - for e := range events { - if err := res.Emit(e); err != nil { - return err - } - } - - return getErr - }, - Encoders: cmds.EncoderMap{ - cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { - pfm := pfuncMap{ - routing.Value: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { - if verbose { - _, err := fmt.Fprintf(out, "got value: '%s'\n", obj.Extra) - return err - } - res, err := base64.StdEncoding.DecodeString(obj.Extra) - if err != nil { - return err - } - _, err = out.Write(res) - return err - }, - } - - verbose, _ := req.Options[dhtVerboseOptionName].(bool) - return printEvent(out, w, verbose, pfm) - }), - }, - Type: routing.QueryEvent{}, -} - -var putValueDhtCmd = &cmds.Command{ - Helptext: cmds.HelpText{ - Tagline: "Write a key/value pair to the routing system.", - ShortDescription: ` -Given a key of the form /foo/bar and a valid value for that key, this will write -that value to the routing system with that key. - -Keys have two parts: a keytype (foo) and the key name (bar). IPNS uses the -/ipns keytype, and expects the key name to be a Peer ID. IPNS entries are -specifically formatted (protocol buffer). - -You may only use keytypes that are supported in your ipfs binary: currently -this is only /ipns. Unless you have a relatively deep understanding of the -go-ipfs routing internals, you likely want to be using 'ipfs name publish' instead -of this. - -The value must be a valid value for the given key type. For example, if the key -is /ipns/QmFoo, the value must be IPNS record (protobuf) signed with the key -identified by QmFoo. -`, - }, - - Arguments: []cmds.Argument{ - cmds.StringArg("key", true, false, "The key to store the value at."), - cmds.FileArg("value-file", true, false, "A path to a file containing the value to store.").EnableStdin(), - }, - Options: []cmds.Option{ - cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), - }, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - nd, err := cmdenv.GetNode(env) - if err != nil { - return err - } - - if !nd.IsOnline { - return ErrNotOnline - } - - key, err := escapeDhtKey(req.Arguments[0]) - if err != nil { - return err - } - - file, err := cmdenv.GetFileArg(req.Files.Entries()) - if err != nil { - return err - } - defer file.Close() - - data, err := io.ReadAll(file) - if err != nil { - return err - } - - ctx, cancel := context.WithCancel(req.Context) - ctx, events := routing.RegisterForQueryEvents(ctx) - - var putErr error - go func() { - defer cancel() - putErr = nd.Routing.PutValue(ctx, key, []byte(data)) - if putErr != nil { - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.QueryError, - Extra: putErr.Error(), - }) - } - }() - - for e := range events { - if err := res.Emit(e); err != nil { - return err - } - } - - return putErr - }, - Encoders: cmds.EncoderMap{ - cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { - pfm := pfuncMap{ - routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { - if verbose { - fmt.Fprintf(out, "* closest peer %s\n", obj.ID) - } - return nil - }, - routing.Value: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { - fmt.Fprintf(out, "%s\n", obj.ID.Pretty()) - return nil - }, - } - - verbose, _ := req.Options[dhtVerboseOptionName].(bool) - - return printEvent(out, w, verbose, pfm) - }), - }, - Type: routing.QueryEvent{}, -} - -type printFunc func(obj *routing.QueryEvent, out io.Writer, verbose bool) error -type pfuncMap map[routing.QueryEventType]printFunc - -func printEvent(obj *routing.QueryEvent, out io.Writer, verbose bool, override pfuncMap) error { - if verbose { - fmt.Fprintf(out, "%s: ", time.Now().Format("15:04:05.000")) - } - - if override != nil { - if pf, ok := override[obj.Type]; ok { - return pf(obj, out, verbose) - } - } - - switch obj.Type { - case routing.SendingQuery: - if verbose { - fmt.Fprintf(out, "* querying %s\n", obj.ID) - } - case routing.Value: - if verbose { - fmt.Fprintf(out, "got value: '%s'\n", obj.Extra) - } else { - fmt.Fprint(out, obj.Extra) - } - case routing.PeerResponse: - if verbose { - fmt.Fprintf(out, "* %s says use ", obj.ID) - for _, p := range obj.Responses { - fmt.Fprintf(out, "%s ", p.ID) - } - fmt.Fprintln(out) - } - case routing.QueryError: - if verbose { - fmt.Fprintf(out, "error: %s\n", obj.Extra) - } - case routing.DialingPeer: - if verbose { - fmt.Fprintf(out, "dialing peer: %s\n", obj.ID) - } - case routing.AddingPeer: - if verbose { - fmt.Fprintf(out, "adding peer to query: %s\n", obj.ID) - } - case routing.FinalPeer: - default: - if verbose { - fmt.Fprintf(out, "unrecognized event type: %d\n", obj.Type) - } - } - return nil -} - -func escapeDhtKey(s string) (string, error) { - parts := path.SplitList(s) - if len(parts) != 3 || - parts[0] != "" || - !(parts[1] == "ipns" || parts[1] == "pk") { - return "", errors.New("invalid key") - } - - k, err := peer.Decode(parts[2]) - if err != nil { - return "", err - } - return path.Join(append(parts[:2], string(k))), nil -} diff --git a/core/commands/root.go b/core/commands/root.go index 47fe1f09558..33c0cc04f35 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -135,6 +135,7 @@ var rootSubcommands = map[string]*cmds.Command{ "config": ConfigCmd, "dag": dag.DagCmd, "dht": DhtCmd, + "routing": RoutingCmd, "diag": DiagCmd, "dns": DNSCmd, "id": IDCmd, diff --git a/core/commands/routing.go b/core/commands/routing.go new file mode 100644 index 00000000000..63d06940d5e --- /dev/null +++ b/core/commands/routing.go @@ -0,0 +1,604 @@ +package commands + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "io" + "time" + + cmdenv "github.com/ipfs/kubo/core/commands/cmdenv" + + cid "github.com/ipfs/go-cid" + cmds "github.com/ipfs/go-ipfs-cmds" + ipld "github.com/ipfs/go-ipld-format" + dag "github.com/ipfs/go-merkledag" + path "github.com/ipfs/go-path" + peer "github.com/libp2p/go-libp2p-core/peer" + routing "github.com/libp2p/go-libp2p-core/routing" +) + +var RoutingCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Issue routing commands.", + ShortDescription: ``, + }, + + Subcommands: map[string]*cmds.Command{ + "findprovs": findProvidersRoutingCmd, + "findpeer": findPeerRoutingCmd, + "get": getValueRoutingCmd, + "put": putValueRoutingCmd, + "provide": provideRefRoutingCmd, + }, +} + +const ( + dhtVerboseOptionName = "verbose" +) + +const ( + numProvidersOptionName = "num-providers" +) + +var findProvidersRoutingCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Find peers that can provide a specific value, given a key.", + ShortDescription: "Outputs a list of newline-delimited provider Peer IDs.", + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("key", true, true, "The key to find providers for."), + }, + Options: []cmds.Option{ + cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), + cmds.IntOption(numProvidersOptionName, "n", "The number of providers to find.").WithDefault(20), + }, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if !n.IsOnline { + return ErrNotOnline + } + + numProviders, _ := req.Options[numProvidersOptionName].(int) + if numProviders < 1 { + return fmt.Errorf("number of providers must be greater than 0") + } + + c, err := cid.Parse(req.Arguments[0]) + + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(req.Context) + ctx, events := routing.RegisterForQueryEvents(ctx) + + pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders) + + go func() { + defer cancel() + for p := range pchan { + np := p + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.Provider, + Responses: []*peer.AddrInfo{&np}, + }) + } + }() + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return nil + }, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { + pfm := pfuncMap{ + routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { + if verbose { + fmt.Fprintf(out, "* closest peer %s\n", obj.ID) + } + return nil + }, + routing.Provider: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { + prov := obj.Responses[0] + if verbose { + fmt.Fprintf(out, "provider: ") + } + fmt.Fprintf(out, "%s\n", prov.ID.Pretty()) + if verbose { + for _, a := range prov.Addrs { + fmt.Fprintf(out, "\t%s\n", a) + } + } + return nil + }, + } + + verbose, _ := req.Options[dhtVerboseOptionName].(bool) + return printEvent(out, w, verbose, pfm) + }), + }, + Type: routing.QueryEvent{}, +} + +const ( + recursiveOptionName = "recursive" +) + +var provideRefRoutingCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Announce to the network that you are providing given values.", + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("key", true, true, "The key[s] to send provide records for.").EnableStdin(), + }, + Options: []cmds.Option{ + cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), + cmds.BoolOption(recursiveOptionName, "r", "Recursively provide entire graph."), + }, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if !nd.IsOnline { + return ErrNotOnline + } + + if len(nd.PeerHost.Network().Conns()) == 0 { + return errors.New("cannot provide, no connected peers") + } + + // Needed to parse stdin args. + // TODO: Lazy Load + err = req.ParseBodyArgs() + if err != nil { + return err + } + + rec, _ := req.Options[recursiveOptionName].(bool) + + var cids []cid.Cid + for _, arg := range req.Arguments { + c, err := cid.Decode(arg) + if err != nil { + return err + } + + has, err := nd.Blockstore.Has(req.Context, c) + if err != nil { + return err + } + + if !has { + return fmt.Errorf("block %s not found locally, cannot provide", c) + } + + cids = append(cids, c) + } + + ctx, cancel := context.WithCancel(req.Context) + ctx, events := routing.RegisterForQueryEvents(ctx) + + var provideErr error + go func() { + defer cancel() + if rec { + provideErr = provideKeysRec(ctx, nd.Routing, nd.DAG, cids) + } else { + provideErr = provideKeys(ctx, nd.Routing, cids) + } + if provideErr != nil { + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.QueryError, + Extra: provideErr.Error(), + }) + } + }() + + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return provideErr + }, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { + pfm := pfuncMap{ + routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { + if verbose { + fmt.Fprintf(out, "sending provider record to peer %s\n", obj.ID) + } + return nil + }, + } + + verbose, _ := req.Options[dhtVerboseOptionName].(bool) + return printEvent(out, w, verbose, pfm) + }), + }, + Type: routing.QueryEvent{}, +} + +func provideKeys(ctx context.Context, r routing.Routing, cids []cid.Cid) error { + for _, c := range cids { + err := r.Provide(ctx, c, true) + if err != nil { + return err + } + } + return nil +} + +func provideKeysRec(ctx context.Context, r routing.Routing, dserv ipld.DAGService, cids []cid.Cid) error { + provided := cid.NewSet() + for _, c := range cids { + kset := cid.NewSet() + + err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, kset.Visit) + if err != nil { + return err + } + + for _, k := range kset.Keys() { + if provided.Has(k) { + continue + } + + err = r.Provide(ctx, k, true) + if err != nil { + return err + } + provided.Add(k) + } + } + + return nil +} + +var findPeerRoutingCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Find the multiaddresses associated with a Peer ID.", + ShortDescription: "Outputs a list of newline-delimited multiaddresses.", + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("peerID", true, true, "The ID of the peer to search for."), + }, + Options: []cmds.Option{ + cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), + }, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if !nd.IsOnline { + return ErrNotOnline + } + + pid, err := peer.Decode(req.Arguments[0]) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(req.Context) + ctx, events := routing.RegisterForQueryEvents(ctx) + + var findPeerErr error + go func() { + defer cancel() + var pi peer.AddrInfo + pi, findPeerErr = nd.Routing.FindPeer(ctx, pid) + if findPeerErr != nil { + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.QueryError, + Extra: findPeerErr.Error(), + }) + return + } + + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.FinalPeer, + Responses: []*peer.AddrInfo{&pi}, + }) + }() + + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return findPeerErr + }, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { + pfm := pfuncMap{ + routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { + pi := obj.Responses[0] + for _, a := range pi.Addrs { + fmt.Fprintf(out, "%s\n", a) + } + return nil + }, + } + + verbose, _ := req.Options[dhtVerboseOptionName].(bool) + return printEvent(out, w, verbose, pfm) + }), + }, + Type: routing.QueryEvent{}, +} + +var getValueRoutingCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Given a key, query the routing system for its best value.", + ShortDescription: ` +Outputs the best value for the given key. + +There may be several different values for a given key stored in the routing +system; in this context 'best' means the record that is most desirable. There is +no one metric for 'best': it depends entirely on the key type. For IPNS, 'best' +is the record that is both valid and has the highest sequence number (freshest). +Different key types can specify other 'best' rules. +`, + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("key", true, true, "The key to find a value for."), + }, + Options: []cmds.Option{ + cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), + }, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if !nd.IsOnline { + return ErrNotOnline + } + + dhtkey, err := escapeDhtKey(req.Arguments[0]) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(req.Context) + ctx, events := routing.RegisterForQueryEvents(ctx) + + var getErr error + go func() { + defer cancel() + var val []byte + val, getErr = nd.Routing.GetValue(ctx, dhtkey) + if getErr != nil { + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.QueryError, + Extra: getErr.Error(), + }) + } else { + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.Value, + Extra: base64.StdEncoding.EncodeToString(val), + }) + } + }() + + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return getErr + }, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { + pfm := pfuncMap{ + routing.Value: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { + if verbose { + _, err := fmt.Fprintf(out, "got value: '%s'\n", obj.Extra) + return err + } + res, err := base64.StdEncoding.DecodeString(obj.Extra) + if err != nil { + return err + } + _, err = out.Write(res) + return err + }, + } + + verbose, _ := req.Options[dhtVerboseOptionName].(bool) + return printEvent(out, w, verbose, pfm) + }), + }, + Type: routing.QueryEvent{}, +} + +var putValueRoutingCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Write a key/value pair to the routing system.", + ShortDescription: ` +Given a key of the form /foo/bar and a valid value for that key, this will write +that value to the routing system with that key. + +Keys have two parts: a keytype (foo) and the key name (bar). IPNS uses the +/ipns keytype, and expects the key name to be a Peer ID. IPNS entries are +specifically formatted (protocol buffer). + +You may only use keytypes that are supported in your ipfs binary: currently +this is only /ipns. Unless you have a relatively deep understanding of the +go-ipfs routing internals, you likely want to be using 'ipfs name publish' instead +of this. + +The value must be a valid value for the given key type. For example, if the key +is /ipns/QmFoo, the value must be IPNS record (protobuf) signed with the key +identified by QmFoo. +`, + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("key", true, false, "The key to store the value at."), + cmds.FileArg("value-file", true, false, "A path to a file containing the value to store.").EnableStdin(), + }, + Options: []cmds.Option{ + cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), + }, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if !nd.IsOnline { + return ErrNotOnline + } + + key, err := escapeDhtKey(req.Arguments[0]) + if err != nil { + return err + } + + file, err := cmdenv.GetFileArg(req.Files.Entries()) + if err != nil { + return err + } + defer file.Close() + + data, err := io.ReadAll(file) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(req.Context) + ctx, events := routing.RegisterForQueryEvents(ctx) + + var putErr error + go func() { + defer cancel() + putErr = nd.Routing.PutValue(ctx, key, []byte(data)) + if putErr != nil { + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.QueryError, + Extra: putErr.Error(), + }) + } + }() + + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return putErr + }, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { + pfm := pfuncMap{ + routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { + if verbose { + fmt.Fprintf(out, "* closest peer %s\n", obj.ID) + } + return nil + }, + routing.Value: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { + fmt.Fprintf(out, "%s\n", obj.ID.Pretty()) + return nil + }, + } + + verbose, _ := req.Options[dhtVerboseOptionName].(bool) + + return printEvent(out, w, verbose, pfm) + }), + }, + Type: routing.QueryEvent{}, +} + +type printFunc func(obj *routing.QueryEvent, out io.Writer, verbose bool) error +type pfuncMap map[routing.QueryEventType]printFunc + +func printEvent(obj *routing.QueryEvent, out io.Writer, verbose bool, override pfuncMap) error { + if verbose { + fmt.Fprintf(out, "%s: ", time.Now().Format("15:04:05.000")) + } + + if override != nil { + if pf, ok := override[obj.Type]; ok { + return pf(obj, out, verbose) + } + } + + switch obj.Type { + case routing.SendingQuery: + if verbose { + fmt.Fprintf(out, "* querying %s\n", obj.ID) + } + case routing.Value: + if verbose { + fmt.Fprintf(out, "got value: '%s'\n", obj.Extra) + } else { + fmt.Fprint(out, obj.Extra) + } + case routing.PeerResponse: + if verbose { + fmt.Fprintf(out, "* %s says use ", obj.ID) + for _, p := range obj.Responses { + fmt.Fprintf(out, "%s ", p.ID) + } + fmt.Fprintln(out) + } + case routing.QueryError: + if verbose { + fmt.Fprintf(out, "error: %s\n", obj.Extra) + } + case routing.DialingPeer: + if verbose { + fmt.Fprintf(out, "dialing peer: %s\n", obj.ID) + } + case routing.AddingPeer: + if verbose { + fmt.Fprintf(out, "adding peer to query: %s\n", obj.ID) + } + case routing.FinalPeer: + default: + if verbose { + fmt.Fprintf(out, "unrecognized event type: %d\n", obj.Type) + } + } + return nil +} + +func escapeDhtKey(s string) (string, error) { + parts := path.SplitList(s) + if len(parts) != 3 || + parts[0] != "" || + !(parts[1] == "ipns" || parts[1] == "pk") { + return "", errors.New("invalid key") + } + + k, err := peer.Decode(parts[2]) + if err != nil { + return "", err + } + return path.Join(append(parts[:2], string(k))), nil +} diff --git a/core/core.go b/core/core.go index 5f95de69233..da1fcb30253 100644 --- a/core/core.go +++ b/core/core.go @@ -14,14 +14,14 @@ import ( "io" "github.com/ipfs/go-filestore" - "github.com/ipfs/go-ipfs-pinner" + pin "github.com/ipfs/go-ipfs-pinner" bserv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-fetcher" "github.com/ipfs/go-graphsync" bstore "github.com/ipfs/go-ipfs-blockstore" exchange "github.com/ipfs/go-ipfs-exchange-interface" - "github.com/ipfs/go-ipfs-provider" + provider "github.com/ipfs/go-ipfs-provider" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" mfs "github.com/ipfs/go-mfs" @@ -52,6 +52,7 @@ import ( "github.com/ipfs/kubo/p2p" "github.com/ipfs/kubo/peering" "github.com/ipfs/kubo/repo" + irouting "github.com/ipfs/kubo/routing" ) var log = logging.Logger("core") @@ -90,7 +91,7 @@ type IpfsNode struct { Peering *peering.PeeringService `optional:"true"` Filters *ma.Filters `optional:"true"` Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper - Routing routing.Routing `optional:"true"` // the routing system. recommend ipfs-dht + Routing irouting.TieredRouter `optional:"true"` // the routing system. recommend ipfs-dht DNSResolver *madns.Resolver // the DNS resolver Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes diff --git a/core/core_test.go b/core/core_test.go index 42dd543d767..6a614af3fa4 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -1,14 +1,28 @@ package core import ( + "crypto/rand" + "errors" + "fmt" + "net/http/httptest" + "path" "testing" + "time" context "context" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-delegated-routing/client" + "github.com/ipfs/go-ipns" + "github.com/ipfs/kubo/core/node/libp2p" "github.com/ipfs/kubo/repo" + "github.com/libp2p/go-libp2p-core/crypto" + peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/require" datastore "github.com/ipfs/go-datastore" syncds "github.com/ipfs/go-datastore/sync" + drs "github.com/ipfs/go-delegated-routing/server" config "github.com/ipfs/kubo/config" ) @@ -65,3 +79,235 @@ var testIdentity = config.Identity{ PeerID: "QmNgdzLieYi8tgfo2WfTUzNVH5hQK9oAYGVf6dxN12NrHt", PrivKey: "CAASrRIwggkpAgEAAoICAQCwt67GTUQ8nlJhks6CgbLKOx7F5tl1r9zF4m3TUrG3Pe8h64vi+ILDRFd7QJxaJ/n8ux9RUDoxLjzftL4uTdtv5UXl2vaufCc/C0bhCRvDhuWPhVsD75/DZPbwLsepxocwVWTyq7/ZHsCfuWdoh/KNczfy+Gn33gVQbHCnip/uhTVxT7ARTiv8Qa3d7qmmxsR+1zdL/IRO0mic/iojcb3Oc/PRnYBTiAZFbZdUEit/99tnfSjMDg02wRayZaT5ikxa6gBTMZ16Yvienq7RwSELzMQq2jFA4i/TdiGhS9uKywltiN2LrNDBcQJSN02pK12DKoiIy+wuOCRgs2NTQEhU2sXCk091v7giTTOpFX2ij9ghmiRfoSiBFPJA5RGwiH6ansCHtWKY1K8BS5UORM0o3dYk87mTnKbCsdz4bYnGtOWafujYwzueGx8r+IWiys80IPQKDeehnLW6RgoyjszKgL/2XTyP54xMLSW+Qb3BPgDcPaPO0hmop1hW9upStxKsefW2A2d46Ds4HEpJEry7PkS5M4gKL/zCKHuxuXVk14+fZQ1rstMuvKjrekpAC2aVIKMI9VRA3awtnje8HImQMdj+r+bPmv0N8rTTr3eS4J8Yl7k12i95LLfK+fWnmUh22oTNzkRlaiERQrUDyE4XNCtJc0xs1oe1yXGqazCIAQIDAQABAoICAQCk1N/ftahlRmOfAXk//8wNl7FvdJD3le6+YSKBj0uWmN1ZbUSQk64chr12iGCOM2WY180xYjy1LOS44PTXaeW5bEiTSnb3b3SH+HPHaWCNM2EiSogHltYVQjKW+3tfH39vlOdQ9uQ+l9Gh6iTLOqsCRyszpYPqIBwi1NMLY2Ej8PpVU7ftnFWouHZ9YKS7nAEiMoowhTu/7cCIVwZlAy3AySTuKxPMVj9LORqC32PVvBHZaMPJ+X1Xyijqg6aq39WyoztkXg3+Xxx5j5eOrK6vO/Lp6ZUxaQilHDXoJkKEJjgIBDZpluss08UPfOgiWAGkW+L4fgUxY0qDLDAEMhyEBAn6KOKVL1JhGTX6GjhWziI94bddSpHKYOEIDzUy4H8BXnKhtnyQV6ELS65C2hj9D0IMBTj7edCF1poJy0QfdK0cuXgMvxHLeUO5uc2YWfbNosvKxqygB9rToy4b22YvNwsZUXsTY6Jt+p9V2OgXSKfB5VPeRbjTJL6xqvvUJpQytmII/C9JmSDUtCbYceHj6X9jgigLk20VV6nWHqCTj3utXD6NPAjoycVpLKDlnWEgfVELDIk0gobxUqqSm3jTPEKRPJgxkgPxbwxYumtw++1UY2y35w3WRDc2xYPaWKBCQeZy+mL6ByXp9bWlNvxS3Knb6oZp36/ovGnf2pGvdQKCAQEAyKpipz2lIUySDyE0avVWAmQb2tWGKXALPohzj7AwkcfEg2GuwoC6GyVE2sTJD1HRazIjOKn3yQORg2uOPeG7sx7EKHxSxCKDrbPawkvLCq8JYSy9TLvhqKUVVGYPqMBzu2POSLEA81QXas+aYjKOFWA2Zrjq26zV9ey3+6Lc6WULePgRQybU8+RHJc6fdjUCCfUxgOrUO2IQOuTJ+FsDpVnrMUGlokmWn23OjL4qTL9wGDnWGUs2pjSzNbj3qA0d8iqaiMUyHX/D/VS0wpeT1osNBSm8suvSibYBn+7wbIApbwXUxZaxMv2OHGz3empae4ckvNZs7r8wsI9UwFt8mwKCAQEA4XK6gZkv9t+3YCcSPw2ensLvL/xU7i2bkC9tfTGdjnQfzZXIf5KNdVuj/SerOl2S1s45NMs3ysJbADwRb4ahElD/V71nGzV8fpFTitC20ro9fuX4J0+twmBolHqeH9pmeGTjAeL1rvt6vxs4FkeG/yNft7GdXpXTtEGaObn8Mt0tPY+aB3UnKrnCQoQAlPyGHFrVRX0UEcp6wyyNGhJCNKeNOvqCHTFObhbhO+KWpWSN0MkVHnqaIBnIn1Te8FtvP/iTwXGnKc0YXJUG6+LM6LmOguW6tg8ZqiQeYyyR+e9eCFH4csLzkrTl1GxCxwEsoSLIMm7UDcjttW6tYEghkwKCAQEAmeCO5lCPYImnN5Lu71ZTLmI2OgmjaANTnBBnDbi+hgv61gUCToUIMejSdDCTPfwv61P3TmyIZs0luPGxkiKYHTNqmOE9Vspgz8Mr7fLRMNApESuNvloVIY32XVImj/GEzh4rAfM6F15U1sN8T/EUo6+0B/Glp+9R49QzAfRSE2g48/rGwgf1JVHYfVWFUtAzUA+GdqWdOixo5cCsYJbqpNHfWVZN/bUQnBFIYwUwysnC29D+LUdQEQQ4qOm+gFAOtrWU62zMkXJ4iLt8Ify6kbrvsRXgbhQIzzGS7WH9XDarj0eZciuslr15TLMC1Azadf+cXHLR9gMHA13mT9vYIQKCAQA/DjGv8cKCkAvf7s2hqROGYAs6Jp8yhrsN1tYOwAPLRhtnCs+rLrg17M2vDptLlcRuI/vIElamdTmylRpjUQpX7yObzLO73nfVhpwRJVMdGU394iBIDncQ+JoHfUwgqJskbUM40dvZdyjbrqc/Q/4z+hbZb+oN/GXb8sVKBATPzSDMKQ/xqgisYIw+wmDPStnPsHAaIWOtni47zIgilJzD0WEk78/YjmPbUrboYvWziK5JiRRJFA1rkQqV1c0M+OXixIm+/yS8AksgCeaHr0WUieGcJtjT9uE8vyFop5ykhRiNxy9wGaq6i7IEecsrkd6DqxDHWkwhFuO1bSE83q/VAoIBAEA+RX1i/SUi08p71ggUi9WFMqXmzELp1L3hiEjOc2AklHk2rPxsaTh9+G95BvjhP7fRa/Yga+yDtYuyjO99nedStdNNSg03aPXILl9gs3r2dPiQKUEXZJ3FrH6tkils/8BlpOIRfbkszrdZIKTO9GCdLWQ30dQITDACs8zV/1GFGrHFrqnnMe/NpIFHWNZJ0/WZMi8wgWO6Ik8jHEpQtVXRiXLqy7U6hk170pa4GHOzvftfPElOZZjy9qn7KjdAQqy6spIrAE94OEL+fBgbHQZGLpuTlj6w6YGbMtPU8uo7sXKoc6WOCb68JWft3tejGLDa1946HAWqVM9B/UcneNc=", } + +var errNotSupported = errors.New("method not supported") + +func TestDelegatedRoutingSingle(t *testing.T) { + require := require.New(t) + + pId1, priv1, err := GeneratePeerID() + require.NoError(err) + + pId2, _, err := GeneratePeerID() + require.NoError(err) + + theID := path.Join("/ipns", string(pId1)) + theErrorID := path.Join("/ipns", string(pId2)) + + d := &delegatedRoutingService{ + goodPeerID: pId1, + badPeerID: pId2, + pk1: priv1, + } + + url := StartRoutingServer(t, d) + n := GetNode(t, url) + + ctx := context.Background() + + v, err := n.Routing.GetValue(ctx, theID) + require.NoError(err) + require.NotNil(v) + require.Contains(string(v), "RECORD FROM SERVICE 0") + + v, err = n.Routing.GetValue(ctx, theErrorID) + require.Nil(v) + require.Error(err) + + err = n.Routing.PutValue(ctx, theID, v) + require.NoError(err) + + err = n.Routing.PutValue(ctx, theErrorID, v) + require.Error(err) +} + +func TestDelegatedRoutingMulti(t *testing.T) { + require := require.New(t) + + pId1, priv1, err := GeneratePeerID() + require.NoError(err) + + pId2, priv2, err := GeneratePeerID() + require.NoError(err) + + theID1 := path.Join("/ipns", string(pId1)) + theID2 := path.Join("/ipns", string(pId2)) + + d1 := &delegatedRoutingService{ + goodPeerID: pId1, + badPeerID: pId2, + pk1: priv1, + serviceID: 1, + } + + url1 := StartRoutingServer(t, d1) + + d2 := &delegatedRoutingService{ + goodPeerID: pId2, + badPeerID: pId1, + pk1: priv2, + serviceID: 2, + } + + url2 := StartRoutingServer(t, d2) + + n := GetNode(t, url1, url2) + + ctx := context.Background() + + v, err := n.Routing.GetValue(ctx, theID1) + require.NoError(err) + require.NotNil(v) + require.Contains(string(v), "RECORD FROM SERVICE 1") + + v, err = n.Routing.GetValue(ctx, theID2) + require.NoError(err) + require.NotNil(v) + require.Contains(string(v), "RECORD FROM SERVICE 2") + + err = n.Routing.PutValue(ctx, theID1, v) + require.Error(err) + + err = n.Routing.PutValue(ctx, theID2, v) + require.Error(err) +} + +func StartRoutingServer(t *testing.T, d drs.DelegatedRoutingService) string { + t.Helper() + + f := drs.DelegatedRoutingAsyncHandler(d) + svr := httptest.NewServer(f) + t.Cleanup(func() { + svr.Close() + }) + + return svr.URL +} + +func GetNode(t *testing.T, reframeURLs ...string) *IpfsNode { + t.Helper() + + routers := make(map[string]config.Router) + for i, ru := range reframeURLs { + routers[fmt.Sprintf("reframe-%d", i)] = config.Router{ + Type: string(config.RouterTypeReframe), + Parameters: map[string]string{ + string(config.RouterParamEndpoint): ru, + }, + } + } + + cfg := config.Config{ + Identity: testIdentity, + Addresses: config.Addresses{ + Swarm: []string{"/ip4/0.0.0.0/tcp/0", "/ip4/0.0.0.0/udp/0/quic"}, + API: []string{"/ip4/127.0.0.1/tcp/0"}, + }, + Routing: config.Routing{ + Type: config.NewOptionalString("none"), + Routers: routers, + }, + } + + r := &repo.Mock{ + C: cfg, + D: syncds.MutexWrap(datastore.NewMapDatastore()), + } + + n, err := NewNode(context.Background(), &BuildCfg{Repo: r, Online: true, Routing: libp2p.NilRouterOption}) + require.NoError(t, err) + + return n +} + +func GeneratePeerID() (peer.ID, crypto.PrivKey, error) { + priv, pk, err := crypto.GenerateEd25519Key(rand.Reader) + if err != nil { + return peer.ID(""), nil, err + } + + pid, err := peer.IDFromPublicKey(pk) + return pid, priv, err +} + +type delegatedRoutingService struct { + goodPeerID, badPeerID peer.ID + pk1 crypto.PrivKey + serviceID int +} + +func (drs *delegatedRoutingService) FindProviders(ctx context.Context, key cid.Cid) (<-chan client.FindProvidersAsyncResult, error) { + return nil, errNotSupported +} + +func (drs *delegatedRoutingService) GetIPNS(ctx context.Context, id []byte) (<-chan client.GetIPNSAsyncResult, error) { + ctx, cancel := context.WithCancel(ctx) + ch := make(chan client.GetIPNSAsyncResult) + go func() { + defer close(ch) + defer cancel() + + var out client.GetIPNSAsyncResult + switch peer.ID(id) { + case drs.goodPeerID: + ie, err := ipns.Create(drs.pk1, []byte(fmt.Sprintf("RECORD FROM SERVICE %d", drs.serviceID)), 0, time.Now().Add(10*time.Hour), 100*time.Hour) + if err != nil { + log.Fatal(err) + } + ieb, err := ie.Marshal() + if err != nil { + log.Fatal(err) + } + + out = client.GetIPNSAsyncResult{ + Record: ieb, + Err: nil, + } + case drs.badPeerID: + out = client.GetIPNSAsyncResult{ + Record: nil, + Err: errors.New("THE ERROR"), + } + default: + return + } + + select { + case <-ctx.Done(): + return + case ch <- out: + } + }() + + return ch, nil + +} + +func (drs *delegatedRoutingService) PutIPNS(ctx context.Context, id []byte, record []byte) (<-chan client.PutIPNSAsyncResult, error) { + ctx, cancel := context.WithCancel(ctx) + ch := make(chan client.PutIPNSAsyncResult) + go func() { + defer close(ch) + defer cancel() + + var out client.PutIPNSAsyncResult + switch peer.ID(id) { + case drs.goodPeerID: + out = client.PutIPNSAsyncResult{} + case drs.badPeerID: + out = client.PutIPNSAsyncResult{ + Err: fmt.Errorf("THE ERROR %d", drs.serviceID), + } + default: + return + } + + select { + case <-ctx.Done(): + return + case ch <- out: + } + }() + + return ch, nil +} diff --git a/core/node/bitswap.go b/core/node/bitswap.go index 10d1bd331ba..1369243a301 100644 --- a/core/node/bitswap.go +++ b/core/node/bitswap.go @@ -8,8 +8,8 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" exchange "github.com/ipfs/go-ipfs-exchange-interface" config "github.com/ipfs/kubo/config" + irouting "github.com/ipfs/kubo/routing" "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/routing" "go.uber.org/fx" "github.com/ipfs/kubo/core/node/helpers" @@ -25,7 +25,7 @@ const ( // OnlineExchange creates new LibP2P backed block exchange (BitSwap) func OnlineExchange(cfg *config.Config, provide bool) interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs blockstore.GCBlockstore) exchange.Interface { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt irouting.TieredRouter, bs blockstore.GCBlockstore) exchange.Interface { bitswapNetwork := network.NewFromIpfsHost(host, rt) var internalBsCfg config.InternalBitswap diff --git a/core/node/groups.go b/core/node/groups.go index 937c73da131..bb27a24f3eb 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -17,7 +17,6 @@ import ( "github.com/ipfs/kubo/p2p" offline "github.com/ipfs/go-ipfs-exchange-offline" - offroute "github.com/ipfs/go-ipfs-routing/offline" uio "github.com/ipfs/go-unixfs/io" "github.com/dustin/go-humanize" @@ -166,7 +165,10 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)), fx.Provide(libp2p.Routing), + fx.Provide(libp2p.ContentRouting), + fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)), + fx.Provide(libp2p.DelegatedRouting(cfg.Routing.Routers)), maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")), maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics), @@ -313,7 +315,9 @@ func Offline(cfg *config.Config) fx.Option { fx.Provide(offline.Exchange), fx.Provide(DNSResolver), fx.Provide(Namesys(0)), - fx.Provide(offroute.NewOfflineRouter), + fx.Provide(libp2p.Routing), + fx.Provide(libp2p.ContentRouting), + fx.Provide(libp2p.OfflineRouting), OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval), ) } diff --git a/core/node/ipns.go b/core/node/ipns.go index d27085421da..3c565839a32 100644 --- a/core/node/ipns.go +++ b/core/node/ipns.go @@ -4,14 +4,15 @@ import ( "fmt" "time" - "github.com/ipfs/go-ipfs-util" + util "github.com/ipfs/go-ipfs-util" "github.com/ipfs/go-ipns" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peerstore" - "github.com/libp2p/go-libp2p-core/routing" - "github.com/libp2p/go-libp2p-record" + record "github.com/libp2p/go-libp2p-record" madns "github.com/multiformats/go-multiaddr-dns" + irouting "github.com/ipfs/kubo/routing" + "github.com/ipfs/go-namesys" "github.com/ipfs/go-namesys/republisher" "github.com/ipfs/kubo/repo" @@ -28,8 +29,8 @@ func RecordValidator(ps peerstore.Peerstore) record.Validator { } // Namesys creates new name system -func Namesys(cacheSize int) func(rt routing.Routing, rslv *madns.Resolver, repo repo.Repo) (namesys.NameSystem, error) { - return func(rt routing.Routing, rslv *madns.Resolver, repo repo.Repo) (namesys.NameSystem, error) { +func Namesys(cacheSize int) func(rt irouting.TieredRouter, rslv *madns.Resolver, repo repo.Repo) (namesys.NameSystem, error) { + return func(rt irouting.TieredRouter, rslv *madns.Resolver, repo repo.Repo) (namesys.NameSystem, error) { opts := []namesys.Option{ namesys.WithDatastore(repo.Datastore()), namesys.WithDNSResolver(rslv), diff --git a/core/node/libp2p/routing.go b/core/node/libp2p/routing.go index a5f1de3a7cc..8a77c48f1db 100644 --- a/core/node/libp2p/routing.go +++ b/core/node/libp2p/routing.go @@ -8,7 +8,10 @@ import ( "time" "github.com/ipfs/kubo/core/node/helpers" + irouting "github.com/ipfs/kubo/routing" + ds "github.com/ipfs/go-datastore" + offroute "github.com/ipfs/go-ipfs-routing/offline" config "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/repo" "github.com/libp2p/go-libp2p-core/host" @@ -26,8 +29,6 @@ import ( "go.uber.org/fx" ) -type BaseIpfsRouting routing.Routing - type Router struct { routing.Routing @@ -54,16 +55,17 @@ type processInitialRoutingIn struct { type processInitialRoutingOut struct { fx.Out - Router Router `group:"routers"` + Router Router `group:"routers"` + ContentRouter routing.ContentRouting `group:"content-routers"` + DHT *ddht.DHT DHTClient routing.Routing `name:"dhtc"` - BaseRT BaseIpfsRouting } type AddrInfoChan chan peer.AddrInfo func BaseRouting(experimentalDHTClient bool) interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) { + return func(lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) { var dr *ddht.DHT if dht, ok := in.Router.(*ddht.DHT); ok { dr = dht @@ -109,9 +111,9 @@ func BaseRouting(experimentalDHTClient bool) interface{} { Routing: expClient, Priority: 1000, }, - DHT: dr, - DHTClient: expClient, - BaseRT: expClient, + DHT: dr, + DHTClient: expClient, + ContentRouter: expClient, }, nil } @@ -120,13 +122,69 @@ func BaseRouting(experimentalDHTClient bool) interface{} { Priority: 1000, Routing: in.Router, }, - DHT: dr, - DHTClient: dr, - BaseRT: in.Router, + DHT: dr, + DHTClient: dr, + ContentRouter: in.Router, }, nil } } +type delegatedRouterOut struct { + fx.Out + + Routers []Router `group:"routers,flatten"` + ContentRouter []routing.ContentRouting `group:"content-routers,flatten"` +} + +func DelegatedRouting(routers map[string]config.Router) interface{} { + return func() (delegatedRouterOut, error) { + out := delegatedRouterOut{} + + for _, v := range routers { + if !v.Enabled.WithDefault(true) { + continue + } + + r, err := irouting.RoutingFromConfig(v) + if err != nil { + return out, err + } + + out.Routers = append(out.Routers, Router{ + Routing: r, + Priority: irouting.GetPriority(v.Parameters), + }) + + out.ContentRouter = append(out.ContentRouter, r) + } + + return out, nil + } +} + +type p2pOnlineContentRoutingIn struct { + fx.In + + ContentRouter []routing.ContentRouting `group:"content-routers"` +} + +// ContentRouting will get all routers that can do contentRouting and add them +// all together using a TieredRouter. It will be used for topic discovery. +func ContentRouting(in p2pOnlineContentRoutingIn) routing.ContentRouting { + var routers []routing.Routing + for _, cr := range in.ContentRouter { + routers = append(routers, + &routinghelpers.Compose{ + ContentRouting: cr, + }, + ) + } + + return routinghelpers.Tiered{ + Routers: routers, + } +} + type p2pOnlineRoutingIn struct { fx.In @@ -134,7 +192,10 @@ type p2pOnlineRoutingIn struct { Validator record.Validator } -func Routing(in p2pOnlineRoutingIn) routing.Routing { +// Routing will get all routers obtained from different methods +// (delegated routers, pub-sub, and so on) and add them all together +// using a TieredRouter. +func Routing(in p2pOnlineRoutingIn) irouting.TieredRouter { routers := in.Routers sort.SliceStable(routers, func(i, j int) bool { @@ -146,19 +207,30 @@ func Routing(in p2pOnlineRoutingIn) routing.Routing { irouters[i] = v.Routing } - return routinghelpers.Tiered{ - Routers: irouters, - Validator: in.Validator, + return irouting.Tiered{ + Tiered: routinghelpers.Tiered{ + Routers: irouters, + Validator: in.Validator, + }, + } +} + +// OfflineRouting provides a special Router to the routers list when we are creating a offline node. +func OfflineRouting(dstore ds.Datastore, validator record.Validator) p2pRouterOut { + return p2pRouterOut{ + Router: Router{ + Routing: offroute.NewOfflineRouter(dstore, validator), + Priority: 10000, + }, } } type p2pPSRoutingIn struct { fx.In - BaseIpfsRouting BaseIpfsRouting - Validator record.Validator - Host host.Host - PubSub *pubsub.PubSub `optional:"true"` + Validator record.Validator + Host host.Host + PubSub *pubsub.PubSub `optional:"true"` } func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore, error) { diff --git a/core/node/libp2p/topicdiscovery.go b/core/node/libp2p/topicdiscovery.go index bdf5166a02b..cdb3986d111 100644 --- a/core/node/libp2p/topicdiscovery.go +++ b/core/node/libp2p/topicdiscovery.go @@ -9,12 +9,11 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/backoff" disc "github.com/libp2p/go-libp2p/p2p/discovery/routing" - "github.com/ipfs/kubo/core/node/helpers" - "go.uber.org/fx" + "github.com/libp2p/go-libp2p-core/routing" ) func TopicDiscovery() interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cr BaseIpfsRouting) (service discovery.Discovery, err error) { + return func(host host.Host, cr routing.ContentRouting) (service discovery.Discovery, err error) { baseDisc := disc.NewRoutingDiscovery(cr) minBackoff, maxBackoff := time.Second*60, time.Hour rng := rand.New(rand.NewSource(rand.Int63())) diff --git a/core/node/provider.go b/core/node/provider.go index 2c3836d8fb8..52cc0b074d7 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -6,18 +6,16 @@ import ( "time" "github.com/ipfs/go-fetcher" - "github.com/ipfs/go-ipfs-pinner" - "github.com/ipfs/go-ipfs-provider" + pin "github.com/ipfs/go-ipfs-pinner" + provider "github.com/ipfs/go-ipfs-provider" "github.com/ipfs/go-ipfs-provider/batched" q "github.com/ipfs/go-ipfs-provider/queue" "github.com/ipfs/go-ipfs-provider/simple" - "github.com/libp2p/go-libp2p-core/routing" - "github.com/multiformats/go-multihash" "go.uber.org/fx" "github.com/ipfs/kubo/core/node/helpers" - "github.com/ipfs/kubo/core/node/libp2p" "github.com/ipfs/kubo/repo" + irouting "github.com/ipfs/kubo/routing" ) const kReprovideFrequency = time.Hour * 12 @@ -30,13 +28,13 @@ func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*q } // SimpleProvider creates new record provider -func SimpleProvider(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt routing.Routing) provider.Provider { +func SimpleProvider(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt irouting.TieredRouter) provider.Provider { return simple.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt) } // SimpleReprovider creates new reprovider func SimpleReprovider(reproviderInterval time.Duration) interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.Routing, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt irouting.TieredRouter, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) { return simple.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil } } @@ -62,16 +60,11 @@ func SimpleProviderSys(isOnline bool) interface{} { } } -type provideMany interface { - ProvideMany(ctx context.Context, keys []multihash.Multihash) error - Ready() bool -} - // BatchedProviderSys creates new provider system func BatchedProviderSys(isOnline bool, reprovideInterval string) interface{} { - return func(lc fx.Lifecycle, cr libp2p.BaseIpfsRouting, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) { - r, ok := (cr).(provideMany) - if !ok { + return func(lc fx.Lifecycle, cr irouting.TieredRouter, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) { + r := cr.ProvideMany() + if r == nil { return nil, fmt.Errorf("BatchedProviderSys requires a content router that supports provideMany") } diff --git a/docs/changelogs/v0.14.md b/docs/changelogs/v0.14.md new file mode 100644 index 00000000000..4ecaee605d1 --- /dev/null +++ b/docs/changelogs/v0.14.md @@ -0,0 +1,31 @@ +# Kubo changelog + +## v0.14.0 TBD + +### Overview + +Below is an outline of all that is in this release, so you get a sense of all that's included. + +- [🔦 Highlights](#---highlights) + * [🛣️ Delegated Routing](#---Delegated-Routing) + +### 🔦 Highlights + +#### 🛣️ Delegated Routing + +Content routing is the a term used to describe the problem of finding providers for a given piece of content. +If you have a hash, or CID of some data, how do you find who has it? +In IPFS, until now, only a DHT was used as a decentralized answer to content routing. +Now, content routing can be handled by clients implementing the [Reframe protocol](https://github.com/ipfs/specs/tree/main/reframe#readme). + +Example configuration usage using the [Filecoin Network Indexer](https://docs.cid.contact/filecoin-network-indexer/overview): + +``` +ipfs config Routing.Routers.CidContact --json '{ + "Type": "reframe", + "Parameters": { + "Endpoint": "https://cid.contact/reframe" + } +}' + +``` diff --git a/docs/config.md b/docs/config.md index 29e76af239b..5878e92b42a 100644 --- a/docs/config.md +++ b/docs/config.md @@ -102,6 +102,10 @@ config file at runtime. - [`Reprovider.Interval`](#reproviderinterval) - [`Reprovider.Strategy`](#reproviderstrategy) - [`Routing`](#routing) + - [`Routing.Routers`](#routingrouters) + - [`Routing.Routers: Type`](#routingrouters-type) + - [`Routing.Routers: Enabled`](#routingrouters-enabled) + - [`Routing.Routers: Parameters`](#routingrouters-parameters) - [`Routing.Type`](#routingtype) - [`Swarm`](#swarm) - [`Swarm.AddrFilters`](#swarmaddrfilters) @@ -1289,9 +1293,73 @@ Type: `string` (or unset for the default, which is "all") Contains options for content, peer, and IPNS routing mechanisms. -### `Routing.Type` +### `Routing.Routers` + +**EXPERIMENTAL: `Routing.Routers` configuration may change in future release** + +Map of additional Routers. + +Allows for extending the default routing (DHT) with alternative Router +implementations, such as custom DHTs and delegated routing based +on the [reframe protocol](https://github.com/ipfs/specs/tree/main/reframe#readme). + +The map key is a name of a Router, and the value is its configuration. + +Default: `{}` + +Type: `object[string->object]` + +#### `Routing.Routers: Type` + +**EXPERIMENTAL: `Routing.Routers` configuration may change in future release** + +It specifies the routing type that will be created. + +Currently supported types: + +- `reframe` (delegated routing based on the [reframe protocol](https://github.com/ipfs/specs/tree/main/reframe#readme)) +- `dht` (WIP, custom DHT will be added in a future release) + +Type: `string` + +#### `Routing.Routers: Enabled` + +**EXPERIMENTAL: `Routing.Routers` configuration may change in future release** + +Optional flag to disable the specified router without removing it from the configuration file. -Content routing mode. Can be overridden with daemon `--routing` flag. +Default: `true` + +Type: `flag` (`null`/missing will apply the default) + +#### `Routing.Routers: Parameters` + +**EXPERIMENTAL: `Routing.Routers` configuration may change in future release** + +Parameters needed to create the specified router. Supported params per router type: + +Reframe: + - `Endpoint` (mandatory): URL that will be used to connect to a specified router. + - `Priority` (optional): Priority is used when making a routing request. Small numbers represent more important routers. The default priority is 100000. + +**Example:** + +To add router provided by _Store the Index_ team at [cid.contact](https://cid.contact): + +```console +$ ipfs config Routing.Routers.CidContact --json '{ + "Type": "reframe", + "Parameters": { + "Endpoint": "https://cid.contact/reframe" + } +}' +``` + +Default: `{}` (use the safe implicit defaults) + +Type: `object[string->string]` + +### `Routing.Type` There are two core routing options: "none" and "dht" (default). @@ -1326,9 +1394,9 @@ unless you're sure your node is reachable from the public network. } ``` -Default: dht +Default: `dht` -Type: `string` (or unset for the default) +Type: `optionalString` (`null`/missing means the default) ## `Swarm` diff --git a/go.mod b/go.mod index 50007399ee1..60cc216b30d 100644 --- a/go.mod +++ b/go.mod @@ -63,7 +63,7 @@ require ( github.com/ipld/go-car v0.4.0 github.com/ipld/go-car/v2 v2.4.0 github.com/ipld/go-codec-dagpb v1.4.0 - github.com/ipld/go-ipld-prime v0.16.0 + github.com/ipld/go-ipld-prime v0.17.0 github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jbenet/go-temp-err-catcher v0.1.0 github.com/jbenet/goprocess v0.1.4 @@ -118,6 +118,7 @@ require ( require ( github.com/benbjohnson/clock v1.3.0 + github.com/ipfs/go-delegated-routing v0.3.0 github.com/ipfs/go-log/v2 v2.5.1 ) @@ -171,6 +172,7 @@ require ( github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.2 // indirect github.com/ipfs/go-peertaskqueue v0.7.1 // indirect + github.com/ipld/edelweiss v0.1.4 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/klauspost/compress v1.15.1 // indirect github.com/klauspost/cpuid/v2 v2.0.12 // indirect diff --git a/go.sum b/go.sum index ce2d1af50de..0d700aec239 100644 --- a/go.sum +++ b/go.sum @@ -252,8 +252,9 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= -github.com/frankban/quicktest v1.14.2 h1:SPb1KFFmM+ybpEjPUhCCkZOM5xlovT5UbrMvWnXyBns= github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= +github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= +github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= @@ -355,8 +356,9 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -500,6 +502,8 @@ github.com/ipfs/go-datastore v0.4.5/go.mod h1:eXTcaaiN6uOlVCLS9GjJUJtlvJfM3xk23w github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= github.com/ipfs/go-datastore v0.5.1 h1:WkRhLuISI+XPD0uk3OskB0fYFSyqK8Ob5ZYew9Qa1nQ= github.com/ipfs/go-datastore v0.5.1/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= +github.com/ipfs/go-delegated-routing v0.3.0 h1:pF5apOJ/xdQkj22mRahW9GmSuCkgMLparKZWKJBO4CE= +github.com/ipfs/go-delegated-routing v0.3.0/go.mod h1:2w79E1/G9YOaxyJJQgqIFSQaa/GdS2zSATEpK8aJUBM= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8= @@ -655,6 +659,8 @@ github.com/ipfs/interface-go-ipfs-core v0.7.0 h1:7tb+2upz8oCcjIyjo1atdMk+P+u7wPm github.com/ipfs/interface-go-ipfs-core v0.7.0/go.mod h1:lF27E/nnSPbylPqKVXGZghal2hzifs3MmjyiEjnc9FY= github.com/ipfs/tar-utils v0.0.2 h1:UNgHB4x/PPzbMkmJi+7EqC9LNMPDztOVSnx1HAqSNg4= github.com/ipfs/tar-utils v0.0.2/go.mod h1:4qlnRWgTVljIMhSG2SqRYn66NT+3wrv/kZt9V+eqxDM= +github.com/ipld/edelweiss v0.1.4 h1:g4+C2Ph+8SV2MCJBG3oRtetvxJYAS2WzlNGgsOY95iM= +github.com/ipld/edelweiss v0.1.4/go.mod h1:JX1MR06BPcTOF+5xCYDLnylYkXS15iUN0/RXVSiUIQs= github.com/ipld/go-car v0.4.0 h1:U6W7F1aKF/OJMHovnOVdst2cpQE5GhmHibQkAixgNcQ= github.com/ipld/go-car v0.4.0/go.mod h1:Uslcn4O9cBKK9wqHm/cLTFacg6RAPv6LZx2mxd2Ypl4= github.com/ipld/go-car/v2 v2.1.1/go.mod h1:+2Yvf0Z3wzkv7NeI69i8tuZ+ft7jyjPYIWZzeVNeFcI= @@ -669,8 +675,9 @@ github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvB github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8= github.com/ipld/go-ipld-prime v0.14.0/go.mod h1:9ASQLwUFLptCov6lIYc70GRB4V7UTyLD0IJtrDJe6ZM= github.com/ipld/go-ipld-prime v0.14.1/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0= -github.com/ipld/go-ipld-prime v0.16.0 h1:RS5hhjB/mcpeEPJvfyj0qbOj/QL+/j05heZ0qa97dVo= github.com/ipld/go-ipld-prime v0.16.0/go.mod h1:axSCuOCBPqrH+gvXr2w9uAOulJqBPhHPT2PjoiiU1qA= +github.com/ipld/go-ipld-prime v0.17.0 h1:+U2peiA3aQsE7mrXjD2nYZaZrCcakoz2Wge8K42Ld8g= +github.com/ipld/go-ipld-prime v0.17.0/go.mod h1:aYcKm5TIvGfY8P3QBKz/2gKcLxzJ1zDaD+o0bOowhgs= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73 h1:TsyATB2ZRRQGTwafJdgEUQkmjOExRV0DNokcihZxbnQ= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= @@ -1523,8 +1530,9 @@ github.com/wI2L/jsondiff v0.2.0 h1:dE00WemBa1uCjrzQUUTE/17I6m5qAaN0EMFOg2Ynr/k= github.com/wI2L/jsondiff v0.2.0/go.mod h1:axTcwtBkY4TsKuV+RgoMhHyHKKFRI6nnjRLi8LLYQnA= github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= github.com/warpfork/go-testmark v0.3.0/go.mod h1:jhEf8FVxd+F17juRubpmut64NEG6I2rgkUhlcqqXwE0= -github.com/warpfork/go-testmark v0.9.0 h1:nc+uaCiv5lFQLYjhuC2LTYeJ7JaC+gdDmsz9r0ISy0Y= github.com/warpfork/go-testmark v0.9.0/go.mod h1:jhEf8FVxd+F17juRubpmut64NEG6I2rgkUhlcqqXwE0= +github.com/warpfork/go-testmark v0.10.0 h1:E86YlUMYfwIacEsQGlnTvjk1IgYkyTGjPhF0RnwTCmw= +github.com/warpfork/go-testmark v0.10.0/go.mod h1:jhEf8FVxd+F17juRubpmut64NEG6I2rgkUhlcqqXwE0= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w= diff --git a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go index 5ce723fb3b3..6328e1ff4da 100644 --- a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go +++ b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go @@ -188,7 +188,7 @@ func initTempNode(ctx context.Context, bootstrap []string, peers []peer.AddrInfo } // configure the temporary node - cfg.Routing.Type = "dhtclient" + cfg.Routing.Type = config.NewOptionalString("dhtclient") // Disable listening for inbound connections cfg.Addresses.Gateway = []string{} diff --git a/routing/delegated.go b/routing/delegated.go new file mode 100644 index 00000000000..b8bf95e5cd6 --- /dev/null +++ b/routing/delegated.go @@ -0,0 +1,93 @@ +package routing + +import ( + "strconv" + + drc "github.com/ipfs/go-delegated-routing/client" + drp "github.com/ipfs/go-delegated-routing/gen/proto" + "github.com/ipfs/kubo/config" + "github.com/libp2p/go-libp2p-core/routing" + routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" +) + +type TieredRouter interface { + routing.Routing + ProvideMany() ProvideMany +} + +var _ TieredRouter = &Tiered{} + +// Tiered is a routing Tiered implementation providing some extra methods to fill +// some special use cases when initializing the client. +type Tiered struct { + routinghelpers.Tiered +} + +// ProvideMany returns a ProvideMany implementation including all Routers that +// implements ProvideMany +func (ds Tiered) ProvideMany() ProvideMany { + var pms []ProvideMany + for _, r := range ds.Tiered.Routers { + pm, ok := r.(ProvideMany) + if !ok { + continue + } + pms = append(pms, pm) + } + + if len(pms) == 0 { + return nil + } + + return &ProvideManyWrapper{pms: pms} +} + +const defaultPriority = 100000 + +// GetPriority extract priority from config params. +// Small numbers represent more important routers. +func GetPriority(params map[string]string) int { + param := params[string(config.RouterParamPriority)] + if param == "" { + return defaultPriority + } + + p, err := strconv.Atoi(param) + if err != nil { + return defaultPriority + } + + return p +} + +// RoutingFromConfig creates a Routing instance from the specified configuration. +func RoutingFromConfig(c config.Router) (routing.Routing, error) { + switch { + case c.Type == string(config.RouterTypeReframe): + return reframeRoutingFromConfig(c) + default: + return nil, &RouterTypeNotFoundError{c.Type} + } +} + +func reframeRoutingFromConfig(conf config.Router) (routing.Routing, error) { + var dr drp.DelegatedRouting_Client + + param := string(config.RouterParamEndpoint) + addr, ok := conf.Parameters[param] + if !ok { + return nil, NewParamNeededErr(param, conf.Type) + } + + dr, err := drp.New_DelegatedRouting_Client(addr) + if err != nil { + return nil, err + } + + c := drc.NewClient(dr) + crc := drc.NewContentRoutingClient(c) + return &reframeRoutingWrapper{ + Client: c, + ContentRoutingClient: crc, + }, nil +} diff --git a/routing/delegated_test.go b/routing/delegated_test.go new file mode 100644 index 00000000000..2e96b63665d --- /dev/null +++ b/routing/delegated_test.go @@ -0,0 +1,121 @@ +package routing + +import ( + "context" + "testing" + + "github.com/ipfs/go-cid" + "github.com/ipfs/kubo/config" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/routing" + routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +func TestPriority(t *testing.T) { + require := require.New(t) + params := make(map[string]string) + p := GetPriority(params) + + require.Equal(defaultPriority, p) + + params[string(config.RouterParamPriority)] = "101" + + p = GetPriority(params) + + require.Equal(101, p) + + params[string(config.RouterParamPriority)] = "NAN" + + p = GetPriority(params) + + require.Equal(defaultPriority, p) +} + +func TestRoutingFromConfig(t *testing.T) { + require := require.New(t) + + r, err := RoutingFromConfig(config.Router{ + Type: "unknown", + }) + + require.Nil(r) + require.EqualError(err, "router type unknown is not supported") + + r, err = RoutingFromConfig(config.Router{ + Type: string(config.RouterTypeReframe), + Parameters: make(map[string]string), + }) + + require.Nil(r) + require.EqualError(err, "configuration param 'Endpoint' is needed for reframe delegated routing types") + + r, err = RoutingFromConfig(config.Router{ + Type: string(config.RouterTypeReframe), + Parameters: map[string]string{ + string(config.RouterParamEndpoint): "test", + }, + }) + + require.NotNil(r) + require.NoError(err) +} + +func TestTieredRouter(t *testing.T) { + require := require.New(t) + + tr := &Tiered{ + Tiered: routinghelpers.Tiered{ + Routers: []routing.Routing{routinghelpers.Null{}}, + }, + } + + pm := tr.ProvideMany() + require.Nil(pm) + + tr.Tiered.Routers = append(tr.Tiered.Routers, &dummyRouter{}) + + pm = tr.ProvideMany() + require.NotNil(pm) +} + +type dummyRouter struct { +} + +func (dr *dummyRouter) Provide(context.Context, cid.Cid, bool) error { + panic("not implemented") + +} + +func (dr *dummyRouter) FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.AddrInfo { + panic("not implemented") +} + +func (dr *dummyRouter) FindPeer(context.Context, peer.ID) (peer.AddrInfo, error) { + panic("not implemented") +} + +func (dr *dummyRouter) PutValue(context.Context, string, []byte, ...routing.Option) error { + panic("not implemented") +} + +func (dr *dummyRouter) GetValue(context.Context, string, ...routing.Option) ([]byte, error) { + panic("not implemented") +} + +func (dr *dummyRouter) SearchValue(context.Context, string, ...routing.Option) (<-chan []byte, error) { + panic("not implemented") +} + +func (dr *dummyRouter) Bootstrap(context.Context) error { + panic("not implemented") +} + +func (dr *dummyRouter) ProvideMany(ctx context.Context, keys []multihash.Multihash) error { + panic("not implemented") +} + +func (dr *dummyRouter) Ready() bool { + panic("not implemented") +} diff --git a/routing/error.go b/routing/error.go new file mode 100644 index 00000000000..15016a0e4e6 --- /dev/null +++ b/routing/error.go @@ -0,0 +1,27 @@ +package routing + +import "fmt" + +type ParamNeededError struct { + ParamName string + RouterType string +} + +func NewParamNeededErr(param, routing string) error { + return &ParamNeededError{ + ParamName: param, + RouterType: routing, + } +} + +func (e *ParamNeededError) Error() string { + return fmt.Sprintf("configuration param '%v' is needed for %v delegated routing types", e.ParamName, e.RouterType) +} + +type RouterTypeNotFoundError struct { + RouterType string +} + +func (e *RouterTypeNotFoundError) Error() string { + return fmt.Sprintf("router type %v is not supported", e.RouterType) +} diff --git a/routing/wrapper.go b/routing/wrapper.go new file mode 100644 index 00000000000..b3dea9b8c5e --- /dev/null +++ b/routing/wrapper.go @@ -0,0 +1,66 @@ +package routing + +import ( + "context" + + "github.com/ipfs/go-cid" + drc "github.com/ipfs/go-delegated-routing/client" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/routing" + "github.com/multiformats/go-multihash" + "golang.org/x/sync/errgroup" +) + +var _ routing.Routing = &reframeRoutingWrapper{} + +// reframeRoutingWrapper is a wrapper needed to construct the routing.Routing interface from +// delegated-routing library. +type reframeRoutingWrapper struct { + *drc.Client + *drc.ContentRoutingClient +} + +func (c *reframeRoutingWrapper) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo { + return c.ContentRoutingClient.FindProvidersAsync(ctx, cid, count) +} + +func (c *reframeRoutingWrapper) Bootstrap(ctx context.Context) error { + return nil +} + +func (c *reframeRoutingWrapper) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + return peer.AddrInfo{}, routing.ErrNotSupported +} + +type ProvideMany interface { + ProvideMany(ctx context.Context, keys []multihash.Multihash) error + Ready() bool +} + +var _ ProvideMany = &ProvideManyWrapper{} + +type ProvideManyWrapper struct { + pms []ProvideMany +} + +func (pmw *ProvideManyWrapper) ProvideMany(ctx context.Context, keys []multihash.Multihash) error { + var g errgroup.Group + for _, pm := range pmw.pms { + pm := pm + g.Go(func() error { + return pm.ProvideMany(ctx, keys) + }) + } + + return g.Wait() +} + +// Ready is ready if all providers are ready +func (pmw *ProvideManyWrapper) Ready() bool { + out := true + for _, pm := range pmw.pms { + out = out && pm.Ready() + } + + return out +} diff --git a/routing/wrapper_test.go b/routing/wrapper_test.go new file mode 100644 index 00000000000..dd5f2f44690 --- /dev/null +++ b/routing/wrapper_test.go @@ -0,0 +1,101 @@ +package routing + +import ( + "context" + "errors" + "testing" + + "github.com/multiformats/go-multihash" +) + +func TestProvideManyWrapper_ProvideMany(t *testing.T) { + type fields struct { + pms []ProvideMany + } + tests := []struct { + name string + fields fields + wantErr bool + ready bool + }{ + { + name: "one provider", + fields: fields{ + pms: []ProvideMany{ + newDummyProvideMany(true, false), + }, + }, + wantErr: false, + ready: true, + }, + { + name: "two providers, no errors and ready", + fields: fields{ + pms: []ProvideMany{ + newDummyProvideMany(true, false), + newDummyProvideMany(true, false), + }, + }, + wantErr: false, + ready: true, + }, + { + name: "two providers, no ready, no error", + fields: fields{ + pms: []ProvideMany{ + newDummyProvideMany(true, false), + newDummyProvideMany(false, false), + }, + }, + wantErr: false, + ready: false, + }, + { + name: "two providers, no ready, and one erroing", + fields: fields{ + pms: []ProvideMany{ + newDummyProvideMany(true, false), + newDummyProvideMany(false, true), + }, + }, + wantErr: true, + ready: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pmw := &ProvideManyWrapper{ + pms: tt.fields.pms, + } + if err := pmw.ProvideMany(context.Background(), nil); (err != nil) != tt.wantErr { + t.Errorf("ProvideManyWrapper.ProvideMany() error = %v, wantErr %v", err, tt.wantErr) + } + + if ready := pmw.Ready(); ready != tt.ready { + t.Errorf("ProvideManyWrapper.Ready() unexpected output = %v, want %v", ready, tt.ready) + } + }) + } +} + +func newDummyProvideMany(ready, failProviding bool) *dummyProvideMany { + return &dummyProvideMany{ + ready: ready, + failProviding: failProviding, + } +} + +type dummyProvideMany struct { + ready, failProviding bool +} + +func (dpm *dummyProvideMany) ProvideMany(ctx context.Context, keys []multihash.Multihash) error { + if dpm.failProviding { + return errors.New("error providing many") + } + + return nil +} +func (dpm *dummyProvideMany) Ready() bool { + return dpm.ready +} diff --git a/test/sharness/t0170-dht.sh b/test/sharness/t0170-legacy-dht.sh similarity index 98% rename from test/sharness/t0170-dht.sh rename to test/sharness/t0170-legacy-dht.sh index e76b8586089..c18735b5b18 100755 --- a/test/sharness/t0170-dht.sh +++ b/test/sharness/t0170-legacy-dht.sh @@ -1,5 +1,6 @@ #!/usr/bin/env bash +# Legacy / deprecated, see: t0170-routing-dht.sh test_description="Test dht command" . lib/test-lib.sh diff --git a/test/sharness/t0170-routing-dht.sh b/test/sharness/t0170-routing-dht.sh new file mode 100755 index 00000000000..2eef692a8cb --- /dev/null +++ b/test/sharness/t0170-routing-dht.sh @@ -0,0 +1,118 @@ +#!/usr/bin/env bash + +# This file does the same tests as t0170-dht.sh but uses 'routing' commands instead +# (only exception is query, which lives only under dht) +test_description="Test routing command" + +. lib/test-lib.sh + +test_dht() { + NUM_NODES=5 + + test_expect_success 'init iptb' ' + rm -rf .iptb/ && + iptb testbed create -type localipfs -count $NUM_NODES -init + ' + + startup_cluster $NUM_NODES $@ + + test_expect_success 'peer ids' ' + PEERID_0=$(iptb attr get 0 id) && + PEERID_2=$(iptb attr get 2 id) + ' + + # ipfs routing findpeer + test_expect_success 'findpeer' ' + ipfsi 1 routing findpeer $PEERID_0 | sort >actual && + ipfsi 0 id -f "" | cut -d / -f 1-5 | sort >expected && + test_cmp actual expected + ' + + # ipfs routing get + test_expect_success 'get with good keys works' ' + HASH="$(echo "hello world" | ipfsi 2 add -q)" && + ipfsi 2 name publish "/ipfs/$HASH" && + ipfsi 1 routing get "/ipns/$PEERID_2" >get_result + ' + + test_expect_success 'get with good keys contains the right value' ' + cat get_result | grep -aq "/ipfs/$HASH" + ' + + test_expect_success 'put round trips (#3124)' ' + ipfsi 0 routing put "/ipns/$PEERID_2" get_result | sort >putted && + [ -s putted ] || + test_fsh cat putted + ' + + test_expect_success 'put with bad keys fails (issue #5113)' ' + ipfsi 0 routing put "foo" <<putted + ipfsi 0 routing put "/pk/foo" <<>putted + ipfsi 0 routing put "/ipns/foo" <<>putted + [ ! -s putted ] || + test_fsh cat putted + ' + + test_expect_success 'put with bad keys returns error (issue #4611)' ' + test_must_fail ipfsi 0 routing put "foo" << afile && + HASH=$(ipfsi 3 add -q afile) + ' + + # ipfs routing findprovs + test_expect_success 'findprovs' ' + ipfsi 4 routing findprovs $HASH > provs && + iptb attr get 3 id > expected && + test_cmp provs expected + ' + + + # ipfs dht query + # + # We test all nodes. 4 nodes should see the same peer ID, one node (the + # closest) should see a different one. + + for i in $(test_seq 0 4); do + test_expect_success "dht query from $i" ' + ipfsi "$i" dht query "$HASH" | head -1 >closest-$i + ' + done + + test_expect_success "collecting results" ' + cat closest-* | sort | uniq -c | sed -e "s/ *\([0-9]\+\) .*/\1/g" | sort -g > actual && + echo 1 > expected && + echo 4 >> expected + ' + + test_expect_success "checking results" ' + test_cmp actual expected + ' + + test_expect_success 'stop iptb' ' + iptb stop + ' + + test_expect_success "dht commands fail when offline" ' + test_must_fail ipfsi 0 routing findprovs "$HASH" 2>err_findprovs && + test_must_fail ipfsi 0 routing findpeer "$HASH" 2>err_findpeer && + test_must_fail ipfsi 0 routing put "/ipns/$PEERID_2" "get_result" 2>err_put && + test_should_contain "this command must be run in online mode" err_findprovs && + test_should_contain "this command must be run in online mode" err_findpeer && + test_should_contain "this command must be run in online mode" err_put + ' +} + +test_dht +test_dht --enable-pubsub-experiment --enable-namesys-pubsub + +test_done diff --git a/test/sharness/t0701-delegated-routing-reframe.sh b/test/sharness/t0701-delegated-routing-reframe.sh new file mode 100755 index 00000000000..424be6ef55e --- /dev/null +++ b/test/sharness/t0701-delegated-routing-reframe.sh @@ -0,0 +1,103 @@ +#!/usr/bin/env bash + +test_description="Test delegated routing via reframe endpoint" + +. lib/test-lib.sh + +if ! test_have_prereq SOCAT; then + skip_all="skipping '$test_description': socat is not available" + test_done +fi + +# simple reframe server mock +# local endpoint responds with deterministic application/vnd.ipfs.rpc+dag-json; version=1 +REFRAME_PORT=5098 +function start_reframe_mock_endpoint() { + REMOTE_SERVER_LOG="reframe-server.log" + rm -f $REMOTE_SERVER_LOG + + touch response + socat tcp-listen:$REFRAME_PORT,fork,bind=127.0.0.1,reuseaddr 'SYSTEM:cat response'!!CREATE:$REMOTE_SERVER_LOG & + REMOTE_SERVER_PID=$! + + socat /dev/null tcp:127.0.0.1:$REFRAME_PORT,retry=10 + return $? +} +function serve_reframe_response() { + local body=$1 + local status_code=${2:-"200 OK"} + local length=$((1 + ${#body})) + echo -e "HTTP/1.1 $status_code\nContent-Type: application/vnd.ipfs.rpc+dag-json; version=1\nContent-Length: $length\n\n$body" > response +} +function stop_reframe_mock_endpoint() { + exec 7<&- + kill $REMOTE_SERVER_PID > /dev/null 2>&1 + wait $REMOTE_SERVER_PID || true +} + +# daemon running in online mode to ensure Pin.origins/PinStatus.delegates work +test_init_ipfs + +# based on static, synthetic reframe messages: +# t0701-delegated-routing-reframe/FindProvidersRequest +# t0701-delegated-routing-reframe/FindProvidersResponse +FINDPROV_CID="bafybeigvgzoolc3drupxhlevdp2ugqcrbcsqfmcek2zxiw5wctk3xjpjwy" +EXPECTED_PROV="QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC" + +test_expect_success "default Routing config has no Routers defined" ' + echo null > expected && + ipfs config show | jq .Routing.Routers > actual && + test_cmp expected actual +' + +# turn off all implicit routers +ipfs config Routing.Type none || exit 1 +test_launch_ipfs_daemon +test_expect_success "disabling default router (dht) works" ' + ipfs config Routing.Type > actual && + echo none > expected && + test_cmp expected actual +' +test_expect_success "no routers means findprovs returns no results" ' + ipfs routing findprovs "$FINDPROV_CID" > actual && + echo -n > expected && + test_cmp expected actual +' + +test_kill_ipfs_daemon + +# set Routing config to only use delegated routing via mocked reframe endpoint +ipfs config Routing.Routers.TestDelegatedRouter --json '{ + "Type": "reframe", + "Parameters": { + "Endpoint": "http://127.0.0.1:5098/reframe" + } +}' || exit 1 + +test_expect_success "adding reframe endpoint to Routing.Routers config works" ' + echo "http://127.0.0.1:5098/reframe" > expected && + ipfs config Routing.Routers.TestDelegatedRouter.Parameters.Endpoint > actual && + test_cmp expected actual +' + +test_launch_ipfs_daemon + +test_expect_success "start_reframe_mock_endpoint" ' + start_reframe_mock_endpoint +' + +test_expect_success "'ipfs routing findprovs' returns result from delegated reframe router" ' + serve_reframe_response "$(<../t0701-delegated-routing-reframe/FindProvidersResponse)" && + echo "$EXPECTED_PROV" > expected && + ipfs routing findprovs "$FINDPROV_CID" > actual && + test_cmp expected actual +' + +test_expect_success "stop_reframe_mock_endpoint" ' + stop_reframe_mock_endpoint +' + + +test_kill_ipfs_daemon +test_done +# vim: ts=2 sw=2 sts=2 et: diff --git a/test/sharness/t0701-delegated-routing-reframe/FindProvidersRequest b/test/sharness/t0701-delegated-routing-reframe/FindProvidersRequest new file mode 100644 index 00000000000..c162296fc39 --- /dev/null +++ b/test/sharness/t0701-delegated-routing-reframe/FindProvidersRequest @@ -0,0 +1 @@ +{"FindProvidersRequest":{"Key":{"/":"bafybeigvgzoolc3drupxhlevdp2ugqcrbcsqfmcek2zxiw5wctk3xjpjwy"}}} diff --git a/test/sharness/t0701-delegated-routing-reframe/FindProvidersResponse b/test/sharness/t0701-delegated-routing-reframe/FindProvidersResponse new file mode 100644 index 00000000000..da8ddfe625d --- /dev/null +++ b/test/sharness/t0701-delegated-routing-reframe/FindProvidersResponse @@ -0,0 +1 @@ +{"FindProvidersResponse":{"Providers":[{"Node":{"peer":{"ID":{"/":{"bytes":"EiAngCqwSSL46hQ5+DWaJsZ1SPV2RwrqwID/OEuj5Rdgqw"}},"Multiaddresses":[{"/":{"bytes":"NiJwZWVyLmlwZnMtZWxhc3RpYy1wcm92aWRlci1hd3MuY29tBgu43QM"}}]}},"Proto":[{"2304":{}}]}]}}