Skip to content

Commit

Permalink
Merge remote-tracking branch 'p2p/discovery/routing/master' into disc…
Browse files Browse the repository at this point in the history
…overy-routing
  • Loading branch information
marten-seemann committed Jan 9, 2022
2 parents 5039dc0 + 6443c8b commit b5056a8
Show file tree
Hide file tree
Showing 2 changed files with 259 additions and 0 deletions.
113 changes: 113 additions & 0 deletions p2p/discovery/routing/routing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package discovery

import (
"context"
"github.com/libp2p/go-libp2p-core/discovery"
"time"

"github.com/ipfs/go-cid"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"

mh "github.com/multiformats/go-multihash"
)

// RoutingDiscovery is an implementation of discovery using ContentRouting.
// Namespaces are translated to Cids using the SHA256 hash.
type RoutingDiscovery struct {
routing.ContentRouting
}

func NewRoutingDiscovery(router routing.ContentRouting) *RoutingDiscovery {
return &RoutingDiscovery{router}
}

func (d *RoutingDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return 0, err
}

ttl := options.Ttl
if ttl == 0 || ttl > 3*time.Hour {
// the DHT provider record validity is 24hrs, but it is recommnded to republish at least every 6hrs
// we go one step further and republish every 3hrs
ttl = 3 * time.Hour
}

cid, err := nsToCid(ns)
if err != nil {
return 0, err
}

// this context requires a timeout; it determines how long the DHT looks for
// closest peers to the key/CID before it goes on to provide the record to them.
// Not setting a timeout here will make the DHT wander forever.
pctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

err = d.Provide(pctx, cid, true)
if err != nil {
return 0, err
}

return ttl, nil
}

func (d *RoutingDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return nil, err
}

limit := options.Limit
if limit == 0 {
limit = 100 // that's just arbitrary, but FindProvidersAsync needs a count
}

cid, err := nsToCid(ns)
if err != nil {
return nil, err
}

return d.FindProvidersAsync(ctx, cid, limit), nil
}

func nsToCid(ns string) (cid.Cid, error) {
h, err := mh.Sum([]byte(ns), mh.SHA2_256, -1)
if err != nil {
return cid.Undef, err
}

return cid.NewCidV1(cid.Raw, h), nil
}

func NewDiscoveryRouting(disc discovery.Discovery, opts ...discovery.Option) *DiscoveryRouting {
return &DiscoveryRouting{disc, opts}
}

type DiscoveryRouting struct {
discovery.Discovery
opts []discovery.Option
}

func (r *DiscoveryRouting) Provide(ctx context.Context, c cid.Cid, bcast bool) error {
if !bcast {
return nil
}

_, err := r.Advertise(ctx, cidToNs(c), r.opts...)
return err
}

func (r *DiscoveryRouting) FindProvidersAsync(ctx context.Context, c cid.Cid, limit int) <-chan peer.AddrInfo {
ch, _ := r.FindPeers(ctx, cidToNs(c), append([]discovery.Option{discovery.Limit(limit)}, r.opts...)...)
return ch
}

func cidToNs(c cid.Cid) string {
return "/provider/" + c.String()
}
146 changes: 146 additions & 0 deletions p2p/discovery/routing/routing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package discovery

import (
"context"
"sync"
"testing"
"time"

"github.com/ipfs/go-cid"
bhost "github.com/libp2p/go-libp2p-blankhost"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
)

type mockRoutingTable struct {
mx sync.Mutex
providers map[string]map[peer.ID]peer.AddrInfo
}

type mockRouting struct {
h host.Host
tab *mockRoutingTable
}

func NewMockRoutingTable() *mockRoutingTable {
return &mockRoutingTable{providers: make(map[string]map[peer.ID]peer.AddrInfo)}
}

func NewMockRouting(h host.Host, tab *mockRoutingTable) *mockRouting {
return &mockRouting{h: h, tab: tab}
}

func (m *mockRouting) Provide(ctx context.Context, cid cid.Cid, bcast bool) error {
m.tab.mx.Lock()
defer m.tab.mx.Unlock()

pmap, ok := m.tab.providers[cid.String()]
if !ok {
pmap = make(map[peer.ID]peer.AddrInfo)
m.tab.providers[cid.String()] = pmap
}

pmap[m.h.ID()] = peer.AddrInfo{ID: m.h.ID(), Addrs: m.h.Addrs()}

return nil
}

func (m *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, limit int) <-chan peer.AddrInfo {
ch := make(chan peer.AddrInfo)
go func() {
defer close(ch)
m.tab.mx.Lock()
defer m.tab.mx.Unlock()

pmap, ok := m.tab.providers[cid.String()]
if !ok {
return
}

for _, pi := range pmap {
select {
case ch <- pi:
case <-ctx.Done():
return
}
}
}()

return ch
}

func TestRoutingDiscovery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t))

mtab := NewMockRoutingTable()
mr1 := NewMockRouting(h1, mtab)
mr2 := NewMockRouting(h2, mtab)

d1 := NewRoutingDiscovery(mr1)
d2 := NewRoutingDiscovery(mr2)

_, err := d1.Advertise(ctx, "/test")
if err != nil {
t.Fatal(err)
}

pis, err := FindPeers(ctx, d2, "/test", discovery.Limit(20))
if err != nil {
t.Fatal(err)
}

if len(pis) != 1 {
t.Fatalf("Expected 1 peer, got %d", len(pis))
}

pi := pis[0]
if pi.ID != h1.ID() {
t.Fatalf("Unexpected peer: %s", pi.ID)
}
}

func TestDiscoveryRouting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t))

dserver := newDiscoveryServer()
d1 := &mockDiscoveryClient{h1, dserver}
d2 := &mockDiscoveryClient{h2, dserver}

r1 := NewDiscoveryRouting(d1, discovery.TTL(time.Hour))
r2 := NewDiscoveryRouting(d2, discovery.TTL(time.Hour))

c, err := nsToCid("/test")
if err != nil {
t.Fatal(err)
}

if err := r1.Provide(ctx, c, true); err != nil {
t.Fatal(err)
}

pch := r2.FindProvidersAsync(ctx, c, 20)

var allAIs []peer.AddrInfo
for ai := range pch {
allAIs = append(allAIs, ai)
}

if len(allAIs) != 1 {
t.Fatalf("Expected 1 peer, got %d", len(allAIs))
}

ai := allAIs[0]
if ai.ID != h1.ID() {
t.Fatalf("Unexpected peer: %s", ai.ID)
}
}

0 comments on commit b5056a8

Please sign in to comment.