Skip to content

Commit

Permalink
Add AddAddresses method to DHT (#879)
Browse files Browse the repository at this point in the history
* Add AddAddresses method to DHT

* Add AddAddresses method to DHT

* go mod tidy

* Rename Query Skip errors

* go fmt coordinator.go

* Fix test flakes

* Fix lint errors
  • Loading branch information
iand authored Sep 7, 2023
1 parent 6170d5d commit 46ff621
Show file tree
Hide file tree
Showing 22 changed files with 410 additions and 126 deletions.
4 changes: 2 additions & 2 deletions v2/coord/behaviour.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type WorkQueue[E BehaviourEvent] struct {

func NewWorkQueue[E BehaviourEvent](fn WorkQueueFunc[E]) *WorkQueue[E] {
w := &WorkQueue[E]{
pending: make(chan pendingEvent[E], 16),
pending: make(chan pendingEvent[E], 1),
fn: fn,
}
return w
Expand Down Expand Up @@ -113,7 +113,7 @@ var _ Notify[BehaviourEvent] = (*Waiter[BehaviourEvent])(nil)

func NewWaiter[E BehaviourEvent]() *Waiter[E] {
w := &Waiter[E]{
pending: make(chan WaiterEvent[E], 16),
pending: make(chan WaiterEvent[E], 1),
}
return w
}
Expand Down
5 changes: 5 additions & 0 deletions v2/coord/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func NodeIDToAddrInfo(id kad.NodeID[KadKey]) peer.AddrInfo {
}
}

// AddrInfoToNodeID converts a peer.AddrInfo to a kad.NodeID.
func AddrInfoToNodeID(ai peer.AddrInfo) kad.NodeID[KadKey] {
return kadt.PeerID(ai.ID)
}

// SliceOfNodeInfoToSliceOfAddrInfo converts a kad.NodeInfo to a peer.AddrInfo.
// This function will panic if any info.ID() does not return a kadt.PeerID
func SliceOfNodeInfoToSliceOfAddrInfo(infos []kad.NodeInfo[KadKey, ma.Multiaddr]) []peer.AddrInfo {
Expand Down
60 changes: 38 additions & 22 deletions v2/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Coordinator struct {
// self is the peer id of the system the dht is running on
self peer.ID

// cancel is used to cancel all running goroutines when the coordinator is cleaning up
cancel context.CancelFunc

// cfg is a copy of the optional configuration supplied to the dht
cfg CoordinatorConfig

Expand All @@ -50,8 +53,6 @@ type Coordinator struct {
queryBehaviour Behaviour[BehaviourEvent, BehaviourEvent]
}

const DefaultChanqueueCapacity = 1024

type CoordinatorConfig struct {
PeerstoreTTL time.Duration // duration for which a peer is kept in the peerstore

Expand Down Expand Up @@ -120,14 +121,14 @@ func (cfg *CoordinatorConfig) Validate() error {
return nil
}

func DefaultConfig() (*CoordinatorConfig, error) {
func DefaultCoordinatorConfig() (*CoordinatorConfig, error) {
telemetry, err := tele.NewWithGlobalProviders()
if err != nil {
return nil, fmt.Errorf("new telemetry: %w", err)
}

return &CoordinatorConfig{
Clock: clock.New(), // use standard time
Clock: clock.New(),
PeerstoreTTL: 10 * time.Minute,
QueryConcurrency: 3,
QueryTimeout: 5 * time.Minute,
Expand All @@ -140,7 +141,7 @@ func DefaultConfig() (*CoordinatorConfig, error) {

func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey, kad.NodeID[KadKey]], cfg *CoordinatorConfig) (*Coordinator, error) {
if cfg == nil {
c, err := DefaultConfig()
c, err := DefaultCoordinatorConfig()
if err != nil {
return nil, fmt.Errorf("default config: %w", err)
}
Expand Down Expand Up @@ -200,25 +201,34 @@ func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey,

routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, probe, cfg.Logger, cfg.Tele.Tracer)

networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger)
networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, cfg.Tele.Tracer)

ctx, cancel := context.WithCancel(context.Background())

d := &Coordinator{
self: self,
cfg: *cfg,
rtr: rtr,
rt: rt,
self: self,
cfg: *cfg,
rtr: rtr,
rt: rt,
cancel: cancel,

networkBehaviour: networkBehaviour,
routingBehaviour: routingBehaviour,
queryBehaviour: queryBehaviour,

routingNotifications: make(chan RoutingNotification, 20),
routingNotifications: make(chan RoutingNotification, 20), // buffered mainly to allow tests to read the channel after running an operation
}
go d.eventLoop()
go d.eventLoop(ctx)

return d, nil
}

// Close cleans up all resources associated with this Coordinator.
func (c *Coordinator) Close() error {
c.cancel()
return nil
}

func (c *Coordinator) ID() peer.ID {
return c.self
}
Expand All @@ -237,13 +247,16 @@ func (c *Coordinator) RoutingNotifications() <-chan RoutingNotification {
return c.routingNotifications
}

func (c *Coordinator) eventLoop() {
ctx := context.Background()

func (c *Coordinator) eventLoop(ctx context.Context) {
ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.eventLoop")
defer span.End()
for {
var ev BehaviourEvent
var ok bool
select {
case <-ctx.Done():
// coordinator is closing
return
case <-c.networkBehaviour.Ready():
ev, ok = c.networkBehaviour.Perform(ctx)
case <-c.routingBehaviour.Ready():
Expand Down Expand Up @@ -339,7 +352,7 @@ func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (Q
}

waiter := NewWaiter[BehaviourEvent]()
queryID := query.QueryID("foo")
queryID := query.QueryID("foo") // TODO: choose query ID

cmd := &EventStartQuery{
QueryID: queryID,
Expand Down Expand Up @@ -375,12 +388,12 @@ func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (Q
}

err = fn(ctx, nh, lastStats)
if errors.Is(err, SkipRemaining) {
if errors.Is(err, ErrSkipRemaining) {
// done
c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID})
return lastStats, nil
}
if errors.Is(err, SkipNode) {
if errors.Is(err, ErrSkipNode) {
// TODO: don't add closer nodes from this node
break
}
Expand All @@ -405,17 +418,20 @@ func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (Q
// AddNodes suggests new DHT nodes and their associated addresses to be added to the routing table.
// If the routing table is updated as a result of this operation an EventRoutingUpdated notification
// is emitted on the routing notification channel.
func (c *Coordinator) AddNodes(ctx context.Context, infos []peer.AddrInfo) error {
func (c *Coordinator) AddNodes(ctx context.Context, ais []peer.AddrInfo, ttl time.Duration) error {
ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.AddNodes")
defer span.End()
for _, info := range infos {
if info.ID == c.self {
for _, ai := range ais {
if ai.ID == c.self {
// skip self
continue
}

// TODO: apply address filter

c.routingBehaviour.Notify(ctx, &EventAddAddrInfo{
NodeInfo: info,
NodeInfo: ai,
TTL: ttl,
})

}
Expand Down
Loading

0 comments on commit 46ff621

Please sign in to comment.