Skip to content

Commit

Permalink
add configurable dht fallback type
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Oct 6, 2023
1 parent 00fdfa1 commit fbe11ac
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 11 deletions.
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func main() {
Value: false,
Usage: "If using an Amino DHT client should the libp2p host be shared with the data downloading host",
},
&cli.StringFlag{
Name: "dht-fallback-type",
Value: "combined",
Usage: "the type of Amino client to be used as a fallback (standard, accelerated, combined)",
},
}

app.Name = "rainbow"
Expand Down
106 changes: 95 additions & 11 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ipfs/boxo/namesys"
routingv1client "github.com/ipfs/boxo/routing/http/client"
httpcontentrouter "github.com/ipfs/boxo/routing/http/contentrouter"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
flatfs "github.com/ipfs/go-ds-flatfs"
levelds "github.com/ipfs/go-ds-leveldb"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -59,6 +61,14 @@ type Node struct {
bwc *metrics.BandwidthCounter
}

type DHTType int

const (
Combined DHTType = iota
Standard
Accelerated
)

type Config struct {
ListenAddrs []string
AnnounceAddrs []string
Expand All @@ -76,6 +86,7 @@ type Config struct {
RoutingV1 string
KuboRPCURLs []string
DHTSharedHost bool
DHTType DHTType
}

func Setup(ctx context.Context, cfg *Config) (*Node, error) {
Expand Down Expand Up @@ -165,18 +176,49 @@ func Setup(ctx context.Context, cfg *Config) (*Node, error) {
}
}

fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{KeyBook: h.Peerstore()},
}),
var standardClient *dht.IpfsDHT
var fullRTClient *fullrt.FullRT

if cfg.DHTType == Combined || cfg.DHTType == Standard {
standardClient, err = dht.New(ctx, dhtHost,
dht.Datastore(memDS),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.BucketSize(20),
))
if err != nil {
return nil, err
dht.Mode(dht.ModeClient),
)
if err != nil {
return nil, err
}
}

if cfg.DHTType == Combined || cfg.DHTType == Accelerated {
fullRTClient, err = fullrt.NewFullRT(dhtHost, dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{KeyBook: h.Peerstore()},
}),
dht.Datastore(memDS),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.BucketSize(20),
))
if err != nil {
return nil, err
}
}

var dhtRouter routing.Routing
switch cfg.DHTType {
case Combined:
dhtRouter = &bundledDHT{
standard: standardClient,
fullRT: fullRTClient,
}
case Standard:
dhtRouter = standardClient
case Accelerated:
dhtRouter = fullRTClient
default:
return nil, fmt.Errorf("unsupported DHT type")
}

// we want to also use the default HTTP routers, so wrap the FullRT client
Expand All @@ -187,7 +229,7 @@ func Setup(ctx context.Context, cfg *Config) (*Node, error) {
}
routers := []*routinghelpers.ParallelRouter{
{
Router: fullRTClient,
Router: dhtRouter,
ExecuteAfter: 0,
DoNotWaitForSearchValue: true,
IgnoreError: false,
Expand Down Expand Up @@ -242,6 +284,48 @@ func Setup(ctx context.Context, cfg *Config) (*Node, error) {
}, nil
}

type bundledDHT struct {
standard *dht.IpfsDHT
fullRT *fullrt.FullRT
}

func (b *bundledDHT) getDHT() routing.Routing {
if b.fullRT.Ready() {
return b.fullRT
}
return b.standard
}

func (b *bundledDHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
return b.getDHT().Provide(ctx, c, brdcst)
}

func (b *bundledDHT) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.AddrInfo {
return b.getDHT().FindProvidersAsync(ctx, c, i)
}

func (b *bundledDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
return b.getDHT().FindPeer(ctx, id)
}

func (b *bundledDHT) PutValue(ctx context.Context, k string, v []byte, option ...routing.Option) error {
return b.getDHT().PutValue(ctx, k, v, option...)
}

func (b *bundledDHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) {
return b.getDHT().GetValue(ctx, s, option...)
}

func (b *bundledDHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) {
return b.getDHT().SearchValue(ctx, s, option...)
}

func (b *bundledDHT) Bootstrap(ctx context.Context) error {
return b.standard.Bootstrap(ctx)
}

var _ routing.Routing = (*bundledDHT)(nil)

func delegatedHTTPContentRouter(endpoint string, rv1Opts ...routingv1client.Option) (routing.Routing, error) {
// Increase per-host connection pool since we are making lots of concurrent requests.
transport := http.DefaultTransport.(*http.Transport).Clone()
Expand Down

0 comments on commit fbe11ac

Please sign in to comment.