Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AddAddresses method to DHT #879

Merged
merged 9 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions v2/coord/behaviour.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type WorkQueue[E BehaviourEvent] struct {

func NewWorkQueue[E BehaviourEvent](fn WorkQueueFunc[E]) *WorkQueue[E] {
w := &WorkQueue[E]{
pending: make(chan pendingEvent[E], 16),
pending: make(chan pendingEvent[E], 1),
fn: fn,
}
return w
Expand Down Expand Up @@ -113,7 +113,7 @@ var _ Notify[BehaviourEvent] = (*Waiter[BehaviourEvent])(nil)

func NewWaiter[E BehaviourEvent]() *Waiter[E] {
w := &Waiter[E]{
pending: make(chan WaiterEvent[E], 16),
pending: make(chan WaiterEvent[E], 1),
}
return w
}
Expand Down
5 changes: 5 additions & 0 deletions v2/coord/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func NodeIDToAddrInfo(id kad.NodeID[KadKey]) peer.AddrInfo {
}
}

// AddrInfoToNodeID converts a peer.AddrInfo to a kad.NodeID.
func AddrInfoToNodeID(ai peer.AddrInfo) kad.NodeID[KadKey] {
return kadt.PeerID(ai.ID)
}

// SliceOfNodeInfoToSliceOfAddrInfo converts a kad.NodeInfo to a peer.AddrInfo.
// This function will panic if any info.ID() does not return a kadt.PeerID
func SliceOfNodeInfoToSliceOfAddrInfo(infos []kad.NodeInfo[KadKey, ma.Multiaddr]) []peer.AddrInfo {
Expand Down
60 changes: 38 additions & 22 deletions v2/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Coordinator struct {
// self is the peer id of the system the dht is running on
self peer.ID

// cancel is used to cancel all running goroutines when the coordinator is cleaning up
cancel context.CancelFunc

// cfg is a copy of the optional configuration supplied to the dht
cfg CoordinatorConfig

Expand All @@ -50,8 +53,6 @@ type Coordinator struct {
queryBehaviour Behaviour[BehaviourEvent, BehaviourEvent]
}

const DefaultChanqueueCapacity = 1024

type CoordinatorConfig struct {
PeerstoreTTL time.Duration // duration for which a peer is kept in the peerstore

Expand Down Expand Up @@ -120,14 +121,14 @@ func (cfg *CoordinatorConfig) Validate() error {
return nil
}

func DefaultConfig() (*CoordinatorConfig, error) {
func DefaultCoordinatorConfig() (*CoordinatorConfig, error) {
telemetry, err := tele.NewWithGlobalProviders()
if err != nil {
return nil, fmt.Errorf("new telemetry: %w", err)
}

return &CoordinatorConfig{
Clock: clock.New(), // use standard time
Clock: clock.New(),
PeerstoreTTL: 10 * time.Minute,
QueryConcurrency: 3,
QueryTimeout: 5 * time.Minute,
Expand All @@ -140,7 +141,7 @@ func DefaultConfig() (*CoordinatorConfig, error) {

func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey, kad.NodeID[KadKey]], cfg *CoordinatorConfig) (*Coordinator, error) {
if cfg == nil {
c, err := DefaultConfig()
c, err := DefaultCoordinatorConfig()
if err != nil {
return nil, fmt.Errorf("default config: %w", err)
}
Expand Down Expand Up @@ -200,25 +201,34 @@ func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey,

routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, probe, cfg.Logger, cfg.Tele.Tracer)

networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger)
networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, cfg.Tele.Tracer)

ctx, cancel := context.WithCancel(context.Background())

d := &Coordinator{
self: self,
cfg: *cfg,
rtr: rtr,
rt: rt,
self: self,
cfg: *cfg,
rtr: rtr,
rt: rt,
cancel: cancel,

networkBehaviour: networkBehaviour,
routingBehaviour: routingBehaviour,
queryBehaviour: queryBehaviour,

routingNotifications: make(chan RoutingNotification, 20),
routingNotifications: make(chan RoutingNotification, 20), // buffered mainly to allow tests to read the channel after running an operation
}
go d.eventLoop()
go d.eventLoop(ctx)

return d, nil
}

// Close cleans up all resources associated with this Coordinator.
func (c *Coordinator) Close() error {
c.cancel()
return nil
}

func (c *Coordinator) ID() peer.ID {
return c.self
}
Expand All @@ -237,13 +247,16 @@ func (c *Coordinator) RoutingNotifications() <-chan RoutingNotification {
return c.routingNotifications
}

func (c *Coordinator) eventLoop() {
ctx := context.Background()

func (c *Coordinator) eventLoop(ctx context.Context) {
ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.eventLoop")
defer span.End()
for {
var ev BehaviourEvent
var ok bool
select {
case <-ctx.Done():
// coordinator is closing
return
case <-c.networkBehaviour.Ready():
ev, ok = c.networkBehaviour.Perform(ctx)
case <-c.routingBehaviour.Ready():
Expand Down Expand Up @@ -339,7 +352,7 @@ func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (Q
}

waiter := NewWaiter[BehaviourEvent]()
queryID := query.QueryID("foo")
queryID := query.QueryID("foo") // TODO: choose query ID

cmd := &EventStartQuery{
QueryID: queryID,
Expand Down Expand Up @@ -375,12 +388,12 @@ func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (Q
}

err = fn(ctx, nh, lastStats)
if errors.Is(err, SkipRemaining) {
if errors.Is(err, ErrSkipRemaining) {
// done
c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID})
return lastStats, nil
}
if errors.Is(err, SkipNode) {
if errors.Is(err, ErrSkipNode) {
// TODO: don't add closer nodes from this node
break
}
Expand All @@ -405,17 +418,20 @@ func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (Q
// AddNodes suggests new DHT nodes and their associated addresses to be added to the routing table.
// If the routing table is updated as a result of this operation an EventRoutingUpdated notification
// is emitted on the routing notification channel.
func (c *Coordinator) AddNodes(ctx context.Context, infos []peer.AddrInfo) error {
func (c *Coordinator) AddNodes(ctx context.Context, ais []peer.AddrInfo, ttl time.Duration) error {
ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.AddNodes")
defer span.End()
for _, info := range infos {
if info.ID == c.self {
for _, ai := range ais {
if ai.ID == c.self {
// skip self
continue
}

// TODO: apply address filter

c.routingBehaviour.Notify(ctx, &EventAddAddrInfo{
NodeInfo: info,
NodeInfo: ai,
TTL: ttl,
})

}
Expand Down
Loading