Skip to content

Commit

Permalink
Separate possible ContentRouters for TopicDiscovery.
Browse files Browse the repository at this point in the history
If we don't do this, we have a ciclic dependency creating TieredRouter.
Now we can create first all possible content routers, and after that,
create Routers.
  • Loading branch information
ajnavarro committed Jun 29, 2022
1 parent 47aae79 commit 4f31953
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 9 deletions.
3 changes: 3 additions & 0 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)),

fx.Provide(libp2p.Routing),
fx.Provide(libp2p.ContentRouting),

fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)),
fx.Provide(libp2p.DelegatedRouting(cfg.Routing.Routers)),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),
Expand Down Expand Up @@ -314,6 +316,7 @@ func Offline(cfg *config.Config) fx.Option {
fx.Provide(DNSResolver),
fx.Provide(Namesys(0)),
fx.Provide(libp2p.Routing),
fx.Provide(libp2p.ContentRouting),
fx.Provide(libp2p.OfflineRouting),
OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
)
Expand Down
29 changes: 23 additions & 6 deletions core/node/libp2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ type processInitialRoutingIn struct {
type processInitialRoutingOut struct {
fx.Out

Router Router `group:"routers"`
Router Router `group:"routers"`
ContentRouter routing.ContentRouting `group:"content-routers"`

DHT *ddht.DHT
DHTClient routing.Routing `name:"dhtc"`
}
Expand Down Expand Up @@ -109,8 +111,9 @@ func BaseRouting(experimentalDHTClient bool) interface{} {
Routing: expClient,
Priority: 1000,
},
DHT: dr,
DHTClient: expClient,
DHT: dr,
DHTClient: expClient,
ContentRouter: expClient,
}, nil
}

Expand All @@ -119,16 +122,18 @@ func BaseRouting(experimentalDHTClient bool) interface{} {
Priority: 1000,
Routing: in.Router,
},
DHT: dr,
DHTClient: dr,
DHT: dr,
DHTClient: dr,
ContentRouter: in.Router,
}, nil
}
}

type delegatedRouterOut struct {
fx.Out

Routers []Router `group:"routers,flatten"`
Routers []Router `group:"routers,flatten"`
ContentRouter []routing.ContentRouting `group:"content-routers,flatten"`
}

func DelegatedRouting(routers map[string]config.Router) interface{} {
Expand All @@ -149,12 +154,24 @@ func DelegatedRouting(routers map[string]config.Router) interface{} {
Routing: r,
Priority: irouting.GetPriority(v.Parameters),
})

out.ContentRouter = append(out.ContentRouter, r)
}

return out, nil
}
}

type p2pOnlineContentRoutingIn struct {
fx.In

ContentRouter []routing.ContentRouting `group:"content-routers"`
}

func ContentRouting(in p2pOnlineContentRoutingIn) irouting.TieredContentRouter {
return &irouting.ContentRoutingWrapper{ContentRoutings: in.ContentRouter}
}

type p2pOnlineRoutingIn struct {
fx.In

Expand Down
4 changes: 1 addition & 3 deletions core/node/libp2p/topicdiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ import (
"github.com/libp2p/go-libp2p-core/host"
disc "github.com/libp2p/go-libp2p-discovery"

"github.com/ipfs/go-ipfs/core/node/helpers"
irouting "github.com/ipfs/go-ipfs/routing"
"go.uber.org/fx"
)

func TopicDiscovery() interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cr irouting.TieredRouter) (service discovery.Discovery, err error) {
return func(host host.Host, cr irouting.TieredContentRouter) (service discovery.Discovery, err error) {
baseDisc := disc.NewRoutingDiscovery(cr)
minBackoff, maxBackoff := time.Second*60, time.Hour
rng := rand.New(rand.NewSource(rand.Int63()))
Expand Down
4 changes: 4 additions & 0 deletions routing/delegated.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type TieredRouter interface {
ProviderManyWrapper() ProvideMany
}

type TieredContentRouter interface {
routing.ContentRouting
}

var _ TieredRouter = &Tiered{}

// Tiered is a routing Tiered implementation providing some extra methods to fill
Expand Down
97 changes: 97 additions & 0 deletions routing/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package routing

import (
"context"
"sync"

"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
drc "github.com/ipfs/go-delegated-routing/client"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -64,3 +66,98 @@ func (pmw *ProvideManyWrapper) Ready() bool {

return out
}

var _ TieredContentRouter = &ContentRoutingWrapper{}

type ContentRoutingWrapper struct {
ContentRoutings []routing.ContentRouting
}

// Provide adds the given cid to the content routing system. If 'true' is
// passed, it also announces it, otherwise it is just kept in the local
// accounting of which objects are being provided.
func (crw *ContentRoutingWrapper) Provide(ctx context.Context, cid cid.Cid, announce bool) error {
c := len(crw.ContentRoutings)
wg := sync.WaitGroup{}
wg.Add(c)

errors := make([]error, c)

for i, cr := range crw.ContentRoutings {
go func(cr routing.ContentRouting, i int) {
errors[i] = cr.Provide(ctx, cid, announce)
wg.Done()
}(cr, i)
}

wg.Wait()

var out []error
success := false
for _, e := range errors {
switch e {
case nil:
success = true
case routing.ErrNotSupported:
default:
out = append(out, e)
}
}
switch len(out) {
case 0:
if success {
// No errors and at least one router succeeded.
return nil
}
// No routers supported this operation.
return routing.ErrNotSupported
case 1:
return out[0]
default:
return &multierror.Error{Errors: out}
}
}

// Search for peers who are able to provide a given key
//
// When count is 0, this method will return an unbounded number of
// results.
func (crw *ContentRoutingWrapper) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
subCtx, cancel := context.WithCancel(ctx)

aich := make(chan peer.AddrInfo)

for _, ri := range crw.ContentRoutings {
fpch := ri.FindProvidersAsync(subCtx, cid, count)
go func() {
for ai := range fpch {
aich <- ai
}
}()
}

out := make(chan peer.AddrInfo)

go func() {
defer cancel()
c := 0
doCount := true
if count <= 0 {
doCount = false
}

for ai := range aich {
if c >= count && doCount {
return
}

out <- ai

if doCount {
c++
}
}
}()

return out
}

0 comments on commit 4f31953

Please sign in to comment.